Akka to wzorowana na Erlangu biblioteka (framework?) obiecująca łatwiejsze pisanie skalowalnych, wielowątkowych i bezpiecznych aplikacji. O ile w większości popularnych języków wielowątkowość oparta jest o pamięć współdzieloną przez wiele wątków, chronioną za pomocą różnych metod synchronizacji, Akka oferuje model oparty o aktorów. Aktor to lekki obiekt, z którym komunikować się można jedynie poprzez wysyłanie mu komunikatów. Każdy aktor może przetwarzać co najwyżej jeden komunikat w jednej chwili i naturalnie może wysyłać komunikaty do innych aktorów. W ramach jednej maszyny wirtualnej pracować mogą jednocześnie miliony aktorów tworząc drzewiastą strukturę rodzic (supervisor) – dzieci, gdzie rodzic monitoruje poprawność działania swoich dzieci. Jakby tego było mało, z łatwością możemy rozdzielić naszych aktorów na wiele maszyn w klastrze bez modyfikowania kodu. Każdy aktor może mieć wewnętrzny stan (zbiór zmiennych/pól), ale komunikacja odbywa się jedynie poprzez wymianę wiadomości, nigdy poprzez współdzielenie tej samej pamięci (liczników, struktur danych, etc.)

Kombinacja tych wszystkich cech sprawia, że za cenę radykalnej zmiany sposobu, w jaki programujemy wielowątkowe aplikacje, uzyskujemy kod bezpieczniejszy, stabilniejszy i lepiej skalowany. Tyle frazesów i pustych obietnic, przejdźmy do przykładu. I nie będzie to przykład pokroju „Hello, World”, ale postaramy się zbudować niewielkie, acz kompletne rozwiązanie. W kilku najbliższych odcinkach zajmiemy się integracją z API random.org. Strona ta umożliwia pobranie prawdziwie losowych liczb (w przeciwieństwie do generatorów pseudo-losowych) opartych na szumie atmosferycznym, cokolwiek by to nie znaczyło. API nie jest przesadnie skomplikowane, odwiedźcie i odświeżcie poniższą stronę, to wszystko:

www.random.org/integers/?num=20&min=1000&max=10000&col=1&base=10&format=plain

Na czym zatem polega trudność? Czytając wytyczne dla klientów API dowiadujemy się, że:

  1. Aplikacja kliencka może odwoływać się do powyższego URLa co najwyżej z jednego wątku – czyli zakazane jest pobieranie liczb losowych wielowątkowo
  2. Powinniśmy pobierać liczby losowe w paczkach, a nie pojedynczo, po jednej liczbie na żądanie. Powyższe wywołanie pobiera num=20 liczb za jednym wywołaniem
  3. Jesteśmy ostrzeżeni przed możliwymi bardzo długimi czasami odpowiedzi
  4. Klient powinien periodycznie sprawdzać, czy nie wyczerpaliśmy limitu liczb losowych pobranych danego dnia (usługa powyżej pewnego pułapu jest płatna)

Te wszystkie wymagania sprawiają, że integracja z random.org okazuje się nietrywialna. W zapoczątkowanym właśnie cyklu będziemy ulepszali nasze rozwiązanie, krok po kroku poznając nowe możliwości Akka. Szybko przekonamy się, że dość wysoka bariera wejścia i duża inwestycja w naukę i infrastrukturę na początku szybko się zwróci. Zatem – do dzieła!

Dziś spróbujemy się zająć pierwszymi dwoma wymaganiami, czyli co najwyżej jedno połączenie w danej chwili z serwerem oraz pobieranie liczb w paczkach. Do tego kroku nie potrzebujemy Akka, wystarczy prosta synchronizacja i bufor:

val buffer = new Queue[Int]

def nextRandom(): Int = {
  this.synchronized {
    if(buffer.isEmpty) {
      buffer ++= fetchRandomNumbers(50)
    }
    buffer.dequeue()
  }
}

def fetchRandomNumbers(count: Int) = {
  val url = new URL("https://www.random.org/integers/?num=" + count + "&min=0&max=65535&col=1&base=10&format=plain&rnd=new")
  val connection = url.openConnection()
  val stream = Source.fromInputStream(connection.getInputStream)
  val randomNumbers = stream.getLines().map(_.toInt).toList
  stream.close()
  randomNumbers
}

