Skip to content

Commit

Permalink
Merge pull request #178 from clue-labs/types
Browse files Browse the repository at this point in the history
Add native types to public API
  • Loading branch information
WyriHaximus authored May 19, 2024
2 parents 212e923 + 74dd763 commit cf107f5
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 71 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ See also `pause()`.

#### pipe()

The `pipe(WritableStreamInterface $dest, array $options = [])` method can be used to
The `pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface` method can be used to
pipe all the data from this readable source into the given writable destination.

Automatically sends all incoming data to the destination.
Expand Down
16 changes: 8 additions & 8 deletions src/CompositeStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ public function __construct(ReadableStreamInterface $readable, WritableStreamInt
$this->writable->on('close', [$this, 'close']);
}

public function isReadable()
public function isReadable(): bool
{
return $this->readable->isReadable();
}

public function pause()
public function pause(): void
{
$this->readable->pause();
}

public function resume()
public function resume(): void
{
if (!$this->writable->isWritable()) {
return;
Expand All @@ -46,28 +46,28 @@ public function resume()
$this->readable->resume();
}

public function pipe(WritableStreamInterface $dest, array $options = [])
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface
{
return Util::pipe($this, $dest, $options);
}

public function isWritable()
public function isWritable(): bool
{
return $this->writable->isWritable();
}

public function write($data)
public function write($data): bool
{
return $this->writable->write($data);
}

public function end($data = null)
public function end($data = null): void
{
$this->readable->pause();
$this->writable->end($data);
}

public function close()
public function close(): void
{
if ($this->closed) {
return;
Expand Down
20 changes: 10 additions & 10 deletions src/DuplexResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ final class DuplexResourceStream extends EventEmitter implements DuplexStreamInt
* @param ?int $readChunkSize
* @param ?WritableStreamInterface $buffer
*/
public function __construct($stream, ?LoopInterface $loop = null, $readChunkSize = null, ?WritableStreamInterface $buffer = null)
public function __construct($stream, ?LoopInterface $loop = null, ?int $readChunkSize = null, ?WritableStreamInterface $buffer = null)
{
if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
throw new InvalidArgumentException('First parameter must be a valid stream resource');
Expand Down Expand Up @@ -75,7 +75,7 @@ public function __construct($stream, ?LoopInterface $loop = null, $readChunkSize

$this->stream = $stream;
$this->loop = $loop ?: Loop::get();
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
$this->bufferSize = $readChunkSize ?? 65536;
$this->buffer = $buffer;

$this->buffer->on('error', function ($error) {
Expand All @@ -91,33 +91,33 @@ public function __construct($stream, ?LoopInterface $loop = null, $readChunkSize
$this->resume();
}

public function isReadable()
public function isReadable(): bool
{
return $this->readable;
}

public function isWritable()
public function isWritable(): bool
{
return $this->writable;
}

public function pause()
public function pause(): void
{
if ($this->listening) {
$this->loop->removeReadStream($this->stream);
$this->listening = false;
}
}

public function resume()
public function resume(): void
{
if (!$this->listening && $this->readable) {
$this->loop->addReadStream($this->stream, [$this, 'handleData']);
$this->listening = true;
}
}

public function write($data)
public function write($data): bool
{
if (!$this->writable) {
return false;
Expand All @@ -126,7 +126,7 @@ public function write($data)
return $this->buffer->write($data);
}

public function close()
public function close(): void
{
if (!$this->writable && !$this->closing) {
return;
Expand All @@ -147,7 +147,7 @@ public function close()
}
}

public function end($data = null)
public function end($data = null): void
{
if (!$this->writable) {
return;
Expand All @@ -162,7 +162,7 @@ public function end($data = null)
$this->buffer->end($data);
}

public function pipe(WritableStreamInterface $dest, array $options = [])
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface
{
return Util::pipe($this, $dest, $options);
}
Expand Down
14 changes: 7 additions & 7 deletions src/ReadableResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ final class ReadableResourceStream extends EventEmitter implements ReadableStrea
* @param ?LoopInterface $loop
* @param ?int $readChunkSize
*/
public function __construct($stream, ?LoopInterface $loop = null, $readChunkSize = null)
public function __construct($stream, ?LoopInterface $loop = null, ?int $readChunkSize = null)
{
if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
throw new InvalidArgumentException('First parameter must be a valid stream resource');
Expand All @@ -72,38 +72,38 @@ public function __construct($stream, ?LoopInterface $loop = null, $readChunkSize

$this->stream = $stream;
$this->loop = $loop ?: Loop::get();
$this->bufferSize = ($readChunkSize === null) ? 65536 : (int)$readChunkSize;
$this->bufferSize = $readChunkSize ?? 65536;

$this->resume();
}

public function isReadable()
public function isReadable(): bool
{
return !$this->closed;
}

public function pause()
public function pause(): void
{
if ($this->listening) {
$this->loop->removeReadStream($this->stream);
$this->listening = false;
}
}

public function resume()
public function resume(): void
{
if (!$this->listening && !$this->closed) {
$this->loop->addReadStream($this->stream, [$this, 'handleData']);
$this->listening = true;
}
}

public function pipe(WritableStreamInterface $dest, array $options = [])
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface
{
return Util::pipe($this, $dest, $options);
}

public function close()
public function close(): void
{
if ($this->closed) {
return;
Expand Down
10 changes: 5 additions & 5 deletions src/ReadableStreamInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ interface ReadableStreamInterface extends EventEmitterInterface
*
* @return bool
*/
public function isReadable();
public function isReadable(): bool;

/**
* Pauses reading incoming data events.
Expand Down Expand Up @@ -226,7 +226,7 @@ public function isReadable();
* @see self::resume()
* @return void
*/
public function pause();
public function pause(): void;

/**
* Resumes reading incoming data events.
Expand All @@ -247,7 +247,7 @@ public function pause();
* @see self::pause()
* @return void
*/
public function resume();
public function resume(): void;

/**
* Pipes all the data from this readable source into the given writable destination.
Expand Down Expand Up @@ -322,7 +322,7 @@ public function resume();
* @param array $options
* @return WritableStreamInterface $dest stream as-is
*/
public function pipe(WritableStreamInterface $dest, array $options = []);
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface;

/**
* Closes the stream (forcefully).
Expand Down Expand Up @@ -358,5 +358,5 @@ public function pipe(WritableStreamInterface $dest, array $options = []);
* @return void
* @see WritableStreamInterface::close()
*/
public function close();
public function close(): void;
}
22 changes: 9 additions & 13 deletions src/ThroughStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,18 @@ final class ThroughStream extends EventEmitter implements DuplexStreamInterface
private $drain = false;
private $callback;

public function __construct($callback = null)
public function __construct(?callable $callback = null)
{
if ($callback !== null && !\is_callable($callback)) {
throw new InvalidArgumentException('Invalid transformation callback given');
}

$this->callback = $callback;
}

public function pause()
public function pause(): void
{
// only allow pause if still readable, false otherwise
$this->paused = $this->readable;
}

public function resume()
public function resume(): void
{
$this->paused = false;

Expand All @@ -108,22 +104,22 @@ public function resume()
}
}

public function pipe(WritableStreamInterface $dest, array $options = [])
public function pipe(WritableStreamInterface $dest, array $options = []): WritableStreamInterface
{
return Util::pipe($this, $dest, $options);
}

public function isReadable()
public function isReadable(): bool
{
return $this->readable;
}

public function isWritable()
public function isWritable(): bool
{
return $this->writable;
}

public function write($data)
public function write($data): bool
{
if (!$this->writable) {
return false;
Expand Down Expand Up @@ -151,7 +147,7 @@ public function write($data)
return $this->writable && !$this->paused;
}

public function end($data = null)
public function end($data = null): void
{
if (!$this->writable) {
return;
Expand All @@ -175,7 +171,7 @@ public function end($data = null)
$this->close();
}

public function close()
public function close(): void
{
if ($this->closed) {
return;
Expand Down
10 changes: 8 additions & 2 deletions src/Util.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ final class Util
* @return WritableStreamInterface $dest stream as-is
* @see ReadableStreamInterface::pipe() for more details
*/
public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = [])
public static function pipe(ReadableStreamInterface $source, WritableStreamInterface $dest, array $options = []): WritableStreamInterface
{
// source not readable => NO-OP
if (!$source->isReadable()) {
Expand Down Expand Up @@ -64,7 +64,13 @@ public static function pipe(ReadableStreamInterface $source, WritableStreamInter
return $dest;
}

public static function forwardEvents($source, $target, array $events)
/**
* @param ReadableStreamInterface|WritableStreamInterface $source
* @param ReadableStreamInterface|WritableStreamInterface $target
* @param string[] $events
* @return void
*/
public static function forwardEvents($source, $target, array $events): void
{
foreach ($events as $event) {
$source->on($event, function () use ($event, $target) {
Expand Down
14 changes: 7 additions & 7 deletions src/WritableResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ final class WritableResourceStream extends EventEmitter implements WritableStrea
* @param ?int $writeBufferSoftLimit
* @param ?int $writeChunkSize
*/
public function __construct($stream, ?LoopInterface $loop = null, $writeBufferSoftLimit = null, $writeChunkSize = null)
public function __construct($stream, ?LoopInterface $loop = null, ?int $writeBufferSoftLimit = null, ?int $writeChunkSize = null)
{
if (!\is_resource($stream) || \get_resource_type($stream) !== "stream") {
throw new \InvalidArgumentException('First parameter must be a valid stream resource');
Expand All @@ -54,16 +54,16 @@ public function __construct($stream, ?LoopInterface $loop = null, $writeBufferSo

$this->stream = $stream;
$this->loop = $loop ?: Loop::get();
$this->softLimit = ($writeBufferSoftLimit === null) ? 65536 : (int)$writeBufferSoftLimit;
$this->writeChunkSize = ($writeChunkSize === null) ? -1 : (int)$writeChunkSize;
$this->softLimit = $writeBufferSoftLimit ?? 65536;
$this->writeChunkSize = $writeChunkSize ?? -1;
}

public function isWritable()
public function isWritable(): bool
{
return $this->writable;
}

public function write($data)
public function write($data): bool
{
if (!$this->writable) {
return false;
Expand All @@ -80,7 +80,7 @@ public function write($data)
return !isset($this->data[$this->softLimit - 1]);
}

public function end($data = null)
public function end($data = null): void
{
if (null !== $data) {
$this->write($data);
Expand All @@ -95,7 +95,7 @@ public function end($data = null)
}
}

public function close()
public function close(): void
{
if ($this->closed) {
return;
Expand Down
Loading

0 comments on commit cf107f5

Please sign in to comment.