Zabawa zabawą, nauka nauką, ale zmierzywszy czasy odpowiedzi stworzonej w poprzednim odcinku klasy RandomOrgRandom zauważymy bardzo niepokojące zjawisko (wykres przedstawia czas wywołania w milisekundach dla kolejnych prób):

Okazuje się, że dość regularnie czas odpowiedzi jest o kilka rzędów wielkości większy, skacząc z 1 milisekundy do pół sekundy (skala logarytmiczna!) Oczywiście pamiętając poprzednią implementację powód jest oczywisty: nasz aktor pobiera gorliwie 50 liczb losowych do bufora i zwraca je jedna za drugą. Jeśli bufor jest pusty, wykonuje blokujące zapytanie serwera, które jak się domyślamy zajmuje około pół sekundy. To właśnie widzimy na wykresie – co pięćdziesiąte wywołanie zajmuje znacznie więcej czasu niż zwykle. W pewnych zastosowaniach takie zachowanie byłoby dopuszczalne (podobnie jak nieregularne wywołania garbage-collectora potrafią zwiększyć czas odpowiedzi naszej aplikacji w nieprzewidywalnych momentach). Niemniej jednak spróbujmy naprawić naszą implementację.

Jaki macie pomysł na usprawnienie (spłaszczenie) czasów odpowiedzi? Ja proponuję monitorowanie rozmiaru bufora i gdy zbliża się on niebezpiecznie do dna, rozpoczęcie pobierania kolejnej partii liczb z zewnętrznego serwera w tle. Dzięki temu bufor niemal nigdy nie będzie pusty a my nie będziemy zmuszeni do gorliwego pobierania ogromnej ilości liczb – których być może nie wykorzystamy. Zacznijmy od tego, że naiwna implementacja stosująca globalną synchronizację zachowywałaby się dokładnie tak samo jak nasz obecny system oparty o jednego aktora. Jednak dopiero teraz Akka pokaże prawdziwy pazur. Aby zaimplementować nasze usprawnienie w Javie potrzebowaliśmy zewnętrznego wątku do pobierania w tle oraz jakieś metody synchronizacji przynajmniej na buforze (wszak korzysta zeń wiele wątków).

W Akka każdy aktor jest efektywnie jednowątkowy, dzięki czemu nie musimy się martwić synchronizacją. Na początek podzielmy odpowiedzialności: jeden aktor przechowuje bufor liczb losowych na przyszłość i zwraca je, gdy jest o to poproszony odpowiednim komunikatem. Drugi aktor zajmuje się li tylko pobieraniem nowych liczb losowych ze zdalnego serwera. Drugi aktor uaktywnia się w chwili, gdy pierwszy aktor zauważa niski poziom bufora i wysyła komunikat do drugiego aktora inicjujący pobieranie liczb losowych. Gdy drugi aktor otrzyma odpowiedź z zewnętrznego serwera, odsyła komunikat zawierający liczby losowe do pierwszego aktora, który umieszcza nowe dane na końcu bufora.

Zacznijmy od drugiego aktora, odpowiedzialnego za asynchroniczne pobieranie liczb losowych. Aktor ten rozpoczyna w chwili otrzymania komunikatu FetchFromRandomOrg, zawierającego ponadto pożądaną przez nadawcę ilość liczb losowych. Gdy przychodzi odpowiedź z serwera random.org, parsujemy wynik i wysyłamy liczby z powrotem do nadawcy. Kod ten niewiele się różni od poznanej wcześniej metody fetchRandomNumbers():

case class FetchFromRandomOrg(batchSize: Int)

case class RandomOrgServerResponse(randomNumbers: List[Int])

class RandomOrgClient extends Actor {
  protected def receive = LoggingReceive {
    case FetchFromRandomOrg(batchSize) =>
      val url = new URL("https://www.random.org/integers/?num=" + batchSize + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new")
      val connection = url.openConnection()
      val stream = Source.fromInputStream(connection.getInputStream)
      sender ! RandomOrgServerResponse(stream.getLines().map(_.toInt).toList)
      stream.close()
  }
}

Teraz pora na właściwego aktora obsługującego żądania potencjalnie wielu klientów proszących o pojedyncze liczby losowe. Niestety logika staje się nieco bardziej skomplikowana. Po pierwsze nasz aktor RandomOrgBuffer musi teraz obsługiwać dwa różne komunikaty: RandomRequest jak przedtem od wszystkich klientów oraz RandomOrgServerResponse zawierający nową paczkę liczb losowych pobranych przez RandomOrgClient. Po drugie RandomOrgBuffer musi pamiętać, że zainicjował proces pobierania nowej paczki liczb losowych poprzez wysłanie FetchFromRandomOrg. W przeciwnym wypadku ryzykujemy rozpoczęciem kilku równoległych połączeń.

class RandomOrgBuffer extends Actor with ActorLogging {

  val BatchSize = 50

  val buffer = new Queue[Int]
  var waitingForResponse = false

  val randomOrgClient = context.actorOf(Props[RandomOrgClient], name="client")
  preFetchIfAlmostEmpty()

  def receive = {
    case RandomRequest =>
      preFetchIfAlmostEmpty()
      sender ! buffer.dequeue()
    case RandomOrgServerResponse(randomNumbers) =>
      buffer ++= randomNumbers
      waitingForResponse = false
  }

