From 382b0ee64e975a516bd89f78a3c4e7ec0d59a556 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 10:28:49 +0200 Subject: [PATCH 01/10] CS --- src/Server/Process.php | 1 - src/Server/Process/Foreground.php | 1 - src/Server/Process/Logger.php | 6 +++--- src/Server/Processes/Unix.php | 2 -- src/ServerFactory.php | 1 - src/Servers/Unix.php | 1 - tests/Server/Process/BackgroundTest.php | 6 +----- tests/Server/Process/ForegroundTest.php | 7 +------ tests/Server/Processes/RemoteTest.php | 1 - tests/Servers/LoggerTest.php | 1 - tests/Servers/RemoteTest.php | 1 - 11 files changed, 5 insertions(+), 23 deletions(-) diff --git a/src/Server/Process.php b/src/Server/Process.php index 34d7862..867c409 100644 --- a/src/Server/Process.php +++ b/src/Server/Process.php @@ -6,7 +6,6 @@ use Innmind\Server\Control\Server\Process\{ Pid, Output, - ExitCode, TimedOut, Failed, Signaled, diff --git a/src/Server/Process/Foreground.php b/src/Server/Process/Foreground.php index f7f12b8..93cb6af 100644 --- a/src/Server/Process/Foreground.php +++ b/src/Server/Process/Foreground.php @@ -6,7 +6,6 @@ use Innmind\Server\Control\Server\Process; use Innmind\Immutable\{ Sequence, - Str, Maybe, Either, }; diff --git a/src/Server/Process/Logger.php b/src/Server/Process/Logger.php index abf645b..5b6d783 100644 --- a/src/Server/Process/Logger.php +++ b/src/Server/Process/Logger.php @@ -68,18 +68,18 @@ public function wait(): Either ->wait() ->leftMap(function($e) { [$message, $context] = match (\get_class($e)) { - Process\Signaled::class => [ + Signaled::class => [ 'Command {command} stopped due to external signal', ['command' => $this->command->toString()], ], - Process\Failed::class => [ + Failed::class => [ 'Command {command} failed with {exitCode}', [ 'command' => $this->command->toString(), 'exitCode' => $e->exitCode()->toInt(), ], ], - Process\TimedOut::class => [ + TimedOut::class => [ 'Command {command} timed out', ['command' => $this->command->toString()], ], diff --git a/src/Server/Processes/Unix.php b/src/Server/Processes/Unix.php index e431760..14ac5f1 100644 --- a/src/Server/Processes/Unix.php +++ b/src/Server/Processes/Unix.php @@ -15,9 +15,7 @@ }; use Innmind\TimeContinuum\{ Clock, - ElapsedPeriod, Period, - Earth, Earth\Period\Second, }; use Innmind\TimeWarp\Halt; diff --git a/src/ServerFactory.php b/src/ServerFactory.php index b64ab6d..501b94f 100644 --- a/src/ServerFactory.php +++ b/src/ServerFactory.php @@ -9,7 +9,6 @@ }; use Innmind\TimeContinuum\{ Clock, - ElapsedPeriod, Period, }; use Innmind\TimeWarp\Halt; diff --git a/src/Servers/Unix.php b/src/Servers/Unix.php index 2366a63..bc046a5 100644 --- a/src/Servers/Unix.php +++ b/src/Servers/Unix.php @@ -10,7 +10,6 @@ }; use Innmind\TimeContinuum\{ Clock, - ElapsedPeriod, Period, }; use Innmind\TimeWarp\Halt; diff --git a/tests/Server/Process/BackgroundTest.php b/tests/Server/Process/BackgroundTest.php index 017be56..3a4c63b 100644 --- a/tests/Server/Process/BackgroundTest.php +++ b/tests/Server/Process/BackgroundTest.php @@ -13,14 +13,10 @@ }; use Innmind\TimeContinuum\Earth\{ Clock, - ElapsedPeriod, Period\Second, }; use Innmind\TimeWarp\Halt\Usleep; -use Innmind\Stream\{ - Watch\Select, - Streams, -}; +use Innmind\Stream\Streams; use PHPUnit\Framework\TestCase; class BackgroundTest extends TestCase diff --git a/tests/Server/Process/ForegroundTest.php b/tests/Server/Process/ForegroundTest.php index eae9479..c18ab91 100644 --- a/tests/Server/Process/ForegroundTest.php +++ b/tests/Server/Process/ForegroundTest.php @@ -7,7 +7,6 @@ Process\Foreground, Process\Unix, Process, - Process\Pid, Process\Output, Process\Output\Type, Process\Failed, @@ -16,14 +15,10 @@ }; use Innmind\TimeContinuum\Earth\{ Clock, - ElapsedPeriod, Period\Second, }; use Innmind\TimeWarp\Halt\Usleep; -use Innmind\Stream\{ - Watch\Select, - Streams, -}; +use Innmind\Stream\Streams; use Innmind\Immutable\Str; use PHPUnit\Framework\TestCase; diff --git a/tests/Server/Processes/RemoteTest.php b/tests/Server/Processes/RemoteTest.php index 3a8f271..8a3b4b8 100644 --- a/tests/Server/Processes/RemoteTest.php +++ b/tests/Server/Processes/RemoteTest.php @@ -10,7 +10,6 @@ Command, Signal, Process\Pid, - Process\ExitCode, }; use Innmind\Url\{ Path, diff --git a/tests/Servers/LoggerTest.php b/tests/Servers/LoggerTest.php index e78c595..2bba05a 100644 --- a/tests/Servers/LoggerTest.php +++ b/tests/Servers/LoggerTest.php @@ -8,7 +8,6 @@ Server, Server\Processes, Server\Process, - Server\Process\ExitCode, Server\Command, Server\Volumes, }; diff --git a/tests/Servers/RemoteTest.php b/tests/Servers/RemoteTest.php index eba40cd..8d99355 100644 --- a/tests/Servers/RemoteTest.php +++ b/tests/Servers/RemoteTest.php @@ -8,7 +8,6 @@ Server, Server\Processes, Server\Process, - Server\Process\ExitCode, Server\Command, Server\Volumes, }; From 25fe546f5141529a44d586c20455d5c1e64df820 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 10:30:27 +0200 Subject: [PATCH 02/10] throw when the status can't be retrieved --- src/Server/Process/Foreground.php | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/Server/Process/Foreground.php b/src/Server/Process/Foreground.php index 93cb6af..cd9a813 100644 --- a/src/Server/Process/Foreground.php +++ b/src/Server/Process/Foreground.php @@ -3,7 +3,10 @@ namespace Innmind\Server\Control\Server\Process; -use Innmind\Server\Control\Server\Process; +use Innmind\Server\Control\{ + Server\Process, + Exception\RuntimeException, +}; use Innmind\Immutable\{ Sequence, Maybe, @@ -66,16 +69,12 @@ public function wait(): Either $_ = $this->output->foreach(static fn() => null); } + if (\is_null($this->status)) { + throw new RuntimeException('Unable to retrieve the status'); + } + // the status should always be set here because we iterated over the - // output above but we stil coalesce to the wait() call to please psalm - return $this->status ??= $this - ->process - ->wait() - ->map(fn() => new Success($this->output)) - ->leftMap(fn($error) => match ($error) { - 'timed-out' => new TimedOut($this->output), - 'signaled' => new Signaled($this->output), - default => new Failed($error, $this->output), - }); + // output above + return $this->status; } } From 6c8514f946e6f45fbffeb053bf7fec068849dd6e Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 11:46:55 +0200 Subject: [PATCH 03/10] use a property to store the Watch to allow only returning Sequences when reading the pipes --- src/Server/Process/Started.php | 80 ++++++++++++++++------------------ 1 file changed, 37 insertions(+), 43 deletions(-) diff --git a/src/Server/Process/Started.php b/src/Server/Process/Started.php index 9a0245e..5531673 100644 --- a/src/Server/Process/Started.php +++ b/src/Server/Process/Started.php @@ -82,14 +82,6 @@ public function __construct( Maybe $content, ) { $this->clock = $clock; - // We use a short timeout to watch the streams when there is a timeout - // defined on the command to make sure we're as close as possible to the - // defined value without using polling. - // When simply reading the output we can't wait forever as the tests - // hang forever on Linux. - $this->watch = $capabilities - ->watch() - ->timeoutAfter(ElapsedPeriod::of(100)); $this->halt = $halt; $this->grace = $grace; $this->background = $background; @@ -105,6 +97,19 @@ public function __construct( $capabilities->readable()->acquire($pipes[2]), ); $this->input = $capabilities->writable()->acquire($pipes[0]); + // We use a short timeout to watch the streams when there is a timeout + // defined on the command to make sure we're as close as possible to the + // defined value without using polling. + // When simply reading the output we can't wait forever as the tests + // hang forever on Linux. + $this->watch = $capabilities + ->watch() + ->timeoutAfter(ElapsedPeriod::of(100)) + ->forRead( + $this->output, + $this->error, + ) + ->forWrite($this->input); $this->timeout = $timeout; $this->content = $content; $this->pid = new Pid($this->status()['pid']); @@ -140,24 +145,16 @@ public function output(bool $keepOutputWhileWriting = true): \Generator { $this->ensureExecuteOnce(); - $watch = $this - ->watch - ->forRead( - $this->output, - $this->error, - ) - ->forWrite($this->input); - - [$watch, $chunks] = $this->writeInputAndRead($watch, $keepOutputWhileWriting); + $chunks = $this->writeInputAndRead($keepOutputWhileWriting); foreach ($chunks->toList() as $value) { yield $value; } - $watch = $watch->unwatch($this->input); + $this->watch = $this->watch->unwatch($this->input); do { - [$watch, $chunks] = $this->readOnce($watch); + $chunks = $this->readOnce(); foreach ($chunks->toList() as $value) { yield $value; @@ -183,7 +180,7 @@ public function output(bool $keepOutputWhileWriting = true): \Generator while (!$this->background && $this->outputStillOpen()) { // even though the process is no longer running there might stil be // data to be read in the streams - [$watch, $chunks] = $this->readOnce($watch); + $chunks = $this->readOnce(); foreach ($chunks->toList() as $value) { yield $value; @@ -235,12 +232,11 @@ private function status(): array * this process because the output will be kept in memory before being able * to send it back to the caller. This may result in an "out of memory" error * - * @return array{0: Watch, 1: Sequence} + * @return Sequence */ private function writeInputAndRead( - Watch $watch, bool $keepOutputWhileWriting, - ): array { + ): Sequence { return $this ->content ->map(static fn($content) => $content->chunks()) @@ -252,13 +248,12 @@ private function writeInputAndRead( }) ->match( fn($chunks) => $this->writeAndRead( - $watch, $this->input, $chunks, Sequence::of(), $keepOutputWhileWriting, ), - static fn() => [$watch, Sequence::of()], + static fn() => Sequence::of(), ); } @@ -266,25 +261,24 @@ private function writeInputAndRead( * @param Sequence $chunks * @param Sequence $output * - * @return array{0: Watch, 1: Sequence} + * @return Sequence */ private function writeAndRead( - Watch $watch, Writable $stream, Sequence $chunks, Sequence $output, bool $keepOutputWhileWriting, - ): array { - [$watch, $output, $stream] = $chunks + ): Sequence { + [$output, $stream] = $chunks ->map(static fn($chunk) => $chunk->toEncoding(Str\Encoding::ascii)) ->reduce( - [$watch, $output, $stream], + [$output, $stream], function($state, $chunk) use ($keepOutputWhileWriting) { /** * @psalm-suppress MixedAssignment * @psalm-suppress MixedArrayAccess */ - [$watch, $output, $stream] = $state; + [$output, $stream] = $state; // leave the exception here in case we can't write to the // input stream because for now there is no clear way to // handle this case @@ -294,31 +288,31 @@ function($state, $chunk) use ($keepOutputWhileWriting) { // done at this moment for sake of simplicity while the case // has never been encountered $stream = $this - ->waitAvailable($watch, $stream) + ->waitAvailable($stream) ->write($chunk) ->match( static fn($stream) => $stream, static fn($e) => throw new RuntimeException($e::class), ); - [$watch, $read] = $this->readOnce($watch); + $read = $this->readOnce(); if ($keepOutputWhileWriting) { $output = $output->append($read); } - return [$watch, $output, $stream]; + return [$output, $stream]; }, ); $this->closeInput($stream); - return [$watch, $output]; + return $output; } - private function waitAvailable(Watch $watch, Writable $stream): Writable + private function waitAvailable(Writable $stream): Writable { do { /** @var Set */ - $toWrite = $watch()->match( + $toWrite = ($this->watch)()->match( static fn($ready) => $ready->toWrite(), static fn() => Set::of(), ); @@ -373,12 +367,12 @@ private function read(Readable $stream): Str } /** - * @return array{0: Watch, 1: Sequence} + * @return Sequence */ - private function readOnce(Watch $watch): array + private function readOnce(): Sequence { /** @var Set */ - $toRead = $watch()->match( + $toRead = ($this->watch)()->match( static fn($ready) => $ready->toRead(), static fn() => Set::of(), ); @@ -392,12 +386,12 @@ private function readOnce(Watch $watch): array ->filter(static fn($pair) => !$pair[0]->empty()) ->toList(); - $watch = $toRead->reduce( - $watch, + $this->watch = $toRead->reduce( + $this->watch, $this->maybeUnwatch(...), ); - return [$watch, Sequence::of(...$chunks)]; + return Sequence::of(...$chunks); } /** From a11279aee99063935c897fab8309a3d66511779f Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 12:12:34 +0200 Subject: [PATCH 04/10] use a lazy Sequence to return the output read while writing input --- src/Server/Process/Started.php | 48 +++++++++++----------------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/src/Server/Process/Started.php b/src/Server/Process/Started.php index 5531673..d2f7a97 100644 --- a/src/Server/Process/Started.php +++ b/src/Server/Process/Started.php @@ -129,7 +129,7 @@ public function wait(): Either // stream as this data will never be exposed to caller, so by discarding // this data we prevent ourself from reaching a possible "out of memory" // error - $output = $this->output(false); + $output = $this->output(); foreach ($output as $_) { // do nothing with the output @@ -141,11 +141,11 @@ public function wait(): Either /** * @return \Generator> */ - public function output(bool $keepOutputWhileWriting = true): \Generator + public function output(): \Generator { $this->ensureExecuteOnce(); - $chunks = $this->writeInputAndRead($keepOutputWhileWriting); + $chunks = $this->writeInputAndRead(); foreach ($chunks->toList() as $value) { yield $value; @@ -234,9 +234,8 @@ private function status(): array * * @return Sequence */ - private function writeInputAndRead( - bool $keepOutputWhileWriting, - ): Sequence { + private function writeInputAndRead(): Sequence + { return $this ->content ->map(static fn($content) => $content->chunks()) @@ -250,8 +249,6 @@ private function writeInputAndRead( fn($chunks) => $this->writeAndRead( $this->input, $chunks, - Sequence::of(), - $keepOutputWhileWriting, ), static fn() => Sequence::of(), ); @@ -259,26 +256,17 @@ private function writeInputAndRead( /** * @param Sequence $chunks - * @param Sequence $output * * @return Sequence */ private function writeAndRead( Writable $stream, Sequence $chunks, - Sequence $output, - bool $keepOutputWhileWriting, ): Sequence { - [$output, $stream] = $chunks - ->map(static fn($chunk) => $chunk->toEncoding(Str\Encoding::ascii)) - ->reduce( - [$output, $stream], - function($state, $chunk) use ($keepOutputWhileWriting) { - /** - * @psalm-suppress MixedAssignment - * @psalm-suppress MixedArrayAccess - */ - [$output, $stream] = $state; + return Sequence::lazy(function() use ($chunks, $stream) { + yield $chunks + ->map(static fn($chunk) => $chunk->toEncoding(Str\Encoding::ascii)) + ->flatMap(function($chunk) use ($stream) { // leave the exception here in case we can't write to the // input stream because for now there is no clear way to // handle this case @@ -287,25 +275,19 @@ function($state, $chunk) use ($keepOutputWhileWriting) { // the part of the chunk that hasn't be written. This is not // done at this moment for sake of simplicity while the case // has never been encountered - $stream = $this + $_ = $this ->waitAvailable($stream) ->write($chunk) ->match( static fn($stream) => $stream, static fn($e) => throw new RuntimeException($e::class), ); - $read = $this->readOnce(); - - if ($keepOutputWhileWriting) { - $output = $output->append($read); - } - - return [$output, $stream]; - }, - ); - $this->closeInput($stream); - return $output; + return $this->readOnce(); + }); + $this->closeInput($stream); + }) + ->flatMap(static fn($chunks) => $chunks); } private function waitAvailable(Writable $stream): Writable From 1db226448304caeaa2306029a5dd7da6d29d64a6 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 12:14:00 +0200 Subject: [PATCH 05/10] use the property when closing the input --- src/Server/Process/Started.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Server/Process/Started.php b/src/Server/Process/Started.php index d2f7a97..f853dee 100644 --- a/src/Server/Process/Started.php +++ b/src/Server/Process/Started.php @@ -240,7 +240,7 @@ private function writeInputAndRead(): Sequence ->content ->map(static fn($content) => $content->chunks()) ->otherwise(function() { - $this->closeInput($this->input); + $this->closeInput(); /** @var Maybe> */ return Maybe::nothing(); @@ -285,7 +285,7 @@ private function writeAndRead( return $this->readOnce(); }); - $this->closeInput($stream); + $this->closeInput(); }) ->flatMap(static fn($chunks) => $chunks); } @@ -303,14 +303,14 @@ private function waitAvailable(Writable $stream): Writable return $stream; } - private function closeInput(Writable $input): void + private function closeInput(): void { // we crash the app if we fail to close the input stream be cause the // underlying process receiving the input may not behave correctly, in // some cases this could result on this process hanging forever // there is no way to recover safely from unpredictable behaviour so it's // better to stop everything - $_ = $input->close()->match( + $_ = $this->input->close()->match( static fn() => null, // closed correctly static fn() => throw new RuntimeException('Failed to close input stream'), ); From 1089b2784f6d0f3a183555a92bc19946c95f2029 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 13:40:25 +0200 Subject: [PATCH 06/10] allow to access the output chunks while writing the input stream --- src/Server/Process/Background.php | 2 +- src/Server/Process/Foreground.php | 36 +++++---- src/Server/Process/Started.php | 119 +++++++++++++++--------------- tests/Server/Process/UnixTest.php | 10 +-- 4 files changed, 87 insertions(+), 80 deletions(-) diff --git a/src/Server/Process/Background.php b/src/Server/Process/Background.php index eff2740..5d4976b 100644 --- a/src/Server/Process/Background.php +++ b/src/Server/Process/Background.php @@ -20,7 +20,7 @@ public function __construct(Started $process) // wait for the process to be started in the background otherwise the // process will be killed // this also allows to send any input to the stream - $process->wait(); + $process->output()->memoize(); /** @var Sequence */ $output = Sequence::of(); $this->output = new Output\Output($output); diff --git a/src/Server/Process/Foreground.php b/src/Server/Process/Foreground.php index cd9a813..1acce9d 100644 --- a/src/Server/Process/Foreground.php +++ b/src/Server/Process/Foreground.php @@ -24,26 +24,32 @@ public function __construct(Started $process, bool $streamOutput = false) { $this->process = $process; $yieldOutput = function() use ($process): \Generator { - $output = $process->output(); + yield $process + ->output() + ->map(function($chunk) { + if ($chunk instanceof Either) { + $this->status = $chunk + ->map(fn() => new Success($this->output)) + ->leftMap(fn($error) => match ($error) { + 'timed-out' => new TimedOut($this->output), + 'signaled' => new Signaled($this->output), + default => new Failed($error, $this->output), + }); + } - foreach ($output as $chunk) { - yield $chunk; - } - - $this->status = $output - ->getReturn() - ->map(fn() => new Success($this->output)) - ->leftMap(fn($error) => match ($error) { - 'timed-out' => new TimedOut($this->output), - 'signaled' => new Signaled($this->output), - default => new Failed($error, $this->output), - }); + return $chunk; + }) + ->filter(\is_array(...)); }; if ($streamOutput) { - $output = Sequence::lazy($yieldOutput); + $output = Sequence::lazy($yieldOutput)->flatMap( + static fn($chunks) => $chunks, + ); } else { - $output = Sequence::defer($yieldOutput()); + $output = Sequence::defer($yieldOutput())->flatMap( + static fn($chunks) => $chunks, + ); } $this->output = new Output\Output($output); diff --git a/src/Server/Process/Started.php b/src/Server/Process/Started.php index f853dee..ce3d68a 100644 --- a/src/Server/Process/Started.php +++ b/src/Server/Process/Started.php @@ -34,6 +34,7 @@ Either, SideEffect, Set, + Predicate\Instance, }; /** @@ -129,89 +130,89 @@ public function wait(): Either // stream as this data will never be exposed to caller, so by discarding // this data we prevent ourself from reaching a possible "out of memory" // error - $output = $this->output(); - - foreach ($output as $_) { - // do nothing with the output - } - - return $output->getReturn(); + /** @var Either */ + return $this + ->output() + ->last() + ->keep(Instance::of(Either::class)) + ->match( + static fn($return) => $return, + static fn() => throw new RuntimeException('Unable to retrieve process result'), + ); } /** - * @return \Generator> + * @return Sequence> */ - public function output(): \Generator + public function output(): Sequence { $this->ensureExecuteOnce(); - $chunks = $this->writeInputAndRead(); + return Sequence::lazy(function() { + yield $this->writeInputAndRead(); - foreach ($chunks->toList() as $value) { - yield $value; - } + $this->watch = $this->watch->unwatch($this->input); - $this->watch = $this->watch->unwatch($this->input); + do { + yield $this->readOnce(); - do { - $chunks = $this->readOnce(); + $timedOut = $this->checkTimeout()->match( + static fn() => true, + static fn() => false, + ); - foreach ($chunks->toList() as $value) { - yield $value; - } + if ($timedOut) { + /** @var Sequence> */ + yield Sequence::of(Either::left($this->abort())); - $timedOut = $this->checkTimeout()->match( - static fn() => true, - static fn() => false, - ); + return; + } - if ($timedOut) { - /** @var Either */ - return Either::left($this->abort()); - } + $status = $this->status(); + } while ($status['running']); - $status = $this->status(); - } while ($status['running']); + // we don't read the remaining data in the streams for background + // processes because it will hang until the concrete process is really + // finished, thus defeating the purpose of launching the process in the + // background + while (!$this->background && $this->outputStillOpen()) { + // even though the process is no longer running there might stil be + // data to be read in the streams + $chunks = $this->readOnce(); - // we don't read the remaining data in the streams for background - // processes because it will hang until the concrete process is really - // finished, thus defeating the purpose of launching the process in the - // background - while (!$this->background && $this->outputStillOpen()) { - // even though the process is no longer running there might stil be - // data to be read in the streams - $chunks = $this->readOnce(); + yield $chunks; - foreach ($chunks->toList() as $value) { - yield $value; - } + if ($chunks->empty()) { + // do not try to continue reading the streams when no output + // otherwise for commands like "tail -f" it will run forever + break; + } - if ($chunks->empty()) { - // do not try to continue reading the streams when no output - // otherwise for commands like "tail -f" it will run forever - break; + // no need to check for timeouts here since the process is no longer + // running } - // no need to check for timeouts here since the process is no longer - // running - } + $this->close(); - $this->close(); + if ($status['signaled'] || $status['stopped']) { + /** @var Sequence> */ + yield Sequence::of(Either::left('signaled')); - if ($status['signaled'] || $status['stopped']) { - /** @var Either */ - return Either::left('signaled'); - } + return; + } - $exitCode = new ExitCode($status['exitcode']); + $exitCode = new ExitCode($status['exitcode']); - if (!$exitCode->successful()) { - /** @var Either */ - return Either::left($exitCode); - } + if (!$exitCode->successful()) { + /** @var Sequence> */ + yield Sequence::of(Either::left($exitCode)); - /** @var Either */ - return Either::right(new SideEffect); + return; + } + + /** @var Sequence> */ + yield Sequence::of(Either::right(new SideEffect)); + })->flatMap(static fn($chunks) => $chunks); } /** diff --git a/tests/Server/Process/UnixTest.php b/tests/Server/Process/UnixTest.php index a01ee4a..67ac7d0 100644 --- a/tests/Server/Process/UnixTest.php +++ b/tests/Server/Process/UnixTest.php @@ -46,7 +46,7 @@ public function testSimpleOutput() $count = 0; $process = $cat(); - foreach ($process->output() as [$value, $type]) { + foreach ($process->output()->filter(\is_array(...))->toList() as [$value, $type]) { $this->assertSame(Type::output, $type); $this->assertSame("hello\n", $value->toString()); ++$count; @@ -74,7 +74,7 @@ public function testOutput() $process = $cat(); $output = ''; - foreach ($process->output() as [$value, $type]) { + foreach ($process->output()->filter(\is_array(...))->toList() as [$value, $type]) { $output .= $value->toString(); } @@ -100,7 +100,7 @@ public function testSlowOutput() $this->assertGreaterThanOrEqual(2, $process->pid()->toInt()); - foreach ($process->output() as [$chunk, $type]) { + foreach ($process->output()->filter(\is_array(...))->toList() as [$chunk, $type]) { $output .= $chunk->toString(); $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $type); ++$count; @@ -128,7 +128,7 @@ public function testTimeoutSlowOutput() $this->assertGreaterThanOrEqual(2, $process->pid()->toInt()); - foreach ($process->output() as [$chunk, $type]) { + foreach ($process->output()->filter(\is_array(...))->toList() as [$chunk, $type]) { $output .= $chunk->toString(); $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $type); ++$count; @@ -226,7 +226,7 @@ public function testWithInput() ); $output = ''; - foreach ($cat()->output() as [$value]) { + foreach ($cat()->output()->filter(\is_array(...))->toList() as [$value]) { $output .= $value->toString(); } From 5a00a08b3a6656bbf5b6c2c87c346554fc8e3b40 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 13:58:18 +0200 Subject: [PATCH 07/10] use a Chunk to wrap data and type instead of using arrays --- src/Server/Process/Foreground.php | 6 +++-- src/Server/Process/Output/Chunk.php | 36 ++++++++++++++++++++++++++++ src/Server/Process/Output/Output.php | 11 +++++++++ src/Server/Process/Started.php | 21 +++++++--------- tests/Server/Process/UnixTest.php | 32 ++++++++++++++----------- 5 files changed, 78 insertions(+), 28 deletions(-) create mode 100644 src/Server/Process/Output/Chunk.php diff --git a/src/Server/Process/Foreground.php b/src/Server/Process/Foreground.php index 1acce9d..600a58c 100644 --- a/src/Server/Process/Foreground.php +++ b/src/Server/Process/Foreground.php @@ -5,12 +5,14 @@ use Innmind\Server\Control\{ Server\Process, + Server\Process\Output\Chunk, Exception\RuntimeException, }; use Innmind\Immutable\{ Sequence, Maybe, Either, + Predicate\Instance, }; final class Foreground implements Process @@ -39,7 +41,7 @@ public function __construct(Started $process, bool $streamOutput = false) return $chunk; }) - ->filter(\is_array(...)); + ->keep(Instance::of(Chunk::class)); }; if ($streamOutput) { @@ -52,7 +54,7 @@ public function __construct(Started $process, bool $streamOutput = false) ); } - $this->output = new Output\Output($output); + $this->output = Output\Output::of($output); } public function pid(): Maybe diff --git a/src/Server/Process/Output/Chunk.php b/src/Server/Process/Output/Chunk.php new file mode 100644 index 0000000..77037d3 --- /dev/null +++ b/src/Server/Process/Output/Chunk.php @@ -0,0 +1,36 @@ +data; + } + + public function type(): Type + { + return $this->type; + } +} diff --git a/src/Server/Process/Output/Output.php b/src/Server/Process/Output/Output.php index b19cb7e..6f3a9f6 100644 --- a/src/Server/Process/Output/Output.php +++ b/src/Server/Process/Output/Output.php @@ -27,6 +27,17 @@ public function __construct(Sequence $output) $this->output = $output; } + /** + * @param Sequence $chunks + */ + public static function of(Sequence $chunks): self + { + return new self($chunks->map(static fn($chunk) => [ + $chunk->data(), + $chunk->type(), + ])); + } + /** * @param callable(Str, Type): void $function */ diff --git a/src/Server/Process/Started.php b/src/Server/Process/Started.php index ce3d68a..7739637 100644 --- a/src/Server/Process/Started.php +++ b/src/Server/Process/Started.php @@ -4,15 +4,13 @@ namespace Innmind\Server\Control\Server\Process; use Innmind\Server\Control\{ + Server\Process\Output\Chunk, Server\Process\Output\Type, Server\Second, Server\Signal, Exception\RuntimeException, }; -use Innmind\Filesystem\{ - File\Content, - Chunk, -}; +use Innmind\Filesystem\File\Content; use Innmind\TimeContinuum\{ Clock, PointInTime, @@ -142,7 +140,7 @@ public function wait(): Either } /** - * @return Sequence> + * @return Sequence> */ public function output(): Sequence { @@ -233,7 +231,7 @@ private function status(): array * this process because the output will be kept in memory before being able * to send it back to the caller. This may result in an "out of memory" error * - * @return Sequence + * @return Sequence */ private function writeInputAndRead(): Sequence { @@ -258,7 +256,7 @@ private function writeInputAndRead(): Sequence /** * @param Sequence $chunks * - * @return Sequence + * @return Sequence */ private function writeAndRead( Writable $stream, @@ -350,7 +348,7 @@ private function read(Readable $stream): Str } /** - * @return Sequence + * @return Sequence */ private function readOnce(): Sequence { @@ -360,13 +358,12 @@ private function readOnce(): Sequence static fn() => Set::of(), ); - /** @var list */ $chunks = $toRead ->map(fn($stream) => match ($stream) { - $this->output => [$this->read($stream), Type::output], - $this->error => [$this->read($stream), Type::error], + $this->output => Chunk::of($this->read($stream), Type::output), + $this->error => Chunk::of($this->read($stream), Type::error), }) - ->filter(static fn($pair) => !$pair[0]->empty()) + ->filter(static fn($chunk) => !$chunk->data()->empty()) ->toList(); $this->watch = $toRead->reduce( diff --git a/tests/Server/Process/UnixTest.php b/tests/Server/Process/UnixTest.php index 67ac7d0..f953f60 100644 --- a/tests/Server/Process/UnixTest.php +++ b/tests/Server/Process/UnixTest.php @@ -5,6 +5,7 @@ use Innmind\Server\Control\{ Server\Process\Unix, + Server\Process\Output\Chunk, Server\Process\Output\Type, Server\Process\ExitCode, Server\Command, @@ -23,7 +24,10 @@ Streams, Watch\Select, }; -use Innmind\Immutable\SideEffect; +use Innmind\Immutable\{ + SideEffect, + Predicate\Instance, +}; use PHPUnit\Framework\TestCase; use Innmind\BlackBox\{ PHPUnit\BlackBox, @@ -46,9 +50,9 @@ public function testSimpleOutput() $count = 0; $process = $cat(); - foreach ($process->output()->filter(\is_array(...))->toList() as [$value, $type]) { - $this->assertSame(Type::output, $type); - $this->assertSame("hello\n", $value->toString()); + foreach ($process->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $this->assertSame(Type::output, $chunk->type()); + $this->assertSame("hello\n", $chunk->data()->toString()); ++$count; } @@ -74,8 +78,8 @@ public function testOutput() $process = $cat(); $output = ''; - foreach ($process->output()->filter(\is_array(...))->toList() as [$value, $type]) { - $output .= $value->toString(); + foreach ($process->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $output .= $chunk->data()->toString(); } $this->assertSame("$echo\n", $output); @@ -100,9 +104,9 @@ public function testSlowOutput() $this->assertGreaterThanOrEqual(2, $process->pid()->toInt()); - foreach ($process->output()->filter(\is_array(...))->toList() as [$chunk, $type]) { - $output .= $chunk->toString(); - $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $type); + foreach ($process->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $output .= $chunk->data()->toString(); + $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $chunk->type()); ++$count; } @@ -128,9 +132,9 @@ public function testTimeoutSlowOutput() $this->assertGreaterThanOrEqual(2, $process->pid()->toInt()); - foreach ($process->output()->filter(\is_array(...))->toList() as [$chunk, $type]) { - $output .= $chunk->toString(); - $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $type); + foreach ($process->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $output .= $chunk->data()->toString(); + $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $chunk->type()); ++$count; } @@ -226,8 +230,8 @@ public function testWithInput() ); $output = ''; - foreach ($cat()->output()->filter(\is_array(...))->toList() as [$value]) { - $output .= $value->toString(); + foreach ($cat()->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $output .= $chunk->data()->toString(); } $this->assertSame( From 4b2132116bcb9bf3d9e44b9fce89e3f3ec25ee87 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 14:04:05 +0200 Subject: [PATCH 08/10] =?UTF-8?q?suppression=20m=C3=A9thode=20uniquement?= =?UTF-8?q?=20utilis=C3=A9e=20dans=20les=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Server/Process/Started.php | 21 ------------- tests/Server/Process/UnixTest.php | 52 +++++++++++++++++++++---------- 2 files changed, 36 insertions(+), 37 deletions(-) diff --git a/src/Server/Process/Started.php b/src/Server/Process/Started.php index 7739637..eac00e5 100644 --- a/src/Server/Process/Started.php +++ b/src/Server/Process/Started.php @@ -32,7 +32,6 @@ Either, SideEffect, Set, - Predicate\Instance, }; /** @@ -119,26 +118,6 @@ public function pid(): Pid return $this->pid; } - /** - * @return Either - */ - public function wait(): Either - { - // we don't need to keep the output read while writing to the input - // stream as this data will never be exposed to caller, so by discarding - // this data we prevent ourself from reaching a possible "out of memory" - // error - /** @var Either */ - return $this - ->output() - ->last() - ->keep(Instance::of(Either::class)) - ->match( - static fn($return) => $return, - static fn() => throw new RuntimeException('Unable to retrieve process result'), - ); - } - /** * @return Sequence> */ diff --git a/tests/Server/Process/UnixTest.php b/tests/Server/Process/UnixTest.php index f953f60..df2e562 100644 --- a/tests/Server/Process/UnixTest.php +++ b/tests/Server/Process/UnixTest.php @@ -167,10 +167,15 @@ public function testTimeoutWaitSlowProcess() $started = \microtime(true); $this->assertGreaterThanOrEqual(2, $process->pid()->toInt()); - $e = $process->wait()->match( - static fn() => null, - static fn($e) => $e, - ); + $e = $process + ->output() + ->last() + ->either() + ->flatMap(static fn($result) => $result) + ->match( + static fn() => null, + static fn($e) => $e, + ); $this->assertSame('timed-out', $e); // 3 because of the grace period $this->assertEqualsWithDelta(3, \microtime(true) - $started, 0.5); @@ -186,10 +191,15 @@ public function testWaitSuccess() Command::foreground('echo')->withArgument('hello'), ); - $value = $cat()->wait()->match( - static fn($value) => $value, - static fn() => null, - ); + $value = $cat() + ->output() + ->last() + ->either() + ->flatMap(static fn($result) => $result) + ->match( + static fn($value) => $value, + static fn() => null, + ); $this->assertInstanceOf(SideEffect::class, $value); } @@ -206,10 +216,15 @@ public function testWaitFail() ->withEnvironment('PATH', $_SERVER['PATH']), ); - $value = $cat()->wait()->match( - static fn() => null, - static fn($e) => $e, - ); + $value = $cat() + ->output() + ->last() + ->either() + ->flatMap(static fn($result) => $result) + ->match( + static fn() => null, + static fn($e) => $e, + ); $this->assertInstanceOf(ExitCode::class, $value); $this->assertSame(1, $value->toInt()); @@ -257,10 +272,15 @@ public function testOverwrite() ->overwrite(Path::of('test.log')), ); - $value = $cat()->wait()->match( - static fn($value) => $value, - static fn() => null, - ); + $value = $cat() + ->output() + ->last() + ->either() + ->flatMap(static fn($result) => $result) + ->match( + static fn($value) => $value, + static fn() => null, + ); $this->assertInstanceOf(SideEffect::class, $value); $this->assertSame( From 8412f49e90d6c29c437d87cf4b6712db980c4353 Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 14:08:57 +0200 Subject: [PATCH 09/10] update changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d2adb4..c32f943 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## [Unreleased] + +### Fixed + +- Output generated while writing the input is now directly available (previously it was when the whole input was written) + ## 5.2.1 - 2023-11-11 ### Fixed From de4943f65ba0c87dcfe7d0464c275315cdf2e4ed Mon Sep 17 00:00:00 2001 From: Baptiste Langlade Date: Thu, 25 Jul 2024 14:30:55 +0200 Subject: [PATCH 10/10] specify next release --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c32f943..4067a3f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## [Unreleased] +## 5.2.2 - 2024-07-25 ### Fixed