PoolTest.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. <?php
  2. namespace test;
  3. use PHPUnit\Framework\TestCase;
  4. use ReflectionMethod;
  5. use Workerman\Coroutine;
  6. use Workerman\Coroutine\Exception\PoolException;
  7. use Workerman\Coroutine\Pool;
  8. use Psr\Log\LoggerInterface;
  9. use ReflectionClass;
  10. use stdClass;
  11. use Exception;
  12. use Workerman\Events\Event;
  13. use Workerman\Events\Select;
  14. use Workerman\Timer;
  15. use Workerman\Worker;
  16. class PoolTest extends TestCase
  17. {
  18. public function testConstructorWithConfig()
  19. {
  20. $config = [
  21. 'min_connections' => 2,
  22. 'idle_timeout' => 30,
  23. 'heartbeat_interval' => 10,
  24. 'wait_timeout' => 5,
  25. ];
  26. $pool = new Pool(10, $config);
  27. $this->assertEquals(10, $this->getPrivateProperty($pool, 'maxConnections'));
  28. $this->assertEquals(2, $this->getPrivateProperty($pool, 'minConnections'));
  29. $this->assertEquals(30, $this->getPrivateProperty($pool, 'idleTimeout'));
  30. $this->assertEquals(10, $this->getPrivateProperty($pool, 'heartbeatInterval'));
  31. $this->assertEquals(5, $this->getPrivateProperty($pool, 'waitTimeout'));
  32. }
  33. public function testSetConnectionCreator()
  34. {
  35. $pool = new Pool(5);
  36. $connectionCreator = function () {
  37. return new stdClass();
  38. };
  39. $pool->setConnectionCreator($connectionCreator);
  40. $this->assertSame($connectionCreator, $this->getPrivateProperty($pool, 'connectionCreateHandler'));
  41. }
  42. public function testSetConnectionCloser()
  43. {
  44. $pool = new Pool(5);
  45. $connectionCloser = function ($conn) {
  46. // Close connection.
  47. };
  48. $pool->setConnectionCloser($connectionCloser);
  49. $this->assertSame($connectionCloser, $this->getPrivateProperty($pool, 'connectionDestroyHandler'));
  50. }
  51. public function testGetConnection()
  52. {
  53. $pool = new Pool(5);
  54. $connectionMock = $this->createMock(stdClass::class);
  55. // 设置连接创建器
  56. $pool->setConnectionCreator(function () use ($connectionMock) {
  57. return $connectionMock;
  58. });
  59. $connection = $pool->get();
  60. $this->assertSame($connectionMock, $connection);
  61. $this->assertEquals(1, $this->getCurrentConnections($pool));
  62. // 检查 WeakMap 是否更新
  63. $connections = $this->getPrivateProperty($pool, 'connections');
  64. $lastUsedTimes = $this->getPrivateProperty($pool, 'lastUsedTimes');
  65. $lastHeartbeatTimes = $this->getPrivateProperty($pool, 'lastHeartbeatTimes');
  66. $this->assertTrue($connections->offsetExists($connection));
  67. $this->assertTrue($lastUsedTimes->offsetExists($connection));
  68. $this->assertTrue($lastHeartbeatTimes->offsetExists($connection));
  69. }
  70. public function testPutConnection()
  71. {
  72. $pool = new Pool(5);
  73. $connectionMock = $this->createMock(stdClass::class);
  74. $pool->setConnectionCreator(function () use ($connectionMock) {
  75. return $connectionMock;
  76. });
  77. $connection = $pool->get();
  78. $pool->put($connection);
  79. if (Coroutine::isCoroutine()) {
  80. $channel = $this->getPrivateProperty($pool, 'channel');
  81. $this->assertEquals(1, $channel->length());
  82. }
  83. $this->assertEquals(1, $pool->getConnectionCount());
  84. }
  85. public function testPutConnectionDoesNotBelong()
  86. {
  87. $this->expectException(PoolException::class);
  88. $this->expectExceptionMessage('The connection does not belong to the connection pool.');
  89. $pool = new Pool(5);
  90. $connection = new stdClass();
  91. $pool->put($connection);
  92. }
  93. public function testCreateConnection()
  94. {
  95. $pool = new Pool(5);
  96. $connectionMock = $this->createMock(stdClass::class);
  97. $pool->setConnectionCreator(function () use ($connectionMock) {
  98. return $connectionMock;
  99. });
  100. $connection = $pool->createConnection();
  101. $this->assertSame($connectionMock, $connection);
  102. // 确保 currentConnections 增加
  103. $this->assertEquals(1, $this->getCurrentConnections($pool));
  104. // 检查 WeakMap 是否更新
  105. $connections = $this->getPrivateProperty($pool, 'connections');
  106. $lastUsedTimes = $this->getPrivateProperty($pool, 'lastUsedTimes');
  107. $lastHeartbeatTimes = $this->getPrivateProperty($pool, 'lastHeartbeatTimes');
  108. $this->assertTrue($connections->offsetExists($connection));
  109. $this->assertTrue($lastUsedTimes->offsetExists($connection));
  110. $this->assertTrue($lastHeartbeatTimes->offsetExists($connection));
  111. }
  112. public function testCreateMaxConnections()
  113. {
  114. if (in_array(Worker::$eventLoopClass, [Select::class, Event::class])) {
  115. $this->assertTrue(true);
  116. return;
  117. }
  118. $maxConnections = 2;
  119. $pool = new Pool($maxConnections);
  120. $pool->setConnectionCreator(function () {
  121. Timer::sleep(0.01);
  122. return $this->createMock(stdClass::class);
  123. });
  124. $connections = [];
  125. for ($i = 0; $i < 3; $i++) {
  126. Coroutine::create(function () use ($pool, &$connections) {
  127. $connections[] = $pool->get();
  128. });
  129. }
  130. Timer::sleep(0.1);
  131. $this->assertEquals($maxConnections, $this->getCurrentConnections($pool));
  132. $lastUsedTimes = $this->getPrivateProperty($pool, 'lastUsedTimes');
  133. $lastHeartbeatTimes = $this->getPrivateProperty($pool, 'lastHeartbeatTimes');
  134. $this->assertCount($maxConnections, $lastUsedTimes);
  135. $this->assertCount($maxConnections, $lastHeartbeatTimes);
  136. foreach ($connections as $connection) {
  137. $pool->put($connection);
  138. }
  139. }
  140. public function testCreateConnectionThrowsException()
  141. {
  142. $pool = new Pool(5);
  143. $pool->setConnectionCreator(function () {
  144. throw new Exception('Failed to create connection');
  145. });
  146. $this->expectException(Exception::class);
  147. $this->expectExceptionMessage('Failed to create connection');
  148. try {
  149. $pool->createConnection();
  150. } finally {
  151. // 确保 currentConnections 减少
  152. $this->assertEquals(0, $this->getCurrentConnections($pool));
  153. }
  154. }
  155. public function testCloseConnection()
  156. {
  157. $pool = new Pool(5);
  158. $connection = $this->createMock(ConnectionMock::class);
  159. // 模拟连接属于连接池
  160. $connections = $this->getPrivateProperty($pool, 'connections');
  161. $connections[$connection] = time();
  162. $connection->expects($this->once())->method('close');
  163. $pool->setConnectionCloser(function ($conn) {
  164. $conn->close();
  165. });
  166. $pool->closeConnection($connection);
  167. // 确保 currentConnections 减少
  168. $this->assertEquals(0, $this->getCurrentConnections($pool));
  169. // 确保连接从 WeakMap 中移除
  170. $this->assertFalse($connections->offsetExists($connection));
  171. }
  172. public function testCloseConnections()
  173. {
  174. $maxConnections = 5;
  175. $pool = new Pool($maxConnections);
  176. $pool->setConnectionCreator(function () {
  177. $connection = $this->createMock(ConnectionMock::class);
  178. $connection->expects($this->once())->method('close');
  179. return $connection;
  180. });
  181. $pool->setConnectionCloser(function ($conn) {
  182. $conn->close();
  183. });
  184. $connections = [];
  185. for ($i = 0; $i < $maxConnections; $i++) {
  186. $connections[] = $pool->get();
  187. }
  188. $this->assertEquals(Coroutine::isCoroutine() ? $maxConnections : 1, $this->getCurrentConnections($pool));
  189. $pool->closeConnections();
  190. $this->assertEquals(Coroutine::isCoroutine() ? $maxConnections : 0, $this->getCurrentConnections($pool));
  191. if (!Coroutine::isCoroutine()) {
  192. return;
  193. }
  194. foreach ($connections as $connection) {
  195. $pool->put($connection);
  196. }
  197. $this->assertEquals($maxConnections, $this->getCurrentConnections($pool));
  198. $pool->closeConnections();
  199. $this->assertEquals(0, $this->getCurrentConnections($pool));
  200. $connections = [];
  201. for ($i = 0; $i < $maxConnections; $i++) {
  202. $connections[] = $pool->get();
  203. }
  204. $this->assertEquals($maxConnections, $this->getCurrentConnections($pool));
  205. foreach ($connections as $connection) {
  206. $pool->put($connection);
  207. }
  208. $pool->closeConnections();
  209. unset($connections);
  210. $this->assertEquals(0, $this->getCurrentConnections($pool));
  211. }
  212. public function testCloseConnectionWithExceptionInDestroyHandler()
  213. {
  214. $pool = new Pool(5);
  215. $connection = $this->createMock(stdClass::class);
  216. // 模拟连接属于连接池
  217. $connections = $this->getPrivateProperty($pool, 'connections');
  218. $connections[$connection] = time();
  219. $exception = new Exception('Error closing connection');
  220. $pool->setConnectionCloser(function ($conn) use ($exception) {
  221. throw $exception;
  222. });
  223. // 设置日志记录器
  224. $loggerMock = $this->createMock(LoggerInterface::class);
  225. $loggerMock->expects($this->once())
  226. ->method('info')
  227. ->with($this->stringContains('Error closing connection'));
  228. $this->setPrivateProperty($pool, 'logger', $loggerMock);
  229. $pool->closeConnection($connection);
  230. // 确保 currentConnections 减少
  231. $this->assertEquals(0, $this->getCurrentConnections($pool));
  232. // 确保连接从 WeakMap 中移除
  233. $this->assertFalse($connections->offsetExists($connection));
  234. }
  235. public function testHeartbeatChecker()
  236. {
  237. $pool = $this->getMockBuilder(Pool::class)
  238. ->setConstructorArgs([5])
  239. ->onlyMethods(['closeConnection'])
  240. ->getMock();
  241. $connection = $this->createMock(stdClass::class);
  242. // 设置连接心跳检测器
  243. $pool->setHeartbeatChecker(function ($conn) {
  244. // 模拟心跳检测
  245. });
  246. // 模拟连接在通道中
  247. $channel = $this->getPrivateProperty($pool, 'channel');
  248. $channel->push($connection);
  249. // 设置连接的上次使用时间和心跳时间
  250. $connections = $this->getPrivateProperty($pool, 'connections');
  251. $connections[$connection] = time();
  252. $lastUsedTimes = $this->getPrivateProperty($pool, 'lastUsedTimes');
  253. $lastUsedTimes[$connection] = time();
  254. $lastHeartbeatTimes = $this->getPrivateProperty($pool, 'lastHeartbeatTimes');
  255. $lastHeartbeatTimes[$connection] = time() - 100; // 超过心跳间隔
  256. // 调用受保护的 checkConnections 方法
  257. $reflectedMethod = new ReflectionMethod($pool, 'checkConnections');
  258. $reflectedMethod->setAccessible(true);
  259. $reflectedMethod->invoke($pool);
  260. // 检查心跳时间是否更新
  261. $lastHeartbeatTimes = $this->getPrivateProperty($pool, 'lastHeartbeatTimes');
  262. $this->assertGreaterThan(time() - 2, $lastHeartbeatTimes[$connection]);
  263. }
  264. public function testConnectionDestroyedWithoutReturn()
  265. {
  266. $pool = new Pool(5);
  267. // 设置连接创建器
  268. $pool->setConnectionCreator(function () {
  269. return new stdClass;
  270. });
  271. // 获取初始的 currentConnections
  272. $initialConnections = $this->getCurrentConnections($pool);
  273. // 从连接池获取一个连接
  274. $connection = $pool->get();
  275. // 检查 currentConnections 是否增加
  276. $this->assertEquals(Coroutine::isCoroutine() ? $initialConnections + 1 : 1, $this->getCurrentConnections($pool));
  277. // 不归还连接,并销毁连接对象
  278. unset($connection);
  279. // 检查 currentConnections 是否减少
  280. $this->assertEquals(Coroutine::isCoroutine() ? $initialConnections : 1, $this->getCurrentConnections($pool));
  281. }
  282. private function getPrivateProperty($object, string $property)
  283. {
  284. $reflection = new ReflectionClass($object);
  285. $prop = $reflection->getProperty($property);
  286. $prop->setAccessible(true);
  287. return $prop->getValue($object);
  288. }
  289. private function setPrivateProperty($object, string $property, $value)
  290. {
  291. $reflection = new ReflectionClass($object);
  292. $prop = $reflection->getProperty($property);
  293. $prop->setAccessible(true);
  294. $prop->setValue($object, $value);
  295. }
  296. private function getCurrentConnections($object): int
  297. {
  298. return $object->getConnectionCount();
  299. }
  300. }
  301. // 定义 ConnectionMock 类用于测试
  302. class ConnectionMock
  303. {
  304. public function close()
  305. {
  306. // 模拟关闭连接
  307. }
  308. }