5, 'heartbeat_interval' => 30, 'max_retries' => 10 ] ) { $this->connect(); } private function connect(): void { try { $this->socket = new \WebSocket\Client($this->url, [ 'timeout' => $this->options['timeout'] ]); } catch (\Exception $e) { throw new \RuntimeException("连接失败: " . $e->getMessage()); } } public function on(int $msgId, callable $handler): void { $this->handlers[$msgId] = $handler; } public function sendBinary(int $msgId, string $data): void { try { $binMsgId = pack('n', $msgId); // 2字节无符号短整型 $binData = $binMsgId . $data; $this->socket->binary($binData); } catch (\Exception $e) { $this->handleDisconnect($e); } } public function startListening(): void { if ($this->isRunning) { return; } $this->isRunning = true; $lastHeartbeat = time(); while ($this->isRunning) { try { // 检查是否需要发送心跳 if (time() - $lastHeartbeat >= $this->options['heartbeat_interval']) { $this->sendHeartbeat(); $lastHeartbeat = time(); } // 接收消息 $message = $this->socket->receive(); if ($message) { $this->dispatchBinaryMessage($message); } usleep(10000); // 10ms 避免CPU过高使用 } catch (\WebSocket\ConnectionException $e) { $this->handleDisconnect($e); $lastHeartbeat = time(); // 重置心跳时间 } catch (\Exception $e) { error_log("错误: " . $e->getMessage()); } } } private function dispatchBinaryMessage(string $binary): void { if (strlen($binary) < 2) { error_log("无效的消息长度"); return; } try { // 解析消息ID(前2字节) $msgId = unpack('n', substr($binary, 0, 2))[1]; $data = substr($binary, 2); if (isset($this->handlers[$msgId])) { call_user_func($this->handlers[$msgId], $data); } } catch (\Exception $e) { error_log("消息处理错误: " . $e->getMessage()); } } private function sendHeartbeat(): void { try { // 这里假设心跳消息ID为1,根据实际协议调整 $this->sendBinary(1, ''); } catch (\Exception $e) { error_log("心跳发送失败: " . $e->getMessage()); } } private function handleDisconnect(\Throwable $e): void { error_log("连接断开: " . $e->getMessage()); if ($this->retryCount >= $this->options['max_retries']) { $this->close(); throw new \RuntimeException("达到最大重试次数"); } $delay = min(pow(2, $this->retryCount), 30); error_log("{$delay}秒后重试连接..."); sleep($delay); try { $this->connect(); $this->retryCount++; } catch (\Exception $e) { $this->handleDisconnect($e); } } public function close(): void { $this->isRunning = false; try { $this->socket->close(); } catch (\Exception $e) { error_log("关闭连接错误: " . $e->getMessage()); } } }