Laravel: Event Dispatcher

10.12.2020 в 19:43
4275
+1

Event Dispatcher является одним из самых простых и в то же время одним из самых полезных компонентов Laravel. Почему простым, потому что в общем случае dispatcher просто хранит массив вида событие ⇒ слушатели и в момент, когда вы вызываете dispatch(событие), достает эти слушатели и по очереди их запускает. А почему полезным, потому что события предоставляют точки расширения логики без изменения кода. Таким образом вы уменьшаете каплинг (связанность) вашего приложения, что является хорошим преимуществом на пути масштабирования функционала в будущем.

Давайте для начала напишем свой небольшой dispatcher, чтобы посмотреть, как в общем случае устроен механизм:

  <?php

require_once __DIR__ . '/vendor/autoload.php';

use Illuminate\Support\Str;

final class EventDispatcher
{
    private $listeners = [];

    /**
     * @param string          $event
     * @param string|callable $listener
     */
    public function listen(string $event, $listener): void
    {
        $this->listeners[$event][] = $this->makeListener($listener);
    }

    /**
     * @param object $event
     */
    public function dispatch(object $event): void
    {
        $eventName = get_class($event);

        if (!isset($this->listeners[$eventName])) {
            return;
        }

        $listeners = $this->listeners[$eventName];

        foreach ($listeners as $listener) {
            $listener($event);
        }
    }

    /**
     * @param $listener
     *
     * @return callable
     */
    private function makeListener($listener): callable
    {
        if (\is_string($listener)) {
            [$class, $method] = Str::parseCallback($listener, 'handle');

            return [new $class, $method];
        }

        if (\is_callable($listener)) {
            return $listener;
        }

        throw new RuntimeException(sprintf('Listener must be class-string or callable, got %s', gettype($listener)));
    }
}

final class TestEvent
{
    public $payload;
}

final class TestEventListener
{
    /**
     * @param object $event
     * @return mixed|void
     */
    public function handle(TestEvent $event)
    {
        $event->payload[] = [__CLASS__, __METHOD__];
    }
}

$dispatcher = new EventDispatcher();

$dispatcher->listen(TestEvent::class, function (TestEvent $event) {
    $event->payload[] = [__FUNCTION__];
});

$dispatcher->listen(TestEvent::class, TestEventListener::class);

$dispatcher->dispatch($event = new TestEvent);

dd(
    $event,
    // TestEvent {#10
    //    +payload: array:2 [
    //      0 => array:1 [
    //        0 => "{closure}"
    //     ]
    //      1 => array:2 [
    //        0 => "TestEventListener"
    //        1 => "TestEventListener::handle"
    //      ]
    //    ]
    // }
);

В первую очередь, нам нужен метод, с помощью которого мы сможем зарегистрировать наши слушатели. Это метод listen:


public function listen(string $event, $listener): void
{
    $this->listeners[$event][] = $this->makeListener($listener);
}

Для названия события, которое является ключом массива, добавляем слушатели, предварительно их создав. Мы позволяем регистрировать два вида слушателей: класс-строка или замыкание. Причем класс-строку можно зарегистрировать с кастомным методом. Например: TestEventListener::class.'@on', где on - название метода внутри класса TestEventListener, который будет обрабатывать событие. По умолчанию будет использоваться метод handle. В ларавеле это работает точно так же, но об этом позже.

private function makeListener($listener): callable
{
    if (\is_string($listener)) {
        [$class, $method] = Str::parseCallback($listener, 'handle');

        return [new $class, $method];
    }

    if (\is_callable($listener)) {
        return $listener;
    }

    throw new RuntimeException(sprintf('Listener must be class-string or callable, got %s', gettype($listener)));
}

Приводим объект к замыканию, применив трюк с массивом, где первый элемент — объект, а второй — метод этого объекта. Теперь вы можете вызвать массив в качестве замыкания.

Регистрируем наши слушатели и диспатчим событие:

$dispatcher = new EventDispatcher();

