Server : nginx/1.20.2 System : Linux VM-4-4-centos 3.10.0-1160.66.1.el7.x86_64 #1 SMP Wed May 18 16:02:34 UTC 2022 x86_64 User : www ( 1000) PHP Version : 5.6.40 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv Directory : /www/wwwroot/greatapp.cn/vendor/topthink/think-queue/src/queue/connector/ |
<?php // +---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ] // +---------------------------------------------------------------------- // | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved. // +---------------------------------------------------------------------- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 ) // +---------------------------------------------------------------------- // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- namespace think\queue\connector; use think\Db; use think\queue\Connector; use think\queue\job\Database as DatabaseJob; class Database extends Connector { protected $db; protected $options = [ 'expire' => 60, 'default' => 'default', 'table' => 'jobs', 'dsn' => [] ]; public function __construct($options) { if (!empty($options)) { $this->options = array_merge($this->options, $options); } $this->db = Db::connect($this->options['dsn']); } public function push($job, $data = '', $queue = null) { return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data)); } public function later($delay, $job, $data = '', $queue = null) { return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data)); } public function pop($queue = null) { $queue = $this->getQueue($queue); if (!is_null($this->options['expire'])) { $this->releaseJobsThatHaveBeenReservedTooLong($queue); } if ($job = $this->getNextAvailableJob($queue)) { $this->markJobAsReserved($job->id); $this->db->commit(); return new DatabaseJob($this, $job, $queue); } $this->db->commit(); } /** * 重新发布任务 * @param string $queue * @param \StdClass $job * @param int $delay * @return mixed */ public function release($queue, $job, $delay) { return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts); } /** * Push a raw payload to the database with a given delay. * * @param \DateTime|int $delay * @param string|null $queue * @param string $payload * @param int $attempts * @return mixed */ protected function pushToDatabase($delay, $queue, $payload, $attempts = 0) { return $this->db->name($this->options['table'])->insert([ 'queue' => $this->getQueue($queue), 'payload' => $payload, 'attempts' => $attempts, 'reserved' => 0, 'reserved_at' => null, 'available_at' => time() + $delay, 'created_at' => time() ]); } /** * 获取下个有效任务 * * @param string|null $queue * @return \StdClass|null */ protected function getNextAvailableJob($queue) { $this->db->startTrans(); $job = $this->db->name($this->options['table']) ->lock(true) ->where('queue', $this->getQueue($queue)) ->where('reserved', 0) ->where('available_at', '<=', time()) ->order('id', 'asc') ->find(); return $job ? (object) $job : null; } /** * 标记任务正在执行. * * @param string $id * @return void */ protected function markJobAsReserved($id) { $this->db->name($this->options['table'])->where('id', $id)->update([ 'reserved' => 1, 'reserved_at' => time() ]); } /** * 重新发布超时的任务 * * @param string $queue * @return void */ protected function releaseJobsThatHaveBeenReservedTooLong($queue) { $expired = time() - $this->options['expire']; $this->db->name($this->options['table']) ->where('queue', $this->getQueue($queue)) ->where('reserved', 1) ->where('reserved_at', '<=', $expired) ->update([ 'reserved' => 0, 'reserved_at' => null, 'attempts' => ['exp', 'attempts + 1'] ]); } /** * 删除任务 * @param string $id * @return void */ public function deleteReserved($id) { $this->db->name($this->options['table'])->delete($id); } protected function getQueue($queue) { return $queue ?: $this->options['default']; } }