Server : nginx/1.20.2 System : Linux VM-4-4-centos 3.10.0-1160.66.1.el7.x86_64 #1 SMP Wed May 18 16:02:34 UTC 2022 x86_64 User : www ( 1000) PHP Version : 5.6.40 Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv Directory : /www/wwwroot/greatapp.cn/vendor/topthink/think-queue/src/queue/connector/ |
<?php // +---------------------------------------------------------------------- // | ThinkPHP [ WE CAN DO IT JUST THINK IT ] // +---------------------------------------------------------------------- // | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved. // +---------------------------------------------------------------------- // | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 ) // +---------------------------------------------------------------------- // | Author: yunwuxin <448901948@qq.com> // +---------------------------------------------------------------------- namespace think\queue\connector; use think\exception\HttpException; use think\queue\Connector; use think\Request; use think\queue\job\Topthink as TopthinkJob; use think\Response; class Topthink extends Connector { protected $options = [ 'token' => '', 'project_id' => '', 'protocol' => 'https', 'host' => 'qns.topthink.com', 'port' => 443, 'api_version' => 1, 'max_retries' => 3, 'default' => 'default' ]; /** @var Request */ protected $request; protected $url; protected $curl = null; protected $last_status; protected $headers = []; public function __construct($options) { if (!empty($options)) { $this->options = array_merge($this->options, $options); } $this->url = "{$this->options['protocol']}://{$this->options['host']}:{$this->options['port']}/v{$this->options['api_version']}/"; $this->headers['Authorization'] = "Bearer {$this->options['token']}"; $this->request = Request::instance(); } public function push($job, $data = '', $queue = null) { return $this->pushRaw(0, $queue, $this->createPayload($job, $data)); } public function later($delay, $job, $data = '', $queue = null) { return $this->pushRaw($delay, $queue, $this->createPayload($job, $data)); } public function release($queue, $job, $delay) { return $this->pushRaw($delay, $queue, $job->payload, $job->attempts); } public function marshal() { $job = new TopthinkJob($this, $this->marshalPushedJob(), $this->request->header('topthink-message-queue')); if ($this->request->header('topthink-message-status') == 'success') { $job->fire(); } else { $job->failed(); } return new Response('OK'); } public function pushRaw($delay, $queue, $payload, $attempts = 0) { $queue_name = $this->getQueue($queue); $queue = rawurlencode($queue_name); $url = "project/{$this->options['project_id']}/queue/{$queue}/message"; $message = [ 'payload' => $payload, 'attempts' => $attempts, 'delay' => $delay ]; return $this->apiCall('POST', $url, $message)->id; } public function deleteMessage($queue, $id) { $queue = rawurlencode($queue); $url = "project/{$this->options['project_id']}/queue/{$queue}/message/{$id}"; return $this->apiCall('DELETE', $url); } protected function apiCall($type, $url, $params = []) { $url = "{$this->url}$url"; if ($this->curl == null) { $this->curl = curl_init(); } switch ($type = strtoupper($type)) { case 'DELETE': curl_setopt($this->curl, CURLOPT_URL, $url); curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params)); break; case 'PUT': curl_setopt($this->curl, CURLOPT_URL, $url); curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params)); break; case 'POST': curl_setopt($this->curl, CURLOPT_URL, $url); curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); curl_setopt($this->curl, CURLOPT_POST, true); curl_setopt($this->curl, CURLOPT_POSTFIELDS, $params); break; case 'GET': curl_setopt($this->curl, CURLOPT_POSTFIELDS, null); curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type); curl_setopt($this->curl, CURLOPT_HTTPGET, true); $url .= '?' . http_build_query($params); curl_setopt($this->curl, CURLOPT_URL, $url); break; } curl_setopt($this->curl, CURLOPT_SSL_VERIFYPEER, false); curl_setopt($this->curl, CURLOPT_RETURNTRANSFER, true); $headers = []; foreach ($this->headers as $k => $v) { if ($k == 'Connection') { $v = 'Close'; } $headers[] = "$k: $v"; } curl_setopt($this->curl, CURLOPT_HTTPHEADER, $headers); curl_setopt($this->curl, CURLOPT_CONNECTTIMEOUT, 10); return $this->callWithRetries(); } protected function callWithRetries() { for ($retry = 0; $retry < $this->options['max_retries']; $retry++) { $out = curl_exec($this->curl); if ($out === false) { $this->reportHttpError(0, curl_error($this->curl)); } $this->last_status = curl_getinfo($this->curl, CURLINFO_HTTP_CODE); if ($this->last_status >= 200 && $this->last_status < 300) { return self::jsonDecode($out); } elseif ($this->last_status >= 500) { self::waitRandomInterval($retry); } else { $this->reportHttpError($this->last_status, $out); } } $this->reportHttpError($this->last_status, "Service unavailable"); return; } protected static function jsonDecode($response) { $data = json_decode($response); $json_error = json_last_error(); if ($json_error != JSON_ERROR_NONE) { throw new \RuntimeException($json_error); } return $data; } protected static function waitRandomInterval($retry) { $max_delay = pow(4, $retry) * 100 * 1000; usleep(rand(0, $max_delay)); } protected function reportHttpError($status, $text) { throw new HttpException($status, "http error: {$status} | {$text}"); } /** * Marshal out the pushed job and payload. * * @return object */ protected function marshalPushedJob() { return (object) [ 'id' => $this->request->header('topthink-message-id'), 'payload' => $this->request->getContent(), 'attempts' => $this->request->header('topthink-message-attempts') ]; } public function __destruct() { if ($this->curl != null) { curl_close($this->curl); $this->curl = null; } } public function pop($queue = null) { throw new \RuntimeException('pop queues not support for this type'); } }