DynamoBatchRepository.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. <?php
  2. namespace Illuminate\Bus;
  3. use Aws\DynamoDb\DynamoDbClient;
  4. use Aws\DynamoDb\Marshaler;
  5. use Carbon\CarbonImmutable;
  6. use Closure;
  7. use Illuminate\Support\Str;
  8. class DynamoBatchRepository implements BatchRepository
  9. {
  10. /**
  11. * The batch factory instance.
  12. *
  13. * @var \Illuminate\Bus\BatchFactory
  14. */
  15. protected $factory;
  16. /**
  17. * The database connection instance.
  18. *
  19. * @var \Aws\DynamoDb\DynamoDbClient
  20. */
  21. protected $dynamoDbClient;
  22. /**
  23. * The application name.
  24. *
  25. * @var string
  26. */
  27. protected $applicationName;
  28. /**
  29. * The table to use to store batch information.
  30. *
  31. * @var string
  32. */
  33. protected $table;
  34. /**
  35. * The time-to-live value for batch records.
  36. *
  37. * @var int
  38. */
  39. protected $ttl;
  40. /**
  41. * The name of the time-to-live attribute for batch records.
  42. *
  43. * @var string
  44. */
  45. protected $ttlAttribute;
  46. /**
  47. * The DynamoDB marshaler instance.
  48. *
  49. * @var \Aws\DynamoDb\Marshaler
  50. */
  51. protected $marshaler;
  52. /**
  53. * Create a new batch repository instance.
  54. */
  55. public function __construct(
  56. BatchFactory $factory,
  57. DynamoDbClient $dynamoDbClient,
  58. string $applicationName,
  59. string $table,
  60. ?int $ttl,
  61. ?string $ttlAttribute,
  62. ) {
  63. $this->factory = $factory;
  64. $this->dynamoDbClient = $dynamoDbClient;
  65. $this->applicationName = $applicationName;
  66. $this->table = $table;
  67. $this->ttl = $ttl;
  68. $this->ttlAttribute = $ttlAttribute;
  69. $this->marshaler = new Marshaler;
  70. }
  71. /**
  72. * Retrieve a list of batches.
  73. *
  74. * @param int $limit
  75. * @param mixed $before
  76. * @return \Illuminate\Bus\Batch[]
  77. */
  78. public function get($limit = 50, $before = null)
  79. {
  80. $condition = 'application = :application';
  81. if ($before) {
  82. $condition = 'application = :application AND id < :id';
  83. }
  84. $result = $this->dynamoDbClient->query([
  85. 'TableName' => $this->table,
  86. 'KeyConditionExpression' => $condition,
  87. 'ExpressionAttributeValues' => array_filter([
  88. ':application' => ['S' => $this->applicationName],
  89. ':id' => array_filter(['S' => $before]),
  90. ]),
  91. 'Limit' => $limit,
  92. 'ScanIndexForward' => false,
  93. ]);
  94. return array_map(
  95. fn ($b) => $this->toBatch($this->marshaler->unmarshalItem($b, mapAsObject: true)),
  96. $result['Items']
  97. );
  98. }
  99. /**
  100. * Retrieve information about an existing batch.
  101. *
  102. * @param string $batchId
  103. * @return \Illuminate\Bus\Batch|null
  104. */
  105. public function find(string $batchId)
  106. {
  107. if (trim($batchId) === '') {
  108. return null;
  109. }
  110. $b = $this->dynamoDbClient->getItem([
  111. 'TableName' => $this->table,
  112. 'Key' => [
  113. 'application' => ['S' => $this->applicationName],
  114. 'id' => ['S' => $batchId],
  115. ],
  116. ]);
  117. if (! isset($b['Item'])) {
  118. // If we didn't find it via a standard read, attempt consistent read...
  119. $b = $this->dynamoDbClient->getItem([
  120. 'TableName' => $this->table,
  121. 'Key' => [
  122. 'application' => ['S' => $this->applicationName],
  123. 'id' => ['S' => $batchId],
  124. ],
  125. 'ConsistentRead' => true,
  126. ]);
  127. if (! isset($b['Item'])) {
  128. return null;
  129. }
  130. }
  131. $batch = $this->marshaler->unmarshalItem($b['Item'], mapAsObject: true);
  132. if ($batch) {
  133. return $this->toBatch($batch);
  134. }
  135. }
  136. /**
  137. * Store a new pending batch.
  138. *
  139. * @param \Illuminate\Bus\PendingBatch $batch
  140. * @return \Illuminate\Bus\Batch
  141. */
  142. public function store(PendingBatch $batch)
  143. {
  144. $id = (string) Str::orderedUuid();
  145. $batch = [
  146. 'id' => $id,
  147. 'name' => $batch->name,
  148. 'total_jobs' => 0,
  149. 'pending_jobs' => 0,
  150. 'failed_jobs' => 0,
  151. 'failed_job_ids' => [],
  152. 'options' => $this->serialize($batch->options ?? []),
  153. 'created_at' => time(),
  154. 'cancelled_at' => null,
  155. 'finished_at' => null,
  156. ];
  157. if (! is_null($this->ttl)) {
  158. $batch[$this->ttlAttribute] = time() + $this->ttl;
  159. }
  160. $this->dynamoDbClient->putItem([
  161. 'TableName' => $this->table,
  162. 'Item' => $this->marshaler->marshalItem(
  163. array_merge(['application' => $this->applicationName], $batch)
  164. ),
  165. ]);
  166. return $this->find($id);
  167. }
  168. /**
  169. * Increment the total number of jobs within the batch.
  170. *
  171. * @param string $batchId
  172. * @param int $amount
  173. * @return void
  174. */
  175. public function incrementTotalJobs(string $batchId, int $amount)
  176. {
  177. $update = 'SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val';
  178. if ($this->ttl) {
  179. $update = "SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val, #{$this->ttlAttribute} = :ttl";
  180. }
  181. $this->dynamoDbClient->updateItem(array_filter([
  182. 'TableName' => $this->table,
  183. 'Key' => [
  184. 'application' => ['S' => $this->applicationName],
  185. 'id' => ['S' => $batchId],
  186. ],
  187. 'UpdateExpression' => $update,
  188. 'ExpressionAttributeValues' => array_filter([
  189. ':val' => ['N' => "$amount"],
  190. ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
  191. ]),
  192. 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
  193. 'ReturnValues' => 'ALL_NEW',
  194. ]));
  195. }
  196. /**
  197. * Decrement the total number of pending jobs for the batch.
  198. *
  199. * @param string $batchId
  200. * @param string $jobId
  201. * @return \Illuminate\Bus\UpdatedBatchJobCounts
  202. */
  203. public function decrementPendingJobs(string $batchId, string $jobId)
  204. {
  205. $update = 'SET pending_jobs = pending_jobs - :inc';
  206. if ($this->ttl !== null) {
  207. $update = "SET pending_jobs = pending_jobs - :inc, #{$this->ttlAttribute} = :ttl";
  208. }
  209. $batch = $this->dynamoDbClient->updateItem(array_filter([
  210. 'TableName' => $this->table,
  211. 'Key' => [
  212. 'application' => ['S' => $this->applicationName],
  213. 'id' => ['S' => $batchId],
  214. ],
  215. 'UpdateExpression' => $update,
  216. 'ExpressionAttributeValues' => array_filter([
  217. ':inc' => ['N' => '1'],
  218. ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
  219. ]),
  220. 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
  221. 'ReturnValues' => 'ALL_NEW',
  222. ]));
  223. $values = $this->marshaler->unmarshalItem($batch['Attributes']);
  224. return new UpdatedBatchJobCounts(
  225. $values['pending_jobs'],
  226. $values['failed_jobs']
  227. );
  228. }
  229. /**
  230. * Increment the total number of failed jobs for the batch.
  231. *
  232. * @param string $batchId
  233. * @param string $jobId
  234. * @return \Illuminate\Bus\UpdatedBatchJobCounts
  235. */
  236. public function incrementFailedJobs(string $batchId, string $jobId)
  237. {
  238. $update = 'SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId)';
  239. if ($this->ttl !== null) {
  240. $update = "SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId), #{$this->ttlAttribute} = :ttl";
  241. }
  242. $batch = $this->dynamoDbClient->updateItem(array_filter([
  243. 'TableName' => $this->table,
  244. 'Key' => [
  245. 'application' => ['S' => $this->applicationName],
  246. 'id' => ['S' => $batchId],
  247. ],
  248. 'UpdateExpression' => $update,
  249. 'ExpressionAttributeValues' => array_filter([
  250. ':jobId' => $this->marshaler->marshalValue([$jobId]),
  251. ':inc' => ['N' => '1'],
  252. ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
  253. ]),
  254. 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
  255. 'ReturnValues' => 'ALL_NEW',
  256. ]));
  257. $values = $this->marshaler->unmarshalItem($batch['Attributes']);
  258. return new UpdatedBatchJobCounts(
  259. $values['pending_jobs'],
  260. $values['failed_jobs']
  261. );
  262. }
  263. /**
  264. * Mark the batch that has the given ID as finished.
  265. *
  266. * @param string $batchId
  267. * @return void
  268. */
  269. public function markAsFinished(string $batchId)
  270. {
  271. $update = 'SET finished_at = :timestamp';
  272. if ($this->ttl !== null) {
  273. $update = "SET finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
  274. }
  275. $this->dynamoDbClient->updateItem(array_filter([
  276. 'TableName' => $this->table,
  277. 'Key' => [
  278. 'application' => ['S' => $this->applicationName],
  279. 'id' => ['S' => $batchId],
  280. ],
  281. 'UpdateExpression' => $update,
  282. 'ExpressionAttributeValues' => array_filter([
  283. ':timestamp' => ['N' => (string) time()],
  284. ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
  285. ]),
  286. 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
  287. ]));
  288. }
  289. /**
  290. * Cancel the batch that has the given ID.
  291. *
  292. * @param string $batchId
  293. * @return void
  294. */
  295. public function cancel(string $batchId)
  296. {
  297. $update = 'SET cancelled_at = :timestamp, finished_at = :timestamp';
  298. if ($this->ttl !== null) {
  299. $update = "SET cancelled_at = :timestamp, finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
  300. }
  301. $this->dynamoDbClient->updateItem(array_filter([
  302. 'TableName' => $this->table,
  303. 'Key' => [
  304. 'application' => ['S' => $this->applicationName],
  305. 'id' => ['S' => $batchId],
  306. ],
  307. 'UpdateExpression' => $update,
  308. 'ExpressionAttributeValues' => array_filter([
  309. ':timestamp' => ['N' => (string) time()],
  310. ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
  311. ]),
  312. 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
  313. ]));
  314. }
  315. /**
  316. * Delete the batch that has the given ID.
  317. *
  318. * @param string $batchId
  319. * @return void
  320. */
  321. public function delete(string $batchId)
  322. {
  323. $this->dynamoDbClient->deleteItem([
  324. 'TableName' => $this->table,
  325. 'Key' => [
  326. 'application' => ['S' => $this->applicationName],
  327. 'id' => ['S' => $batchId],
  328. ],
  329. ]);
  330. }
  331. /**
  332. * Execute the given Closure within a storage specific transaction.
  333. *
  334. * @param \Closure $callback
  335. * @return mixed
  336. */
  337. public function transaction(Closure $callback)
  338. {
  339. return $callback();
  340. }
  341. /**
  342. * Rollback the last database transaction for the connection.
  343. *
  344. * @return void
  345. */
  346. public function rollBack()
  347. {
  348. }
  349. /**
  350. * Convert the given raw batch to a Batch object.
  351. *
  352. * @param object $batch
  353. * @return \Illuminate\Bus\Batch
  354. */
  355. protected function toBatch($batch)
  356. {
  357. return $this->factory->make(
  358. $this,
  359. $batch->id,
  360. $batch->name,
  361. (int) $batch->total_jobs,
  362. (int) $batch->pending_jobs,
  363. (int) $batch->failed_jobs,
  364. $batch->failed_job_ids,
  365. $this->unserialize($batch->options) ?? [],
  366. CarbonImmutable::createFromTimestamp($batch->created_at, date_default_timezone_get()),
  367. $batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at, date_default_timezone_get()) : $batch->cancelled_at,
  368. $batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at, date_default_timezone_get()) : $batch->finished_at
  369. );
  370. }
  371. /**
  372. * Create the underlying DynamoDB table.
  373. *
  374. * @return void
  375. */
  376. public function createAwsDynamoTable(): void
  377. {
  378. $definition = [
  379. 'TableName' => $this->table,
  380. 'AttributeDefinitions' => [
  381. [
  382. 'AttributeName' => 'application',
  383. 'AttributeType' => 'S',
  384. ],
  385. [
  386. 'AttributeName' => 'id',
  387. 'AttributeType' => 'S',
  388. ],
  389. ],
  390. 'KeySchema' => [
  391. [
  392. 'AttributeName' => 'application',
  393. 'KeyType' => 'HASH',
  394. ],
  395. [
  396. 'AttributeName' => 'id',
  397. 'KeyType' => 'RANGE',
  398. ],
  399. ],
  400. 'BillingMode' => 'PAY_PER_REQUEST',
  401. ];
  402. $this->dynamoDbClient->createTable($definition);
  403. if (! is_null($this->ttl)) {
  404. $this->dynamoDbClient->updateTimeToLive([
  405. 'TableName' => $this->table,
  406. 'TimeToLiveSpecification' => [
  407. 'AttributeName' => $this->ttlAttribute,
  408. 'Enabled' => true,
  409. ],
  410. ]);
  411. }
  412. }
  413. /**
  414. * Delete the underlying DynamoDB table.
  415. */
  416. public function deleteAwsDynamoTable(): void
  417. {
  418. $this->dynamoDbClient->deleteTable([
  419. 'TableName' => $this->table,
  420. ]);
  421. }
  422. /**
  423. * Get the expiry time based on the configured time-to-live.
  424. *
  425. * @return string|null
  426. */
  427. protected function getExpiryTime(): ?string
  428. {
  429. return is_null($this->ttl) ? null : (string) (time() + $this->ttl);
  430. }
  431. /**
  432. * Get the expression attribute name for the time-to-live attribute.
  433. *
  434. * @return array
  435. */
  436. protected function ttlExpressionAttributeName(): array
  437. {
  438. return is_null($this->ttl) ? [] : ["#{$this->ttlAttribute}" => $this->ttlAttribute];
  439. }
  440. /**
  441. * Serialize the given value.
  442. *
  443. * @param mixed $value
  444. * @return string
  445. */
  446. protected function serialize($value)
  447. {
  448. return serialize($value);
  449. }
  450. /**
  451. * Unserialize the given value.
  452. *
  453. * @param string $serialized
  454. * @return mixed
  455. */
  456. protected function unserialize($serialized)
  457. {
  458. return unserialize($serialized);
  459. }
  460. /**
  461. * Get the underlying DynamoDB client instance.
  462. *
  463. * @return \Aws\DynamoDb\DynamoDbClient
  464. */
  465. public function getDynamoClient(): DynamoDbClient
  466. {
  467. return $this->dynamoDbClient;
  468. }
  469. /**
  470. * The name of the table that contains the batch records.
  471. *
  472. * @return string
  473. */
  474. public function getTable(): string
  475. {
  476. return $this->table;
  477. }
  478. }