Kolejkowanie zadań w Symfony 5 z wykorzystaniem RabbitMQ

 lis, 29 - 2020   PHP

Kolejkowanie zadań jest częstym rozwiązaniem wykorzystywanym podczas konieczności przetwarzania dużej ilości danych lub zadań wymagających określonej ilości czasu do wykonania. Idea jest prosta: dzielimy określoną logikę aplikacji na zadania, a następnie wysyłamy wysyłamy je do kolejki, której zadaniem jest ich przetworzenie. W artykule przedstawiam w jaki sposób wykorzystać do tego celu popularny framework PHP Symfony oraz system kolejkowania i obsługi wiadomości RabbitMQ.

 

 

Słowem wstępu

 

Na początku chciałbym krótko przedstawić o czym konkretnie będzie ten artykół, co będziemy robić oraz z jakich narzędzi będziemy korzystać.Zacznijmy od tytułu:

Symfony jest jednym z najbardziej popularnych frameworków ułatwiających tworzenie aplikacji opartych na języku PHP. Symfony udostępnia bibliotekę Messanger, która implementuje obsługę różnych systemów kolejkowania, w tym tych opartych na protokole amqp, który wykorzystuje RabbitMQ.

RabbitMQ to oprogramowanie napisane w języku Erlang, przeznaczone do przesyłania wiadomości pomiędzy serwerami (tzw. message broker, message-oriented middleware). Obsługuje wiele protokołów wiadomości oraz umożliwia różne konfiguracje przesyłu i odbioru wiadomości.

Zostaje jeszcze do wyjaśnienia termin kolejkowania zadań oraz kiedy warto skorzystać z tego typu funkcjonalności. Otóż, bardzo często zdarza się, że pisane przez nas oprogramowanie musi wykonać zadanie, które jest czasochłonne lub bardzo wiele powtarzalnych zadań. Mogą być to np. wykonywanie jakichś czasochłonnych obliczeń, komunikacja z zewnętrznymi serwisami, przetwarzanie zamówienia, wysyłanie wiadomości e-mail lub SMS, itp. Wszystkie takie działania mogą trwać dość długi czas oraz może się zdarzyć że z jakichś powodów w trakcie wykonywania zadania pojawił się błąd. W takich wypadkach warto przenieść tego typu akcje do kolejki, która ułatwi wykonywanie takich zadań.

 

 

Jako przykład omówimy wykorzystanie kolejki podczas funkcjonalności rejestracji użytkownika, podczas której wysyłamy użytkownikowi wiadomość e-mail, weryfikującą utworzone konto. Powiadomienie użytkownika, może zająć trochę czasu, a poza tym może być zawodne, dlatego jest to idealny przykład do przedstawienia działania kolejki. Oczywiście będzie to jedynie symulacja, przykładowy kod będzie zawierał tylko niezbędne elementy.

 

 

Konfiguracja środowiska testowego

 

Na potrzeby omawianego przykładu wykorzystałem PHP 7.4 oraz framework Symfony 5.1. Wykorzystując system zarządzania pakietami Composer zainstalowałem aplikację wraz z niezbędnymi zależnościami. Całość została uruchomiona w systemie Windows 10 z wykorzystaniem serwera Nginx. Serwis kolejki RabbitMQ został uruchomiony z wykorzystaniem kontenera Docker.

 

Instalacja Symfony i niezbędnych bibliotek

Framework Symfony instalujemy przy użyciu composera z wpisując w konsole polecenie:

Zostaną utworzony katalog projektu, w którym pobrane zostaną wszystkie niezbędne biblioteki oraz zostanie utworzona cała struktura aplikacji. Następnie musimy dodać bibliotekę odpowiedzialną za obsługę kolejki zadań. W konsoli wpisujemy:

 

Przygotowanie PHP

Wiadomości do systemu RabbitMQ wysyłane są wykorzystaniem protokołu AMQP (Advanced Message Queuing Protocol), który domyślnie nie jest obsługiwany w języku PHP, dlatego musimy zainstalować odpowiednie rozszerzenie. W przeciwnym wypadku nasza kolejka nie zadziała i dostaniemy komunikat błędu:

You cannot use the „Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection” as the „amqp” extension is not installed.

Instalacja rozszerzenia amqp dla języka PHP w systemie Windows odbywa się w kilku prostych krokach:

1) Pobieramy pliki rozszerzenia z strony https://pecl.php.net/package/amqp/
(np. https://pecl.php.net/package/amqp/1.10.2/windows). Zanim pobierzemy odpowiednią wersję należy sprawdzić czy zainstalowany PHP jest w trybie thread safe lub non-thread safe. Aby to sprawdzić używamy polecenia:

2) Rozpakowujemy paczkę i kopiujemy pliki:

  • rabbitmq.4.dll i rabbitmq.4.pdb do głównego katalogu PHP
  • php_amqp.dll i php_amqp.pdb do katalogu „ext” w głównym katalogu PHP

3) Do pliku konfiguracyjnego php.ini dopisujemy linijkę:

4) Restartujemy serwer

Dla pewności możemy sprawdzić czy rozszerzenie znajduje się na liście poleceniem rozszerzeń.

W systemie Linux możemy posłużyć się poleceniem instalacji:

a następnie wystarczy dodać do pliku php.ini linijkę:

 

Uruchomienie serwisu RabbitMQ

Jak już wspomniałem RabbitMQ uruchomimy z wykorzystaniem systemu Docker. Nie będę tutaj omawiał instalacji i konfiguracji samego Dockera – nie jest to zbyt skomplikowany proces, aczkolwiek w systemie Windows trzeba wykonać kilka dodatkowych kroków związanych z instalacją systemu WSL (Windows Subsystem for Linux). Zainteresowanych zapraszam do zadawania pytań w komentarzu.