$dispatcher->listen(TestEvent::class, function (TestEvent $event) {
    $event->payload[] = [__FUNCTION__];
});

$dispatcher->listen(TestEvent::class, TestEventListener::class);

$dispatcher->dispatch($event = new TestEvent);

В момент диспатча происходит следующее:

  1. Для начала проверяем, есть ли для брошенного события слушатели: для этого берем полное имя класса события и ищем по нему в массиве listeners;
  2. Если не нашли, просто выходим;
  3. Если нашли, обходим их и запускаем как замыкания, передав в качестве аргумента объект события.
public function dispatch(object $event): void
{
     $eventName = get_class($event);

     if (!isset($this->listeners[$eventName])) {
         return;
     }

     $listeners = $this->listeners[$eventName];

     foreach ($listeners as $listener) {
         $listener($event);
     }
}

Это и есть основная способность диспатчера, никакой магии.

Теперь разберем, как это работает в Laravel и что еще умеет диспатчер там.

Для начала вспомним, что для регистрации событий используется сервис-провайдер, но не обычный сервис-провайдер, а так называемый EventServiceProvider. Отнаследовав его, вы должны зарегистрировать слушатели в свойстве $listen или в свойстве $subscribe. Отличие заключается в том, что у подписчика (subscriber) есть метод subscribe, куда фреймворк передаст Dispatcher, чтобы вы зарегистрировали события:

final class UserStatesSubscriber
{
   public function (Dispatcher $dispatcher)
   {
      $dispatcher->listen(
            UserCreated::class,
            self::class . '@whenUserCreated',
        );
   }
}

Такая возможность позволяет вам логически группировать несколько событий в одном месте, однако на моей практике подписчики в ларавеле не снискали популярность. Тем не менее, в итоге подписчик разделяется на несколько слушателей. Так работает в ларавеле, так работает и в симфони.

Передаем в метод listen событие(-я) и слушатель:

public function listen($events, $listener)
{
     foreach ((array) $events as $event) {
        if (Str::contains($event, '*')) {
             $this->setupWildcardListen($event, $listener);
        } else {
           $this->listeners[$event][] = $this->makeListener($listener);
        }
     }
}

Вы можете прослушивать несколько событий сразу, для этого используется wildcard, например: 'user_states.*'. В этом случае вы будете ловить в этом слушателе все события вида user_states.create, user_states.change, user_states.ban и так далее. Так или иначе добавляем событие в массив в качестве ключа и в качестве значения — объект слушателя, созданный контейнером. Если вы захотите передать объект сами, вам необходимо будет использовать метод __invoke, чтобы его можно было вызвать как функцию:

public function makeListener($listener, $wildcard = false)
    {
        if (is_string($listener)) {
            return $this->createClassListener($listener, $wildcard);
        }

        return function ($event, $payload) use ($listener, $wildcard) {
            if ($wildcard) {
                return $listener($event, $payload);
            }

            return $listener(...array_values($payload));
        };
    }

Внутри диспатчера ларавел активно использует контейнер. Разумеется, это большой минус ларавел, так как при желании стянуть себе на проект их диспатчер, вы потянете и его (контейнер) тоже. Разработчики ларавел явно не слышали про Common-Closure принцип по отношению к пакетам или почему-то его не любят. По теме принципов хорошего дизайна пакетов советую почитать issue на гитхабе к пакету infection от одного из его авторов.

С замыканиями все понятно, давайте посмотрим, как ларавел создает слушатель из класса-строки:

public function createClassListener($listener, $wildcard = false)
    {
        return function ($event, $payload) use ($listener, $wildcard) {
            if ($wildcard) {
                return call_user_func($this->createClassCallable($listener), $event, $payload);
            }

            return call_user_func_array(
                $this->createClassCallable($listener), $payload
            );
        };
    }

Нам необходимо создать класс как замыкание, чтобы привести слушатели к одному виду, для этого диспатчер вызывает метод createClassCallable:


    protected function createClassCallable($listener)
    {
        [$class, $method] = $this->parseClassCallable($listener);

        if ($this->handlerShouldBeQueued($class)) {
            return $this->createQueuedHandlerCallable($class, $method);
        }

        return [$this->container->make($class), $method];
    }

