DatabaseBatchRepository.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. <?php
  2. namespace Illuminate\Bus;
  3. use Carbon\CarbonImmutable;
  4. use Closure;
  5. use DateTimeInterface;
  6. use Illuminate\Database\Connection;
  7. use Illuminate\Database\PostgresConnection;
  8. use Illuminate\Database\Query\Expression;
  9. use Illuminate\Support\Str;
  10. use Throwable;
  11. class DatabaseBatchRepository implements PrunableBatchRepository
  12. {
  13. /**
  14. * The batch factory instance.
  15. *
  16. * @var \Illuminate\Bus\BatchFactory
  17. */
  18. protected $factory;
  19. /**
  20. * The database connection instance.
  21. *
  22. * @var \Illuminate\Database\Connection
  23. */
  24. protected $connection;
  25. /**
  26. * The database table to use to store batch information.
  27. *
  28. * @var string
  29. */
  30. protected $table;
  31. /**
  32. * Create a new batch repository instance.
  33. *
  34. * @param \Illuminate\Bus\BatchFactory $factory
  35. * @param \Illuminate\Database\Connection $connection
  36. * @param string $table
  37. */
  38. public function __construct(BatchFactory $factory, Connection $connection, string $table)
  39. {
  40. $this->factory = $factory;
  41. $this->connection = $connection;
  42. $this->table = $table;
  43. }
  44. /**
  45. * Retrieve a list of batches.
  46. *
  47. * @param int $limit
  48. * @param mixed $before
  49. * @return \Illuminate\Bus\Batch[]
  50. */
  51. public function get($limit = 50, $before = null)
  52. {
  53. return $this->connection->table($this->table)
  54. ->orderByDesc('id')
  55. ->limit($limit)
  56. ->when($before, fn ($q) => $q->where('id', '<', $before))
  57. ->get()
  58. ->map(function ($batch) {
  59. return $this->toBatch($batch);
  60. })
  61. ->all();
  62. }
  63. /**
  64. * Retrieve information about an existing batch.
  65. *
  66. * @param string $batchId
  67. * @return \Illuminate\Bus\Batch|null
  68. */
  69. public function find(string $batchId)
  70. {
  71. $batch = $this->connection->table($this->table)
  72. ->useWritePdo()
  73. ->where('id', $batchId)
  74. ->first();
  75. if ($batch) {
  76. return $this->toBatch($batch);
  77. }
  78. }
  79. /**
  80. * Store a new pending batch.
  81. *
  82. * @param \Illuminate\Bus\PendingBatch $batch
  83. * @return \Illuminate\Bus\Batch
  84. */
  85. public function store(PendingBatch $batch)
  86. {
  87. $id = (string) Str::orderedUuid();
  88. $this->connection->table($this->table)->insert([
  89. 'id' => $id,
  90. 'name' => $batch->name,
  91. 'total_jobs' => 0,
  92. 'pending_jobs' => 0,
  93. 'failed_jobs' => 0,
  94. 'failed_job_ids' => '[]',
  95. 'options' => $this->serialize($batch->options),
  96. 'created_at' => time(),
  97. 'cancelled_at' => null,
  98. 'finished_at' => null,
  99. ]);
  100. return $this->find($id);
  101. }
  102. /**
  103. * Increment the total number of jobs within the batch.
  104. *
  105. * @param string $batchId
  106. * @param int $amount
  107. * @return void
  108. */
  109. public function incrementTotalJobs(string $batchId, int $amount)
  110. {
  111. $this->connection->table($this->table)->where('id', $batchId)->update([
  112. 'total_jobs' => new Expression('total_jobs + '.$amount),
  113. 'pending_jobs' => new Expression('pending_jobs + '.$amount),
  114. 'finished_at' => null,
  115. ]);
  116. }
  117. /**
  118. * Decrement the total number of pending jobs for the batch.
  119. *
  120. * @param string $batchId
  121. * @param string $jobId
  122. * @return \Illuminate\Bus\UpdatedBatchJobCounts
  123. */
  124. public function decrementPendingJobs(string $batchId, string $jobId)
  125. {
  126. $values = $this->updateAtomicValues($batchId, function ($batch) use ($jobId) {
  127. return [
  128. 'pending_jobs' => $batch->pending_jobs - 1,
  129. 'failed_jobs' => $batch->failed_jobs,
  130. 'failed_job_ids' => json_encode(array_values(array_diff((array) json_decode($batch->failed_job_ids, true), [$jobId]))),
  131. ];
  132. });
  133. return new UpdatedBatchJobCounts(
  134. $values['pending_jobs'],
  135. $values['failed_jobs']
  136. );
  137. }
  138. /**
  139. * Increment the total number of failed jobs for the batch.
  140. *
  141. * @param string $batchId
  142. * @param string $jobId
  143. * @return \Illuminate\Bus\UpdatedBatchJobCounts
  144. */
  145. public function incrementFailedJobs(string $batchId, string $jobId)
  146. {
  147. $values = $this->updateAtomicValues($batchId, function ($batch) use ($jobId) {
  148. return [
  149. 'pending_jobs' => $batch->pending_jobs,
  150. 'failed_jobs' => $batch->failed_jobs + 1,
  151. 'failed_job_ids' => json_encode(array_values(array_unique(array_merge((array) json_decode($batch->failed_job_ids, true), [$jobId])))),
  152. ];
  153. });
  154. return new UpdatedBatchJobCounts(
  155. $values['pending_jobs'],
  156. $values['failed_jobs']
  157. );
  158. }
  159. /**
  160. * Update an atomic value within the batch.
  161. *
  162. * @param string $batchId
  163. * @param \Closure $callback
  164. * @return int|null
  165. */
  166. protected function updateAtomicValues(string $batchId, Closure $callback)
  167. {
  168. return $this->connection->transaction(function () use ($batchId, $callback) {
  169. $batch = $this->connection->table($this->table)->where('id', $batchId)
  170. ->lockForUpdate()
  171. ->first();
  172. return is_null($batch) ? [] : tap($callback($batch), function ($values) use ($batchId) {
  173. $this->connection->table($this->table)->where('id', $batchId)->update($values);
  174. });
  175. });
  176. }
  177. /**
  178. * Mark the batch that has the given ID as finished.
  179. *
  180. * @param string $batchId
  181. * @return void
  182. */
  183. public function markAsFinished(string $batchId)
  184. {
  185. $this->connection->table($this->table)->where('id', $batchId)->update([
  186. 'finished_at' => time(),
  187. ]);
  188. }
  189. /**
  190. * Cancel the batch that has the given ID.
  191. *
  192. * @param string $batchId
  193. * @return void
  194. */
  195. public function cancel(string $batchId)
  196. {
  197. $this->connection->table($this->table)->where('id', $batchId)->update([
  198. 'cancelled_at' => time(),
  199. 'finished_at' => time(),
  200. ]);
  201. }
  202. /**
  203. * Delete the batch that has the given ID.
  204. *
  205. * @param string $batchId
  206. * @return void
  207. */
  208. public function delete(string $batchId)
  209. {
  210. $this->connection->table($this->table)->where('id', $batchId)->delete();
  211. }
  212. /**
  213. * Prune all of the entries older than the given date.
  214. *
  215. * @param \DateTimeInterface $before
  216. * @return int
  217. */
  218. public function prune(DateTimeInterface $before)
  219. {
  220. $query = $this->connection->table($this->table)
  221. ->whereNotNull('finished_at')
  222. ->where('finished_at', '<', $before->getTimestamp());
  223. $totalDeleted = 0;
  224. do {
  225. $deleted = $query->limit(1000)->delete();
  226. $totalDeleted += $deleted;
  227. } while ($deleted !== 0);
  228. return $totalDeleted;
  229. }
  230. /**
  231. * Prune all of the unfinished entries older than the given date.
  232. *
  233. * @param \DateTimeInterface $before
  234. * @return int
  235. */
  236. public function pruneUnfinished(DateTimeInterface $before)
  237. {
  238. $query = $this->connection->table($this->table)
  239. ->whereNull('finished_at')
  240. ->where('created_at', '<', $before->getTimestamp());
  241. $totalDeleted = 0;
  242. do {
  243. $deleted = $query->limit(1000)->delete();
  244. $totalDeleted += $deleted;
  245. } while ($deleted !== 0);
  246. return $totalDeleted;
  247. }
  248. /**
  249. * Prune all of the cancelled entries older than the given date.
  250. *
  251. * @param \DateTimeInterface $before
  252. * @return int
  253. */
  254. public function pruneCancelled(DateTimeInterface $before)
  255. {
  256. $query = $this->connection->table($this->table)
  257. ->whereNotNull('cancelled_at')
  258. ->where('created_at', '<', $before->getTimestamp());
  259. $totalDeleted = 0;
  260. do {
  261. $deleted = $query->limit(1000)->delete();
  262. $totalDeleted += $deleted;
  263. } while ($deleted !== 0);
  264. return $totalDeleted;
  265. }
  266. /**
  267. * Execute the given Closure within a storage specific transaction.
  268. *
  269. * @param \Closure $callback
  270. * @return mixed
  271. */
  272. public function transaction(Closure $callback)
  273. {
  274. return $this->connection->transaction(fn () => $callback());
  275. }
  276. /**
  277. * Rollback the last database transaction for the connection.
  278. *
  279. * @return void
  280. */
  281. public function rollBack()
  282. {
  283. $this->connection->rollBack(toLevel: 0);
  284. }
  285. /**
  286. * Serialize the given value.
  287. *
  288. * @param mixed $value
  289. * @return string
  290. */
  291. protected function serialize($value)
  292. {
  293. $serialized = serialize($value);
  294. return $this->connection instanceof PostgresConnection
  295. ? base64_encode($serialized)
  296. : $serialized;
  297. }
  298. /**
  299. * Unserialize the given value.
  300. *
  301. * @param string $serialized
  302. * @return mixed
  303. */
  304. protected function unserialize($serialized)
  305. {
  306. if ($this->connection instanceof PostgresConnection &&
  307. ! Str::contains($serialized, [':', ';'])) {
  308. $serialized = base64_decode($serialized);
  309. }
  310. try {
  311. return unserialize($serialized);
  312. } catch (Throwable) {
  313. return [];
  314. }
  315. }
  316. /**
  317. * Convert the given raw batch to a Batch object.
  318. *
  319. * @param object $batch
  320. * @return \Illuminate\Bus\Batch
  321. */
  322. protected function toBatch($batch)
  323. {
  324. return $this->factory->make(
  325. $this,
  326. $batch->id,
  327. $batch->name,
  328. (int) $batch->total_jobs,
  329. (int) $batch->pending_jobs,
  330. (int) $batch->failed_jobs,
  331. (array) json_decode($batch->failed_job_ids, true),
  332. $this->unserialize($batch->options),
  333. CarbonImmutable::createFromTimestamp($batch->created_at, date_default_timezone_get()),
  334. $batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at, date_default_timezone_get()) : $batch->cancelled_at,
  335. $batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at, date_default_timezone_get()) : $batch->finished_at
  336. );
  337. }
  338. /**
  339. * Get the underlying database connection.
  340. *
  341. * @return \Illuminate\Database\Connection
  342. */
  343. public function getConnection()
  344. {
  345. return $this->connection;
  346. }
  347. /**
  348. * Set the underlying database connection.
  349. *
  350. * @param \Illuminate\Database\Connection $connection
  351. * @return void
  352. */
  353. public function setConnection(Connection $connection)
  354. {
  355. $this->connection = $connection;
  356. }
  357. }