讨论 Laravel社区 laravel 连接 trino

laravel 连接 trino

小晨晨 发表于    阅读:386    回复:0

trino 服务端测试使用

docker run -it --name trino -p 8080:8080 trinodb/trino

laravel 的 php 测试代码

$YsPhpTrino = new \App\Libs\YsPhpTrino([
    'host'=>'http://127.0.0.1:8080',
    'basic_auth_user'=>'user',
    'basic_auth_password'=>'',
]);
$result = $YsPhpTrino->query('select * from jmx.information_schema.tables limit 10');
dd($result);

trino 连接的 laravel 类代码

<?php
namespace App\Libs;

/**
 * 参考
 * https://github.com/tagomoris/presto-client-node/tree/master
 */
class YsPhpTrino
{

    public $options;

    const LOG_PRE = 'ysphptrino';

    public $columns = [];
    public $rows = [];
    public $nextUriNum = 0;
    public $timeStart = 0;


    /**
     *  构造方法
     */
    public function __construct($options = []) {
        $this->setOptions($options);
    }

    /**
     * 设置 配置
     */
    public function setOptions($options) {
        $this->options = $options;
        return $this;
    }

    /**
     * 查询
     */
    public function query($sql, $catalog = '', $schema = '', $source = '') {
        $this->columns = [];
        $this->rows = [];
        $this->nextUriNum = 0;
        $this->timeStart = time();
        $this->log('', "ysphptrino_query:开始: query:".$sql);
        $header = [];
        if (!empty($catalog)) {
            $header['X-Presto-Catalog'] = $catalog;
        }
        if (!empty($schema)) {
            $header['X-Presto-Schema'] = $schema;
        }
        // 认证
        if (!empty($this->options['basic_auth_user']) || !empty($this->options['basic_auth_password'])) {
            $str = base64_encode(trim(@$this->options['basic_auth_user']).':'.trim(@$this->options['basic_auth_password']));
            $header['Authorization'] = 'Basic '.$str;
        }
        $result = $this->request('POST', $this->getHost().'/v1/statement', $header, $sql);
        if ($result['code'] !== 0) return $result;
        $data = json_decode($result['data'], true);
        $state = trim(@$data['stats']['state']);
        if (in_array($state, ['QUEUED', 'PLANNING', 'STARTING']) && empty($data['data'])) {
            $result = $this->nextUri(@$data['nextUri']);
            $this->log('', "ysphptrino_query:开始结束用时: " . (time()-$this->timeStart) . " 秒.query:".$sql);
            return $result;
        }
        return $this->back(1, '未知的state:'.$state);
    }

    /**
     * nextUri
     */
    public function nextUri($url) {
        $this->nextUriNum++;
        if ($this->nextUriNum > 100) return $this->back(1, 'nextUri请求次数过百');
        if (empty($url)) return $this->back(1, 'nextUri为空');
        $result = $this->request('GET', $url, [], '');
        if ($result['code'] !== 0) return $result;
        $data = json_decode($result['data'], true);
        $state = trim(@$data['stats']['state']);
        if (in_array($state, ['FAILED'])) {
            $msg = $data['stats']['state'].':'.trim(@$data['error']['message']);
            return $this->back(1, $msg);
        }
        if (in_array($state, ['QUEUED', 'PLANNING', 'STARTING', 'RUNNING', 'FINISHING']) && empty($data['data'])) {
            return $this->nextUri(@$data['nextUri']);
        }
        if (in_array($state, ['RUNNING'])) {
            // 这里的 data 必不为空
            $this->columns = @$data['columns'];
            $this->rows = array_merge($this->rows, $data['data']);
            return $this->nextUri(@$data['nextUri']);
        }
        if (in_array($state, ['FINISHED'])) {
            if (!empty($data['data'])) {
                $this->rows = array_merge($this->rows, $data['data']);
            }
            return $this->back(0, '', ['columns'=>$this->columns, 'rows'=>$this->rows]);
        }
        return $this->back(1, '未知的state:'.$state);
    }

    /**
     * 请求
     */
    public function request($method, $url, $header, $body) {
        $client = new \GuzzleHttp\Client();
        $logPath = '';
        $this->log($logPath, ['requestApi-start', $method, $url, $header, $body]);
        try {
            $response = $client->request($method, $url, [
                'body'=>$body,
                'headers'=>$header,
            ]);
        } catch (ClientException $e) {
            $this->log($logPath, "服务器" . $e->getResponse()->getStatusCode() . "异常,请联系管理员");
            return $this->back(2, $url." 服务器" . $e->getResponse()->getStatusCode() . "异常,请联系管理员");
        } catch (ConnectException $e) {
            $this->log($logPath, "网络异常,请联系管理员");
            return $this->back(2, $url ." 网络异常,请联系管理员");

        } catch (\GuzzleHttp\Exception\RequestException $e) {
            $msg = $e->getMessage();
            $msgOther = [];
            if ($e->getResponse()) {
                $msg = $e->getResponse()->getStatusCode();
                $msgOther = [$e->getResponse()->getBody()->getContents()];
            }
            if ($e->getResponse()) $msg = $e->getResponse()->getStatusCode();
            $this->log($logPath, "服务器" . $msg . "异常,请联系管理员", $msgOther);
            return $this->back(2, $url." 服务器" . $msg . "异常,请联系管理员");
        } catch (\GuzzleHttp\Exception\ConnectException $e) {
            $code = '异常代码:'.$e->getCode();
            $msg = '异常消息:'.$e->getMessage();
            $this->log($logPath, $code.' '.$msg);
            return $this->back(2, $url ." 网络异常,请联系管理员");
        }
        $data = $response->getBody()->getContents();
        $this->log($logPath, $data, [$response->getStatusCode()]);
        if (trim($response->getStatusCode()) !== '200') {
            return $this->back(2, $url." 服务器" . $response->getStatusCode() . "异常,请联系管理员");
        }
        return $this->back(0, '', $data);
    }

    /**
     * log
     */
    public function log($logPath, $msg, $context = []) {
        if (gettype($msg) !== 'string') {
            $msg = json_encode($msg);
        }
        $logN = \Log::build([
                'driver' => 'daily',
                'path' => storage_path('/logs/' .self::LOG_PRE. $logPath.'/ysphptrino.log'),
            ]);
        $logN->info($msg, $context);
    }

    /**
     * 获取host
     * 不以 / 结尾
     */
    public function getHost() {
        $host = trim(@$this->options['host']);
        if (substr($host, -1) === '/') $host = substr($host, 0, -1);
        return $host;
    }

    /**
     * 返回的数据
     */
    public function back($code, $msg = '', $data = []) {
        return ['code'=>$code, 'msg'=>$msg, 'message'=>$msg, 'data'=>$data];
    }
}


我来评论
QQ
微信
客服