| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- <?php
- namespace Illuminate\Bus;
- use Aws\DynamoDb\DynamoDbClient;
- use Aws\DynamoDb\Marshaler;
- use Carbon\CarbonImmutable;
- use Closure;
- use Illuminate\Support\Str;
- class DynamoBatchRepository implements BatchRepository
- {
- /**
- * The batch factory instance.
- *
- * @var \Illuminate\Bus\BatchFactory
- */
- protected $factory;
- /**
- * The database connection instance.
- *
- * @var \Aws\DynamoDb\DynamoDbClient
- */
- protected $dynamoDbClient;
- /**
- * The application name.
- *
- * @var string
- */
- protected $applicationName;
- /**
- * The table to use to store batch information.
- *
- * @var string
- */
- protected $table;
- /**
- * The time-to-live value for batch records.
- *
- * @var int
- */
- protected $ttl;
- /**
- * The name of the time-to-live attribute for batch records.
- *
- * @var string
- */
- protected $ttlAttribute;
- /**
- * The DynamoDB marshaler instance.
- *
- * @var \Aws\DynamoDb\Marshaler
- */
- protected $marshaler;
- /**
- * Create a new batch repository instance.
- */
- public function __construct(
- BatchFactory $factory,
- DynamoDbClient $dynamoDbClient,
- string $applicationName,
- string $table,
- ?int $ttl,
- ?string $ttlAttribute,
- ) {
- $this->factory = $factory;
- $this->dynamoDbClient = $dynamoDbClient;
- $this->applicationName = $applicationName;
- $this->table = $table;
- $this->ttl = $ttl;
- $this->ttlAttribute = $ttlAttribute;
- $this->marshaler = new Marshaler;
- }
- /**
- * Retrieve a list of batches.
- *
- * @param int $limit
- * @param mixed $before
- * @return \Illuminate\Bus\Batch[]
- */
- public function get($limit = 50, $before = null)
- {
- $condition = 'application = :application';
- if ($before) {
- $condition = 'application = :application AND id < :id';
- }
- $result = $this->dynamoDbClient->query([
- 'TableName' => $this->table,
- 'KeyConditionExpression' => $condition,
- 'ExpressionAttributeValues' => array_filter([
- ':application' => ['S' => $this->applicationName],
- ':id' => array_filter(['S' => $before]),
- ]),
- 'Limit' => $limit,
- 'ScanIndexForward' => false,
- ]);
- return array_map(
- fn ($b) => $this->toBatch($this->marshaler->unmarshalItem($b, mapAsObject: true)),
- $result['Items']
- );
- }
- /**
- * Retrieve information about an existing batch.
- *
- * @param string $batchId
- * @return \Illuminate\Bus\Batch|null
- */
- public function find(string $batchId)
- {
- if (trim($batchId) === '') {
- return null;
- }
- $b = $this->dynamoDbClient->getItem([
- 'TableName' => $this->table,
- 'Key' => [
- 'application' => ['S' => $this->applicationName],
- 'id' => ['S' => $batchId],
- ],
- ]);
- if (! isset($b['Item'])) {
- // If we didn't find it via a standard read, attempt consistent read...
- $b = $this->dynamoDbClient->getItem([
- 'TableName' => $this->table,
- 'Key' => [
- 'application' => ['S' => $this->applicationName],
- 'id' => ['S' => $batchId],
- ],
- 'ConsistentRead' => true,
- ]);
- if (! isset($b['Item'])) {
- return null;
- }
- }
- $batch = $this->marshaler->unmarshalItem($b['Item'], mapAsObject: true);
- if ($batch) {
- return $this->toBatch($batch);
- }
- }
- /**
- * Store a new pending batch.
- *
- * @param \Illuminate\Bus\PendingBatch $batch
- * @return \Illuminate\Bus\Batch
- */
- public function store(PendingBatch $batch)
- {
- $id = (string) Str::orderedUuid();
- $batch = [
- 'id' => $id,
- 'name' => $batch->name,
- 'total_jobs' => 0,
- 'pending_jobs' => 0,
- 'failed_jobs' => 0,
- 'failed_job_ids' => [],
- 'options' => $this->serialize($batch->options ?? []),
- 'created_at' => time(),
- 'cancelled_at' => null,
- 'finished_at' => null,
- ];
- if (! is_null($this->ttl)) {
- $batch[$this->ttlAttribute] = time() + $this->ttl;
- }
- $this->dynamoDbClient->putItem([
- 'TableName' => $this->table,
- 'Item' => $this->marshaler->marshalItem(
- array_merge(['application' => $this->applicationName], $batch)
- ),
- ]);
- return $this->find($id);
- }
- /**
- * Increment the total number of jobs within the batch.
- *
- * @param string $batchId
- * @param int $amount
- * @return void
- */
- public function incrementTotalJobs(string $batchId, int $amount)
- {
- $update = 'SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val';
- if ($this->ttl) {
- $update = "SET total_jobs = total_jobs + :val, pending_jobs = pending_jobs + :val, #{$this->ttlAttribute} = :ttl";
- }
- $this->dynamoDbClient->updateItem(array_filter([
- 'TableName' => $this->table,
- 'Key' => [
- 'application' => ['S' => $this->applicationName],
- 'id' => ['S' => $batchId],
- ],
- 'UpdateExpression' => $update,
- 'ExpressionAttributeValues' => array_filter([
- ':val' => ['N' => "$amount"],
- ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
- ]),
- 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
- 'ReturnValues' => 'ALL_NEW',
- ]));
- }
- /**
- * Decrement the total number of pending jobs for the batch.
- *
- * @param string $batchId
- * @param string $jobId
- * @return \Illuminate\Bus\UpdatedBatchJobCounts
- */
- public function decrementPendingJobs(string $batchId, string $jobId)
- {
- $update = 'SET pending_jobs = pending_jobs - :inc';
- if ($this->ttl !== null) {
- $update = "SET pending_jobs = pending_jobs - :inc, #{$this->ttlAttribute} = :ttl";
- }
- $batch = $this->dynamoDbClient->updateItem(array_filter([
- 'TableName' => $this->table,
- 'Key' => [
- 'application' => ['S' => $this->applicationName],
- 'id' => ['S' => $batchId],
- ],
- 'UpdateExpression' => $update,
- 'ExpressionAttributeValues' => array_filter([
- ':inc' => ['N' => '1'],
- ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
- ]),
- 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
- 'ReturnValues' => 'ALL_NEW',
- ]));
- $values = $this->marshaler->unmarshalItem($batch['Attributes']);
- return new UpdatedBatchJobCounts(
- $values['pending_jobs'],
- $values['failed_jobs']
- );
- }
- /**
- * Increment the total number of failed jobs for the batch.
- *
- * @param string $batchId
- * @param string $jobId
- * @return \Illuminate\Bus\UpdatedBatchJobCounts
- */
- public function incrementFailedJobs(string $batchId, string $jobId)
- {
- $update = 'SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId)';
- if ($this->ttl !== null) {
- $update = "SET failed_jobs = failed_jobs + :inc, failed_job_ids = list_append(failed_job_ids, :jobId), #{$this->ttlAttribute} = :ttl";
- }
- $batch = $this->dynamoDbClient->updateItem(array_filter([
- 'TableName' => $this->table,
- 'Key' => [
- 'application' => ['S' => $this->applicationName],
- 'id' => ['S' => $batchId],
- ],
- 'UpdateExpression' => $update,
- 'ExpressionAttributeValues' => array_filter([
- ':jobId' => $this->marshaler->marshalValue([$jobId]),
- ':inc' => ['N' => '1'],
- ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
- ]),
- 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
- 'ReturnValues' => 'ALL_NEW',
- ]));
- $values = $this->marshaler->unmarshalItem($batch['Attributes']);
- return new UpdatedBatchJobCounts(
- $values['pending_jobs'],
- $values['failed_jobs']
- );
- }
- /**
- * Mark the batch that has the given ID as finished.
- *
- * @param string $batchId
- * @return void
- */
- public function markAsFinished(string $batchId)
- {
- $update = 'SET finished_at = :timestamp';
- if ($this->ttl !== null) {
- $update = "SET finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
- }
- $this->dynamoDbClient->updateItem(array_filter([
- 'TableName' => $this->table,
- 'Key' => [
- 'application' => ['S' => $this->applicationName],
- 'id' => ['S' => $batchId],
- ],
- 'UpdateExpression' => $update,
- 'ExpressionAttributeValues' => array_filter([
- ':timestamp' => ['N' => (string) time()],
- ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
- ]),
- 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
- ]));
- }
- /**
- * Cancel the batch that has the given ID.
- *
- * @param string $batchId
- * @return void
- */
- public function cancel(string $batchId)
- {
- $update = 'SET cancelled_at = :timestamp, finished_at = :timestamp';
- if ($this->ttl !== null) {
- $update = "SET cancelled_at = :timestamp, finished_at = :timestamp, #{$this->ttlAttribute} = :ttl";
- }
- $this->dynamoDbClient->updateItem(array_filter([
- 'TableName' => $this->table,
- 'Key' => [
- 'application' => ['S' => $this->applicationName],
- 'id' => ['S' => $batchId],
- ],
- 'UpdateExpression' => $update,
- 'ExpressionAttributeValues' => array_filter([
- ':timestamp' => ['N' => (string) time()],
- ':ttl' => array_filter(['N' => $this->getExpiryTime()]),
- ]),
- 'ExpressionAttributeNames' => $this->ttlExpressionAttributeName(),
- ]));
- }
- /**
- * Delete the batch that has the given ID.
- *
- * @param string $batchId
- * @return void
- */
- public function delete(string $batchId)
- {
- $this->dynamoDbClient->deleteItem([
- 'TableName' => $this->table,
- 'Key' => [
- 'application' => ['S' => $this->applicationName],
- 'id' => ['S' => $batchId],
- ],
- ]);
- }
- /**
- * Execute the given Closure within a storage specific transaction.
- *
- * @param \Closure $callback
- * @return mixed
- */
- public function transaction(Closure $callback)
- {
- return $callback();
- }
- /**
- * Rollback the last database transaction for the connection.
- *
- * @return void
- */
- public function rollBack()
- {
- }
- /**
- * Convert the given raw batch to a Batch object.
- *
- * @param object $batch
- * @return \Illuminate\Bus\Batch
- */
- protected function toBatch($batch)
- {
- return $this->factory->make(
- $this,
- $batch->id,
- $batch->name,
- (int) $batch->total_jobs,
- (int) $batch->pending_jobs,
- (int) $batch->failed_jobs,
- $batch->failed_job_ids,
- $this->unserialize($batch->options) ?? [],
- CarbonImmutable::createFromTimestamp($batch->created_at, date_default_timezone_get()),
- $batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at, date_default_timezone_get()) : $batch->cancelled_at,
- $batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at, date_default_timezone_get()) : $batch->finished_at
- );
- }
- /**
- * Create the underlying DynamoDB table.
- *
- * @return void
- */
- public function createAwsDynamoTable(): void
- {
- $definition = [
- 'TableName' => $this->table,
- 'AttributeDefinitions' => [
- [
- 'AttributeName' => 'application',
- 'AttributeType' => 'S',
- ],
- [
- 'AttributeName' => 'id',
- 'AttributeType' => 'S',
- ],
- ],
- 'KeySchema' => [
- [
- 'AttributeName' => 'application',
- 'KeyType' => 'HASH',
- ],
- [
- 'AttributeName' => 'id',
- 'KeyType' => 'RANGE',
- ],
- ],
- 'BillingMode' => 'PAY_PER_REQUEST',
- ];
- $this->dynamoDbClient->createTable($definition);
- if (! is_null($this->ttl)) {
- $this->dynamoDbClient->updateTimeToLive([
- 'TableName' => $this->table,
- 'TimeToLiveSpecification' => [
- 'AttributeName' => $this->ttlAttribute,
- 'Enabled' => true,
- ],
- ]);
- }
- }
- /**
- * Delete the underlying DynamoDB table.
- */
- public function deleteAwsDynamoTable(): void
- {
- $this->dynamoDbClient->deleteTable([
- 'TableName' => $this->table,
- ]);
- }
- /**
- * Get the expiry time based on the configured time-to-live.
- *
- * @return string|null
- */
- protected function getExpiryTime(): ?string
- {
- return is_null($this->ttl) ? null : (string) (time() + $this->ttl);
- }
- /**
- * Get the expression attribute name for the time-to-live attribute.
- *
- * @return array
- */
- protected function ttlExpressionAttributeName(): array
- {
- return is_null($this->ttl) ? [] : ["#{$this->ttlAttribute}" => $this->ttlAttribute];
- }
- /**
- * Serialize the given value.
- *
- * @param mixed $value
- * @return string
- */
- protected function serialize($value)
- {
- return serialize($value);
- }
- /**
- * Unserialize the given value.
- *
- * @param string $serialized
- * @return mixed
- */
- protected function unserialize($serialized)
- {
- return unserialize($serialized);
- }
- /**
- * Get the underlying DynamoDB client instance.
- *
- * @return \Aws\DynamoDb\DynamoDbClient
- */
- public function getDynamoClient(): DynamoDbClient
- {
- return $this->dynamoDbClient;
- }
- /**
- * The name of the table that contains the batch records.
- *
- * @return string
- */
- public function getTable(): string
- {
- return $this->table;
- }
- }
|