Ten kod działa i odpowiada użyciu słowa kluczowego synchronized w Javie. Funkcjonowanie nextRandom() jest chyba oczywiste: jeśli bufor jest pusty: napełnij go pobierając 50 losowych liczb z serwera. Na koniec zwróć pierwszą liczbę z bufora. Kod ten ma jednak szereg wad, z których owo synchronized jest najpoważniejszą. Dość kosztowna synchronizacja każdego wywołania wydaje się nieco zbyt rygorystyczna. A przecież i tak nie zadziała w klastrze, kiedy wymaganiem jest co najwyżej jeden wątek w ramach całego klastra, a nie pojedynczego węzła.

Zacznijmy od zaimplementowania aktora. Aktor to po prostu klasa rozszerzająca trait Actor i implementująca metodę receive. Zadaniem tej metody jest odebranie i obsługa komunikatu. Przypomnijmy jeszcze raz: każdy aktor może obsługiwać co najwyżej jeden komunikat w danej chwili, pozostałe czekają cierpliwie w kolejce dedykowanej danemu aktorowi. Dzięki temu unikamy jakiejkolwiek synchronizacji w naszym kodzie:

case object RandomRequest

class RandomOrgBuffer extends Actor {

  val buffer = new Queue[Int]

  def receive = {
    case RandomRequest =>
      if(buffer.isEmpty) {
        buffer ++= fetchRandomNumbers(50)
      }
      println(buffer.dequeue())
  }

}

Metoda fetchRandomNumbers() nie uległa zmianie w porównaniu z poprzednim przykładem. Jednowątkowość wywołań zewnętrznego serwera random.org uzyskaliśmy za darmo, ponieważ aktor może obsługiwać tylko jeden komunikat jednocześnie. Właśnie, komunikatem w tym przypadku jest case object RandomRequest – pusty obiekt nieniosący żadnej informacji. W Akka prawie zawsze komunikaty są instancjami case classes lub typami immutable (niezmiennymi). Gdybyśmy zatem zapragnęli obsługiwać generowanie dowolnej ilości liczb losowych, musielibyśmy pożądaną ilość przekazać jako część komunikatu:

case class RandomRequest(howMany: Int)

class RandomOrgBuffer extends Actor with ActorLogging {

  val buffer = new Queue[Int]

  def receive = {
    case RandomRequest(howMany) =>
      if(buffer.isEmpty) {
        buffer ++= fetchRandomNumbers(50)
      }
      for(_ <- 1 to (howMany min 50)) {
        println(buffer.dequeue())
      }
  }

Na koniec wypadałoby wysłać coś do naszego aktora. Oczywiście nie możemy po prostu wywołać metody receive z komunikatem jako argumentem. Najpierw musimy wystartować framework i poprosić o referencję aktora. Tej referencji wysyłamy komunikat przy użyciu nieintuicyjnej na początku, zaczerpniętej z Erlanga metody !:

object Bootstrap extends App {
  val system = ActorSystem("Akka")
  val randomOrgBuffer = system.actorOf(Props[RandomOrgBuffer], "randomOrg")

  randomOrgBuffer ! RandomRequest(10)  //wysłanie komunikatu

  system.shutdown()
}

Po uruchomieniu tego programu na konsoli powinno się pojawić 10 losowych liczb. Poeksperymentujcie nieco z tą prostą aplikacją. W szczególności zwróćcie uwagę, że wysłanie komunikatu jest nieblokujące a jego obsługa przez aktora odbywa się w tle w innym wątku (widać dużą analogię do JMS). Spróbujcie wysłać aktorowi komunikat innego typu oraz poprawić metodę receive tak, aby aktor potrafił obsłużyć więcej, niż jeden typ.

Użyteczność naszej aplikacji w obecnej chwili jest dalece wątpliwa. Chcielibyśmy móc uzyskać jakoś nasze losowe dane miast tylko wypisywać je na konsolę. Jak się zapewne domyślacie, skoro komunikacja z aktorem odbywa się tylko poprzez wymianę komunikatów (aktor nie może „zwrócić” rezultatu, nie powinien też umieszczać go w jakiejkolwiek globalnie dostępnej pamięci) – tak też odzyskamy rezultat. Aktor odeśle wynik z powrotem w postaci odpowiednio zaadresowanego komunikatu. Ale o tym w następnej części.

Tagged with →  
Share →
Buffer
Przeczytaj poprzedni wpis:
Polimorfizm funkcji

W tym artykule opisuję relacje polimorficzne, jakie zachodzą między różnymi funkcjami o tej samej liczbie parametrów, mających różne wartości parametrów...

Zamknij