RedisManager.php 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. <?php
  2. namespace Webman\Redis;
  3. use Illuminate\Events\Dispatcher;
  4. use Illuminate\Redis\Connections\Connection;
  5. use StdClass;
  6. use Throwable;
  7. use WeakMap;
  8. use Webman\Context;
  9. use Workerman\Coroutine\Pool;
  10. class RedisManager extends \Illuminate\Redis\RedisManager
  11. {
  12. /**
  13. * @var Pool[]
  14. */
  15. protected static array $pools = [];
  16. /**
  17. * @var WeakMap
  18. */
  19. protected WeakMap $allConnections;
  20. /**
  21. * Get connection.
  22. *
  23. * @param $name
  24. * @return Connection|mixed|StdClass|null
  25. * @throws Throwable
  26. */
  27. public function connection($name = null)
  28. {
  29. $name = $name ?: 'default';
  30. $key = "redis.connections.$name";
  31. $connection = Context::get($key);
  32. if (!$connection) {
  33. if (!isset(static::$pools[$name])) {
  34. $poolConfig = $this->config[$name]['pool'] ?? [];
  35. $pool = new Pool($poolConfig['max_connections'] ?? 10, $poolConfig);
  36. $pool->setConnectionCreator(function () use ($name) {
  37. $connection = $this->configure($this->resolve($name), $name);
  38. if (class_exists(Dispatcher::class)) {
  39. $connection->setEventDispatcher(new Dispatcher());
  40. }
  41. $this->allConnections ??= new WeakMap();
  42. $this->allConnections[$connection] = true;
  43. return $connection;
  44. });
  45. $pool->setConnectionCloser(function ($connection) {
  46. $connection->client()->close();
  47. });
  48. $pool->setHeartbeatChecker(function ($connection) {
  49. $connection->get('PING');
  50. });
  51. static::$pools[$name] = $pool;
  52. }
  53. try {
  54. $connection = static::$pools[$name]->get();
  55. Context::set($key, $connection);
  56. } finally {
  57. Context::onDestroy(function () use ($connection, $name) {
  58. try {
  59. $connection && static::$pools[$name]->put($connection);
  60. } catch (Throwable) {
  61. // ignore
  62. }
  63. });
  64. }
  65. }
  66. return $connection;
  67. }
  68. /**
  69. * Return all the created connections.
  70. *
  71. * @return array
  72. */
  73. public function connections()
  74. {
  75. if (empty($this->allConnections)) {
  76. return [];
  77. }
  78. $connections = [];
  79. foreach ($this->allConnections as $connection => $_) {
  80. $connections[] = $connection;
  81. }
  82. return $connections;
  83. }
  84. }