Nadszedł czas by przyjrzeć się dobrym praktykom podczas implementowania aktorów. Kluczowym wymaganiem stawianym aktorom jest unikanie blokowania na operacjach wejścia/wyjścia, aktywnego oczekiwania, spania, etc. Innymi słowy aktor w trakcie obsługi komunikat powinien zużywać czas procesora – bądź oddać go natychmiast innym aktorom. Jeśli będziemy się trzymali ściśle tej rady, Akka sprosta obsłudze setek tysięcy komunikatów wykorzystując jedynie garść wątków. Nie będzie bowiem dla was zaskoczeniem, że chociaż nasza aplikacja może się składać z tysięcy pozornie niezależnych aktorów (np. jeden aktor na jedno połączenie HTTP, jednego gracza w masowej rozgrywce sieciowej, itp.), Akka przydziela im ograniczony czas procesora i ilość wątków. Przy domyślnych 10 wątkach obsługujących wszystkich aktorów, wystarczy by jeden blokował się na wywołaniu usługi sieciowej albo spał by ograniczyć przepustowość o 10%. 10 aktorów oczekujących jednocześnie na zdalny zasób oznacza całkowite zatrzymanie pracy systemu.

Z tego powodu wywołania metody sleep() są poważnym błędem a blokujące odpytywanie zewnętrznych usług webowych dużym niedopatrzeniem. Niestety nie powstała jeszcze (?) dojrzała biblioteka oferująca asynchroniczny dostęp do bazy danych a NIO jest dość kłopotliwe w użyciu. Ale powinniśmy eliminować blokujący kod gdzie to tylko możliwe. W naszej przykładowej aplikacji pobieramy liczby losowe z zewnętrznego serwera w dość naiwny sposób:

val url = new URL("https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new")
val connection = url.openConnection()
val stream = Source.fromInputStream(connection.getInputStream)
sender ! RandomOrgServerResponse(stream.getLines().map(_.toInt).toList)
stream.close()

Kod ten blokująco czeka na odpowiedź serwera, która jak przekonaliśmy się w pierwszej części cyklu może przybyć nawet po minucie. Oznacza to, że przez minutę nasz aktor nie może obsłużyć żadnego innego komunikatu. Mało tego, w tym czasie okupuje on również jeden wątek z puli roboczej Akka – która to pula w teorii powinna wystarczyć tysiącom aktorów. Dość samolubne, nieprawdaż?

Na szczęście istnieją (dojrzałe) biblioteki umożliwiające asynchroniczną komunikację z usługami webowymi, mianowicie async-http-client i Jetty HttpClient. Do testów wykorzystamy tą pierwszą. API jest dość oczywiste: prosi nas o URL, pod który ma zostać wysłane żądanie i obiekt zwrotny, którego metody będą wywołane gdy nadejdzie odpowiedź. Tym samym wywołanie HTTP jest nieblokujące (aktor szybko kończy obsługę komunikatu i może rozpocząć obsługę kolejnych) podczas gdy odpowiedź zjawia się asynchronicznie:

implicit def block2completionHandler[T](block: Response => T) = new AsyncCompletionHandler[T]() {
  def onCompleted(response: Response) = block(response)
}

def receive = {
  case FetchFromRandomOrg(batchSize) =>
    val curSender = sender
    val url = "https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new"
    client.prepareGet(url).execute {
      response: Response =>
        val numbers = response.getResponseBody.lines.map(_.toInt).toList
        curSender ! RandomOrgServerResponse(numbers)
    }
}

W powyższym kodzie ocieramy się o bardzo niebezpieczny błąd. Zauważcie, że przed wysłaniem żądania kopiuję do lokalnej zmiennej aktualną wartość sender. Gdybym tego nie zrobił, blok kodu wywołany w chwili odebrania komunikatu odczytałby bieżącą wartość sender. Bieżącą, tzn. gdyby aktor był w trakcie obsługi innego komunikatu, byłby to nadawca tego komunikatu. To dlatego wszystkie zmienne z zewnętrznego kontekstu, do których odwołują się anonimowe klasy wewnętrzne w Javie, muszą być finalne.

Odejdźmy na chwilę od naszej przykładowej aplikacji. Wyobraźcie sobie jak skalowalne byłoby to rozwiązanie w aplikacji pobierającej aktualizacje kanałów RSS/Atom. Dla każdego adresu URL (a monitorujemy ich np. kilka tysięcy) tworzymy jednego aktora. Aktor wysyła asynchroniczne żądanie i oczekuje na odpowiedź, którą natychmiast po otrzymaniu odsyła. Teoretycznie przy pomocy jednego wątku jesteśmy w stanie jednocześnie monitorować tysiące adresów URL, na bieżąco przetwarzając rezultaty (czasy odpowiedzi są wszakże silnie uzależnione od zewnętrznego serwera). W klasycznym, synchronicznym modelu bylibyśmy w stanie jednocześnie oczekiwać jedynie na tyle odpowiedzi, ile wątków byśmy na to przeznaczyli (na pewno nie kilka tysięcy).

Jeżeli dostrzegacie pewne podobieństwo do node.js to jesteście na właściwej ścieżce. Framework ten bazuje w całości na asynchronicznym I/O, dzięki czemu może obsłużyć bardzo duże ilości równoległych połączeń przy użyciu jednego (!) wątku.

Kod źródłowy tej lekcji jest dostępny na GitHubie.

Tagged with →  
Share →
Buffer
  • Marek Żebrowski

    Dobrą asynchroniczną komunikację po HTTP w scali zapewnia Dispatch http://notes.implicit.ly/post/30567701311/dispatch-0-9-1  Jest to nakładka na async-http-client ze scalowym DSLem. Warto użyć w scalowym projekcie zamiast bezpośredniego odwoływania się do async-http-clienta

Przeczytaj poprzedni wpis:
Polimorfizm parametryczny w Scali. Typy abstrakcyjne a typy generyczne.

Polimorfizm - w informatyce, występowanie w obrębie jednej deklaracji, typu różnych jego postaci. Polimorfizm jest używany do pisania generycznego kodu...

Zamknij