Server : nginx/1.20.2 System : Linux VM-4-4-centos 3.10.0-1160.66.1.el7.x86_64 #1 SMP Wed May 18 16:02:34 UTC 2022 x86_64 User : www ( 1000) PHP Version : 5.6.40 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv Directory : /www/wwwroot/greatapp.cn/vendor/topthink/think-queue/src/queue/command/ |
<?php // +---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ] // +---------------------------------------------------------------------- // | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved. // +---------------------------------------------------------------------- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 ) // +---------------------------------------------------------------------- // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- namespace think\queue\command; use think\Config; use think\console\Command; use think\console\Input; use think\console\input\Option; use think\console\Output; use think\Hook; use think\queue\Job; use think\queue\Worker; use Exception; use Throwable; use think\Cache; use think\exception\Handle; use think\exception\ThrowableError; class Work extends Command { /** * The queue worker instance. * @var \think\queue\Worker */ protected $worker; protected function initialize(Input $input, Output $output) { $this->worker = new Worker(); } protected function configure() { $this->setName('queue:work') ->addOption('queue', null, Option::VALUE_OPTIONAL, 'The queue to listen on') ->addOption('daemon', null, Option::VALUE_NONE, 'Run the worker in daemon mode') ->addOption('delay', null, Option::VALUE_OPTIONAL, 'Amount of time to delay failed jobs', 0) ->addOption('force', null, Option::VALUE_NONE, 'Force the worker to run even in maintenance mode') ->addOption('memory', null, Option::VALUE_OPTIONAL, 'The memory limit in megabytes', 128) ->addOption('sleep', null, Option::VALUE_OPTIONAL, 'Number of seconds to sleep when no job is available', 3) ->addOption('tries', null, Option::VALUE_OPTIONAL, 'Number of times to attempt a job before logging it failed', 0) ->setDescription('Process the next job on a queue'); } /** * Execute the console command. * @param Input $input * @param Output $output * @return int|null|void */ public function execute(Input $input, Output $output) { $queue = $input->getOption('queue'); $delay = $input->getOption('delay'); $memory = $input->getOption('memory'); if ($input->getOption('daemon')) { Hook::listen('worker_daemon_start', $queue); $this->daemon( $queue, $delay, $memory, $input->getOption('sleep'), $input->getOption('tries') ); } else { $response = $this->worker->pop($queue, $delay, $input->getOption('sleep'), $input->getOption('tries')); $this->output($response); } } protected function output($response) { if (!is_null($response['job'])) { /** @var Job $job */ $job = $response['job']; if ($response['failed']) { $this->output->writeln('<error>Failed:</error> ' . $job->getName()); } else { $this->output->writeln('<info>Processed:</info> ' . $job->getName()); } } } /** * 启动一个守护进程执行任务. * * @param string $queue * @param int $delay * @param int $memory * @param int $sleep * @param int $maxTries * @return array */ protected function daemon($queue = null, $delay = 0, $memory = 128, $sleep = 3, $maxTries = 0) { $lastRestart = $this->getTimestampOfLastQueueRestart(); while (true) { $this->runNextJobForDaemon( $queue, $delay, $sleep, $maxTries ); if ( $this->memoryExceeded($memory) ) { Hook::listen('worker_memory_exceeded', $queue); $this->stop(); } if ( $this->queueShouldRestart($lastRestart) ) { Hook::listen('worker_queue_restart', $queue); $this->stop(); } } } /** * 以守护进程的方式执行下个任务. * * @param string $queue * @param int $delay * @param int $sleep * @param int $maxTries * @return void */ protected function runNextJobForDaemon($queue, $delay, $sleep, $maxTries) { try { $response = $this->worker->pop($queue, $delay, $sleep, $maxTries); $this->output($response); } catch (Exception $e) { $this->getExceptionHandler()->report($e); } catch (Throwable $e) { $this->getExceptionHandler()->report(new ThrowableError($e)); } } /** * 获取上次重启守护进程的时间 * * @return int|null */ protected function getTimestampOfLastQueueRestart() { return Cache::get('think:queue:restart'); } /** * 检查是否要重启守护进程 * * @param int|null $lastRestart * @return bool */ protected function queueShouldRestart($lastRestart) { return $this->getTimestampOfLastQueueRestart() != $lastRestart; } /** * 检查内存是否超出 * @param int $memoryLimit * @return bool */ protected function memoryExceeded($memoryLimit) { return (memory_get_usage() / 1024 / 1024) >= $memoryLimit; } /** * 获取异常处理实例 * * @return \think\exception\Handle */ protected function getExceptionHandler() { static $handle; if (!$handle) { if ($class = Config::get('exception_handle')) { if (class_exists($class) && is_subclass_of($class, "\\think\\exception\\Handle")) { $handle = new $class; } } if (!$handle) { $handle = new Handle(); } } return $handle; } /** * 停止执行任务的守护进程. * @return void */ public function stop() { die; } }