QQ扫一扫联系
trino 服务端测试使用
docker run -it --name trino -p 8080:8080 trinodb/trino
laravel 的 php 测试代码
$YsPhpTrino = new \App\Libs\YsPhpTrino([ 'host'=>'http://127.0.0.1:8080', 'basic_auth_user'=>'user', 'basic_auth_password'=>'', ]); $result = $YsPhpTrino->query('select * from jmx.information_schema.tables limit 10'); dd($result);
trino 连接的 laravel 类代码
<?php namespace App\Libs; /** * 参考 * https://github.com/tagomoris/presto-client-node/tree/master */ class YsPhpTrino { public $options; const LOG_PRE = 'ysphptrino'; public $columns = []; public $rows = []; public $nextUriNum = 0; public $timeStart = 0; /** * 构造方法 */ public function __construct($options = []) { $this->setOptions($options); } /** * 设置 配置 */ public function setOptions($options) { $this->options = $options; return $this; } /** * 查询 */ public function query($sql, $catalog = '', $schema = '', $source = '') { $this->columns = []; $this->rows = []; $this->nextUriNum = 0; $this->timeStart = time(); $this->log('', "ysphptrino_query:开始: query:".$sql); $header = []; if (!empty($catalog)) { $header['X-Presto-Catalog'] = $catalog; } if (!empty($schema)) { $header['X-Presto-Schema'] = $schema; } // 认证 if (!empty($this->options['basic_auth_user']) || !empty($this->options['basic_auth_password'])) { $str = base64_encode(trim(@$this->options['basic_auth_user']).':'.trim(@$this->options['basic_auth_password'])); $header['Authorization'] = 'Basic '.$str; } $result = $this->request('POST', $this->getHost().'/v1/statement', $header, $sql); if ($result['code'] !== 0) return $result; $data = json_decode($result['data'], true); $state = trim(@$data['stats']['state']); if (in_array($state, ['QUEUED', 'PLANNING', 'STARTING']) && empty($data['data'])) { $result = $this->nextUri(@$data['nextUri']); $this->log('', "ysphptrino_query:开始结束用时: " . (time()-$this->timeStart) . " 秒.query:".$sql); return $result; } return $this->back(1, '未知的state:'.$state); } /** * nextUri */ public function nextUri($url) { $this->nextUriNum++; if ($this->nextUriNum > 100) return $this->back(1, 'nextUri请求次数过百'); if (empty($url)) return $this->back(1, 'nextUri为空'); $result = $this->request('GET', $url, [], ''); if ($result['code'] !== 0) return $result; $data = json_decode($result['data'], true); $state = trim(@$data['stats']['state']); if (in_array($state, ['FAILED'])) { $msg = $data['stats']['state'].':'.trim(@$data['error']['message']); return $this->back(1, $msg); } if (in_array($state, ['QUEUED', 'PLANNING', 'STARTING', 'RUNNING', 'FINISHING']) && empty($data['data'])) { return $this->nextUri(@$data['nextUri']); } if (in_array($state, ['RUNNING'])) { // 这里的 data 必不为空 $this->columns = @$data['columns']; $this->rows = array_merge($this->rows, $data['data']); return $this->nextUri(@$data['nextUri']); } if (in_array($state, ['FINISHED'])) { if (!empty($data['data'])) { $this->rows = array_merge($this->rows, $data['data']); } return $this->back(0, '', ['columns'=>$this->columns, 'rows'=>$this->rows]); } return $this->back(1, '未知的state:'.$state); } /** * 请求 */ public function request($method, $url, $header, $body) { $client = new \GuzzleHttp\Client(); $logPath = ''; $this->log($logPath, ['requestApi-start', $method, $url, $header, $body]); try { $response = $client->request($method, $url, [ 'body'=>$body, 'headers'=>$header, ]); } catch (ClientException $e) { $this->log($logPath, "服务器" . $e->getResponse()->getStatusCode() . "异常,请联系管理员"); return $this->back(2, $url." 服务器" . $e->getResponse()->getStatusCode() . "异常,请联系管理员"); } catch (ConnectException $e) { $this->log($logPath, "网络异常,请联系管理员"); return $this->back(2, $url ." 网络异常,请联系管理员"); } catch (\GuzzleHttp\Exception\RequestException $e) { $msg = $e->getMessage(); $msgOther = []; if ($e->getResponse()) { $msg = $e->getResponse()->getStatusCode(); $msgOther = [$e->getResponse()->getBody()->getContents()]; } if ($e->getResponse()) $msg = $e->getResponse()->getStatusCode(); $this->log($logPath, "服务器" . $msg . "异常,请联系管理员", $msgOther); return $this->back(2, $url." 服务器" . $msg . "异常,请联系管理员"); } catch (\GuzzleHttp\Exception\ConnectException $e) { $code = '异常代码:'.$e->getCode(); $msg = '异常消息:'.$e->getMessage(); $this->log($logPath, $code.' '.$msg); return $this->back(2, $url ." 网络异常,请联系管理员"); } $data = $response->getBody()->getContents(); $this->log($logPath, $data, [$response->getStatusCode()]); if (trim($response->getStatusCode()) !== '200') { return $this->back(2, $url." 服务器" . $response->getStatusCode() . "异常,请联系管理员"); } return $this->back(0, '', $data); } /** * log */ public function log($logPath, $msg, $context = []) { if (gettype($msg) !== 'string') { $msg = json_encode($msg); } $logN = \Log::build([ 'driver' => 'daily', 'path' => storage_path('/logs/' .self::LOG_PRE. $logPath.'/ysphptrino.log'), ]); $logN->info($msg, $context); } /** * 获取host * 不以 / 结尾 */ public function getHost() { $host = trim(@$this->options['host']); if (substr($host, -1) === '/') $host = substr($host, 0, -1); return $host; } /** * 返回的数据 */ public function back($code, $msg = '', $data = []) { return ['code'=>$code, 'msg'=>$msg, 'message'=>$msg, 'data'=>$data]; } }