W poprzedniej części zaimplementowaliśmy naszego pierwszego aktora i wysłaliśmy do niego komunikat. Niestety aktor nie był w stanie w żaden sposób zwrócić rezultatu przetwarzania tego komunikatu, co czyniło go raczej bezużytecznym. W tej części nauczymy się jak wysyłać komunikat zwrotny do nadawcy oraz jak zintegrować blokujące API z asynchronicznym z natury systemem opartym na wymianie komunikatów.

Zanim przystąpimy do pracy musimy wyraźnie zaznaczyć różnicę między aktorem (rozszerzającym trait Actor) a referencją aktora typu ActorRef. Implementując aktora rozszerzamy trait Actor co zmusza nas do implementacji metody receive. Jednak nie tworzymy aktorów ręcznie, tylko prosimy o to klasę ActorSystem:

val randomOrgBuffer: ActorRef = system.actorOf(Props[RandomOrgBuffer], "buffer")

Ku naszemu zaskoczeniu zwrócony obiekt wcale nie jest typu RandomOrgBuffer jak nasz aktor, ani nawet typu Actor. ActorRef to „opakowanie” naszego aktora dzięki któremu:

  • wewnętrzny stan, tj. pola, metody, etc. aktora są niedostępne (enkapsulacja)
  • system zapewnia, że metoda receive każdego aktora obsługuje tylko jeden komunikat w danej chwili (jednowątkowość) oraz kolejkuje pozostałe komunikaty
  • właściwy aktor może się znajdować na innej maszynie w klastrze, referencja przezroczyście serializuje i przesyła komunikat po sieci

Bogatsi w tą wiedzę spróbujmy jakoś zwrócić wygenerowane liczby losowe w naszym aktorze. Okazuje się, że wewnątrz każdego aktora mamy dostęp do referencji o obiecującej nazwie sender. Nie będzie zaskoczeniem jeśli powiem, że jest to obiekt typu ActorRef wskazujący aktora, który przesłał do nas właśnie przetwarzany komunikat:

object Bootstrap extends App {
	//...
	randomOrgBuffer ! RandomRequest
	//...
}

//...
	
class RandomOrgBuffer extends Actor with ActorLogging {

	def receive = {
		case RandomRequest =>
			if(buffer.isEmpty) {
				buffer ++= fetchRandomNumbers(50)
			}
			sender ! buffer.dequeue()
	}
}

Mam nadzieję, że przywykliście już do notacji ! służącej do wysłania komunikatu. Jeśli nie, istnieją bardziej konserwatywne alternatywy:

sender tell buffer.dequeue()
sender.tell(buffer.dequeue())

