Skip to content

Commit

Permalink
Implemented non-blocking stdin and event-based stream switching
Browse files Browse the repository at this point in the history
  • Loading branch information
kamermans committed Aug 11, 2016
1 parent 515fe71 commit 04ac844
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 59 deletions.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,35 @@ $cmd = Command::factory('ls')
->run(null, true);
```

### Streaming large command output
The STDOUT and STDERR is collected inside PHP by default. If you have a large amount of data to pass into the command, you should stream it in (see STDIN from a stream below). If you have a large amount of output from the command, you should stream it out using a callback:

```php
use kamermans\Command\Command;

require_once __DIR__.'/../vendor/autoload.php';

$filename = __DIR__.'/../README.md';
$stdin = fopen($filename, 'r');

// This will read README.md and grep for lines containing 'the'
$cmd = Command::factory("grep 'the'")
->setCallback(function($pipe, $data) {
// Change the text to uppercase
$data = strtoupper($data);

if ($pipe === Command::STDERR) {
Command::echoStdErr($data);
} else {
echo $data;
}
})
->run($stdin);

fclose($stdin);

```

### Running a Command without Escaping
By default, the command passed to `Command::factory(string $command, bool $escape)` is escaped, so characters like `|` and `>` will replaced with `\|` and `\>` respectively. To prevent the command factory from escaping your command, you can pass `true` as the second argument:

Expand Down
196 changes: 137 additions & 59 deletions src/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Command
protected $_readbuffer = 16536;
protected $_separator = ' ';
protected $_cmd;
protected $_args = array();
protected $_args = [];
protected $_exitcode;
protected $_stdout;
protected $_stderr;
Expand All @@ -35,7 +35,7 @@ class Command
protected $_timeend;
protected $_cwd;
protected $_env;
protected $_conf = array();
protected $_conf = [];

/**
* Creates a new Command object
Expand All @@ -47,7 +47,7 @@ class Command
static public function factory($cmd = null, $noescape = false)
{
$obj = new self();
if ($cmd !== NULL) {
if ($cmd !== null) {
$obj->command($cmd, $noescape);
}
return $obj;
Expand Down Expand Up @@ -105,7 +105,7 @@ public function setDirectory($cwd)
* @param array $env
* @return Command - Fluent
*/
public function setEnv($env = array())
public function setEnv($env = [])
{
$this->_env = $env;
return $this;
Expand Down Expand Up @@ -158,12 +158,12 @@ public function command($cmd, $noescape = false)
*/
public function option($left, $right = null, $sep = null)
{
if ($right !== NULL) {
if ($right !== null) {
$right = escapeshellarg($right);
if (empty($right)) {
$right = "''";
}
$left .= ($sep === NULL ? $this->_separator : $sep) . $right;
$left .= ($sep === null ? $this->_separator : $sep) . $right;
}

$this->_args[] = $left;
Expand Down Expand Up @@ -198,11 +198,11 @@ public function run($stdin = null, $throw_exceptions = true)
$this->_timestart = microtime(true);

// Prepare the buffers structure
$buffers = array(
$buffers = [
0 => $stdin,
1 => &$this->_stdout,
2 => &$this->_stderr,
);
];
$this->_exitcode = self::exec($this->getFullCommand(), $buffers, $this->_callback, $this->_callbacklines, $this->_readbuffer, $this->_cwd, $this->_env, $this->_conf);
$this->_timeend = microtime(true);

Expand All @@ -225,7 +225,7 @@ public function __toString()
*/
public function getFullCommand()
{
$parts = array_merge(array($this->_cmd), $this->_args);
$parts = array_merge([$this->_cmd], $this->_args);
return implode($this->_separator, $parts);
}

Expand Down Expand Up @@ -295,74 +295,124 @@ public static function echoStdErr($content)
public static function exec($cmd, &$buffers, $callback = null, $callbacklines = false, $readbuffer = 16536, $cwd = null, $env = null, $conf = null)
{
if (!is_array($buffers)) {
$buffers = array();
$buffers = [];
}

// Define the pipes to configure for the process
$descriptors = array(
self::STDIN => array('pipe', 'r'),
self::STDOUT => array('pipe', 'w'),
self::STDERR => array('pipe', 'w'),
);
$descriptors = [
self::STDIN => ['pipe', 'r'],
self::STDOUT => ['pipe', 'w'],
self::STDERR => ['pipe', 'w'],
];

// Start the process
$ph = proc_open($cmd, $descriptors, $pipes, $cwd, $env, $conf);
if (!is_resource($ph)) {
return null;
}

// Feed the process with the stdin if any and close it
// Prepare STDIN
$stdin_position = 0;
$stdin = $buffers[self::STDIN];
if (is_resource($stdin)) {
// It seems this method is less memory-intensive that the stream copying builtin:
// stream_copy_to_stream(resource $source, resource $dest)
while(!feof($stdin)) {
fwrite($pipes[self::STDIN], fread($stdin, $readbuffer));
}

} else if (!empty($stdin)) {
fwrite($pipes[self::STDIN], $stdin);
$stdin_is_stream = is_resource($stdin);
$use_stdin = $stdin_is_stream || !empty($stdin);
$stdin_length = null;

if (!$use_stdin) {
fclose($pipes[self::STDIN]);
} else if (!$stdin_is_stream) {
$stdin_length = strlen($stdin);
}

fclose($pipes[self::STDIN]);
// Setup all streams to non-blocking mode
stream_set_blocking($pipes[self::STDIN], false);
stream_set_blocking($pipes[self::STDOUT], false);
stream_set_blocking($pipes[self::STDERR], false);

// Setup non-blocking behaviour for stdout and stderr
stream_set_blocking($pipes[self::STDOUT], 0);
stream_set_blocking($pipes[self::STDERR], 0);
$stream_select_timeout_sec = null;
$stream_select_timeout_usec = null;

$delay = 0;
$code = null;
$open = array(self::STDOUT, self::STDERR);

$buffers[self::STDIN] = '';
$buffers[self::STDOUT] = empty($buffers[self::STDOUT]) ? '' : $buffers[self::STDOUT];
$buffers[self::STDERR] = empty($buffers[self::STDERR]) ? '' : $buffers[self::STDERR];

while (!empty($open)) {
// Read from the process' STDOUT and STDERR
$reads = [
$pipes[self::STDOUT],
$pipes[self::STDERR],
];

// Write to the process' STDIN
$writes = [
$pipes[self::STDIN],
];

$stream_id_map = [
self::STDIN => $pipes[self::STDIN],
self::STDOUT => $pipes[self::STDOUT],
self::STDERR => $pipes[self::STDERR],
];

// Read/write loop
while (true) {

// Setup streams before each iteration since they are changed by stream_select()
$streams = [
'read' => array_filter($reads, 'is_resource'),
'write' => array_filter($writes, 'is_resource'),
'except' => [],
];

// This line will block until a stream is ready for input or output
$ready_streams = stream_select(
$streams['read'],
$streams['write'],
$streams['except'],
$stream_select_timeout_sec,
$stream_select_timeout_usec
);

// Try to find the exit code of the command before buggy proc_close()
if ($code === NULL) {
if ($code === null) {
$status = proc_get_status($ph);
if (!$status['running']) {
$code = $status['exitcode'];
break;
}
}

// Go thru all open pipes and check for data
foreach ($open as $i=>$pipe) {
// Try to get some data
$str = fread($pipes[$pipe], $readbuffer);
if (strlen($str)) {
$buffers[$pipe] .= $str;
if ($ready_streams === 0) {
// Stream timeout; no streams ready, retry stream_select
continue;
}

if ($ready_streams === false) {
throw new \Exception("stream_select() failed while waiting for I/O on command");
}

// Read from all ready streams
foreach ($streams['read'] as $stream) {

$stream_id = array_search($stream, $stream_id_map, true);

$str = stream_get_contents($stream, $readbuffer);
if (strlen($str) !== 0) {
$buffers[$stream_id] .= $str;

if ($callback) {
if ($callbacklines) {
// Note: \r will be left in the line in case of CRLF,
// and we will need to add \n to the end of each line
$lines = explode("\n", $buffers[$pipe]);
$lines = explode("\n", $buffers[$stream_id]);

// This is left over and does not end with the delimiter
$buffers[$pipe] = array_pop($lines);
$buffers[$stream_id] = array_pop($lines);

foreach ($lines as $line) {
$callback_return = call_user_func($callback, $pipe, "$line\n");
$callback_return = call_user_func($callback, $stream_id, "$line\n");
if ($callback_return === false) {
// We killed the proc early, set code to 0
$code = 0;
Expand All @@ -371,8 +421,8 @@ public static function exec($cmd, &$buffers, $callback = null, $callbacklines =
}

} else {
$callback_return = call_user_func($callback, $pipe, $buffers[$pipe]);
$buffers[$pipe] = '';
$callback_return = call_user_func($callback, $stream_id, $buffers[$stream_id]);
$buffers[$stream_id] = '';
if ($callback_return === false) {
// We killed the proc early, set code to 0
$code = 0;
Expand All @@ -384,28 +434,56 @@ public static function exec($cmd, &$buffers, $callback = null, $callbacklines =
// Since we've got some data we don't need to sleep :)
$delay = 0;
// Check if we have consumed all the data in the current pipe
} else if (feof($pipes[$pipe])) {
if ($callback) {
if (call_user_func($callback, $pipe, null) === false) {
break 2;
}
}

// Write to all write ready streams (STDIN of the process)
foreach ($streams['write'] as $stream) {

$stream_id = array_search($stream, $stream_id_map, true);

if ($stdin_is_stream) {
// It seems this method is less memory-intensive that the stream copying builtin:
// stream_copy_to_stream(resource $source, resource $dest)
if (feof($stdin)) {
fclose($stream);
} else {
if (strlen($buffers[$stream_id]) < $readbuffer) {
// The STDIN buffer is running low
$buffers[$stream_id] .= stream_get_contents($stdin, $readbuffer);
}

$bytes_written = fwrite($stream, $buffers[$stream_id]);

if ($bytes_written === false) {
continue;
}

$buffer_length = strlen($buffers[$stream_id]);

if ($bytes_written === $buffer_length) {
$buffers[$stream_id] = '';
} else {
// Only part of the buffer was written so we remove that part
$buffers[$stream_id] = substr($buffers[$stream_id], $bytes_written);
}
}
unset($open[$i]);
continue 2;

} else {
if ($stdin_position >= $stdin_length) {
fclose($stream);
} else {
$chunk = substr($stdin, $stdin_position, $readbuffer);
$bytes_written = fwrite($stream, $chunk);
$stdin_position += $bytes_written;
}
}
}

// Check if we have to sleep for a bit to be nice on the CPU
if ($delay) {
usleep($delay * 1000);
$delay = ceil(min(self::SLEEP_MAX, $delay*self::SLEEP_FACTOR));
} else {
$delay = self::SLEEP_START;
}
}
} // End read/write loop

// Make sure all pipes are closed
foreach ($pipes as $pipe=>$desc) {
foreach ($pipes as $pipe => $desc) {
if (is_resource($desc)) {
if ($callback) {
call_user_func($callback, $pipe, null);
Expand All @@ -421,7 +499,7 @@ public static function exec($cmd, &$buffers, $callback = null, $callbacklines =
}

// Find out the exit code
if ($code === NULL) {
if ($code === null) {
$code = proc_close($ph);
} else {
proc_close($ph);
Expand Down

0 comments on commit 04ac844

Please sign in to comment.