1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
<?php
namespace React\Promise\Stream;
use Evenement\EventEmitter;
use InvalidArgumentException;
use React\Promise\PromiseInterface;
use React\Stream\ReadableStreamInterface;
use React\Stream\ThroughStream;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;
/**
* @internal
* @see unwrapReadable() instead
*/
class UnwrapReadableStream extends EventEmitter implements ReadableStreamInterface
{
private $promise;
private $closed = false;
/**
* Instantiate new unwrapped readable stream for given `Promise` which resolves with a `ReadableStreamInterface`.
*
* @param PromiseInterface<ReadableStreamInterface> $promise
*/
public function __construct(PromiseInterface $promise)
{
$out = $this;
$closed =& $this->closed;
$this->promise = $promise->then(
function ($stream) {
if (!$stream instanceof ReadableStreamInterface) {
throw new InvalidArgumentException('Not a readable stream');
}
return $stream;
}
)->then(
function (ReadableStreamInterface $stream) use ($out, &$closed) {
// stream is already closed, make sure to close output stream
if (!$stream->isReadable()) {
$out->close();
return $stream;
}
// resolves but output is already closed, make sure to close stream silently
if ($closed) {
$stream->close();
return $stream;
}
// stream any writes into output stream
$stream->on('data', function ($data) use ($out) {
$out->emit('data', array($data, $out));
});
// forward end events and close
$stream->on('end', function () use ($out, &$closed) {
if (!$closed) {
$out->emit('end', array($out));
$out->close();
}
});
// error events cancel output stream
$stream->on('error', function ($error) use ($out) {
$out->emit('error', array($error, $out));
$out->close();
});
// close both streams once either side closes
$stream->on('close', array($out, 'close'));
$out->on('close', array($stream, 'close'));
return $stream;
},
function ($e) use ($out, &$closed) {
// Forward exception as error event if not already closed
if (!$closed) {
$out->emit('error', array($e, $out));
$out->close();
}
// Both resume() and pause() may attach to this promise, so
// return a NOOP stream instance here.
$stream = new ThroughStream();
$stream->close();
return $stream;
}
);
}
public function isReadable()
{
return !$this->closed;
}
public function pause()
{
if ($this->promise !== null) {
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->pause();
});
}
}
public function resume()
{
if ($this->promise !== null) {
$this->promise->then(function (ReadableStreamInterface $stream) {
$stream->resume();
});
}
}
public function pipe(WritableStreamInterface $dest, array $options = array())
{
Util::pipe($this, $dest, $options);
return $dest;
}
public function close()
{
if ($this->closed) {
return;
}
$this->closed = true;
// try to cancel promise once the stream closes
if ($this->promise !== null && \method_exists($this->promise, 'cancel')) {
$this->promise->cancel();
}
$this->promise = null;
$this->emit('close');
$this->removeAllListeners();
}
}
|