Tak czy inaczej miast wypisywać losową liczbę na ekranie wysyłamy ją z powrotem do nadawcy. Szybki test programu i… nic się nie dzieje. Przyglądając się z bliska referencji sender odkrywamy, że prowadzi do aktora o nazwie: Actor[akka://Akka/deadLetters]. Nie wróży to najlepiej, ale jest logiczne. sender reprezentuje referencję aktora, który wysłał dany komunikat. My wysłaliśmy nasz komunikat z poziomu zwykłej klasy niebędącej aktorem. Gdybyśmy mieli dwóch aktorów i pierwszy wysłałby komunikat drugiemu, wtedy ten drugi mógłby użyć referencji sender do odesłania odpowiedzi pierwszemu. Oczywiście wtedy nadal nie potrafilibyśmy odebrać odpowiedzi, mimo wprowadzenia dodatkowej warstwy abstrakcji.

Scenariuszom z wieloma aktorami przyjrzymy się już wkrótce, teraz musimy się nauczyć jak zintegrować zwykły kod aplikacji z Akka. Innymi słowy jak otrzymać odpowiedź, by Akka nie było jedynie studnią, do której wysyłamy komunikaty. Otóż sposób jest bardzo prosty – możemy bez trudu zaczekać na odpowiedź!

implicit val timeout = Timeout(1 minutes)

val future = randomOrgBuffer ? RandomRequest
val veryRandom: Int = Await.result(future.mapTo[Int], 1 minute)

Nazwa future nie jest przypadkowa, co prawda nie jest to zmienna typu java.util.concurrent.Future, ale semantycznie znaczy dokładnie to samo. Przede wszystkim jednak zwróćcie uwagę, że zamiast wykrzyknika użyliśmy pytajnika (?) do wysłania komunikatu. Ten model komunikacji określa się mianem ask w przeciwieństwie do poznanego już tell. W skrócie Akka przygotowała dla nas tymczasowego aktora o nazwie Actor[akka://Akka/temp/$d], wysyła komunikat i teraz czeka do jednej minuty na odpowiedź przesłaną do wspomnianego tymczasowego aktora. Wysłanie komunikatu jest nadal nieblokujące a obiekt future reprezentuje rezultat operacji, która potencjalnie jeszcze nie została wykonana (będzie dostępna w przyszłości). Następnie (już w trybie blokującym) czekamy na odpowiedź. Dodatkowe mapTo[Int] jest konieczne, ponieważ Akka nie wie jakiego typu odpowiedzi ma się spodziewać.

Musicie pamiętać, że korzystanie z wzorca ask, a zwłaszcza blokujące czekanie na odpowiedź są bardzo rzadko stosowane. Z reguły stawiamy na asynchroniczne komunikaty i pracę zdarzeniową. Jak jednak wspomniałem w tym konkretnym przypadku potrzebujemy mieć bezpośredni dostęp do wartości zwróconej, a nie czekać na wywołanie zwrotne. Co ciekawego możemy teraz zaimplementować? Np. naszą własną wersję java.util.Random opartą w całości na idealnie losowych liczbach!

class RandomOrgRandom(randomOrgBuffer: ActorRef) extends java.util.Random {
	implicit val timeout = Timeout(1 minutes)

	override def next(bits: Int) = {
		if(bits <= 16) {
			random16Bits() & ((1 << bits) - 1)
		} else {
			(next(bits - 16) << 16) + random16Bits()
		}
	}

	private def random16Bits(): Int = {
		val future = randomOrgBuffer ? RandomRequest
		Await.result(future.mapTo[Int], 1 minute)
	}
}

Szczegóły nie są istotne, dość powiedzieć, że musimy zaimplementować metodę next() zwracającą zadaną ilość losowych bitów podczas gdy nasz aktor zwraca zawsze liczbę 16-bitową. Teraz do szczęścia potrzebne jest nam jedynie lekkie opakowanie klasy java.util.Random w postaci klasy scala.util.Random i możemy cieszyć się idealnie posortowanym zbiorem liczb:

val javaRandom = new RandomOrgRandom(randomOrgBuffer)
val scalaRandom = new scala.util.Random(javaRandom)
println(scalaRandom.shuffle((1 to 20).toList))
//List(17, 15, 14, 6, 10, 2, 1, 9, 8, 3, 4, 16, 7, 18, 13, 11, 19, 5, 12, 20)

Podsumujmy. Napisaliśmy prosty system oparty o jednego aktora, który w razie potrzeby łączy się z zewnętrznym serwerem i pobiera na zapas zbiór liczb losowych. W odpowiedzi na komunikat aktor odsyła jedną losową liczbę. Następnie zintegrowaliśmy asynchroniczny świat aktorów i wymiany komunikatów z synchronicznym wymogiem stawianym przez istniejące API. Opakowując wysłanie komunikatu i blokujące oczekiwanie na odpowiedź, napisaliśmy własną implementację klasy java.util.Random (zob. też alternatywę pod postacią SecureRandom). Klasy tej możemy teraz używać wszędzie tam, gdzie potrzebujemy liczb losowych bardzo wysokiej jakości. Kod jest jednak daleki od ideału, o czym przekonamy się w kolejnych odcinkach.

Tagged with →  
Share →
Buffer
Przeczytaj poprzedni wpis:
Poznajemy Akka: pierwszy komunikat

Akka to wzorowana na Erlangu biblioteka (framework?) obiecująca łatwiejsze pisanie skalowalnych, wielowątkowych i bezpiecznych aplikacji. O ile w większości popularnych...

Zamknij