diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d2adb4..c32f943 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## [Unreleased] + +### Fixed + +- Output generated while writing the input is now directly available (previously it was when the whole input was written) + ## 5.2.1 - 2023-11-11 ### Fixed diff --git a/src/Server/Process.php b/src/Server/Process.php index 34d7862..867c409 100644 --- a/src/Server/Process.php +++ b/src/Server/Process.php @@ -6,7 +6,6 @@ use Innmind\Server\Control\Server\Process\{ Pid, Output, - ExitCode, TimedOut, Failed, Signaled, diff --git a/src/Server/Process/Background.php b/src/Server/Process/Background.php index eff2740..5d4976b 100644 --- a/src/Server/Process/Background.php +++ b/src/Server/Process/Background.php @@ -20,7 +20,7 @@ public function __construct(Started $process) // wait for the process to be started in the background otherwise the // process will be killed // this also allows to send any input to the stream - $process->wait(); + $process->output()->memoize(); /** @var Sequence */ $output = Sequence::of(); $this->output = new Output\Output($output); diff --git a/src/Server/Process/Foreground.php b/src/Server/Process/Foreground.php index f7f12b8..600a58c 100644 --- a/src/Server/Process/Foreground.php +++ b/src/Server/Process/Foreground.php @@ -3,12 +3,16 @@ namespace Innmind\Server\Control\Server\Process; -use Innmind\Server\Control\Server\Process; +use Innmind\Server\Control\{ + Server\Process, + Server\Process\Output\Chunk, + Exception\RuntimeException, +}; use Innmind\Immutable\{ Sequence, - Str, Maybe, Either, + Predicate\Instance, }; final class Foreground implements Process @@ -22,29 +26,35 @@ public function __construct(Started $process, bool $streamOutput = false) { $this->process = $process; $yieldOutput = function() use ($process): \Generator { - $output = $process->output(); - - foreach ($output as $chunk) { - yield $chunk; - } + yield $process + ->output() + ->map(function($chunk) { + if ($chunk instanceof Either) { + $this->status = $chunk + ->map(fn() => new Success($this->output)) + ->leftMap(fn($error) => match ($error) { + 'timed-out' => new TimedOut($this->output), + 'signaled' => new Signaled($this->output), + default => new Failed($error, $this->output), + }); + } - $this->status = $output - ->getReturn() - ->map(fn() => new Success($this->output)) - ->leftMap(fn($error) => match ($error) { - 'timed-out' => new TimedOut($this->output), - 'signaled' => new Signaled($this->output), - default => new Failed($error, $this->output), - }); + return $chunk; + }) + ->keep(Instance::of(Chunk::class)); }; if ($streamOutput) { - $output = Sequence::lazy($yieldOutput); + $output = Sequence::lazy($yieldOutput)->flatMap( + static fn($chunks) => $chunks, + ); } else { - $output = Sequence::defer($yieldOutput()); + $output = Sequence::defer($yieldOutput())->flatMap( + static fn($chunks) => $chunks, + ); } - $this->output = new Output\Output($output); + $this->output = Output\Output::of($output); } public function pid(): Maybe @@ -67,16 +77,12 @@ public function wait(): Either $_ = $this->output->foreach(static fn() => null); } + if (\is_null($this->status)) { + throw new RuntimeException('Unable to retrieve the status'); + } + // the status should always be set here because we iterated over the - // output above but we stil coalesce to the wait() call to please psalm - return $this->status ??= $this - ->process - ->wait() - ->map(fn() => new Success($this->output)) - ->leftMap(fn($error) => match ($error) { - 'timed-out' => new TimedOut($this->output), - 'signaled' => new Signaled($this->output), - default => new Failed($error, $this->output), - }); + // output above + return $this->status; } } diff --git a/src/Server/Process/Logger.php b/src/Server/Process/Logger.php index abf645b..5b6d783 100644 --- a/src/Server/Process/Logger.php +++ b/src/Server/Process/Logger.php @@ -68,18 +68,18 @@ public function wait(): Either ->wait() ->leftMap(function($e) { [$message, $context] = match (\get_class($e)) { - Process\Signaled::class => [ + Signaled::class => [ 'Command {command} stopped due to external signal', ['command' => $this->command->toString()], ], - Process\Failed::class => [ + Failed::class => [ 'Command {command} failed with {exitCode}', [ 'command' => $this->command->toString(), 'exitCode' => $e->exitCode()->toInt(), ], ], - Process\TimedOut::class => [ + TimedOut::class => [ 'Command {command} timed out', ['command' => $this->command->toString()], ], diff --git a/src/Server/Process/Output/Chunk.php b/src/Server/Process/Output/Chunk.php new file mode 100644 index 0000000..77037d3 --- /dev/null +++ b/src/Server/Process/Output/Chunk.php @@ -0,0 +1,36 @@ +data; + } + + public function type(): Type + { + return $this->type; + } +} diff --git a/src/Server/Process/Output/Output.php b/src/Server/Process/Output/Output.php index b19cb7e..6f3a9f6 100644 --- a/src/Server/Process/Output/Output.php +++ b/src/Server/Process/Output/Output.php @@ -27,6 +27,17 @@ public function __construct(Sequence $output) $this->output = $output; } + /** + * @param Sequence $chunks + */ + public static function of(Sequence $chunks): self + { + return new self($chunks->map(static fn($chunk) => [ + $chunk->data(), + $chunk->type(), + ])); + } + /** * @param callable(Str, Type): void $function */ diff --git a/src/Server/Process/Started.php b/src/Server/Process/Started.php index 9a0245e..eac00e5 100644 --- a/src/Server/Process/Started.php +++ b/src/Server/Process/Started.php @@ -4,15 +4,13 @@ namespace Innmind\Server\Control\Server\Process; use Innmind\Server\Control\{ + Server\Process\Output\Chunk, Server\Process\Output\Type, Server\Second, Server\Signal, Exception\RuntimeException, }; -use Innmind\Filesystem\{ - File\Content, - Chunk, -}; +use Innmind\Filesystem\File\Content; use Innmind\TimeContinuum\{ Clock, PointInTime, @@ -82,14 +80,6 @@ public function __construct( Maybe $content, ) { $this->clock = $clock; - // We use a short timeout to watch the streams when there is a timeout - // defined on the command to make sure we're as close as possible to the - // defined value without using polling. - // When simply reading the output we can't wait forever as the tests - // hang forever on Linux. - $this->watch = $capabilities - ->watch() - ->timeoutAfter(ElapsedPeriod::of(100)); $this->halt = $halt; $this->grace = $grace; $this->background = $background; @@ -105,6 +95,19 @@ public function __construct( $capabilities->readable()->acquire($pipes[2]), ); $this->input = $capabilities->writable()->acquire($pipes[0]); + // We use a short timeout to watch the streams when there is a timeout + // defined on the command to make sure we're as close as possible to the + // defined value without using polling. + // When simply reading the output we can't wait forever as the tests + // hang forever on Linux. + $this->watch = $capabilities + ->watch() + ->timeoutAfter(ElapsedPeriod::of(100)) + ->forRead( + $this->output, + $this->error, + ) + ->forWrite($this->input); $this->timeout = $timeout; $this->content = $content; $this->pid = new Pid($this->status()['pid']); @@ -116,105 +119,77 @@ public function pid(): Pid } /** - * @return Either - */ - public function wait(): Either - { - // we don't need to keep the output read while writing to the input - // stream as this data will never be exposed to caller, so by discarding - // this data we prevent ourself from reaching a possible "out of memory" - // error - $output = $this->output(false); - - foreach ($output as $_) { - // do nothing with the output - } - - return $output->getReturn(); - } - - /** - * @return \Generator> + * @return Sequence> */ - public function output(bool $keepOutputWhileWriting = true): \Generator + public function output(): Sequence { $this->ensureExecuteOnce(); - $watch = $this - ->watch - ->forRead( - $this->output, - $this->error, - ) - ->forWrite($this->input); + return Sequence::lazy(function() { + yield $this->writeInputAndRead(); - [$watch, $chunks] = $this->writeInputAndRead($watch, $keepOutputWhileWriting); + $this->watch = $this->watch->unwatch($this->input); - foreach ($chunks->toList() as $value) { - yield $value; - } + do { + yield $this->readOnce(); - $watch = $watch->unwatch($this->input); + $timedOut = $this->checkTimeout()->match( + static fn() => true, + static fn() => false, + ); - do { - [$watch, $chunks] = $this->readOnce($watch); + if ($timedOut) { + /** @var Sequence> */ + yield Sequence::of(Either::left($this->abort())); - foreach ($chunks->toList() as $value) { - yield $value; - } + return; + } - $timedOut = $this->checkTimeout()->match( - static fn() => true, - static fn() => false, - ); + $status = $this->status(); + } while ($status['running']); - if ($timedOut) { - /** @var Either */ - return Either::left($this->abort()); - } + // we don't read the remaining data in the streams for background + // processes because it will hang until the concrete process is really + // finished, thus defeating the purpose of launching the process in the + // background + while (!$this->background && $this->outputStillOpen()) { + // even though the process is no longer running there might stil be + // data to be read in the streams + $chunks = $this->readOnce(); - $status = $this->status(); - } while ($status['running']); + yield $chunks; - // we don't read the remaining data in the streams for background - // processes because it will hang until the concrete process is really - // finished, thus defeating the purpose of launching the process in the - // background - while (!$this->background && $this->outputStillOpen()) { - // even though the process is no longer running there might stil be - // data to be read in the streams - [$watch, $chunks] = $this->readOnce($watch); + if ($chunks->empty()) { + // do not try to continue reading the streams when no output + // otherwise for commands like "tail -f" it will run forever + break; + } - foreach ($chunks->toList() as $value) { - yield $value; + // no need to check for timeouts here since the process is no longer + // running } - if ($chunks->empty()) { - // do not try to continue reading the streams when no output - // otherwise for commands like "tail -f" it will run forever - break; - } + $this->close(); - // no need to check for timeouts here since the process is no longer - // running - } + if ($status['signaled'] || $status['stopped']) { + /** @var Sequence> */ + yield Sequence::of(Either::left('signaled')); - $this->close(); + return; + } - if ($status['signaled'] || $status['stopped']) { - /** @var Either */ - return Either::left('signaled'); - } + $exitCode = new ExitCode($status['exitcode']); - $exitCode = new ExitCode($status['exitcode']); + if (!$exitCode->successful()) { + /** @var Sequence> */ + yield Sequence::of(Either::left($exitCode)); - if (!$exitCode->successful()) { - /** @var Either */ - return Either::left($exitCode); - } + return; + } - /** @var Either */ - return Either::right(new SideEffect); + /** @var Sequence> */ + yield Sequence::of(Either::right(new SideEffect)); + })->flatMap(static fn($chunks) => $chunks); } /** @@ -235,56 +210,41 @@ private function status(): array * this process because the output will be kept in memory before being able * to send it back to the caller. This may result in an "out of memory" error * - * @return array{0: Watch, 1: Sequence} + * @return Sequence */ - private function writeInputAndRead( - Watch $watch, - bool $keepOutputWhileWriting, - ): array { + private function writeInputAndRead(): Sequence + { return $this ->content ->map(static fn($content) => $content->chunks()) ->otherwise(function() { - $this->closeInput($this->input); + $this->closeInput(); /** @var Maybe> */ return Maybe::nothing(); }) ->match( fn($chunks) => $this->writeAndRead( - $watch, $this->input, $chunks, - Sequence::of(), - $keepOutputWhileWriting, ), - static fn() => [$watch, Sequence::of()], + static fn() => Sequence::of(), ); } /** * @param Sequence $chunks - * @param Sequence $output * - * @return array{0: Watch, 1: Sequence} + * @return Sequence */ private function writeAndRead( - Watch $watch, Writable $stream, Sequence $chunks, - Sequence $output, - bool $keepOutputWhileWriting, - ): array { - [$watch, $output, $stream] = $chunks - ->map(static fn($chunk) => $chunk->toEncoding(Str\Encoding::ascii)) - ->reduce( - [$watch, $output, $stream], - function($state, $chunk) use ($keepOutputWhileWriting) { - /** - * @psalm-suppress MixedAssignment - * @psalm-suppress MixedArrayAccess - */ - [$watch, $output, $stream] = $state; + ): Sequence { + return Sequence::lazy(function() use ($chunks, $stream) { + yield $chunks + ->map(static fn($chunk) => $chunk->toEncoding(Str\Encoding::ascii)) + ->flatMap(function($chunk) use ($stream) { // leave the exception here in case we can't write to the // input stream because for now there is no clear way to // handle this case @@ -293,32 +253,26 @@ function($state, $chunk) use ($keepOutputWhileWriting) { // the part of the chunk that hasn't be written. This is not // done at this moment for sake of simplicity while the case // has never been encountered - $stream = $this - ->waitAvailable($watch, $stream) + $_ = $this + ->waitAvailable($stream) ->write($chunk) ->match( static fn($stream) => $stream, static fn($e) => throw new RuntimeException($e::class), ); - [$watch, $read] = $this->readOnce($watch); - - if ($keepOutputWhileWriting) { - $output = $output->append($read); - } - - return [$watch, $output, $stream]; - }, - ); - $this->closeInput($stream); - return [$watch, $output]; + return $this->readOnce(); + }); + $this->closeInput(); + }) + ->flatMap(static fn($chunks) => $chunks); } - private function waitAvailable(Watch $watch, Writable $stream): Writable + private function waitAvailable(Writable $stream): Writable { do { /** @var Set */ - $toWrite = $watch()->match( + $toWrite = ($this->watch)()->match( static fn($ready) => $ready->toWrite(), static fn() => Set::of(), ); @@ -327,14 +281,14 @@ private function waitAvailable(Watch $watch, Writable $stream): Writable return $stream; } - private function closeInput(Writable $input): void + private function closeInput(): void { // we crash the app if we fail to close the input stream be cause the // underlying process receiving the input may not behave correctly, in // some cases this could result on this process hanging forever // there is no way to recover safely from unpredictable behaviour so it's // better to stop everything - $_ = $input->close()->match( + $_ = $this->input->close()->match( static fn() => null, // closed correctly static fn() => throw new RuntimeException('Failed to close input stream'), ); @@ -373,31 +327,30 @@ private function read(Readable $stream): Str } /** - * @return array{0: Watch, 1: Sequence} + * @return Sequence */ - private function readOnce(Watch $watch): array + private function readOnce(): Sequence { /** @var Set */ - $toRead = $watch()->match( + $toRead = ($this->watch)()->match( static fn($ready) => $ready->toRead(), static fn() => Set::of(), ); - /** @var list */ $chunks = $toRead ->map(fn($stream) => match ($stream) { - $this->output => [$this->read($stream), Type::output], - $this->error => [$this->read($stream), Type::error], + $this->output => Chunk::of($this->read($stream), Type::output), + $this->error => Chunk::of($this->read($stream), Type::error), }) - ->filter(static fn($pair) => !$pair[0]->empty()) + ->filter(static fn($chunk) => !$chunk->data()->empty()) ->toList(); - $watch = $toRead->reduce( - $watch, + $this->watch = $toRead->reduce( + $this->watch, $this->maybeUnwatch(...), ); - return [$watch, Sequence::of(...$chunks)]; + return Sequence::of(...$chunks); } /** diff --git a/src/Server/Processes/Unix.php b/src/Server/Processes/Unix.php index e431760..14ac5f1 100644 --- a/src/Server/Processes/Unix.php +++ b/src/Server/Processes/Unix.php @@ -15,9 +15,7 @@ }; use Innmind\TimeContinuum\{ Clock, - ElapsedPeriod, Period, - Earth, Earth\Period\Second, }; use Innmind\TimeWarp\Halt; diff --git a/src/ServerFactory.php b/src/ServerFactory.php index b64ab6d..501b94f 100644 --- a/src/ServerFactory.php +++ b/src/ServerFactory.php @@ -9,7 +9,6 @@ }; use Innmind\TimeContinuum\{ Clock, - ElapsedPeriod, Period, }; use Innmind\TimeWarp\Halt; diff --git a/src/Servers/Unix.php b/src/Servers/Unix.php index 2366a63..bc046a5 100644 --- a/src/Servers/Unix.php +++ b/src/Servers/Unix.php @@ -10,7 +10,6 @@ }; use Innmind\TimeContinuum\{ Clock, - ElapsedPeriod, Period, }; use Innmind\TimeWarp\Halt; diff --git a/tests/Server/Process/BackgroundTest.php b/tests/Server/Process/BackgroundTest.php index 017be56..3a4c63b 100644 --- a/tests/Server/Process/BackgroundTest.php +++ b/tests/Server/Process/BackgroundTest.php @@ -13,14 +13,10 @@ }; use Innmind\TimeContinuum\Earth\{ Clock, - ElapsedPeriod, Period\Second, }; use Innmind\TimeWarp\Halt\Usleep; -use Innmind\Stream\{ - Watch\Select, - Streams, -}; +use Innmind\Stream\Streams; use PHPUnit\Framework\TestCase; class BackgroundTest extends TestCase diff --git a/tests/Server/Process/ForegroundTest.php b/tests/Server/Process/ForegroundTest.php index eae9479..c18ab91 100644 --- a/tests/Server/Process/ForegroundTest.php +++ b/tests/Server/Process/ForegroundTest.php @@ -7,7 +7,6 @@ Process\Foreground, Process\Unix, Process, - Process\Pid, Process\Output, Process\Output\Type, Process\Failed, @@ -16,14 +15,10 @@ }; use Innmind\TimeContinuum\Earth\{ Clock, - ElapsedPeriod, Period\Second, }; use Innmind\TimeWarp\Halt\Usleep; -use Innmind\Stream\{ - Watch\Select, - Streams, -}; +use Innmind\Stream\Streams; use Innmind\Immutable\Str; use PHPUnit\Framework\TestCase; diff --git a/tests/Server/Process/UnixTest.php b/tests/Server/Process/UnixTest.php index a01ee4a..df2e562 100644 --- a/tests/Server/Process/UnixTest.php +++ b/tests/Server/Process/UnixTest.php @@ -5,6 +5,7 @@ use Innmind\Server\Control\{ Server\Process\Unix, + Server\Process\Output\Chunk, Server\Process\Output\Type, Server\Process\ExitCode, Server\Command, @@ -23,7 +24,10 @@ Streams, Watch\Select, }; -use Innmind\Immutable\SideEffect; +use Innmind\Immutable\{ + SideEffect, + Predicate\Instance, +}; use PHPUnit\Framework\TestCase; use Innmind\BlackBox\{ PHPUnit\BlackBox, @@ -46,9 +50,9 @@ public function testSimpleOutput() $count = 0; $process = $cat(); - foreach ($process->output() as [$value, $type]) { - $this->assertSame(Type::output, $type); - $this->assertSame("hello\n", $value->toString()); + foreach ($process->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $this->assertSame(Type::output, $chunk->type()); + $this->assertSame("hello\n", $chunk->data()->toString()); ++$count; } @@ -74,8 +78,8 @@ public function testOutput() $process = $cat(); $output = ''; - foreach ($process->output() as [$value, $type]) { - $output .= $value->toString(); + foreach ($process->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $output .= $chunk->data()->toString(); } $this->assertSame("$echo\n", $output); @@ -100,9 +104,9 @@ public function testSlowOutput() $this->assertGreaterThanOrEqual(2, $process->pid()->toInt()); - foreach ($process->output() as [$chunk, $type]) { - $output .= $chunk->toString(); - $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $type); + foreach ($process->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $output .= $chunk->data()->toString(); + $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $chunk->type()); ++$count; } @@ -128,9 +132,9 @@ public function testTimeoutSlowOutput() $this->assertGreaterThanOrEqual(2, $process->pid()->toInt()); - foreach ($process->output() as [$chunk, $type]) { - $output .= $chunk->toString(); - $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $type); + foreach ($process->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $output .= $chunk->data()->toString(); + $this->assertSame($count % 2 === 0 ? Type::output : Type::error, $chunk->type()); ++$count; } @@ -163,10 +167,15 @@ public function testTimeoutWaitSlowProcess() $started = \microtime(true); $this->assertGreaterThanOrEqual(2, $process->pid()->toInt()); - $e = $process->wait()->match( - static fn() => null, - static fn($e) => $e, - ); + $e = $process + ->output() + ->last() + ->either() + ->flatMap(static fn($result) => $result) + ->match( + static fn() => null, + static fn($e) => $e, + ); $this->assertSame('timed-out', $e); // 3 because of the grace period $this->assertEqualsWithDelta(3, \microtime(true) - $started, 0.5); @@ -182,10 +191,15 @@ public function testWaitSuccess() Command::foreground('echo')->withArgument('hello'), ); - $value = $cat()->wait()->match( - static fn($value) => $value, - static fn() => null, - ); + $value = $cat() + ->output() + ->last() + ->either() + ->flatMap(static fn($result) => $result) + ->match( + static fn($value) => $value, + static fn() => null, + ); $this->assertInstanceOf(SideEffect::class, $value); } @@ -202,10 +216,15 @@ public function testWaitFail() ->withEnvironment('PATH', $_SERVER['PATH']), ); - $value = $cat()->wait()->match( - static fn() => null, - static fn($e) => $e, - ); + $value = $cat() + ->output() + ->last() + ->either() + ->flatMap(static fn($result) => $result) + ->match( + static fn() => null, + static fn($e) => $e, + ); $this->assertInstanceOf(ExitCode::class, $value); $this->assertSame(1, $value->toInt()); @@ -226,8 +245,8 @@ public function testWithInput() ); $output = ''; - foreach ($cat()->output() as [$value]) { - $output .= $value->toString(); + foreach ($cat()->output()->keep(Instance::of(Chunk::class))->toList() as $chunk) { + $output .= $chunk->data()->toString(); } $this->assertSame( @@ -253,10 +272,15 @@ public function testOverwrite() ->overwrite(Path::of('test.log')), ); - $value = $cat()->wait()->match( - static fn($value) => $value, - static fn() => null, - ); + $value = $cat() + ->output() + ->last() + ->either() + ->flatMap(static fn($result) => $result) + ->match( + static fn($value) => $value, + static fn() => null, + ); $this->assertInstanceOf(SideEffect::class, $value); $this->assertSame( diff --git a/tests/Server/Processes/RemoteTest.php b/tests/Server/Processes/RemoteTest.php index 3a8f271..8a3b4b8 100644 --- a/tests/Server/Processes/RemoteTest.php +++ b/tests/Server/Processes/RemoteTest.php @@ -10,7 +10,6 @@ Command, Signal, Process\Pid, - Process\ExitCode, }; use Innmind\Url\{ Path, diff --git a/tests/Servers/LoggerTest.php b/tests/Servers/LoggerTest.php index e78c595..2bba05a 100644 --- a/tests/Servers/LoggerTest.php +++ b/tests/Servers/LoggerTest.php @@ -8,7 +8,6 @@ Server, Server\Processes, Server\Process, - Server\Process\ExitCode, Server\Command, Server\Volumes, }; diff --git a/tests/Servers/RemoteTest.php b/tests/Servers/RemoteTest.php index eba40cd..8d99355 100644 --- a/tests/Servers/RemoteTest.php +++ b/tests/Servers/RemoteTest.php @@ -8,7 +8,6 @@ Server, Server\Processes, Server\Process, - Server\Process\ExitCode, Server\Command, Server\Volumes, };