diff --git a/README.md b/README.md index 2b94eac9..bc346f1e 100644 --- a/README.md +++ b/README.md @@ -223,7 +223,15 @@ the `COUNT` environment variable: $ COUNT=5 bin/resque -### Forking ### +### Job Srategys ### + +Php-resque implements multiple ways to seperate the worker process +from the job process to improce resilience. Supported platforms +default to the fork strategy, falling back to in-process execution. +Specific strategys can be chosen by supplyingthe `JOB_STRATEGY` +environment variable. + +#### Forking #### Similarly to the Ruby versions, supported platforms will immediately fork after picking up a job. The forked child will exit as soon as @@ -233,6 +241,30 @@ The difference with php-resque is that if a forked child does not exit nicely (PHP error or such), php-resque will automatically fail the job. + $ JOB_STRATEGY=fork php resque.php + +#### Fastcgi #### + +The fastcgi strategy executes jobs over a fastcgi connection to php-fpm. +It may offer a lower overhead per job in environments with lots of very short +jobs. To use fastcgi you must install the suggested composer package +`ebernhardson/fastcgi` + + $ JOB_STRATEGY=fastcgi php resque.php + +Fastcgi accepts two additional parameters. `FASTCGI_LOCATION` sets the +location of the php-fpm server. This can either be a host:port combination +or a path to a unix socket. `FASTCGI_SCRIPT` sets the path to the script used +to receive and run the job in the php-fpm process. + +#### In Process #### + +For cases when the other two strategys are not available the in-process +strategy will run jobs in the same process as the worker. This is not +recommended as failures in the job may turn into failures in the worker. + + $ JOB_STRATEGY=inprocess php resque.php + ### Signals ### Signals also work on supported platforms exactly as in the Ruby @@ -370,4 +402,4 @@ Called after a job has been queued using the `Resque::enqueue` method. Arguments * maetl * Matt Heath * jjfrey -* scragg0x \ No newline at end of file +* scragg0x diff --git a/bin/resque b/bin/resque index 186e1eef..a848d170 100644 --- a/bin/resque +++ b/bin/resque @@ -66,6 +66,40 @@ if(!empty($COUNT) && $COUNT > 1) { $count = $COUNT; } +$jobStrategy=null; +$JOB_STRATEGY = getenv('JOB_STRATEGY'); +switch($JOB_STRATEGY) { + case 'inprocess': + $jobStrategy = new Resque_JobStrategy_InProcess; + break; + case 'fork': + $jobStrategy = new Resque_JobStrategy_Fork; + break; + case 'fastcgi': + $fastcgiLocation = '127.0.0.1:9000'; + $FASTCGI_LOCATION = getenv('FASTCGI_LOCATION'); + if (!empty($FASTCGI_LOCATION)) { + $fastcgiLocation = $FASTCGI_LOCATION; + } + + $fastcgiScript = dirname(__FILE__).'/extras/fastcgi_worker.php'; + $FASTCGI_SCRIPT = getenv('FASTCGI_SCRIPT'); + if (!empty($FASTCGI_SCRIPT)) { + $fastcgiScript = $FASTCGI_SCRIPT; + } + + require_once dirname(__FILE__).'/lib/Resque/JobStrategy/Fastcgi.php'; + $jobStrategy = new Resque_JobStrategy_Fastcgi( + $fastcgiLocation, + $fastcgiScript, + array( + 'APP_INCLUDE' => $APP_INCLUDE, + 'REDIS_BACKEND' => $REDIS_BACKEND, + ) + ); + break; +} + if($count > 1) { for($i = 0; $i < $count; ++$i) { $pid = Resque::fork(); @@ -77,6 +111,9 @@ if($count > 1) { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); $worker->logLevel = $logLevel; + if ($jobStrategy) { + $worker->setJobStrategy($jobStrategy); + } fwrite(STDOUT, '*** Starting worker '.$worker."\n"); $worker->work($interval); break; @@ -88,6 +125,9 @@ else { $queues = explode(',', $QUEUE); $worker = new Resque_Worker($queues); $worker->logLevel = $logLevel; + if ($jobStrategy) { + $worker->setJobStrategy($jobStrategy); + } $PIDFILE = getenv('PIDFILE'); if ($PIDFILE) { diff --git a/composer.json b/composer.json index 32f92ad9..7a842cb3 100644 --- a/composer.json +++ b/composer.json @@ -23,7 +23,8 @@ }, "suggest": { "ext-proctitle": "Allows php-resque to rename the title of UNIX processes to show the status of a worker.", - "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available." + "ext-redis": "Native PHP extension for Redis connectivity. Credis will automatically utilize when available.", + "ebernhardson/fastcgi": "Allows php-resque to execute jobs via php-fpm." }, "require-dev": { "phpunit/phpunit": "3.7.*" @@ -36,4 +37,4 @@ "Resque": "lib" } } -} \ No newline at end of file +} diff --git a/extras/fastcgi_worker.php b/extras/fastcgi_worker.php new file mode 100644 index 00000000..7910ea7d --- /dev/null +++ b/extras/fastcgi_worker.php @@ -0,0 +1,30 @@ +worker->perform($job); +} catch (\Exception $e) { + if (isset($job)) { + $job->fail($e); + } else { + header('Status: 500'); + } +} + +?> diff --git a/lib/Resque/JobStrategy/Fastcgi.php b/lib/Resque/JobStrategy/Fastcgi.php new file mode 100644 index 00000000..535a9620 --- /dev/null +++ b/lib/Resque/JobStrategy/Fastcgi.php @@ -0,0 +1,113 @@ + + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_JobStrategy_Fastcgi implements Resque_JobStrategy_Interface +{ + /** + * @var bool True when waiting for a response from fcgi server + */ + private $waiting = false; + + /** + * @var array Default enironment for FCGI requests + */ + protected $requestData = array( + 'GATEWAY_INTERFACE' => 'FastCGI/1.0', + 'REQUEST_METHOD' => 'GET', + 'SERVER_SOFTWARE' => 'php-resque-fastcgi/1.3-dev', + 'REMOTE_ADDR' => '127.0.0.1', + 'REMOTE_PORT' => 8888, + 'SERVER_ADDR' => '127.0.0.1', + 'SERVER_PORT' => 8888, + 'SERVER_PROTOCOL' => 'HTTP/1.1' + ); + + /** + * @param string $location When the location contains a `:` it will be considered a host/port pair + * otherwise a unix socket path + * @param string $script Absolute path to the script that will load resque and perform the job + * @param array $environment Additional environment variables available in $_SERVER to the fcgi script + */ + public function __construct($location, $script, $environment = array()) + { + $this->location = $location; + + $port = false; + if (false !== strpos($location, ':')) { + list($location, $port) = explode(':', $location, 2); + } + + $this->fcgi = new Client($location, $port); + $this->fcgi->setKeepAlive(true); + + $this->requestData = $environment + $this->requestData + array( + 'SCRIPT_FILENAME' => $script, + 'SERVER_NAME' => php_uname('n'), + 'RESQUE_DIR' => __DIR__.'/../../../', + ); + } + + /** + * @param Resque_Worker $worker + */ + public function setWorker(Resque_Worker $worker) + { + $this->worker = $worker; + } + + /** + * Executes the provided job over a fastcgi connection + * + * @param Resque_Job $job + */ + public function perform(Resque_Job $job) + { + $status = 'Requested fcgi job execution from ' . $this->location . ' at ' . strftime('%F %T'); + $this->worker->updateProcLine($status); + $this->worker->log($status, Resque_Worker::LOG_VERBOSE); + + $this->waiting = true; + + try { + $this->fcgi->request(array( + 'RESQUE_JOB' => urlencode(serialize($job)), + ) + $this->requestData, ''); + + $response = $this->fcgi->response(); + $this->waiting = false; + } catch (CommunicationException $e) { + $this->waiting = false; + $job->fail($e); + return; + } + + if ($response['statusCode'] !== 200) { + $job->fail(new Exception(sprintf( + 'FastCGI job returned non-200 status code: %s Stdout: %s Stderr: %s', + $response['headers']['status'], + $response['body'], + $response['stderr'] + ))); + } + } + + /** + * Shutdown the worker process. + */ + public function shutdown() + { + if ($this->waiting === false) { + $this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE); + } else { + $this->worker->log('Closing fcgi connection with job in progress.', Resque_Worker::LOG_VERBOSE); + } + $this->fcgi->close(); + } +} diff --git a/lib/Resque/JobStrategy/Fork.php b/lib/Resque/JobStrategy/Fork.php new file mode 100644 index 00000000..a2143765 --- /dev/null +++ b/lib/Resque/JobStrategy/Fork.php @@ -0,0 +1,106 @@ + + * @author Erik Bernharsdon + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_JobStrategy_Fork extends Resque_JobStrategy_InProcess +{ + /** + * @param int|null 0 for the forked child, the PID of the child for the parent, or null if no child. + */ + protected $child; + + /** + * @param Resque_Worker Instance of Resque_Worker that is starting jobs + */ + protected $worker; + + /** + * Set the Resque_Worker instance + * + * @param Resque_Worker $worker + */ + public function setWorker(Resque_Worker $worker) + { + $this->worker = $worker; + } + + /** + * Seperate the job from the worker via pcntl_fork + * + * @param Resque_Job $job + */ + public function perform(Resque_Job $job) + { + $this->child = $this->fork(); + + // Forked and we're the child. Run the job. + if ($this->child === 0) { + parent::perform($job); + exit(0); + } + + // Parent process, sit and wait + if($this->child > 0) { + $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); + $this->worker->updateProcLine($status); + $this->worker->log($status, Resque_Worker::LOG_VERBOSE); + + // Wait until the child process finishes before continuing + pcntl_wait($status); + $exitStatus = pcntl_wexitstatus($status); + if($exitStatus !== 0) { + $job->fail(new Resque_Job_DirtyExitException( + 'Job exited with exit code ' . $exitStatus + )); + } + } + + $this->child = null; + } + + /** + * Force an immediate shutdown of the worker, killing any child jobs + * currently working + */ + public function shutdown() + { + if (!$this->child) { + $this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE); + return; + } + + $this->worker->log('Killing child at '.$this->child, Resque_Worker::LOG_VERBOSE); + if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { + $this->worker->log('Killing child at ' . $this->child, Resque_Worker::LOG_VERBOSE); + posix_kill($this->child, SIGKILL); + $this->child = null; + } + else { + $this->worker->log('Child ' . $this->child . ' not found, restarting.', Resque_Worker::LOG_VERBOSE); + $this->worker->shutdown(); + } + } + + /** + * Attempt to fork a child process from the parent to run a job in. + * + * Return values are those of pcntl_fork(). + * + * @return int 0 for the forked child, or the PID of the child for the parent. + * @throws RuntimeException When pcntl_fork returns -1 + */ + private function fork() + { + $pid = pcntl_fork(); + if($pid === -1) { + throw new RuntimeException('Unable to fork child worker.'); + } + + return $pid; + } +} diff --git a/lib/Resque/JobStrategy/InProcess.php b/lib/Resque/JobStrategy/InProcess.php new file mode 100644 index 00000000..68bbe740 --- /dev/null +++ b/lib/Resque/JobStrategy/InProcess.php @@ -0,0 +1,48 @@ + + * @author Erik Bernharsdon + * @license http://www.opensource.org/licenses/mit-license.php + */ +class Resque_JobStrategy_InProcess implements Resque_JobStrategy_Interface +{ + /** + * @param Resque_Worker Instance of Resque_Worker that is starting jobs + */ + protected $worker; + + /** + * Set the Resque_Worker instance + * + * @param Resque_Worker $worker + */ + public function setWorker(Resque_Worker $worker) + { + $this->worker = $worker; + } + + /** + * Run the job in the worker process + * + * @param Resque_Job $job + */ + public function perform(Resque_Job $job) + { + $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); + $this->worker->updateProcLine($status); + $this->worker->log($status, Resque_Worker::LOG_VERBOSE); + $this->worker->perform($job); + } + + /** + * Force an immediate shutdown of the worker, killing any child jobs + * currently working + */ + public function shutdown() + { + $this->worker->log('No child to kill.', Resque_Worker::LOG_VERBOSE); + } +} diff --git a/lib/Resque/JobStrategy/Interface.php b/lib/Resque/JobStrategy/Interface.php new file mode 100644 index 00000000..aa547363 --- /dev/null +++ b/lib/Resque/JobStrategy/Interface.php @@ -0,0 +1,31 @@ + + * @author Erik Bernharsdon + * @license http://www.opensource.org/licenses/mit-license.php + */ +interface Resque_JobStrategy_Interface +{ + /** + * Set the Resque_Worker instance + * + * @param Resque_Worker $worker + */ + function setWorker(Resque_Worker $worker); + + /** + * Seperates the job execution context from the worker and calls $worker->perform($job). + * + * @param Resque_Job $job + */ + function perform(Resque_Job $job); + + /** + * Force an immediate shutdown of the worker, killing any child jobs + * currently working + */ + function shutdown(); +} diff --git a/lib/Resque/Worker.php b/lib/Resque/Worker.php index 5dc678df..9edbf2c9 100644 --- a/lib/Resque/Worker.php +++ b/lib/Resque/Worker.php @@ -48,11 +48,6 @@ class Resque_Worker */ private $currentJob = null; - /** - * @var int Process ID of child worker processes. - */ - private $child = null; - /** * Return all workers known to Resque as instantiated instances. * @return array @@ -137,6 +132,23 @@ public function __construct($queues) } $this->hostname = $hostname; $this->id = $this->hostname . ':'.getmypid() . ':' . implode(',', $this->queues); + + if (function_exists('pcntl_fork')) { + $this->setJobStrategy(new Resque_JobStrategy_Fork); + } else { + $this->setJobStrategy(new Resque_JobStrategy_InProcess); + } + } + + /** + * Set the JobStrategy used to seperate the job execution context from the worker + * + * @param Resque_JobStrategy_Interface + */ + public function setJobStrategy(Resque_JobStrategy_Interface $jobStrategy) + { + $this->jobStrategy = $jobStrategy; + $this->jobStrategy->setWorker($this); } /** @@ -184,36 +196,8 @@ public function work($interval = 5) Resque_Event::trigger('beforeFork', $job); $this->workingOn($job); - $this->child = Resque::fork(); + $this->jobStrategy->perform($job); - // Forked and we're the child. Run the job. - if ($this->child === 0 || $this->child === false) { - $status = 'Processing ' . $job->queue . ' since ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); - $this->perform($job); - if ($this->child === 0) { - exit(0); - } - } - - if($this->child > 0) { - // Parent process, sit and wait - $status = 'Forked ' . $this->child . ' at ' . strftime('%F %T'); - $this->updateProcLine($status); - $this->log($status, self::LOG_VERBOSE); - - // Wait until the child process finishes before continuing - pcntl_wait($status); - $exitStatus = pcntl_wexitstatus($status); - if($exitStatus !== 0) { - $job->fail(new Resque_Job_DirtyExitException( - 'Job exited with exit code ' . $exitStatus - )); - } - } - - $this->child = null; $this->doneWorking(); } @@ -304,7 +288,7 @@ private function startup() * * @param string $status The updated process title. */ - private function updateProcLine($status) + public function updateProcLine($status) { if(function_exists('setproctitle')) { setproctitle('resque-' . Resque::VERSION . ': ' . $status); @@ -391,21 +375,7 @@ public function shutdownNow() */ public function killChild() { - if(!$this->child) { - $this->log('No child to kill.', self::LOG_VERBOSE); - return; - } - - $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); - if(exec('ps -o pid,state -p ' . $this->child, $output, $returnCode) && $returnCode != 1) { - $this->log('Killing child at ' . $this->child, self::LOG_VERBOSE); - posix_kill($this->child, SIGKILL); - $this->child = null; - } - else { - $this->log('Child ' . $this->child . ' not found, restarting.', self::LOG_VERBOSE); - $this->shutdown(); - } + $this->jobStrategy->shutdown(); } /**