Batch.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508
  1. <?php
  2. namespace Illuminate\Bus;
  3. use Carbon\CarbonImmutable;
  4. use Closure;
  5. use Illuminate\Contracts\Queue\Factory as QueueFactory;
  6. use Illuminate\Contracts\Support\Arrayable;
  7. use Illuminate\Queue\CallQueuedClosure;
  8. use Illuminate\Support\Arr;
  9. use Illuminate\Support\Collection;
  10. use JsonSerializable;
  11. use Throwable;
  12. class Batch implements Arrayable, JsonSerializable
  13. {
  14. /**
  15. * The queue factory implementation.
  16. *
  17. * @var \Illuminate\Contracts\Queue\Factory
  18. */
  19. protected $queue;
  20. /**
  21. * The repository implementation.
  22. *
  23. * @var \Illuminate\Bus\BatchRepository
  24. */
  25. protected $repository;
  26. /**
  27. * The batch ID.
  28. *
  29. * @var string
  30. */
  31. public $id;
  32. /**
  33. * The batch name.
  34. *
  35. * @var string
  36. */
  37. public $name;
  38. /**
  39. * The total number of jobs that belong to the batch.
  40. *
  41. * @var int
  42. */
  43. public $totalJobs;
  44. /**
  45. * The total number of jobs that are still pending.
  46. *
  47. * @var int
  48. */
  49. public $pendingJobs;
  50. /**
  51. * The total number of jobs that have failed.
  52. *
  53. * @var int
  54. */
  55. public $failedJobs;
  56. /**
  57. * The IDs of the jobs that have failed.
  58. *
  59. * @var array
  60. */
  61. public $failedJobIds;
  62. /**
  63. * The batch options.
  64. *
  65. * @var array
  66. */
  67. public $options;
  68. /**
  69. * The date indicating when the batch was created.
  70. *
  71. * @var \Carbon\CarbonImmutable
  72. */
  73. public $createdAt;
  74. /**
  75. * The date indicating when the batch was cancelled.
  76. *
  77. * @var \Carbon\CarbonImmutable|null
  78. */
  79. public $cancelledAt;
  80. /**
  81. * The date indicating when the batch was finished.
  82. *
  83. * @var \Carbon\CarbonImmutable|null
  84. */
  85. public $finishedAt;
  86. /**
  87. * Create a new batch instance.
  88. *
  89. * @param \Illuminate\Contracts\Queue\Factory $queue
  90. * @param \Illuminate\Bus\BatchRepository $repository
  91. * @param string $id
  92. * @param string $name
  93. * @param int $totalJobs
  94. * @param int $pendingJobs
  95. * @param int $failedJobs
  96. * @param array $failedJobIds
  97. * @param array $options
  98. * @param \Carbon\CarbonImmutable $createdAt
  99. * @param \Carbon\CarbonImmutable|null $cancelledAt
  100. * @param \Carbon\CarbonImmutable|null $finishedAt
  101. */
  102. public function __construct(
  103. QueueFactory $queue,
  104. BatchRepository $repository,
  105. string $id,
  106. string $name,
  107. int $totalJobs,
  108. int $pendingJobs,
  109. int $failedJobs,
  110. array $failedJobIds,
  111. array $options,
  112. CarbonImmutable $createdAt,
  113. ?CarbonImmutable $cancelledAt = null,
  114. ?CarbonImmutable $finishedAt = null,
  115. ) {
  116. $this->queue = $queue;
  117. $this->repository = $repository;
  118. $this->id = $id;
  119. $this->name = $name;
  120. $this->totalJobs = $totalJobs;
  121. $this->pendingJobs = $pendingJobs;
  122. $this->failedJobs = $failedJobs;
  123. $this->failedJobIds = $failedJobIds;
  124. $this->options = $options;
  125. $this->createdAt = $createdAt;
  126. $this->cancelledAt = $cancelledAt;
  127. $this->finishedAt = $finishedAt;
  128. }
  129. /**
  130. * Get a fresh instance of the batch represented by this ID.
  131. *
  132. * @return self
  133. */
  134. public function fresh()
  135. {
  136. return $this->repository->find($this->id);
  137. }
  138. /**
  139. * Add additional jobs to the batch.
  140. *
  141. * @param \Illuminate\Support\Enumerable|object|array $jobs
  142. * @return self
  143. */
  144. public function add($jobs)
  145. {
  146. $count = 0;
  147. $jobs = Collection::wrap($jobs)->map(function ($job) use (&$count) {
  148. $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
  149. if (is_array($job)) {
  150. $count += count($job);
  151. $chain = $this->prepareBatchedChain($job);
  152. return $chain->first()
  153. ->allOnQueue($this->options['queue'] ?? null)
  154. ->allOnConnection($this->options['connection'] ?? null)
  155. ->chain($chain->slice(1)->values()->all());
  156. } else {
  157. $job->withBatchId($this->id);
  158. $count++;
  159. }
  160. return $job;
  161. });
  162. $this->repository->transaction(function () use ($jobs, $count) {
  163. $this->repository->incrementTotalJobs($this->id, $count);
  164. $this->queue->connection($this->options['connection'] ?? null)->bulk(
  165. $jobs->all(),
  166. $data = '',
  167. $this->options['queue'] ?? null
  168. );
  169. });
  170. return $this->fresh();
  171. }
  172. /**
  173. * Prepare a chain that exists within the jobs being added.
  174. *
  175. * @param array $chain
  176. * @return \Illuminate\Support\Collection
  177. */
  178. protected function prepareBatchedChain(array $chain)
  179. {
  180. return (new Collection($chain))->map(function ($job) {
  181. $job = $job instanceof Closure ? CallQueuedClosure::create($job) : $job;
  182. return $job->withBatchId($this->id);
  183. });
  184. }
  185. /**
  186. * Get the total number of jobs that have been processed by the batch thus far.
  187. *
  188. * @return int
  189. */
  190. public function processedJobs()
  191. {
  192. return $this->totalJobs - $this->pendingJobs;
  193. }
  194. /**
  195. * Get the percentage of jobs that have been processed (between 0-100).
  196. *
  197. * @return int
  198. */
  199. public function progress()
  200. {
  201. return $this->totalJobs > 0 ? round(($this->processedJobs() / $this->totalJobs) * 100) : 0;
  202. }
  203. /**
  204. * Record that a job within the batch finished successfully, executing any callbacks if necessary.
  205. *
  206. * @param string $jobId
  207. * @return void
  208. */
  209. public function recordSuccessfulJob(string $jobId)
  210. {
  211. $counts = $this->decrementPendingJobs($jobId);
  212. if ($this->hasProgressCallbacks()) {
  213. $this->invokeCallbacks('progress');
  214. }
  215. if ($counts->pendingJobs === 0) {
  216. $this->repository->markAsFinished($this->id);
  217. }
  218. if ($counts->pendingJobs === 0 && $this->hasThenCallbacks()) {
  219. $this->invokeCallbacks('then');
  220. }
  221. if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
  222. $this->invokeCallbacks('finally');
  223. }
  224. }
  225. /**
  226. * Decrement the pending jobs for the batch.
  227. *
  228. * @param string $jobId
  229. * @return \Illuminate\Bus\UpdatedBatchJobCounts
  230. */
  231. public function decrementPendingJobs(string $jobId)
  232. {
  233. return $this->repository->decrementPendingJobs($this->id, $jobId);
  234. }
  235. /**
  236. * Invoke the callbacks of the given type.
  237. */
  238. protected function invokeCallbacks(string $type, ?Throwable $e = null): void
  239. {
  240. $batch = $this->fresh();
  241. foreach ($this->options[$type] ?? [] as $handler) {
  242. $this->invokeHandlerCallback($handler, $batch, $e);
  243. }
  244. }
  245. /**
  246. * Determine if the batch has finished executing.
  247. *
  248. * @return bool
  249. */
  250. public function finished()
  251. {
  252. return ! is_null($this->finishedAt);
  253. }
  254. /**
  255. * Determine if the batch has "progress" callbacks.
  256. *
  257. * @return bool
  258. */
  259. public function hasProgressCallbacks()
  260. {
  261. return isset($this->options['progress']) && ! empty($this->options['progress']);
  262. }
  263. /**
  264. * Determine if the batch has "success" callbacks.
  265. *
  266. * @return bool
  267. */
  268. public function hasThenCallbacks()
  269. {
  270. return isset($this->options['then']) && ! empty($this->options['then']);
  271. }
  272. /**
  273. * Determine if the batch allows jobs to fail without cancelling the batch.
  274. *
  275. * @return bool
  276. */
  277. public function allowsFailures()
  278. {
  279. return Arr::get($this->options, 'allowFailures', false) === true;
  280. }
  281. /**
  282. * Determine if the batch has job failures.
  283. *
  284. * @return bool
  285. */
  286. public function hasFailures()
  287. {
  288. return $this->failedJobs > 0;
  289. }
  290. /**
  291. * Record that a job within the batch failed to finish successfully, executing any callbacks if necessary.
  292. *
  293. * @param string $jobId
  294. * @param \Throwable $e
  295. * @return void
  296. */
  297. public function recordFailedJob(string $jobId, $e)
  298. {
  299. $counts = $this->incrementFailedJobs($jobId);
  300. if ($counts->failedJobs === 1 && ! $this->allowsFailures()) {
  301. $this->cancel();
  302. }
  303. if ($this->allowsFailures()) {
  304. if ($this->hasProgressCallbacks()) {
  305. $this->invokeCallbacks('progress', $e);
  306. }
  307. if ($this->hasFailureCallbacks()) {
  308. $this->invokeCallbacks('failure', $e);
  309. }
  310. }
  311. if ($counts->failedJobs === 1 && $this->hasCatchCallbacks()) {
  312. $this->invokeCallbacks('catch', $e);
  313. }
  314. if ($counts->allJobsHaveRanExactlyOnce() && $this->hasFinallyCallbacks()) {
  315. $this->invokeCallbacks('finally');
  316. }
  317. }
  318. /**
  319. * Increment the failed jobs for the batch.
  320. *
  321. * @param string $jobId
  322. * @return \Illuminate\Bus\UpdatedBatchJobCounts
  323. */
  324. public function incrementFailedJobs(string $jobId)
  325. {
  326. return $this->repository->incrementFailedJobs($this->id, $jobId);
  327. }
  328. /**
  329. * Determine if the batch has "catch" callbacks.
  330. *
  331. * @return bool
  332. */
  333. public function hasCatchCallbacks()
  334. {
  335. return isset($this->options['catch']) && ! empty($this->options['catch']);
  336. }
  337. /**
  338. * Determine if the batch has "failure" callbacks.
  339. */
  340. public function hasFailureCallbacks(): bool
  341. {
  342. return isset($this->options['failure']) && ! empty($this->options['failure']);
  343. }
  344. /**
  345. * Determine if the batch has "finally" callbacks.
  346. *
  347. * @return bool
  348. */
  349. public function hasFinallyCallbacks()
  350. {
  351. return isset($this->options['finally']) && ! empty($this->options['finally']);
  352. }
  353. /**
  354. * Cancel the batch.
  355. *
  356. * @return void
  357. */
  358. public function cancel()
  359. {
  360. $this->repository->cancel($this->id);
  361. }
  362. /**
  363. * Determine if the batch has been cancelled.
  364. *
  365. * @return bool
  366. */
  367. public function canceled()
  368. {
  369. return $this->cancelled();
  370. }
  371. /**
  372. * Determine if the batch has been cancelled.
  373. *
  374. * @return bool
  375. */
  376. public function cancelled()
  377. {
  378. return ! is_null($this->cancelledAt);
  379. }
  380. /**
  381. * Delete the batch from storage.
  382. *
  383. * @return void
  384. */
  385. public function delete()
  386. {
  387. $this->repository->delete($this->id);
  388. }
  389. /**
  390. * Invoke a batch callback handler.
  391. *
  392. * @param callable $handler
  393. * @param \Illuminate\Bus\Batch $batch
  394. * @param \Throwable|null $e
  395. * @return void
  396. */
  397. protected function invokeHandlerCallback($handler, Batch $batch, ?Throwable $e = null)
  398. {
  399. try {
  400. $handler($batch, $e);
  401. } catch (Throwable $e) {
  402. if (function_exists('report')) {
  403. report($e);
  404. }
  405. }
  406. }
  407. /**
  408. * Convert the batch to an array.
  409. *
  410. * @return array
  411. */
  412. public function toArray()
  413. {
  414. return [
  415. 'id' => $this->id,
  416. 'name' => $this->name,
  417. 'totalJobs' => $this->totalJobs,
  418. 'pendingJobs' => $this->pendingJobs,
  419. 'processedJobs' => $this->processedJobs(),
  420. 'progress' => $this->progress(),
  421. 'failedJobs' => $this->failedJobs,
  422. 'options' => $this->options,
  423. 'createdAt' => $this->createdAt,
  424. 'cancelledAt' => $this->cancelledAt,
  425. 'finishedAt' => $this->finishedAt,
  426. ];
  427. }
  428. /**
  429. * Get the JSON serializable representation of the object.
  430. *
  431. * @return array
  432. */
  433. public function jsonSerialize(): array
  434. {
  435. return $this->toArray();
  436. }
  437. /**
  438. * Dynamically access the batch's "options" via properties.
  439. *
  440. * @param string $key
  441. * @return mixed
  442. */
  443. public function __get($key)
  444. {
  445. return $this->options[$key] ?? null;
  446. }
  447. }