Парсим строку (помним, что мы зарегистрировали слушатель как строку), получая класс и метод. Напомню, как выглядит регистрация слушателя:

use Illuminate\Foundation\Support\Providers\EventServiceProvider as LaravelEventServiceProvider;

final class EventServiceProvider extends LaravelEventServiceProvider
{
     protected $listen = [
        UserRegistered::class => [
            SendEmailVerificationNotification::class,
            AnotherUserRegisteredListener::class . '@on',
        ],
    ];
}

Если вы не указали метод с помощью конструкции '@on', будет использован метод handle.

Смотрим, нужно ли отложить выполнение слушателя (да, слушатель можно превратить в джобу, если вы вдруг не знали):

if ($this->handlerShouldBeQueued($class)) {
    return $this->createQueuedHandlerCallable($class, $method);
}

И тут можно обнаружить интересные косяки диспатчера. Чтобы вы лучше поняли проблему, о которой я расскажу, давайте посмотрим на то, как можно слушатель превратить в джобу:

final class SomeEventListener implements ShouldQueue
{
    public $connection = 'rabbitmq';

    public $queue = 'queued_events';

    public function __construct() {}

    public function handle(SomeEvent $event) {}
}

Мы помечаем слушатель интерфейсом-маркером ShouldQueue, увидев который, диспатчер поймет, что слушатель необходимо отложить, то есть запустить в очереди. Вы можете настроить то, в какую очередь попадет ваш слушатель и какой драйвер будет использоваться: база данных, кролик, sqs, beanstalkd и так далее. Например, локально вы используете один драйвер (например, базу данных), а на проде — другой. В случае ларавеля это очень удобно, и за это ребят стоит похвалить, так как вам нет нужды поднимать локально брокеры вроде кролика или кафки, потому что все настройки актуальны для всех драйверов, и поэтому локально хватит и базы данных. Однако... вы не сможете переопределить соединение в зависимости от окружения или от чего-либо еще, потому что единственный способ определить соединение — это свойство класса. И нет, даже если вы определите его в конструкторе, где вы смогли бы использовать хелперы вроде config или контекстный биндинг, диспатчер все равно его не увидит. Давайте разберемся, почему:

protected function handlerShouldBeQueued($class)
    {
        try {
            return (new ReflectionClass($class))->implementsInterface(
                ShouldQueue::class
            );
        } catch (Exception $e) {
            return false;
        }
    }

Имплементит ли наш слушатель интерфейс ShouldQueue? Да, значит, откладываем его:

  protected function createQueuedHandlerCallable($class, $method)
    {
        return function () use ($class, $method) {
            $arguments = array_map(function ($a) {
                return is_object($a) ? clone $a : $a;
            }, func_get_args());

            if ($this->handlerWantsToBeQueued($class, $arguments)) {
                $this->queueHandler($class, $method, $arguments);
            }
        };
    }

А может, слушатель все же не хочет выполняться вообще (handlerWantsToBeQueued)? Да, пусть название метода (shouldQueue) не сбивает вас с толку. Если вы подумали, что результат метода влияет на то, как будет исполняться слушатель — синхронно или асинхронно, — то вы ошибаетесь, отрицательный результат приведет к тому, что слушатель не будет выполнен вообще. Именование очень странное, как минимум понятнее было бы назови они его shouldBeExecuted или как-то так:

protected function handlerWantsToBeQueued($class, $arguments)
    {
        $instance = $this->container->make($class);

        if (method_exists($instance, 'shouldQueue')) {
            return $instance->shouldQueue($arguments[0]);
        }

        return true;
    }

Обратите внимание на следующую строку:

$instance = $this->container->make($class);