Tworzymy plik docker-compose.yaml z następującą konfiguracją:

W konfiguracji serwisu korzystam z oficjalnego obrazu pobranego z Docker Hub, udostepniającego panel zarządzania kolejkami. Jaki widać w powyższej konfiguracji mapuję dwa porty:

  • 8888:15672 – port po którym będę łączył się z panelem RabbitMQ
  • 5672:5672 – port po którym będę wysyłał zadania (wiadomości)

Uruchamiamy serwis:

Sprawdzamy, czy możemy zalogować się do panelu – w przeglądarce wpisujemy http://localhost:8888. Powinno pojawić strona z formularzem logowania – wpisujemy „guest” jako domyślne wartości w polach login i hasło. Powinniśmy zobaczyć panel monitorowania kolejek RabbitMQ.

 

Tworzymy kolejkę zadań w Symfony

 

Czas na oprogramowanie kolejki. Działanie prostej kolejki zadań można podzielić na dwie części:

  1. Rejestrowanie zadania w kolejce. W omawianym przypadku taka rejestracja następuje po poprawnym wysłaniu formularza rejestracyjnego przez użytkownika.
  2. Pobranie kolejnego zadanie z kolejki i jego wykonanie. Odbywa się automatycznie, w tle i jest obsługiwane za pomocą uruchomionego workera wiadomości.

W frameworku Symfony korzystanie z biblioteki Messanger, czyli system wysyłania, kolejkowania i obsługi wiadomości (zadań), można podzielić na 4 części:

  • Message – klasa wiadomości, zawierająca wszystkie niezbędne informacje do wykonania zadania
  • Message Handler – klasa obsługująca wiadomość, wykonuje całą logikę zadania
  • Message Bus – szyna po której transportowana jest wiadomość
  • Transport – silnik/protokół przenoszenia wiadomości (np. baza danych, RabbitMQ, Amazon SQS, itp.)

Zacznijmy od napisania klasy zadania. Zakładamy nowy katalog /src/Message, w którym tworzymy klasę wiadomości:

Klasa przyjmuje ID użytkownika, który jest zwracany z wykorzystaniem prostego gettera.

Teraz zajmijmy się klasą odpowiedzialną za obsługę wiadomości. W tym samym katalogu zakładamy kolejny o nazwie MessageHandler, w którym tworzymy klasę:

Klasa ta implementuje interfejs MessageHandlerInterface, który nie deklaruję żadnej metody, ale jego zadaniem jest rejestracja handler-a w systemie Symfony. Klasa powinna zawierać magiczną metodę __invoke, która jako argument przyjmuje klasę wiadomości, która z kolei powinna zawierać wszystkie dane, niezbędne do wykonania zadania.

Dalej zajmijmy się konfiguracją transportera. Przy instalacji biblioteki Messanger, został dodany plik konfiguracyjny /config/packages/messanger.yaml, który należy wyedytować i dodać następującą konfigurację:

Tworzymy transporter async oraz wskazujemy, że nasza wiadomość będzie przez niego obsługiwana. Jak widać konfiguracja odwołuje się do zmiennej środowiskowej MESSENGER_TRANSPORT_DSN, którą również należy odpowiednio ustawić. W pliku .env edytujemy wybrany wpis i dodajemy konfiguracje połączenia z serwisem RabbitMQ.

Ustawiamy protokół, login i hasło, host oraz port na którym nasłuchuje usługa kolejkowania.

 

 

Testowanie kolejki

 

Mamy przygotowane środowisko uruchomieniowe oraz naszą testową wiadomość. Przyszła pora na sprawdzenie jak to wszystko działa.

W pierwszej kolejności napiszmy prosty kontroler, który będzie symulował rejestrację użytkownika. Podczas tego procesu jednym z kroków będzie zarejestrowanie wiadomości w szynie transportującej (message bus).

Akcja kontrolera przyjmuje w argumencie interfejs MessageBusInterface, czyli szynę do której będziemy zapisywać wiadomości (czyli nasze zadania do wykonania). Za pomocą metody dispatch, dodajemy takie zadanie do szyny. Zadanie będzie obsługiwane asynchronicznie (w tle) i nie będzie blokowało działania aplikacji.

Zanim jeszcze uruchomimy nasz kontroler, musimy uruchomić workera, odpowiedzialnego za obsługę kolejki. W konsoli wpisujemy polecenie:

Podajemy nazwę transportera async. Flaga -vv pozwala na wyświetlanie działania kolejki podczas jej pracy. po wprowadzeniu polecenia powinniśmy otrzymać komunikat o działającym workerze.

W przypadku, gdy popełniliśmy byłą w konfiguracji, albo gdy system z jakichś powodów nie będzie mógł połączyć się z serwisem kolejkowania, zostanie wyrzucony błąd.

Ok, czas na test naszej kolejki. Uruchamiamy przeglądarkę oraz uruchamiamy nasz kontroler testowy https://symfony5.test/user/register (najlepiej odświeżyć stronę kilka razy z rzędu, żeby upewnić się, że zadanie faktycznie nie blokuje żądania a kod wykonuje się natychmiastowo). Po uruchomieniu aplikacji, po kilku sekundach w konsoli zaczęły pojawiać się informacje potwierdzające przekazanie wiadomości na szynę a następnie jej przetransportowanie do systemu kolejkowania. Przechodzimy do panelu RabbitMQ, gdzie również widzimy wynik monitorowania przekazywanej wiadomości.

Jak widzimy na powyższym screenie, RabbitMQ zaczął monitorować przedwarzanie zadań wysyłanych z naszego prostego systemu.


Przeczytaj równierz