ParallelTest.php 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. <?php
  2. namespace tests;
  3. use PHPUnit\Framework\TestCase;
  4. use Workerman\Coroutine\Parallel;
  5. use Workerman\Coroutine;
  6. use Workerman\Timer;
  7. /**
  8. * Test cases for the Workerman\Coroutine\Parallel class.
  9. */
  10. class ParallelTest extends TestCase
  11. {
  12. /**
  13. * Test that callables are added and executed, and results are collected properly.
  14. */
  15. public function testAddAndWait()
  16. {
  17. $parallel = new Parallel();
  18. $parallel->add(function () {
  19. // Simulate some work.
  20. Timer::sleep(0.01);
  21. return 1;
  22. }, 'task1');
  23. $parallel->add(function () {
  24. // Simulate some work.
  25. Timer::sleep(0.005);
  26. return 2;
  27. }, 'task2');
  28. $results = $parallel->wait();
  29. $this->assertEquals(['task1' => 1, 'task2' => 2], $results);
  30. }
  31. /**
  32. * Test that exceptions thrown in callables are caught and can be retrieved.
  33. */
  34. public function testExceptions()
  35. {
  36. $parallel = new Parallel();
  37. $parallel->add(function () {
  38. throw new \Exception('Test exception');
  39. }, 'task_with_exception');
  40. $parallel->add(function () {
  41. return 'normal result';
  42. }, 'normal_task');
  43. $results = $parallel->wait();
  44. $exceptions = $parallel->getExceptions();
  45. // Check that the normal task result is present.
  46. $this->assertEquals(['normal_task' => 'normal result'], $results);
  47. // Check that the exception is captured for the failing task.
  48. $this->assertArrayHasKey('task_with_exception', $exceptions);
  49. $this->assertInstanceOf(\Exception::class, $exceptions['task_with_exception']);
  50. $this->assertEquals('Test exception', $exceptions['task_with_exception']->getMessage());
  51. }
  52. /**
  53. * Test concurrency control by limiting the number of concurrent tasks.
  54. */
  55. public function testConcurrencyLimit()
  56. {
  57. $concurrentLimit = 2;
  58. $parallel = new Parallel($concurrentLimit);
  59. $startTimes = [];
  60. $endTimes = [];
  61. for ($i = 0; $i < 5; $i++) {
  62. $parallel->add(function () use (&$startTimes, &$endTimes, $i) {
  63. $startTimes[$i] = microtime(true);
  64. // Simulate some work.
  65. Timer::sleep(0.1); // 100 milliseconds
  66. $endTimes[$i] = microtime(true);
  67. return $i;
  68. }, "task{$i}");
  69. }
  70. $parallel->wait();
  71. // Since we limited concurrency to 2, tasks should finish in batches.
  72. // We'll check that at no point more than $concurrentLimit tasks were running simultaneously.
  73. // Collect start and end times into an array of intervals.
  74. $intervals = [];
  75. for ($i = 0; $i < 5; $i++) {
  76. $intervals[] = ['start' => $startTimes[$i], 'end' => $endTimes[$i]];
  77. }
  78. // Check the maximum number of overlapping intervals does not exceed the concurrency limit.
  79. $maxConcurrent = $this->getMaxConcurrentIntervals($intervals);
  80. $this->assertLessThanOrEqual($concurrentLimit, $maxConcurrent);
  81. }
  82. /**
  83. * Helper function to determine the maximum number of overlapping intervals.
  84. *
  85. * @param array $intervals
  86. * @return int
  87. */
  88. private function getMaxConcurrentIntervals(array $intervals)
  89. {
  90. $events = [];
  91. foreach ($intervals as $interval) {
  92. $events[] = ['time' => $interval['start'], 'type' => 'start'];
  93. $events[] = ['time' => $interval['end'], 'type' => 'end'];
  94. }
  95. // Sort events by time, 'start' before 'end' if times are equal.
  96. usort($events, function ($a, $b) {
  97. if ($a['time'] == $b['time']) {
  98. return $a['type'] === 'start' ? -1 : 1;
  99. }
  100. return $a['time'] < $b['time'] ? -1 : 1;
  101. });
  102. $maxConcurrent = 0;
  103. $currentConcurrent = 0;
  104. foreach ($events as $event) {
  105. if ($event['type'] === 'start') {
  106. $currentConcurrent++;
  107. if ($currentConcurrent > $maxConcurrent) {
  108. $maxConcurrent = $currentConcurrent;
  109. }
  110. } else {
  111. $currentConcurrent--;
  112. }
  113. }
  114. return $maxConcurrent;
  115. }
  116. /**
  117. * Test that callables are executed in parallel when no concurrency limit is set.
  118. */
  119. public function testParallelExecutionWithoutConcurrencyLimit()
  120. {
  121. $parallel = new Parallel();
  122. $startTimes = [];
  123. $endTimes = [];
  124. $parallel->add(function () use (&$startTimes, &$endTimes) {
  125. $startTimes[] = microtime(true);
  126. Timer::sleep(0.1); // 100 milliseconds
  127. $endTimes[] = microtime(true);
  128. return 'task1';
  129. }, 'task1');
  130. $parallel->add(function () use (&$startTimes, &$endTimes) {
  131. $startTimes[] = microtime(true);
  132. Timer::sleep(0.1);// 100 milliseconds
  133. $endTimes[] = microtime(true);
  134. return 'task2';
  135. }, 'task2');
  136. $parallel->wait();
  137. // Calculate total elapsed time.
  138. $totalTime = max($endTimes) - min($startTimes);
  139. // The total time should be approximately the duration of one task, not the sum of both.
  140. $this->assertLessThan(0.2, $totalTime);
  141. }
  142. /**
  143. * Test adding callables without specifying keys and ensure results are correctly indexed.
  144. */
  145. public function testAddWithoutKeys()
  146. {
  147. $parallel = new Parallel();
  148. $parallel->add(function () {
  149. return 'result1';
  150. });
  151. $parallel->add(function () {
  152. return 'result2';
  153. });
  154. $results = $parallel->wait();
  155. // Since no keys were specified, indices should be 0 and 1.
  156. $this->assertEquals(['result1', 'result2'], $results);
  157. }
  158. /**
  159. * Test that the Parallel class can handle a large number of tasks.
  160. */
  161. public function testLargeNumberOfTasks()
  162. {
  163. $parallel = new Parallel();
  164. $taskCount = 100;
  165. for ($i = 0; $i < $taskCount; $i++) {
  166. $parallel->add(function () use ($i) {
  167. return $i * $i;
  168. }, "task{$i}");
  169. }
  170. $results = $parallel->wait();
  171. // Verify that all tasks have been completed and results are correct.
  172. for ($i = 0; $i < $taskCount; $i++) {
  173. $this->assertEquals($i * $i, $results["task{$i}"]);
  174. }
  175. }
  176. /**
  177. * Test that adding a non-callable throws a TypeError.
  178. */
  179. public function testAddNonCallable()
  180. {
  181. $this->expectException(\TypeError::class);
  182. $parallel = new Parallel();
  183. $parallel->add('not a callable');
  184. }
  185. /**
  186. * Test that the wait method can be called multiple times safely.
  187. */
  188. public function testMultipleWaitCalls()
  189. {
  190. $parallel = new Parallel();
  191. $parallel->add(function () {
  192. return 'first call';
  193. }, 'task1');
  194. $resultsFirst = $parallel->wait();
  195. $this->assertEquals(['task1' => 'first call'], $resultsFirst);
  196. // Add another task after first wait.
  197. $parallel->add(function () {
  198. return 'second call';
  199. }, 'task2');
  200. $resultsSecond = $parallel->wait();
  201. // Since the callbacks array is not cleared after wait, results should include both tasks.
  202. $this->assertEquals(['task1' => 'first call', 'task2' => 'second call'], $resultsSecond);
  203. }
  204. /**
  205. * Test that the class properly handles empty tasks (no callables added).
  206. */
  207. public function testNoTasks()
  208. {
  209. $parallel = new Parallel();
  210. $results = $parallel->wait();
  211. $this->assertEmpty($results);
  212. }
  213. /**
  214. * Test that the class handles tasks that return null.
  215. */
  216. public function testTasksReturningNull()
  217. {
  218. $parallel = new Parallel();
  219. $parallel->add(function () {
  220. // No return statement, implicitly returns null.
  221. }, 'nullTask');
  222. $results = $parallel->wait();
  223. $this->assertArrayHasKey('nullTask', $results);
  224. $this->assertNull($results['nullTask']);
  225. }
  226. /**
  227. * Test defer can be used in tasks.
  228. */
  229. public function testWithDefer()
  230. {
  231. $parallel = new Parallel();
  232. $results = [];
  233. $parallel->add(function () use (&$results) {
  234. Coroutine::defer(function () use (&$results) {
  235. $results[] = 'defer1';
  236. });
  237. });
  238. $parallel->wait();
  239. $this->assertEquals(['defer1'], $results);
  240. }
  241. }