| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- <?php
- namespace Illuminate\Bus;
- use Closure;
- use Illuminate\Contracts\Bus\QueueingDispatcher;
- use Illuminate\Contracts\Container\Container;
- use Illuminate\Contracts\Queue\Queue;
- use Illuminate\Contracts\Queue\ShouldQueue;
- use Illuminate\Foundation\Bus\PendingChain;
- use Illuminate\Pipeline\Pipeline;
- use Illuminate\Queue\InteractsWithQueue;
- use Illuminate\Queue\Jobs\SyncJob;
- use Illuminate\Support\Collection;
- use RuntimeException;
- class Dispatcher implements QueueingDispatcher
- {
- /**
- * The container implementation.
- *
- * @var \Illuminate\Contracts\Container\Container
- */
- protected $container;
- /**
- * The pipeline instance for the bus.
- *
- * @var \Illuminate\Pipeline\Pipeline
- */
- protected $pipeline;
- /**
- * The pipes to send commands through before dispatching.
- *
- * @var array
- */
- protected $pipes = [];
- /**
- * The command to handler mapping for non-self-handling events.
- *
- * @var array
- */
- protected $handlers = [];
- /**
- * The queue resolver callback.
- *
- * @var \Closure|null
- */
- protected $queueResolver;
- /**
- * Indicates if dispatching after response is disabled.
- *
- * @var bool
- */
- protected $allowsDispatchingAfterResponses = true;
- /**
- * Create a new command dispatcher instance.
- */
- public function __construct(Container $container, ?Closure $queueResolver = null)
- {
- $this->container = $container;
- $this->queueResolver = $queueResolver;
- $this->pipeline = new Pipeline($container);
- }
- /**
- * Dispatch a command to its appropriate handler.
- *
- * @param mixed $command
- * @return mixed
- */
- public function dispatch($command)
- {
- return $this->queueResolver && $this->commandShouldBeQueued($command)
- ? $this->dispatchToQueue($command)
- : $this->dispatchNow($command);
- }
- /**
- * Dispatch a command to its appropriate handler in the current process.
- *
- * Queueable jobs will be dispatched to the "sync" queue.
- *
- * @param mixed $command
- * @param mixed $handler
- * @return mixed
- */
- public function dispatchSync($command, $handler = null)
- {
- if ($this->queueResolver &&
- $this->commandShouldBeQueued($command) &&
- method_exists($command, 'onConnection')) {
- return $this->dispatchToQueue($command->onConnection('sync'));
- }
- return $this->dispatchNow($command, $handler);
- }
- /**
- * Dispatch a command to its appropriate handler in the current process without using the synchronous queue.
- *
- * @param mixed $command
- * @param mixed $handler
- * @return mixed
- */
- public function dispatchNow($command, $handler = null)
- {
- $uses = class_uses_recursive($command);
- if (isset($uses[InteractsWithQueue::class], $uses[Queueable::class]) && ! $command->job) {
- $command->setJob(new SyncJob($this->container, json_encode([]), 'sync', 'sync'));
- }
- if ($handler || $handler = $this->getCommandHandler($command)) {
- $callback = function ($command) use ($handler) {
- $method = method_exists($handler, 'handle') ? 'handle' : '__invoke';
- return $handler->{$method}($command);
- };
- } else {
- $callback = function ($command) {
- $method = method_exists($command, 'handle') ? 'handle' : '__invoke';
- return $this->container->call([$command, $method]);
- };
- }
- return $this->pipeline->send($command)->through($this->pipes)->then($callback);
- }
- /**
- * Attempt to find the batch with the given ID.
- *
- * @return \Illuminate\Bus\Batch|null
- */
- public function findBatch(string $batchId)
- {
- return $this->container->make(BatchRepository::class)->find($batchId);
- }
- /**
- * Create a new batch of queueable jobs.
- *
- * @param \Illuminate\Support\Collection|mixed $jobs
- * @return \Illuminate\Bus\PendingBatch
- */
- public function batch($jobs)
- {
- return new PendingBatch($this->container, Collection::wrap($jobs));
- }
- /**
- * Create a new chain of queueable jobs.
- *
- * @param \Illuminate\Support\Collection|array|null $jobs
- * @return \Illuminate\Foundation\Bus\PendingChain
- */
- public function chain($jobs = null)
- {
- $jobs = Collection::wrap($jobs);
- $jobs = ChainedBatch::prepareNestedBatches($jobs);
- return new PendingChain($jobs->shift(), $jobs->toArray());
- }
- /**
- * Determine if the given command has a handler.
- *
- * @param mixed $command
- * @return bool
- */
- public function hasCommandHandler($command)
- {
- return array_key_exists(get_class($command), $this->handlers);
- }
- /**
- * Retrieve the handler for a command.
- *
- * @param mixed $command
- * @return mixed
- */
- public function getCommandHandler($command)
- {
- if ($this->hasCommandHandler($command)) {
- return $this->container->make($this->handlers[get_class($command)]);
- }
- return false;
- }
- /**
- * Determine if the given command should be queued.
- *
- * @param mixed $command
- * @return bool
- */
- protected function commandShouldBeQueued($command)
- {
- return $command instanceof ShouldQueue;
- }
- /**
- * Dispatch a command to its appropriate handler behind a queue.
- *
- * @param mixed $command
- * @return mixed
- *
- * @throws \RuntimeException
- */
- public function dispatchToQueue($command)
- {
- $connection = $command->connection ?? null;
- $queue = ($this->queueResolver)($connection);
- if (! $queue instanceof Queue) {
- throw new RuntimeException('Queue resolver did not return a Queue implementation.');
- }
- if (method_exists($command, 'queue')) {
- return $command->queue($queue, $command);
- }
- return $this->pushCommandToQueue($queue, $command);
- }
- /**
- * Push the command onto the given queue instance.
- *
- * @param \Illuminate\Contracts\Queue\Queue $queue
- * @param mixed $command
- * @return mixed
- */
- protected function pushCommandToQueue($queue, $command)
- {
- if (isset($command->delay)) {
- return $queue->later($command->delay, $command, queue: $command->queue ?? null);
- }
- return $queue->push($command, queue: $command->queue ?? null);
- }
- /**
- * Dispatch a command to its appropriate handler after the current process.
- *
- * @param mixed $command
- * @param mixed $handler
- * @return void
- */
- public function dispatchAfterResponse($command, $handler = null)
- {
- if (! $this->allowsDispatchingAfterResponses) {
- $this->dispatchSync($command);
- return;
- }
- $this->container->terminating(function () use ($command, $handler) {
- $this->dispatchSync($command, $handler);
- });
- }
- /**
- * Set the pipes through which commands should be piped before dispatching.
- *
- * @return $this
- */
- public function pipeThrough(array $pipes)
- {
- $this->pipes = $pipes;
- return $this;
- }
- /**
- * Map a command to a handler.
- *
- * @return $this
- */
- public function map(array $map)
- {
- $this->handlers = array_merge($this->handlers, $map);
- return $this;
- }
- /**
- * Allow dispatching after responses.
- *
- * @return $this
- */
- public function withDispatchingAfterResponses()
- {
- $this->allowsDispatchingAfterResponses = true;
- return $this;
- }
- /**
- * Disable dispatching after responses.
- *
- * @return $this
- */
- public function withoutDispatchingAfterResponses()
- {
- $this->allowsDispatchingAfterResponses = false;
- return $this;
- }
- }
|