Server : nginx/1.20.2
System : Linux VM-4-4-centos 3.10.0-1160.66.1.el7.x86_64 #1 SMP Wed May 18 16:02:34 UTC 2022 x86_64
User : www ( 1000)
PHP Version : 5.6.40
Disable Function : passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
Directory :  /www/wwwroot/greatapp.cn/vendor/topthink/think-queue/src/queue/connector/
Upload File :
Current Directory [ Writeable ] Root Directory [ Writeable ]


Current File : /www/wwwroot/greatapp.cn/vendor/topthink/think-queue/src/queue/connector/Topthink.php
<?php
// +----------------------------------------------------------------------
// | ThinkPHP [ WE CAN DO IT JUST THINK IT ]
// +----------------------------------------------------------------------
// | Copyright (c) 2006-2015 http://thinkphp.cn All rights reserved.
// +----------------------------------------------------------------------
// | Licensed ( http://www.apache.org/licenses/LICENSE-2.0 )
// +----------------------------------------------------------------------
// | Author: yunwuxin <448901948@qq.com>
// +----------------------------------------------------------------------

namespace think\queue\connector;

use think\exception\HttpException;
use think\queue\Connector;
use think\Request;
use think\queue\job\Topthink as TopthinkJob;
use think\Response;

class Topthink extends Connector
{
    protected $options = [
        'token'       => '',
        'project_id'  => '',
        'protocol'    => 'https',
        'host'        => 'qns.topthink.com',
        'port'        => 443,
        'api_version' => 1,
        'max_retries' => 3,
        'default'     => 'default'
    ];

    /** @var  Request */
    protected $request;

    protected $url;

    protected $curl = null;

    protected $last_status;

    protected $headers = [];

    public function __construct($options)
    {
        if (!empty($options)) {
            $this->options = array_merge($this->options, $options);
        }

        $this->url = "{$this->options['protocol']}://{$this->options['host']}:{$this->options['port']}/v{$this->options['api_version']}/";

        $this->headers['Authorization'] = "Bearer {$this->options['token']}";

        $this->request = Request::instance();
    }

    public function push($job, $data = '', $queue = null)
    {
        return $this->pushRaw(0, $queue, $this->createPayload($job, $data));
    }

    public function later($delay, $job, $data = '', $queue = null)
    {
        return $this->pushRaw($delay, $queue, $this->createPayload($job, $data));
    }

    public function release($queue, $job, $delay)
    {
        return $this->pushRaw($delay, $queue, $job->payload, $job->attempts);
    }

    public function marshal()
    {
        $job = new TopthinkJob($this, $this->marshalPushedJob(), $this->request->header('topthink-message-queue'));
        if ($this->request->header('topthink-message-status') == 'success') {
            $job->fire();
        } else {
            $job->failed();
        }
        return new Response('OK');
    }

    public function pushRaw($delay, $queue, $payload, $attempts = 0)
    {
        $queue_name = $this->getQueue($queue);
        $queue      = rawurlencode($queue_name);
        $url        = "project/{$this->options['project_id']}/queue/{$queue}/message";
        $message    = [
            'payload'  => $payload,
            'attempts' => $attempts,
            'delay'    => $delay
        ];

        return $this->apiCall('POST', $url, $message)->id;
    }

    public function deleteMessage($queue, $id)
    {
        $queue = rawurlencode($queue);
        $url   = "project/{$this->options['project_id']}/queue/{$queue}/message/{$id}";
        return $this->apiCall('DELETE', $url);
    }

    protected function apiCall($type, $url, $params = [])
    {
        $url = "{$this->url}$url";

        if ($this->curl == null) {
            $this->curl = curl_init();
        }

        switch ($type = strtoupper($type)) {
            case 'DELETE':
                curl_setopt($this->curl, CURLOPT_URL, $url);
                curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
                curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
                break;
            case 'PUT':
                curl_setopt($this->curl, CURLOPT_URL, $url);
                curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
                curl_setopt($this->curl, CURLOPT_POSTFIELDS, json_encode($params));
                break;
            case 'POST':
                curl_setopt($this->curl, CURLOPT_URL, $url);
                curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
                curl_setopt($this->curl, CURLOPT_POST, true);
                curl_setopt($this->curl, CURLOPT_POSTFIELDS, $params);
                break;
            case 'GET':
                curl_setopt($this->curl, CURLOPT_POSTFIELDS, null);
                curl_setopt($this->curl, CURLOPT_CUSTOMREQUEST, $type);
                curl_setopt($this->curl, CURLOPT_HTTPGET, true);
                $url .= '?' . http_build_query($params);
                curl_setopt($this->curl, CURLOPT_URL, $url);
                break;
        }

        curl_setopt($this->curl, CURLOPT_SSL_VERIFYPEER, false);
        curl_setopt($this->curl, CURLOPT_RETURNTRANSFER, true);

        $headers = [];
        foreach ($this->headers as $k => $v) {
            if ($k == 'Connection') {
                $v = 'Close';
            }
            $headers[] = "$k: $v";
        }

        curl_setopt($this->curl, CURLOPT_HTTPHEADER, $headers);
        curl_setopt($this->curl, CURLOPT_CONNECTTIMEOUT, 10);

        return $this->callWithRetries();
    }

    protected function callWithRetries()
    {
        for ($retry = 0; $retry < $this->options['max_retries']; $retry++) {
            $out = curl_exec($this->curl);
            if ($out === false) {
                $this->reportHttpError(0, curl_error($this->curl));
            }
            $this->last_status = curl_getinfo($this->curl, CURLINFO_HTTP_CODE);

            if ($this->last_status >= 200 && $this->last_status < 300) {
                return self::jsonDecode($out);
            } elseif ($this->last_status >= 500) {
                self::waitRandomInterval($retry);
            } else {
                $this->reportHttpError($this->last_status, $out);
            }
        }
        $this->reportHttpError($this->last_status, "Service unavailable");
        return;
    }

    protected static function jsonDecode($response)
    {
        $data = json_decode($response);

        $json_error = json_last_error();
        if ($json_error != JSON_ERROR_NONE) {
            throw new \RuntimeException($json_error);
        }

        return $data;
    }

    protected static function waitRandomInterval($retry)
    {
        $max_delay = pow(4, $retry) * 100 * 1000;
        usleep(rand(0, $max_delay));
    }

    protected function reportHttpError($status, $text)
    {
        throw new HttpException($status, "http error: {$status} | {$text}");
    }

    /**
     * Marshal out the pushed job and payload.
     *
     * @return object
     */
    protected function marshalPushedJob()
    {
        return (object) [
            'id'       => $this->request->header('topthink-message-id'),
            'payload'  => $this->request->getContent(),
            'attempts' => $this->request->header('topthink-message-attempts')
        ];
    }

    public function __destruct()
    {
        if ($this->curl != null) {
            curl_close($this->curl);
            $this->curl = null;
        }
    }

    public function pop($queue = null)
    {
        throw new \RuntimeException('pop queues not support for this type');
    }
}