From 8c272ac560e1bce577e3462d6c0f7d86e76bbaf1 Mon Sep 17 00:00:00 2001 From: Ilya Shapkin Date: Fri, 14 Apr 2017 15:15:25 +0300 Subject: [PATCH] Fix segmentation fault on PHP 7+ for version 2.x --- Service/GearmanExecute.php | 291 ++++++++++++++++++++++++++++++++++--- 1 file changed, 267 insertions(+), 24 deletions(-) diff --git a/Service/GearmanExecute.php b/Service/GearmanExecute.php index 0c10ec9..9029f26 100644 --- a/Service/GearmanExecute.php +++ b/Service/GearmanExecute.php @@ -3,21 +3,34 @@ /** * Gearman Bundle for Symfony2 * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Feel free to edit as you please, and have fun. + * * @author Marc Morera - * @since 2013 */ namespace Mmoreram\GearmanBundle\Service; -use Symfony\Component\DependencyInjection\ContainerInterface; +use Symfony\Component\Console\Output\NullOutput; +use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\DependencyInjection\ContainerAwareInterface; +use Symfony\Component\DependencyInjection\ContainerInterface; +use Symfony\Component\EventDispatcher\EventDispatcherInterface; +use Mmoreram\GearmanBundle\Command\Util\GearmanOutputAwareInterface; +use Mmoreram\GearmanBundle\Event\GearmanWorkExecutedEvent; +use Mmoreram\GearmanBundle\Event\GearmanWorkStartingEvent; +use Mmoreram\GearmanBundle\GearmanEvents; use Mmoreram\GearmanBundle\Service\Abstracts\AbstractGearmanService; +use Mmoreram\GearmanBundle\Exceptions\ServerConnectionException; +use Symfony\Component\OptionsResolver\OptionsResolver; /** * Gearman execute methods. All Worker methods * - * @author Marc Morera + * @since 2.3.1 */ class GearmanExecute extends AbstractGearmanService { @@ -28,6 +41,82 @@ class GearmanExecute extends AbstractGearmanService */ private $container; + /** + * @var EventDispatcherInterface + * + * EventDispatcher instance + */ + protected $eventDispatcher; + + /** + * @var OutputInterface + * + * Output instance + */ + protected $output; + + /** + * @var OptionsResolver + */ + protected $executeOptionsResolver; + + /** + * Boolean to track if a system signal has been received + * @var boolean + */ + protected $stopWorkSignalReceived; + + /** + * Bucket with worker objects configuration for PECL + * @var array + */ + protected $workersBucket = []; + + /** + * Construct method + * + * @param GearmanCacheWrapper $gearmanCacheWrapper GearmanCacheWrapper + * @param array $defaultSettings The default settings for the bundle + */ + public function __construct(GearmanCacheWrapper $gearmanCacheWrapper, array $defaultSettings) + { + parent::__construct($gearmanCacheWrapper, $defaultSettings); + + $this->executeOptionsResolver = new OptionsResolver(); + $this->executeOptionsResolver + ->setDefaults(array( + 'iterations' => null, + 'minimum_execution_time' => null, + 'timeout' => null, + )) + ->setAllowedTypes('iterations', array('null', 'scalar')) + ->setAllowedTypes('minimum_execution_time', array('null', 'scalar')) + ->setAllowedTypes('timeout', array('null', 'scalar')); + + + $this->stopWorkSignalReceived = false; + + /** + * If the pcntl_signal exists, subscribe to the terminate and restart events for graceful worker stops. + */ + if(false !== function_exists('pcntl_signal')) + { + declare(ticks = 1); + pcntl_signal(SIGTERM, array($this,"handleSystemSignal")); + pcntl_signal(SIGHUP, array($this,"handleSystemSignal")); + + } + } + + /** + * Toggles that work should be stopped, we only subscribe to SIGTERM and SIGHUP + * @param int $signno Signal number + */ + public function handleSystemSignal($signo) + { + $this->stopWorkSignalReceived = true; + } + /** * Set container * @@ -37,22 +126,52 @@ class GearmanExecute extends AbstractGearmanService */ public function setContainer(ContainerInterface $container) { - $this->container = $container; + + return $this; + } + + /** + * Set event dispatcher + * + * @param EventDispatcherInterface $eventDispatcher + * + * @return GearmanExecute self Object + */ + public function setEventDispatcher(EventDispatcherInterface $eventDispatcher) + { + $this->eventDispatcher = $eventDispatcher; + + return $this; + } + + /** + * Set output + * + * @param OutputInterface $output + * + * @return GearmanExecute self Object + */ + public function setOutput(OutputInterface $output) + { + $this->output = $output; + + return $this; } /** * Executes a job given a jobName and given settings and annotations of job * * @param string $jobName Name of job to be executed + * @param array $options Array of options passed to the callback + * @param \GearmanWorker $gearmanWorker Worker instance to use */ - public function executeJob($jobName) + public function executeJob($jobName, array $options = array(), \GearmanWorker $gearmanWorker = null) { $worker = $this->getJob($jobName); if (false !== $worker) { - - $this->callJob($worker); + $this->callJob($worker, $options, $gearmanWorker); } } @@ -60,28 +179,68 @@ public function executeJob($jobName) * Given a worker, execute GearmanWorker function defined by job. * * @param array $worker Worker definition + * @param array $options Array of options passed to the callback + * @param \GearmanWorker $gearmanWorker Worker instance to use + * + * @throws ServerConnectionException if a connection to a server was not possible. * * @return GearmanExecute self Object */ - private function callJob(Array $worker) + private function callJob(Array $worker, array $options = array(), \GearmanWorker $gearmanWorker = null) { - $gearmanWorker = new \GearmanWorker; + if(is_null($gearmanWorker)) { + $gearmanWorker = new \GearmanWorker; + } if (isset($worker['job'])) { $jobs = array($worker['job']); $iterations = $worker['job']['iterations']; - $this->addServers($gearmanWorker, $worker['job']['servers']); + $minimumExecutionTime = $worker['job']['minimumExecutionTime']; + $timeout = $worker['job']['timeout']; + $successes = $this->addServers($gearmanWorker, $worker['job']['servers']); } else { $jobs = $worker['jobs']; $iterations = $worker['iterations']; - $this->addServers($gearmanWorker, $worker['servers']); + $minimumExecutionTime = $worker['minimumExecutionTime']; + $timeout = $worker['timeout']; + $successes = $this->addServers($gearmanWorker, $worker['servers']); + } + + $options = $this->executeOptionsResolver->resolve($options); + + $iterations = $options['iterations'] ?: $iterations; + $minimumExecutionTime = $options['minimum_execution_time'] ?: $minimumExecutionTime; + $timeout = $options['timeout'] ?: $timeout; + + if (count($successes) < 1) { + if ($minimumExecutionTime > 0) { + sleep($minimumExecutionTime); + } + throw new ServerConnectionException('Worker was unable to connect to any server.'); } $objInstance = $this->createJob($worker); - $this->runJob($gearmanWorker, $objInstance, $jobs, $iterations); + + /** + * Start the timer before running the worker. + */ + $time = time(); + $this->runJob($gearmanWorker, $objInstance, $jobs, $iterations, $timeout); + + /** + * If there is a minimum expected duration, wait out the remaining period if there is any. + */ + if ($minimumExecutionTime > 0) { + $now = time(); + $remaining = $minimumExecutionTime - ($now - $time); + + if ($remaining > 0) { + sleep($remaining); + } + } return $this; } @@ -131,29 +290,58 @@ private function createJob(array $worker) * @param Object $objInstance Job instance * @param array $jobs Array of jobs to subscribe * @param integer $iterations Number of iterations + * @param integer $timeout Timeout * * @return GearmanExecute self Object */ - private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs, $iterations) + private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs, $iterations, $timeout = null) { + /** + * Set the output of this instance, this should allow workers to use the console output. + */ + if ($objInstance instanceof GearmanOutputAwareInterface) { + $objInstance->setOutput($this->output ? : new NullOutput()); + } /** * Every job defined in worker is added into GearmanWorker */ foreach ($jobs as $job) { - $gearmanWorker->addFunction($job['realCallableName'], array($objInstance, $job['methodName'])); + /** + * worker needs to have it's context into separated memory space; + * if it's passed as a value, then garbage collector remove the target + * what causes a segfault + */ + $this->workersBucket[$job['realCallableName']] = [ + 'job_object_instance' => $objInstance, + 'job_method' => $job['methodName'], + 'jobs' => $jobs, + ]; + $gearmanWorker->addFunction( + $job['realCallableName'], + array($this, 'handleJob') + ); } /** * If iterations value is 0, is like worker will never die */ - $alive = (0 == $iterations); + $alive = (0 === $iterations); + + if ($timeout > 0) { + $gearmanWorker->setTimeout($timeout * 1000); + } /** * Executes GearmanWorker with all jobs defined */ - while ($gearmanWorker->work()) { + while (false === $this->stopWorkSignalReceived && $gearmanWorker->work()) { + + $iterations--; + + $event = new GearmanWorkExecutedEvent($jobs, $iterations, $gearmanWorker->returnCode()); + $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_EXECUTED, $event); if ($gearmanWorker->returnCode() != GEARMAN_SUCCESS) { @@ -164,12 +352,11 @@ private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs * Only finishes its execution if alive is false and iterations * arrives to 0 */ - if (!$alive && --$iterations <= 0) { + if (!$alive && $iterations <= 0) { break; } } - } /** @@ -178,18 +365,29 @@ private function runJob(\GearmanWorker $gearmanWorker, $objInstance, array $jobs * * @param \GearmanWorker $gmworker Worker to perform configuration * @param array $servers Servers array + * + * @throws ServerConnectionException if a connection to a server was not possible. + * + * @return array Successfully added servers */ - private function addServers(\GearmanWorker $gmworker, Array $servers) + private function addServers(\GearmanWorker $gmworker, array $servers) { + $successes = array(); + if (!empty($servers)) { foreach ($servers as $server) { - - $gmworker->addServer($server['host'], $server['port']); + if (@$gmworker->addServer($server['host'], $server['port'])) { + $successes[] = $server; + } } } else { - $gmworker->addServer(); + if (@$gmworker->addServer()) { + $successes[] = array('127.0.0.1', 4730); + } } + + return $successes; } /** @@ -198,13 +396,58 @@ private function addServers(\GearmanWorker $gmworker, Array $servers) * * @param string $workerName Name of worker to be executed */ - public function executeWorker($workerName) + public function executeWorker($workerName, array $options = array()) { $worker = $this->getWorker($workerName); if (false !== $worker) { - $this->callJob($worker); + $this->callJob($worker, $options); + } + } + + /** + * Wrapper function handler for all registered functions + * This allows us to do some nice logging when jobs are started/finished + * + * @see https://github.com/brianlmoon/GearmanManager/blob/ffc828dac2547aff76cb4962bb3fcc4f454ec8a2/GearmanPeclManager.php#L95-206 + * + * @param \GearmanJob $job + * @param mixed $context + * + * @return mixed + */ + public function handleJob(\GearmanJob $job) + { + if (!isset($this->workersBucket[$job->functionName()])) { + $context = false; + } else { + $context = $this->workersBucket[$job->functionName()]; + } + + if ( + !is_array($context) + || !array_key_exists('job_object_instance', $context) + || !array_key_exists('job_method', $context) + ) { + throw new \InvalidArgumentException('$context shall be an array with job_object_instance and job_method key.'); } + + $event = new GearmanWorkStartingEvent($context['jobs']); + $this->eventDispatcher->dispatch(GearmanEvents::GEARMAN_WORK_STARTING, $event); + + $result = call_user_func_array( + array($context['job_object_instance'], $context['job_method']), + array($job, $context) + ); + + /** + * Workaround for PECL bug #17114 + * http://pecl.php.net/bugs/bug.php?id=17114 + */ + $type = gettype($result); + settype($result, $type); + + return $result; } }