Абсолютно для всех слушателей, которые хотят отложиться, мы в этом методе всегда создаем полноценный объект с помощью контейнера и вызываем метод shouldQueue. Ну, вызвали и вызвали, а проблема в чем? А проблема в методе queueHandler, вызываемом следом, если слушатель все же надо отложить:


    protected function queueHandler($class, $method, $arguments)
    {
        [$listener, $job] = $this->createListenerAndJob($class, $method, $arguments);

        $connection = $this->resolveQueue()->connection(
            $listener->connection ?? null
        );

        $queue = method_exists($listener, 'viaQueue')
                    ? $listener->viaQueue()
                    : $listener->queue ?? null;

        isset($listener->delay)
                    ? $connection->laterOn($queue, $listener->delay, $job)
                    : $connection->pushOn($queue, $job);
    }

В самом его начале диспатчер вызывает метод createListenerAndJob, чтобы создать слушатель и джобу для него, однако:

protected function createListenerAndJob($class, $method, $arguments)
    {
        $listener = (new ReflectionClass($class))->newInstanceWithoutConstructor();

        return [$listener, $this->propagateListenerOptions(
            $listener, new CallQueuedListener($class, $method, $arguments)
        )];
    }

Однако он зачем-то снова создает инстанс слушателя, но уже с помощью рефлексии и без конструктора. Без конструктора, Карл! Теперь вы понимаете, почему у вас не получится переопределять соединение слушателя, используя конструктор? Зато у вас получится переопределить очередь, куда попадет слушатель, вам достаточно определить метод viaQueue(). Но это, во-первых, чрезвычайно идиотское решение, а во-вторых, вы можете себе представить ситуацию, когда вам надо будет сменить очередь? Чаще всего вам потребуется сменить именно соединение, а не очередь. Я заводил PR на эту тему, на что мне ответили, что у них нет в планах делать так много изменений в диспатчере, но предложили сделать PR с реализацией метода viaConnection (facepalm). Поэтому придется или переопределять диспатчер, или использовать джобы.

Итак, если слушатель надо отложить, диспатчер создает джобу CallQueuedListener. Это специальная джоба-обертка конкретно для слушателей, она принимает класс слушателя, метод и аргументы (объект события). После того, как джоба будет запущена, выполнится метод handle:

public function handle(Container $container)
    {
        $this->prepareData();

        $handler = $this->setJobInstanceIfNecessary(
            $this->job, $container->make($this->class)
        );

        $handler->{$this->method}(...array_values($this->data));
    }

Рассериализуем данные (объект события), создадим инстанс слушателя и выполним метод (handle или любой другой, который вы определили в сервис-провайдере), передав туда объект события. Таким образом, повесив интерфейс ShouldQueue, вы сделали свой синхронный слушатель асинхронным, и это чрезвычайно полезная возможность диспатчера ларавеля, несмотря на проблемы, которые я описал выше.

Вернемся немного назад, туда, где создавалась джоба:

protected function queueHandler($class, $method, $arguments)
    {
        [$listener, $job] = $this->createListenerAndJob($class, $method, $arguments);

        $connection = $this->resolveQueue()->connection(
            $listener->connection ?? null
        );

        $queue = method_exists($listener, 'viaQueue')
                    ? $listener->viaQueue()
                    : $listener->queue ?? null;

        isset($listener->delay)
                    ? $connection->laterOn($queue, $listener->delay, $job)
                    : $connection->pushOn($queue, $job);
    }

Чтобы джобу куда-то запушить, необходимо определить соединение, для этого в диспатчере есть queueResolver, сетится он в провайдере в момент бутстрапа приложения. Используем его, чтобы определить драйвер. Если вы не указали у слушателя свойство connection, будет использован драйвер по умолчанию, указанный в .env переменной QUEUE_CONNECTION. Если указано свойство delay, откладываем джобу, если нет — пушим ее в очередь, после чего она выполнится сразу, как только до нее доберется воркер (демон).

В качестве примера возьмем DatabaseQueue. Для начала мы должны распарсить джобу:

