| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- <?php
- namespace Illuminate\Redis\Limiters;
- use Illuminate\Contracts\Redis\LimiterTimeoutException;
- use Illuminate\Support\Sleep;
- use Illuminate\Support\Str;
- use Throwable;
- class ConcurrencyLimiter
- {
- /**
- * The Redis factory implementation.
- *
- * @var \Illuminate\Redis\Connections\Connection
- */
- protected $redis;
- /**
- * The name of the limiter.
- *
- * @var string
- */
- protected $name;
- /**
- * The allowed number of concurrent tasks.
- *
- * @var int
- */
- protected $maxLocks;
- /**
- * The number of seconds a slot should be maintained.
- *
- * @var int
- */
- protected $releaseAfter;
- /**
- * Create a new concurrency limiter instance.
- *
- * @param \Illuminate\Redis\Connections\Connection $redis
- * @param string $name
- * @param int $maxLocks
- * @param int $releaseAfter
- */
- public function __construct($redis, $name, $maxLocks, $releaseAfter)
- {
- $this->name = $name;
- $this->redis = $redis;
- $this->maxLocks = $maxLocks;
- $this->releaseAfter = $releaseAfter;
- }
- /**
- * Attempt to acquire the lock for the given number of seconds.
- *
- * @param int $timeout
- * @param callable|null $callback
- * @param int $sleep
- * @return mixed
- *
- * @throws \Illuminate\Contracts\Redis\LimiterTimeoutException
- * @throws \Throwable
- */
- public function block($timeout, $callback = null, $sleep = 250)
- {
- $starting = time();
- $id = Str::random(20);
- while (! $slot = $this->acquire($id)) {
- if (time() - $timeout >= $starting) {
- throw new LimiterTimeoutException;
- }
- Sleep::usleep($sleep * 1000);
- }
- if (is_callable($callback)) {
- try {
- return tap($callback(), function () use ($slot, $id) {
- $this->release($slot, $id);
- });
- } catch (Throwable $exception) {
- $this->release($slot, $id);
- throw $exception;
- }
- }
- return true;
- }
- /**
- * Attempt to acquire the lock.
- *
- * @param string $id A unique identifier for this lock
- * @return mixed
- */
- protected function acquire($id)
- {
- $slots = array_map(function ($i) {
- return $this->name.$i;
- }, range(1, $this->maxLocks));
- return $this->redis->eval(...array_merge(
- [$this->lockScript(), count($slots)],
- array_merge($slots, [$this->name, $this->releaseAfter, $id])
- ));
- }
- /**
- * Get the Lua script for acquiring a lock.
- *
- * KEYS - The keys that represent available slots
- * ARGV[1] - The limiter name
- * ARGV[2] - The number of seconds the slot should be reserved
- * ARGV[3] - The unique identifier for this lock
- *
- * @return string
- */
- protected function lockScript()
- {
- return <<<'LUA'
- for index, value in pairs(redis.call('mget', unpack(KEYS))) do
- if not value then
- redis.call('set', KEYS[index], ARGV[3], "EX", ARGV[2])
- return ARGV[1]..index
- end
- end
- LUA;
- }
- /**
- * Release the lock.
- *
- * @param string $key
- * @param string $id
- * @return void
- */
- protected function release($key, $id)
- {
- $this->redis->eval($this->releaseScript(), 1, $key, $id);
- }
- /**
- * Get the Lua script to atomically release a lock.
- *
- * KEYS[1] - The name of the lock
- * ARGV[1] - The unique identifier for this lock
- *
- * @return string
- */
- protected function releaseScript()
- {
- return <<<'LUA'
- if redis.call('get', KEYS[1]) == ARGV[1]
- then
- return redis.call('del', KEYS[1])
- else
- return 0
- end
- LUA;
- }
- }
|