| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- <?php
- namespace Illuminate\Bus;
- use Closure;
- use Illuminate\Queue\CallQueuedClosure;
- use Illuminate\Support\Arr;
- use Illuminate\Support\Collection;
- use Laravel\SerializableClosure\SerializableClosure;
- use PHPUnit\Framework\Assert as PHPUnit;
- use RuntimeException;
- use function Illuminate\Support\enum_value;
- trait Queueable
- {
- /**
- * The name of the connection the job should be sent to.
- *
- * @var string|null
- */
- public $connection;
- /**
- * The name of the queue the job should be sent to.
- *
- * @var string|null
- */
- public $queue;
- /**
- * The job "group" the job should be sent to.
- *
- * @var string|null
- */
- public $messageGroup;
- /**
- * The job deduplicator callback the job should use to generate the deduplication ID.
- *
- * @var \Laravel\SerializableClosure\SerializableClosure|null
- */
- public $deduplicator;
- /**
- * The number of seconds before the job should be made available.
- *
- * @var \DateTimeInterface|\DateInterval|array|int|null
- */
- public $delay;
- /**
- * Indicates whether the job should be dispatched after all database transactions have committed.
- *
- * @var bool|null
- */
- public $afterCommit;
- /**
- * The middleware the job should be dispatched through.
- *
- * @var array
- */
- public $middleware = [];
- /**
- * The jobs that should run if this job is successful.
- *
- * @var array
- */
- public $chained = [];
- /**
- * The name of the connection the chain should be sent to.
- *
- * @var string|null
- */
- public $chainConnection;
- /**
- * The name of the queue the chain should be sent to.
- *
- * @var string|null
- */
- public $chainQueue;
- /**
- * The callbacks to be executed on chain failure.
- *
- * @var array|null
- */
- public $chainCatchCallbacks;
- /**
- * Set the desired connection for the job.
- *
- * @param \UnitEnum|string|null $connection
- * @return $this
- */
- public function onConnection($connection)
- {
- $this->connection = enum_value($connection);
- return $this;
- }
- /**
- * Set the desired queue for the job.
- *
- * @param \UnitEnum|string|null $queue
- * @return $this
- */
- public function onQueue($queue)
- {
- $this->queue = enum_value($queue);
- return $this;
- }
- /**
- * Set the desired job "group".
- *
- * This feature is only supported by some queues, such as Amazon SQS.
- *
- * @param \UnitEnum|string $group
- * @return $this
- */
- public function onGroup($group)
- {
- $this->messageGroup = enum_value($group);
- return $this;
- }
- /**
- * Set the desired job deduplicator callback.
- *
- * This feature is only supported by some queues, such as Amazon SQS FIFO.
- *
- * @param callable|null $deduplicator
- * @return $this
- */
- public function withDeduplicator($deduplicator)
- {
- $this->deduplicator = $deduplicator instanceof Closure
- ? new SerializableClosure($deduplicator)
- : $deduplicator;
- return $this;
- }
- /**
- * Set the desired connection for the chain.
- *
- * @param \UnitEnum|string|null $connection
- * @return $this
- */
- public function allOnConnection($connection)
- {
- $resolvedConnection = enum_value($connection);
- $this->chainConnection = $resolvedConnection;
- $this->connection = $resolvedConnection;
- return $this;
- }
- /**
- * Set the desired queue for the chain.
- *
- * @param \UnitEnum|string|null $queue
- * @return $this
- */
- public function allOnQueue($queue)
- {
- $resolvedQueue = enum_value($queue);
- $this->chainQueue = $resolvedQueue;
- $this->queue = $resolvedQueue;
- return $this;
- }
- /**
- * Set the desired delay in seconds for the job.
- *
- * @param \DateTimeInterface|\DateInterval|array|int|null $delay
- * @return $this
- */
- public function delay($delay)
- {
- $this->delay = $delay;
- return $this;
- }
- /**
- * Set the delay for the job to zero seconds.
- *
- * @return $this
- */
- public function withoutDelay()
- {
- $this->delay = 0;
- return $this;
- }
- /**
- * Indicate that the job should be dispatched after all database transactions have committed.
- *
- * @return $this
- */
- public function afterCommit()
- {
- $this->afterCommit = true;
- return $this;
- }
- /**
- * Indicate that the job should not wait until database transactions have been committed before dispatching.
- *
- * @return $this
- */
- public function beforeCommit()
- {
- $this->afterCommit = false;
- return $this;
- }
- /**
- * Specify the middleware the job should be dispatched through.
- *
- * @param array|object $middleware
- * @return $this
- */
- public function through($middleware)
- {
- $this->middleware = Arr::wrap($middleware);
- return $this;
- }
- /**
- * Set the jobs that should run if this job is successful.
- *
- * @param array $chain
- * @return $this
- */
- public function chain($chain)
- {
- $this->chained = ChainedBatch::prepareNestedBatches(new Collection($chain))
- ->map(fn ($job) => $this->serializeJob($job))
- ->all();
- return $this;
- }
- /**
- * Prepend a job to the current chain so that it is run after the currently running job.
- *
- * @param mixed $job
- * @return $this
- */
- public function prependToChain($job)
- {
- $jobs = ChainedBatch::prepareNestedBatches(Collection::wrap($job));
- foreach ($jobs->reverse() as $job) {
- $this->chained = Arr::prepend($this->chained, $this->serializeJob($job));
- }
- return $this;
- }
- /**
- * Append a job to the end of the current chain.
- *
- * @param mixed $job
- * @return $this
- */
- public function appendToChain($job)
- {
- $jobs = ChainedBatch::prepareNestedBatches(Collection::wrap($job));
- foreach ($jobs as $job) {
- $this->chained = array_merge($this->chained, [$this->serializeJob($job)]);
- }
- return $this;
- }
- /**
- * Serialize a job for queuing.
- *
- * @param mixed $job
- * @return string
- *
- * @throws \RuntimeException
- */
- protected function serializeJob($job)
- {
- if ($job instanceof Closure) {
- if (! class_exists(CallQueuedClosure::class)) {
- throw new RuntimeException(
- 'To enable support for closure jobs, please install the illuminate/queue package.'
- );
- }
- $job = CallQueuedClosure::create($job);
- }
- return serialize($job);
- }
- /**
- * Dispatch the next job on the chain.
- *
- * @return void
- */
- public function dispatchNextJobInChain()
- {
- if (! empty($this->chained)) {
- dispatch(tap(unserialize(array_shift($this->chained)), function ($next) {
- $next->chained = $this->chained;
- $next->onConnection($next->connection ?: $this->chainConnection);
- $next->onQueue($next->queue ?: $this->chainQueue);
- $next->chainConnection = $this->chainConnection;
- $next->chainQueue = $this->chainQueue;
- $next->chainCatchCallbacks = $this->chainCatchCallbacks;
- }));
- }
- }
- /**
- * Invoke all of the chain's failed job callbacks.
- *
- * @param \Throwable $e
- * @return void
- */
- public function invokeChainCatchCallbacks($e)
- {
- (new Collection($this->chainCatchCallbacks))->each(function ($callback) use ($e) {
- $callback($e);
- });
- }
- /**
- * Assert that the job has the given chain of jobs attached to it.
- *
- * @param array $expectedChain
- * @return void
- */
- public function assertHasChain($expectedChain)
- {
- PHPUnit::assertTrue(
- (new Collection($expectedChain))->isNotEmpty(),
- 'The expected chain can not be empty.'
- );
- if ((new Collection($expectedChain))->contains(fn ($job) => is_object($job))) {
- $expectedChain = (new Collection($expectedChain))->map(fn ($job) => serialize($job))->all();
- } else {
- $chain = (new Collection($this->chained))->map(fn ($job) => get_class(unserialize($job)))->all();
- }
- PHPUnit::assertTrue(
- $expectedChain === ($chain ?? $this->chained),
- 'The job does not have the expected chain.'
- );
- }
- /**
- * Assert that the job has no remaining chained jobs.
- *
- * @return void
- */
- public function assertDoesntHaveChain()
- {
- PHPUnit::assertEmpty($this->chained, 'The job has chained jobs.');
- }
- }
|