  private def preFetchIfAlmostEmpty() {
    if(buffer.size <= BatchSize / 4 && !waitingForResponse) {
      randomOrgClient ! FetchFromRandomOrg(BatchSize)
      waitingForResponse = true
    }
  }

}

Kluczową jest metoda preFetchIfAlmostEmpty() inicjująca proces pobierania liczb losowych (w tle, przez innego aktora). Jeśli poziom bufora jest zbyt niski i nie oczekujemy już na odpowiedź z serwera, wysyłamy odpowiedni komunikat do aktora RandomOrgClient. Metodę tą wołamy również natychmiast po utworzeniu aktora (gdy bufor jest jeszcze pusty) aby w chwili otrzymania pierwszego komunikatu mieć już gotowy zbiór liczb losowych. Zwróćcie uwagę jak jeden aktor może stworzyć instancję drugiego aktora poprzez wywołanie context.actorOf. Tym samym nasz aktor musi obsłużyć zarówno podstawowy komunikat RandomRequest zwracając jedną liczbę z bufora, ale również RandomOrgServerResponse – dodając do bufora kolejną paczkę liczb losowych. Kod ten ma jednak kolosalny błąd, dostrzegasz go?

Wyobraź sobie co się stanie, jeśli RandomOrgBuffer otrzyma nagle mnóstwo komunikatów RandomRequest, więcej niż pojemność bufora nawet tuż po napełnieniu? Albo gdy komunikat ten przyjdzie tuż po utworzeniu aktora, kiedy nie zakończyło się jeszcze pierwsze zapytanie do serwera random.org? Niestety musimy się zabezpieczyć przed sytuacją, kiedy w chwili nadejścia RandomRequest bufor jest pusty. Nie możemy jednak „zaczekać”, zablokować obsługi komunikatu do chwili otrzymania odpowiedzi i napełnienia bufora. Nie możemy choćby z tego prostego powodu, że zasypiając w trakcie obsługi komunikatu RandomRequest nigdy nie będziemy w stanie obsłużyć RandomOrgServerResponse – wszak aktor może obsługiwać tylko jeden komunikat w jednej chwili. Nie dość, że doprowadziliśmy do dead-locku, to jeszcze wszelkie formy (aktywnego) oczekiwania, spania czy blokowania są w aktorach zabronione – o czym w następnych częściach.

Poprawną formą obsługi tego problemu jest stworzenie kolejki „oczekujących” – listy aktorów, którzy wysłali do nas RandomRequest którego nie byliśmy w stanie w danej chwili obsłużyć. Gdy tylko dotrze do nas RandomOrgServerResponse, w pierwszej kolejności obsługujemy oczekujących. Jeśli oczekujących było tak wielu, że bufor prawie się wyczerpał, natychmiast wysyłamy kolejny komunikat FetchFromRandomOrg. W skrajnej sytuacji nawet mimo napełnienia bufora nie uda nam się obsłużyć wszystkich oczekujących – wtedy czekamy na kolejną paczkę odebraną z serwera:

class RandomOrgBuffer extends Actor with ActorLogging {

  val BatchSize = 50

  val buffer = new Queue[Int]
  val backlog = new Queue[ActorRef]
  var waitingForResponse = false

  //...

  def receive = LoggingReceive {
    case RandomRequest =>
      preFetchIfAlmostEmpty()
      if(buffer.isEmpty) {
        backlog += sender
      } else {
        sender ! buffer.dequeue()
      }
    case RandomOrgServerResponse(randomNumbers) =>
      buffer ++= randomNumbers
      waitingForResponse = false
      while(!backlog.isEmpty && !buffer.isEmpty) {
        backlog.dequeue() ! buffer.dequeue()
      }
      preFetchIfAlmostEmpty()
  }

}

Kluczowe fragmenty ostatecznej wersji naszego aktora to kolejka backlog opisana powyżej. Zwróćcie uwagę na obsługę komunikatu RandomOrgServerResponse, która po otrzymaniu nowego zbioru liczb losowych próbuje odpowiedzieć tak wielu oczekującym aktorom, jak to możliwe. Oczywiście skoro naszym celem była eliminacja znacznie dłuższych czasów odpowiedzi, powinniśmy dążyć do minimalizacji wykorzystania kolejki backlog. Idealnie byłoby, gdyby była ona zawsze pusta a aktor dostosowywał się dynamicznie do obciążenia (pobierając z serwera mniej lub więcej liczb za jednym razem oraz dostosowując próg, poniżej którego rozpoczynamy pobieranie nowej paczki). Ale ta implementacja jest dla nas póki co wystarczająca. Oto czas odpowiedzi aktora przy niezbyt dużym obciążeniu (skala liniowa, dane nie są aż tak rozbieżne jak początkowo):

Słowem podsumowania, obsługa nadchodzących komunikatów stała się nieco zbyt skomplikowana, w szczególności boli mnie obecność logicznej flagi waitingForResponse. W następnym odcinku pozbędziemy się jej w bardzo obiektowy i czytelny sposób, dostępny dla każdego aktora w Akka.

Tagged with →  
Share →
Buffer
Przeczytaj poprzedni wpis:
Poznajemy Akka: żądanie i odpowiedź

W poprzedniej części zaimplementowaliśmy naszego pierwszego aktora i wysłaliśmy do niego komunikat. Niestety aktor nie był w stanie w żaden...

Zamknij