RedisConnection.php 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. <?php
  2. /**
  3. * This file is part of webman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Webman\RedisQueue;
  15. use RedisException;
  16. use Throwable;
  17. class RedisConnection extends \Redis
  18. {
  19. /**
  20. * @var array
  21. */
  22. protected array $config = [];
  23. /**
  24. * @param array $config
  25. * @return void
  26. * @throws RedisException
  27. */
  28. public function connectWithConfig(array $config = []): void
  29. {
  30. if ($config) {
  31. $this->config = $config;
  32. }
  33. if (false === $this->connect($this->config['host'], $this->config['port'], $this->config['timeout'] ?? 2)) {
  34. throw new \RuntimeException("Redis connect {$this->config['host']}:{$this->config['port']} fail.");
  35. }
  36. if (!empty($this->config['auth'])) {
  37. $this->auth($this->config['auth']);
  38. }
  39. if (!empty($this->config['db'])) {
  40. $this->select($this->config['db']);
  41. }
  42. if (!empty($this->config['prefix'])) {
  43. $this->setOption(\Redis::OPT_PREFIX, $this->config['prefix']);
  44. }
  45. }
  46. /**
  47. * @param $command
  48. * @param ...$args
  49. * @return mixed
  50. * @throws Throwable
  51. */
  52. protected function execCommand($command, ...$args)
  53. {
  54. try {
  55. return $this->{$command}(...$args);
  56. } catch (Throwable $e) {
  57. $msg = strtolower($e->getMessage());
  58. if ($msg === 'connection lost' || strpos($msg, 'went away')) {
  59. $this->connectWithConfig();
  60. return $this->{$command}(...$args);
  61. }
  62. throw $e;
  63. }
  64. }
  65. /**
  66. * @param $queue
  67. * @param $data
  68. * @param int $delay
  69. * @return bool
  70. * @throws Throwable
  71. */
  72. public function send($queue, $data, int $delay = 0): bool
  73. {
  74. $queue_waiting = '{redis-queue}-waiting';
  75. $queue_delay = '{redis-queue}-delayed';
  76. $now = time();
  77. $package_str = json_encode([
  78. 'id' => time().rand(),
  79. 'time' => $now,
  80. 'delay' => $delay,
  81. 'attempts' => 0,
  82. 'queue' => $queue,
  83. 'data' => $data
  84. ]);
  85. if ($delay) {
  86. return (bool)$this->execCommand('zAdd' ,$queue_delay, $now + $delay, $package_str);
  87. }
  88. return (bool)$this->execCommand('lPush', $queue_waiting.$queue, $package_str);
  89. }
  90. }