diff --git a/README.md b/README.md index 1c34246..99804ec 100644 --- a/README.md +++ b/README.md @@ -64,6 +64,35 @@ $cmd = Command::factory('ls') ->run(null, true); ``` +### Streaming large command output +The STDOUT and STDERR is collected inside PHP by default. If you have a large amount of data to pass into the command, you should stream it in (see STDIN from a stream below). If you have a large amount of output from the command, you should stream it out using a callback: + +```php +use kamermans\Command\Command; + +require_once __DIR__.'/../vendor/autoload.php'; + +$filename = __DIR__.'/../README.md'; +$stdin = fopen($filename, 'r'); + +// This will read README.md and grep for lines containing 'the' +$cmd = Command::factory("grep 'the'") + ->setCallback(function($pipe, $data) { + // Change the text to uppercase + $data = strtoupper($data); + + if ($pipe === Command::STDERR) { + Command::echoStdErr($data); + } else { + echo $data; + } + }) + ->run($stdin); + +fclose($stdin); + +``` + ### Running a Command without Escaping By default, the command passed to `Command::factory(string $command, bool $escape)` is escaped, so characters like `|` and `>` will replaced with `\|` and `\>` respectively. To prevent the command factory from escaping your command, you can pass `true` as the second argument: diff --git a/src/Command.php b/src/Command.php index 548746e..91c8ef3 100644 --- a/src/Command.php +++ b/src/Command.php @@ -25,7 +25,7 @@ class Command protected $_readbuffer = 16536; protected $_separator = ' '; protected $_cmd; - protected $_args = array(); + protected $_args = []; protected $_exitcode; protected $_stdout; protected $_stderr; @@ -35,7 +35,7 @@ class Command protected $_timeend; protected $_cwd; protected $_env; - protected $_conf = array(); + protected $_conf = []; /** * Creates a new Command object @@ -47,7 +47,7 @@ class Command static public function factory($cmd = null, $noescape = false) { $obj = new self(); - if ($cmd !== NULL) { + if ($cmd !== null) { $obj->command($cmd, $noescape); } return $obj; @@ -105,7 +105,7 @@ public function setDirectory($cwd) * @param array $env * @return Command - Fluent */ - public function setEnv($env = array()) + public function setEnv($env = []) { $this->_env = $env; return $this; @@ -158,12 +158,12 @@ public function command($cmd, $noescape = false) */ public function option($left, $right = null, $sep = null) { - if ($right !== NULL) { + if ($right !== null) { $right = escapeshellarg($right); if (empty($right)) { $right = "''"; } - $left .= ($sep === NULL ? $this->_separator : $sep) . $right; + $left .= ($sep === null ? $this->_separator : $sep) . $right; } $this->_args[] = $left; @@ -198,11 +198,11 @@ public function run($stdin = null, $throw_exceptions = true) $this->_timestart = microtime(true); // Prepare the buffers structure - $buffers = array( + $buffers = [ 0 => $stdin, 1 => &$this->_stdout, 2 => &$this->_stderr, - ); + ]; $this->_exitcode = self::exec($this->getFullCommand(), $buffers, $this->_callback, $this->_callbacklines, $this->_readbuffer, $this->_cwd, $this->_env, $this->_conf); $this->_timeend = microtime(true); @@ -225,7 +225,7 @@ public function __toString() */ public function getFullCommand() { - $parts = array_merge(array($this->_cmd), $this->_args); + $parts = array_merge([$this->_cmd], $this->_args); return implode($this->_separator, $parts); } @@ -295,15 +295,15 @@ public static function echoStdErr($content) public static function exec($cmd, &$buffers, $callback = null, $callbacklines = false, $readbuffer = 16536, $cwd = null, $env = null, $conf = null) { if (!is_array($buffers)) { - $buffers = array(); + $buffers = []; } // Define the pipes to configure for the process - $descriptors = array( - self::STDIN => array('pipe', 'r'), - self::STDOUT => array('pipe', 'w'), - self::STDERR => array('pipe', 'w'), - ); + $descriptors = [ + self::STDIN => ['pipe', 'r'], + self::STDOUT => ['pipe', 'w'], + self::STDERR => ['pipe', 'w'], + ]; // Start the process $ph = proc_open($cmd, $descriptors, $pipes, $cwd, $env, $conf); @@ -311,58 +311,108 @@ public static function exec($cmd, &$buffers, $callback = null, $callbacklines = return null; } - // Feed the process with the stdin if any and close it + // Prepare STDIN + $stdin_position = 0; $stdin = $buffers[self::STDIN]; - if (is_resource($stdin)) { - // It seems this method is less memory-intensive that the stream copying builtin: - // stream_copy_to_stream(resource $source, resource $dest) - while(!feof($stdin)) { - fwrite($pipes[self::STDIN], fread($stdin, $readbuffer)); - } - - } else if (!empty($stdin)) { - fwrite($pipes[self::STDIN], $stdin); + $stdin_is_stream = is_resource($stdin); + $use_stdin = $stdin_is_stream || !empty($stdin); + $stdin_length = null; + + if (!$use_stdin) { + fclose($pipes[self::STDIN]); + } else if (!$stdin_is_stream) { + $stdin_length = strlen($stdin); } - fclose($pipes[self::STDIN]); + // Setup all streams to non-blocking mode + stream_set_blocking($pipes[self::STDIN], false); + stream_set_blocking($pipes[self::STDOUT], false); + stream_set_blocking($pipes[self::STDERR], false); - // Setup non-blocking behaviour for stdout and stderr - stream_set_blocking($pipes[self::STDOUT], 0); - stream_set_blocking($pipes[self::STDERR], 0); + $stream_select_timeout_sec = null; + $stream_select_timeout_usec = null; $delay = 0; $code = null; - $open = array(self::STDOUT, self::STDERR); + + $buffers[self::STDIN] = ''; $buffers[self::STDOUT] = empty($buffers[self::STDOUT]) ? '' : $buffers[self::STDOUT]; $buffers[self::STDERR] = empty($buffers[self::STDERR]) ? '' : $buffers[self::STDERR]; - while (!empty($open)) { + // Read from the process' STDOUT and STDERR + $reads = [ + $pipes[self::STDOUT], + $pipes[self::STDERR], + ]; + + // Write to the process' STDIN + $writes = [ + $pipes[self::STDIN], + ]; + + $stream_id_map = [ + self::STDIN => $pipes[self::STDIN], + self::STDOUT => $pipes[self::STDOUT], + self::STDERR => $pipes[self::STDERR], + ]; + + // Read/write loop + while (true) { + + // Setup streams before each iteration since they are changed by stream_select() + $streams = [ + 'read' => array_filter($reads, 'is_resource'), + 'write' => array_filter($writes, 'is_resource'), + 'except' => [], + ]; + + // This line will block until a stream is ready for input or output + $ready_streams = stream_select( + $streams['read'], + $streams['write'], + $streams['except'], + $stream_select_timeout_sec, + $stream_select_timeout_usec + ); + // Try to find the exit code of the command before buggy proc_close() - if ($code === NULL) { + if ($code === null) { $status = proc_get_status($ph); if (!$status['running']) { $code = $status['exitcode']; + break; } } - // Go thru all open pipes and check for data - foreach ($open as $i=>$pipe) { - // Try to get some data - $str = fread($pipes[$pipe], $readbuffer); - if (strlen($str)) { - $buffers[$pipe] .= $str; + if ($ready_streams === 0) { + // Stream timeout; no streams ready, retry stream_select + continue; + } + + if ($ready_streams === false) { + throw new \Exception("stream_select() failed while waiting for I/O on command"); + } + + // Read from all ready streams + foreach ($streams['read'] as $stream) { + + $stream_id = array_search($stream, $stream_id_map, true); + + $str = stream_get_contents($stream, $readbuffer); + if (strlen($str) !== 0) { + $buffers[$stream_id] .= $str; if ($callback) { if ($callbacklines) { // Note: \r will be left in the line in case of CRLF, // and we will need to add \n to the end of each line - $lines = explode("\n", $buffers[$pipe]); + $lines = explode("\n", $buffers[$stream_id]); // This is left over and does not end with the delimiter - $buffers[$pipe] = array_pop($lines); + $buffers[$stream_id] = array_pop($lines); foreach ($lines as $line) { - $callback_return = call_user_func($callback, $pipe, "$line\n"); + $callback_return = call_user_func($callback, $stream_id, "$line\n"); if ($callback_return === false) { // We killed the proc early, set code to 0 $code = 0; @@ -371,8 +421,8 @@ public static function exec($cmd, &$buffers, $callback = null, $callbacklines = } } else { - $callback_return = call_user_func($callback, $pipe, $buffers[$pipe]); - $buffers[$pipe] = ''; + $callback_return = call_user_func($callback, $stream_id, $buffers[$stream_id]); + $buffers[$stream_id] = ''; if ($callback_return === false) { // We killed the proc early, set code to 0 $code = 0; @@ -384,28 +434,56 @@ public static function exec($cmd, &$buffers, $callback = null, $callbacklines = // Since we've got some data we don't need to sleep :) $delay = 0; // Check if we have consumed all the data in the current pipe - } else if (feof($pipes[$pipe])) { - if ($callback) { - if (call_user_func($callback, $pipe, null) === false) { - break 2; + } + } + + // Write to all write ready streams (STDIN of the process) + foreach ($streams['write'] as $stream) { + + $stream_id = array_search($stream, $stream_id_map, true); + + if ($stdin_is_stream) { + // It seems this method is less memory-intensive that the stream copying builtin: + // stream_copy_to_stream(resource $source, resource $dest) + if (feof($stdin)) { + fclose($stream); + } else { + if (strlen($buffers[$stream_id]) < $readbuffer) { + // The STDIN buffer is running low + $buffers[$stream_id] .= stream_get_contents($stdin, $readbuffer); + } + + $bytes_written = fwrite($stream, $buffers[$stream_id]); + + if ($bytes_written === false) { + continue; + } + + $buffer_length = strlen($buffers[$stream_id]); + + if ($bytes_written === $buffer_length) { + $buffers[$stream_id] = ''; + } else { + // Only part of the buffer was written so we remove that part + $buffers[$stream_id] = substr($buffers[$stream_id], $bytes_written); } } - unset($open[$i]); - continue 2; + + } else { + if ($stdin_position >= $stdin_length) { + fclose($stream); + } else { + $chunk = substr($stdin, $stdin_position, $readbuffer); + $bytes_written = fwrite($stream, $chunk); + $stdin_position += $bytes_written; + } } } - // Check if we have to sleep for a bit to be nice on the CPU - if ($delay) { - usleep($delay * 1000); - $delay = ceil(min(self::SLEEP_MAX, $delay*self::SLEEP_FACTOR)); - } else { - $delay = self::SLEEP_START; - } - } + } // End read/write loop // Make sure all pipes are closed - foreach ($pipes as $pipe=>$desc) { + foreach ($pipes as $pipe => $desc) { if (is_resource($desc)) { if ($callback) { call_user_func($callback, $pipe, null); @@ -421,7 +499,7 @@ public static function exec($cmd, &$buffers, $callback = null, $callbacklines = } // Find out the exit code - if ($code === NULL) { + if ($code === null) { $code = proc_close($ph); } else { proc_close($ph);