AsyncReadStream.php 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. <?php
  2. namespace GuzzleHttp\Stream;
  3. /**
  4. * Represents an asynchronous read-only stream that supports a drain event and
  5. * pumping data from a source stream.
  6. *
  7. * The AsyncReadStream can be used as a completely asynchronous stream, meaning
  8. * the data you can read from the stream will immediately return only
  9. * the data that is currently buffered.
  10. *
  11. * AsyncReadStream can also be used in a "blocking" manner if a "pump" function
  12. * is provided. When a caller requests more bytes than are available in the
  13. * buffer, then the pump function is used to block until the requested number
  14. * of bytes are available or the remote source stream has errored, closed, or
  15. * timed-out. This behavior isn't strictly "blocking" because the pump function
  16. * can send other transfers while waiting on the desired buffer size to be
  17. * ready for reading (e.g., continue to tick an event loop).
  18. *
  19. * @unstable This class is subject to change.
  20. */
  21. class AsyncReadStream implements StreamInterface
  22. {
  23. use StreamDecoratorTrait;
  24. /** @var callable|null Fn used to notify writers the buffer has drained */
  25. private $drain;
  26. /** @var callable|null Fn used to block for more data */
  27. private $pump;
  28. /** @var int|null Highwater mark of the underlying buffer */
  29. private $hwm;
  30. /** @var bool Whether or not drain needs to be called at some point */
  31. private $needsDrain;
  32. /** @var int The expected size of the remote source */
  33. private $size;
  34. /**
  35. * In order to utilize high water marks to tell writers to slow down, the
  36. * provided stream must answer to the "hwm" stream metadata variable,
  37. * providing the high water mark. If no "hwm" metadata value is available,
  38. * then the "drain" functionality is not utilized.
  39. *
  40. * This class accepts an associative array of configuration options.
  41. *
  42. * - drain: (callable) Function to invoke when the stream has drained,
  43. * meaning the buffer is now writable again because the size of the
  44. * buffer is at an acceptable level (e.g., below the high water mark).
  45. * The function accepts a single argument, the buffer stream object that
  46. * has drained.
  47. * - pump: (callable) A function that accepts the number of bytes to read
  48. * from the source stream. This function will block until all of the data
  49. * that was requested has been read, EOF of the source stream, or the
  50. * source stream is closed.
  51. * - size: (int) The expected size in bytes of the data that will be read
  52. * (if known up-front).
  53. *
  54. * @param StreamInterface $buffer Buffer that contains the data that has
  55. * been read by the event loop.
  56. * @param array $config Associative array of options.
  57. *
  58. * @throws \InvalidArgumentException if the buffer is not readable and
  59. * writable.
  60. */
  61. public function __construct(
  62. StreamInterface $buffer,
  63. array $config = []
  64. ) {
  65. if (!$buffer->isReadable() || !$buffer->isWritable()) {
  66. throw new \InvalidArgumentException(
  67. 'Buffer must be readable and writable'
  68. );
  69. }
  70. if (isset($config['size'])) {
  71. $this->size = $config['size'];
  72. }
  73. static $callables = ['pump', 'drain'];
  74. foreach ($callables as $check) {
  75. if (isset($config[$check])) {
  76. if (!is_callable($config[$check])) {
  77. throw new \InvalidArgumentException(
  78. $check . ' must be callable'
  79. );
  80. }
  81. $this->{$check} = $config[$check];
  82. }
  83. }
  84. $this->hwm = $buffer->getMetadata('hwm');
  85. // Cannot drain when there's no high water mark.
  86. if ($this->hwm === null) {
  87. $this->drain = null;
  88. }
  89. $this->stream = $buffer;
  90. }
  91. /**
  92. * Factory method used to create new async stream and an underlying buffer
  93. * if no buffer is provided.
  94. *
  95. * This function accepts the same options as AsyncReadStream::__construct,
  96. * but added the following key value pairs:
  97. *
  98. * - buffer: (StreamInterface) Buffer used to buffer data. If none is
  99. * provided, a default buffer is created.
  100. * - hwm: (int) High water mark to use if a buffer is created on your
  101. * behalf.
  102. * - max_buffer: (int) If provided, wraps the utilized buffer in a
  103. * DroppingStream decorator to ensure that buffer does not exceed a given
  104. * length. When exceeded, the stream will begin dropping data. Set the
  105. * max_buffer to 0, to use a NullStream which does not store data.
  106. * - write: (callable) A function that is invoked when data is written
  107. * to the underlying buffer. The function accepts the buffer as the first
  108. * argument, and the data being written as the second. The function MUST
  109. * return the number of bytes that were written or false to let writers
  110. * know to slow down.
  111. * - drain: (callable) See constructor documentation.
  112. * - pump: (callable) See constructor documentation.
  113. *
  114. * @param array $options Associative array of options.
  115. *
  116. * @return array Returns an array containing the buffer used to buffer
  117. * data, followed by the ready to use AsyncReadStream object.
  118. */
  119. public static function create(array $options = [])
  120. {
  121. $maxBuffer = isset($options['max_buffer'])
  122. ? $options['max_buffer']
  123. : null;
  124. if ($maxBuffer === 0) {
  125. $buffer = new NullStream();
  126. } elseif (isset($options['buffer'])) {
  127. $buffer = $options['buffer'];
  128. } else {
  129. $hwm = isset($options['hwm']) ? $options['hwm'] : 16384;
  130. $buffer = new BufferStream($hwm);
  131. }
  132. if ($maxBuffer > 0) {
  133. $buffer = new DroppingStream($buffer, $options['max_buffer']);
  134. }
  135. // Call the on_write callback if an on_write function was provided.
  136. if (isset($options['write'])) {
  137. $onWrite = $options['write'];
  138. $buffer = FnStream::decorate($buffer, [
  139. 'write' => function ($string) use ($buffer, $onWrite) {
  140. $result = $buffer->write($string);
  141. $onWrite($buffer, $string);
  142. return $result;
  143. }
  144. ]);
  145. }
  146. return [$buffer, new self($buffer, $options)];
  147. }
  148. public function getSize()
  149. {
  150. return $this->size;
  151. }
  152. public function isWritable()
  153. {
  154. return false;
  155. }
  156. public function write($string)
  157. {
  158. return false;
  159. }
  160. public function read($length)
  161. {
  162. if (!$this->needsDrain && $this->drain) {
  163. $this->needsDrain = $this->stream->getSize() >= $this->hwm;
  164. }
  165. $result = $this->stream->read($length);
  166. // If we need to drain, then drain when the buffer is empty.
  167. if ($this->needsDrain && $this->stream->getSize() === 0) {
  168. $this->needsDrain = false;
  169. $drainFn = $this->drain;
  170. $drainFn($this->stream);
  171. }
  172. $resultLen = strlen($result);
  173. // If a pump was provided, the buffer is still open, and not enough
  174. // data was given, then block until the data is provided.
  175. if ($this->pump && $resultLen < $length) {
  176. $pumpFn = $this->pump;
  177. $result .= $pumpFn($length - $resultLen);
  178. }
  179. return $result;
  180. }
  181. }