protected function createObjectPayload($job, $queue)
    {
        $payload = $this->withCreatePayloadHooks($queue, [
            'displayName' => $this->getDisplayName($job),
            'job' => 'Illuminate\Queue\CallQueuedHandler@call',
            'maxTries' => $job->tries ?? null,
            'delay' => $this->getJobRetryDelay($job),
            'timeout' => $job->timeout ?? null,
            'timeoutAt' => $this->getJobExpiration($job),
            'data' => [
                'commandName' => $job,
                'command' => $job,
            ],
        ]);

        return array_merge($payload, [
            'data' => [
                'commandName' => get_class($job),
                'command' => serialize(clone $job),
            ],
        ]);
    }

Узнаете структура массива? Если вы хоть раз запускали очередь через базу и смотрели в таблицу jobs, вам это должно быть знакомо. После этого вставляем джобу в базу:

protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
    {
        return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
            $this->getQueue($queue), $payload, $this->availableAt($delay), $attempts
        ));
    } 

protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
    {
        return [
            'queue' => $queue,
            'attempts' => $attempts,
            'reserved_at' => null,
            'available_at' => $availableAt,
            'created_at' => $this->currentTime(),
            'payload' => $payload,
        ];
    }

Дальше уже воркер достанет эти записи и будет их исполнять. Подробнее про очереди и джобы поговорим в отдельной статье, а пока вернемся к слушателям.

Итак, слушатели мы зарегистрировали, что дальше? Дальше мы бросаем событие и смотрим, что происходит:

Если событие помечено интерфейсом ShouldBroadcast, значит, его надо послать по веб-сокетам.

if ($this->shouldBroadcast($payload)) {
      $this->broadcastEvent($payload[0]);
}

Достаем все слушатели для нашего события и вызываем каждый по очереди. Однако если вы передадите третьим аргументом флаг true, вызов слушателей будет идти до тех пор, пока кто-то из них вернет не nullable ответ. Если вернулся ответ, то диспатчер выйдет из цикла, а следующие слушатели выполнены не будут. Также чтобы остановить вызов слушателей, вы можете вернуть в каком-либо из них в качестве ответа false. Во всех остальных случаях диспатчер будет копить ответы от слушателей и вернет их одним массивом после того, как все они будут выполнены.

$responses = [];

foreach ($this->getListeners($event) as $listener) {
     $response = $listener($event, $payload);

     if ($halt && ! is_null($response)) {
         return $response;
     }

     if ($response === false) {
         break;
     }

     $responses[] = $response;
}

return $halt ? null : $responses;

На днях был предложен интересный PR, который позволит вам диспатчить событие внутри транзакции и не бояться, что оно исполнится, если транзакция зафейлится, так как в этом случае диспатчер создаст коллбэк у DatabaseTransactionsManager, который будет исполнен только в случае успешного завершения транзакции. PR хоть и полезный, но в духе разработчиков фреймворка сделан из рук вон плохо: например, автор PR предлагает помечать такие слушатели свойством $dispatchAfterCommit, что, конечно, лишает вас возможности управлять этим свойством в рантайме и добавляет много магии в происходящее. И к тому же вы не сможете использовать эту фичу на версиях фреймворка младше 8. Но можно обойтись своим решением:

final class ExtendedEventDispatcher extends Dispatcher
{
    /**
     * @psalm-param class-string|string $target
     * @param object $event
     * @param string $target
     *
     * @return void
     */
    public function dispatchAfter(object $event, string $target = TransactionCommitted::class): void
    {
        $this->listen($target, function (object $commited) use ($event): void {
            $this->dispatch($event);
        });
    }
}

Не забудем заменить диспатчер на свой:

final class AppServiceProvider extends ServiceProvider
{
   public function register()
   {
        $this->app->singleton('events', function ($app) {
            return (new ExtendedEventDispatcher($app))->setQueueResolver(function () use ($app) {
                return $app->make(QueueFactoryContract::class);
            });
        });
   }
}

Можно пользоваться. На этом обзор диспатчера можно закончить.

loader
10.12.2020 в 19:43
4275
+1
Комментарии
Новый комментарий

Логические задачи с собеседований