123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- <?php
- namespace App;
- require_once __DIR__ . '/../vendor/autoload.php';
- class BinaryWebSocketClient
- {
- private \WebSocket\Client $socket;
- private array $handlers = [];
- private bool $isRunning = false;
- private int $retryCount = 0;
- public function __construct(
- private string $url,
- private array $options = [
- 'timeout' => 5,
- 'heartbeat_interval' => 30,
- 'max_retries' => 10
- ]
- ) {
- $this->connect();
- }
- private function connect(): void
- {
- try {
- $this->socket = new \WebSocket\Client($this->url, [
- 'timeout' => $this->options['timeout']
- ]);
- } catch (\Exception $e) {
- throw new \RuntimeException("连接失败: " . $e->getMessage());
- }
- }
- public function on(int $msgId, callable $handler): void
- {
- $this->handlers[$msgId] = $handler;
- }
- public function sendBinary(int $msgId, string $data): void
- {
- try {
- $binMsgId = pack('n', $msgId); // 2字节无符号短整型
- $binData = $binMsgId . $data;
- $this->socket->binary($binData);
- } catch (\Exception $e) {
- $this->handleDisconnect($e);
- }
- }
- public function startListening(): void
- {
- if ($this->isRunning) {
- return;
- }
- $this->isRunning = true;
- $lastHeartbeat = time();
- while ($this->isRunning) {
- try {
- // 检查是否需要发送心跳
- if (time() - $lastHeartbeat >= $this->options['heartbeat_interval']) {
- $this->sendHeartbeat();
- $lastHeartbeat = time();
- }
- // 接收消息
- $message = $this->socket->receive();
- if ($message) {
- $this->dispatchBinaryMessage($message);
- }
-
- usleep(10000); // 10ms 避免CPU过高使用
- } catch (\WebSocket\ConnectionException $e) {
- $this->handleDisconnect($e);
- $lastHeartbeat = time(); // 重置心跳时间
- } catch (\Exception $e) {
- error_log("错误: " . $e->getMessage());
- }
- }
- }
- private function dispatchBinaryMessage(string $binary): void
- {
- if (strlen($binary) < 2) {
- error_log("无效的消息长度");
- return;
- }
- try {
- // 解析消息ID(前2字节)
- $msgId = unpack('n', substr($binary, 0, 2))[1];
- $data = substr($binary, 2);
- if (isset($this->handlers[$msgId])) {
- call_user_func($this->handlers[$msgId], $data);
- }
- } catch (\Exception $e) {
- error_log("消息处理错误: " . $e->getMessage());
- }
- }
- private function sendHeartbeat(): void
- {
- try {
- // 这里假设心跳消息ID为1,根据实际协议调整
- $this->sendBinary(1, '');
- } catch (\Exception $e) {
- error_log("心跳发送失败: " . $e->getMessage());
- }
- }
- private function handleDisconnect(\Throwable $e): void
- {
- error_log("连接断开: " . $e->getMessage());
- if ($this->retryCount >= $this->options['max_retries']) {
- $this->close();
- throw new \RuntimeException("达到最大重试次数");
- }
- $delay = min(pow(2, $this->retryCount), 30);
- error_log("{$delay}秒后重试连接...");
- sleep($delay);
- try {
- $this->connect();
- $this->retryCount++;
- } catch (\Exception $e) {
- $this->handleDisconnect($e);
- }
- }
- public function close(): void
- {
- $this->isRunning = false;
- try {
- $this->socket->close();
- } catch (\Exception $e) {
- error_log("关闭连接错误: " . $e->getMessage());
- }
- }
- }
|