ConcurrencyLimiter.php 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. <?php
  2. namespace Illuminate\Redis\Limiters;
  3. use Illuminate\Contracts\Redis\LimiterTimeoutException;
  4. use Illuminate\Support\Sleep;
  5. use Illuminate\Support\Str;
  6. use Throwable;
  7. class ConcurrencyLimiter
  8. {
  9. /**
  10. * The Redis factory implementation.
  11. *
  12. * @var \Illuminate\Redis\Connections\Connection
  13. */
  14. protected $redis;
  15. /**
  16. * The name of the limiter.
  17. *
  18. * @var string
  19. */
  20. protected $name;
  21. /**
  22. * The allowed number of concurrent tasks.
  23. *
  24. * @var int
  25. */
  26. protected $maxLocks;
  27. /**
  28. * The number of seconds a slot should be maintained.
  29. *
  30. * @var int
  31. */
  32. protected $releaseAfter;
  33. /**
  34. * Create a new concurrency limiter instance.
  35. *
  36. * @param \Illuminate\Redis\Connections\Connection $redis
  37. * @param string $name
  38. * @param int $maxLocks
  39. * @param int $releaseAfter
  40. */
  41. public function __construct($redis, $name, $maxLocks, $releaseAfter)
  42. {
  43. $this->name = $name;
  44. $this->redis = $redis;
  45. $this->maxLocks = $maxLocks;
  46. $this->releaseAfter = $releaseAfter;
  47. }
  48. /**
  49. * Attempt to acquire the lock for the given number of seconds.
  50. *
  51. * @param int $timeout
  52. * @param callable|null $callback
  53. * @param int $sleep
  54. * @return mixed
  55. *
  56. * @throws \Illuminate\Contracts\Redis\LimiterTimeoutException
  57. * @throws \Throwable
  58. */
  59. public function block($timeout, $callback = null, $sleep = 250)
  60. {
  61. $starting = time();
  62. $id = Str::random(20);
  63. while (! $slot = $this->acquire($id)) {
  64. if (time() - $timeout >= $starting) {
  65. throw new LimiterTimeoutException;
  66. }
  67. Sleep::usleep($sleep * 1000);
  68. }
  69. if (is_callable($callback)) {
  70. try {
  71. return tap($callback(), function () use ($slot, $id) {
  72. $this->release($slot, $id);
  73. });
  74. } catch (Throwable $exception) {
  75. $this->release($slot, $id);
  76. throw $exception;
  77. }
  78. }
  79. return true;
  80. }
  81. /**
  82. * Attempt to acquire the lock.
  83. *
  84. * @param string $id A unique identifier for this lock
  85. * @return mixed
  86. */
  87. protected function acquire($id)
  88. {
  89. $slots = array_map(function ($i) {
  90. return $this->name.$i;
  91. }, range(1, $this->maxLocks));
  92. return $this->redis->eval(...array_merge(
  93. [$this->lockScript(), count($slots)],
  94. array_merge($slots, [$this->name, $this->releaseAfter, $id])
  95. ));
  96. }
  97. /**
  98. * Get the Lua script for acquiring a lock.
  99. *
  100. * KEYS - The keys that represent available slots
  101. * ARGV[1] - The limiter name
  102. * ARGV[2] - The number of seconds the slot should be reserved
  103. * ARGV[3] - The unique identifier for this lock
  104. *
  105. * @return string
  106. */
  107. protected function lockScript()
  108. {
  109. return <<<'LUA'
  110. for index, value in pairs(redis.call('mget', unpack(KEYS))) do
  111. if not value then
  112. redis.call('set', KEYS[index], ARGV[3], "EX", ARGV[2])
  113. return ARGV[1]..index
  114. end
  115. end
  116. LUA;
  117. }
  118. /**
  119. * Release the lock.
  120. *
  121. * @param string $key
  122. * @param string $id
  123. * @return void
  124. */
  125. protected function release($key, $id)
  126. {
  127. $this->redis->eval($this->releaseScript(), 1, $key, $id);
  128. }
  129. /**
  130. * Get the Lua script to atomically release a lock.
  131. *
  132. * KEYS[1] - The name of the lock
  133. * ARGV[1] - The unique identifier for this lock
  134. *
  135. * @return string
  136. */
  137. protected function releaseScript()
  138. {
  139. return <<<'LUA'
  140. if redis.call('get', KEYS[1]) == ARGV[1]
  141. then
  142. return redis.call('del', KEYS[1])
  143. else
  144. return 0
  145. end
  146. LUA;
  147. }
  148. }