FiberChannelTest.php 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. <?php
  2. use PHPUnit\Framework\TestCase;
  3. use Workerman\Coroutine\Channel\Fiber as Channel;
  4. use Workerman\Timer;
  5. use Fiber as BaseFiber;
  6. class FiberChannelTest extends TestCase
  7. {
  8. /**
  9. * Test basic push and pop operations.
  10. */
  11. public function testBasicPushPop()
  12. {
  13. $channel = new Channel();
  14. $fiber = new BaseFiber(function() use ($channel) {
  15. $channel->push('test data');
  16. });
  17. $fiber->start();
  18. $this->assertEquals('test data', $channel->pop());
  19. }
  20. /**
  21. * Test that pop will block until data is available or timeout occurs.
  22. */
  23. public function testPopWithTimeout()
  24. {
  25. $channel = new Channel();
  26. $fiber = new BaseFiber(function() use ($channel) {
  27. $result = $channel->pop(0.5);
  28. $this->assertFalse($result);
  29. });
  30. $startTime = microtime(true);
  31. $fiber->start();
  32. // Allow time for the fiber to suspend and wait
  33. Timer::sleep(0.2); // 200 ms
  34. // Ensure that the fiber is still waiting (not timed out yet)
  35. $this->assertTrue($fiber->isSuspended());
  36. // Wait until the timeout should have occurred
  37. Timer::sleep(0.4); // 400 ms
  38. $endTime = microtime(true);
  39. $this->assertTrue($fiber->isTerminated());
  40. $this->assertGreaterThanOrEqual(0.5, $endTime - $startTime);
  41. }
  42. /**
  43. * Test that push will block when capacity is reached and timeout occurs.
  44. */
  45. public function testPushWithTimeout()
  46. {
  47. $channel = new Channel(1);
  48. $this->assertTrue($channel->push('data1'));
  49. $fiber = new BaseFiber(function() use ($channel) {
  50. $result = $channel->push('data2', 0.5);
  51. $this->assertFalse($result);
  52. });
  53. $startTime = microtime(true);
  54. $fiber->start();
  55. // Allow time for the fiber to suspend and wait
  56. Timer::sleep(0.2); // 200 ms
  57. // Ensure that the fiber is still waiting (not timed out yet)
  58. $this->assertTrue($fiber->isSuspended());
  59. // Wait until the timeout should have occurred
  60. Timer::sleep(0.4); // 400 ms
  61. $endTime = microtime(true);
  62. $this->assertTrue($fiber->isTerminated());
  63. $this->assertGreaterThanOrEqual(0.5, $endTime - $startTime);
  64. }
  65. /**
  66. * Test that push returns false immediately if capacity is full and timeout is zero.
  67. */
  68. public function testPushNonBlockingWhenFull()
  69. {
  70. $channel = new Channel(1);
  71. $this->assertTrue($channel->push('data1'));
  72. $result = $channel->push('data2', 0);
  73. $this->assertFalse($result);
  74. }
  75. /**
  76. * Test that pop returns false immediately if the channel is empty and timeout is zero.
  77. */
  78. public function testPopNonBlockingWhenEmpty()
  79. {
  80. $channel = new Channel();
  81. $result = $channel->pop(0);
  82. $this->assertFalse($result);
  83. }
  84. /**
  85. * Test closing the channel.
  86. */
  87. public function testCloseChannel()
  88. {
  89. $channel = new Channel();
  90. $channel->close();
  91. $this->assertFalse($channel->push('data'));
  92. $this->assertFalse($channel->pop());
  93. }
  94. /**
  95. * Test that waiting pushers and poppers are resumed when the channel is closed.
  96. */
  97. public function testWaitersAreResumedOnClose()
  98. {
  99. $channelPush = new Channel(1);
  100. $channelPop = new Channel(1);
  101. $pushFiber = new BaseFiber(function() use ($channelPush) {
  102. $channelPush->push('data', 1);
  103. $result = $channelPush->push('data', 1);
  104. $this->assertFalse($result);
  105. });
  106. $popFiber = new BaseFiber(function() use ($channelPop) {
  107. $result = $channelPop->pop(1);
  108. $this->assertFalse($result);
  109. });
  110. $pushFiber->start();
  111. $popFiber->start();
  112. // Allow time for fibers to suspend
  113. Timer::sleep(0.1); // 100 ms
  114. // Close the channel to resume fibers
  115. $channelPush->close();
  116. $channelPop->close();
  117. // Allow time for fibers to process after resuming
  118. Timer::sleep(0.1); // 100 ms
  119. $this->assertTrue($pushFiber->isTerminated());
  120. $this->assertTrue($popFiber->isTerminated());
  121. }
  122. /**
  123. * Test that length and getCapacity methods return correct values.
  124. */
  125. public function testLengthAndCapacity()
  126. {
  127. $capacity = 2;
  128. $channel = new Channel($capacity);
  129. $this->assertEquals(0, $channel->length());
  130. $this->assertEquals($capacity, $channel->getCapacity());
  131. $channel->push('data1');
  132. $this->assertEquals(1, $channel->length());
  133. $channel->push('data2');
  134. $this->assertEquals(2, $channel->length());
  135. $channel->pop();
  136. $this->assertEquals(1, $channel->length());
  137. $channel->pop();
  138. $this->assertEquals(0, $channel->length());
  139. }
  140. /**
  141. * Test pushing to a closed channel.
  142. */
  143. public function testPushToClosedChannel()
  144. {
  145. $channel = new Channel();
  146. $channel->close();
  147. $result = $channel->push('data');
  148. $this->assertFalse($result);
  149. }
  150. /**
  151. * Test popping from a closed channel.
  152. */
  153. public function testPopFromClosedChannel()
  154. {
  155. $channel = new Channel();
  156. $channel->push('data');
  157. $channel->close();
  158. $this->assertEquals('data', $channel->pop());
  159. $this->assertFalse($channel->pop());
  160. }
  161. /**
  162. * Test multiple push and pop operations with fibers.
  163. */
  164. public function testMultiplePushPopWithFibers()
  165. {
  166. $channel = new Channel(2);
  167. $results = [];
  168. $producerFiber = new BaseFiber(function() use ($channel) {
  169. $channel->push('data1');
  170. $channel->push('data2');
  171. $channel->push('data3');
  172. });
  173. $consumerFiber = new BaseFiber(function() use ($channel, &$results) {
  174. $results[] = $channel->pop();
  175. $results[] = $channel->pop();
  176. $results[] = $channel->pop();
  177. });
  178. $producerFiber->start();
  179. $consumerFiber->start();
  180. // Allow time for fibers to execute
  181. usleep(500000); // 500 ms
  182. $this->assertEquals(['data1', 'data2', 'data3'], $results);
  183. }
  184. /**
  185. * Test that fibers are properly blocked and resumed in push and pop operations.
  186. */
  187. public function testFiberBlockingAndResuming()
  188. {
  189. $channel = new Channel(1);
  190. $pushFiber = new BaseFiber(function() use ($channel) {
  191. $channel->push('data1');
  192. $channel->push('data2');
  193. $channel->push('data3');
  194. });
  195. $popFiber = new BaseFiber(function() use ($channel) {
  196. $this->assertEquals('data1', $channel->pop());
  197. $this->assertEquals('data2', $channel->pop());
  198. $this->assertEquals('data3', $channel->pop());
  199. });
  200. $pushFiber->start();
  201. $popFiber->start();
  202. // Allow time for fibers to execute
  203. Timer::sleep(0.5); // 500 ms
  204. $this->assertTrue($pushFiber->isTerminated());
  205. $this->assertTrue($popFiber->isTerminated());
  206. }
  207. /**
  208. * Test that pushing data after capacity is reached blocks until space is available.
  209. */
  210. public function testPushBlocksWhenFull()
  211. {
  212. $channel = new Channel(1);
  213. $channel->push('data1');
  214. $pushFiber = new BaseFiber(function() use ($channel) {
  215. $channel->push('data2');
  216. });
  217. $popFiber = new BaseFiber(function() use ($channel) {
  218. Timer::sleep(0.2); // Wait before popping
  219. $this->assertEquals('data1', $channel->pop());
  220. });
  221. $pushFiber->start();
  222. $popFiber->start();
  223. // Allow time for fibers to execute
  224. Timer::sleep(0.5); // 500 ms
  225. $this->assertTrue($pushFiber->isTerminated());
  226. $this->assertTrue($popFiber->isTerminated());
  227. }
  228. /**
  229. * Test that popping data from an empty channel blocks until data is available.
  230. */
  231. public function testPopBlocksWhenEmpty()
  232. {
  233. $channel = new Channel();
  234. $popFiber = new BaseFiber(function() use ($channel) {
  235. $this->assertEquals('data1', $channel->pop());
  236. });
  237. $pushFiber = new BaseFiber(function() use ($channel) {
  238. Timer::sleep(0.2); // Wait before pushing
  239. $channel->push('data1');
  240. });
  241. $popFiber->start();
  242. $pushFiber->start();
  243. // Allow time for fibers to execute
  244. Timer::sleep(0.5); // 500 ms
  245. $this->assertTrue($pushFiber->isTerminated());
  246. $this->assertTrue($popFiber->isTerminated());
  247. }
  248. /**
  249. * Test pushing and popping with zero timeout.
  250. */
  251. public function testPushPopWithZeroTimeout()
  252. {
  253. $channel = new Channel(1);
  254. $this->assertTrue($channel->push('data1'));
  255. $result = $channel->push('data2', 0);
  256. $this->assertFalse($result);
  257. $result = $channel->pop(0);
  258. $this->assertEquals('data1', $result);
  259. $result = $channel->pop(0);
  260. $this->assertFalse($result);
  261. }
  262. }