|
@@ -1,142 +1,144 @@
|
|
|
<?php
|
|
|
-// BinaryWebSocketClient.php
|
|
|
-
|
|
|
declare(strict_types=1);
|
|
|
|
|
|
-class BinaryWebSocketClient
|
|
|
+require 'vendor/autoload.php';
|
|
|
+
|
|
|
+class BinaryWebSocketClient
|
|
|
{
|
|
|
private \WebSocket\Client $socket;
|
|
|
private array $handlers = [];
|
|
|
private bool $isRunning = false;
|
|
|
+ private int $retryCount = 0;
|
|
|
|
|
|
public function __construct(
|
|
|
- string $url,
|
|
|
+ private string $url,
|
|
|
private array $options = [
|
|
|
'timeout' => 5,
|
|
|
- 'heartbeat_interval' => 30
|
|
|
+ 'heartbeat_interval' => 30,
|
|
|
+ 'max_retries' => 10
|
|
|
]
|
|
|
) {
|
|
|
- $this->socket = new \WebSocket\Client($url, [
|
|
|
- 'timeout' => $this->options['timeout']
|
|
|
- ]);
|
|
|
+ $this->connect();
|
|
|
+ }
|
|
|
+
|
|
|
+ private function connect(): void
|
|
|
+ {
|
|
|
+ try {
|
|
|
+ $this->socket = new \WebSocket\Client($this->url, [
|
|
|
+ 'timeout' => $this->options['timeout']
|
|
|
+ ]);
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ throw new \RuntimeException("连接失败: " . $e->getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 注册消息处理器
|
|
|
- */
|
|
|
- public function on(int $msgId, callable $handler): void
|
|
|
+ public function on(int $msgId, callable $handler): void
|
|
|
{
|
|
|
$this->handlers[$msgId] = $handler;
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 发送二进制消息
|
|
|
- * @param int $msgId 使用MsgID常量
|
|
|
- * @param \Google\Protobuf\Internal\Message $data Protobuf消息对象
|
|
|
- */
|
|
|
- public function sendBinary(int $msgId, \Google\Protobuf\Internal\Message $data): void
|
|
|
+ public function sendBinary(int $msgId, string $data): void
|
|
|
{
|
|
|
- $binMsgId = pack('n', $msgId); // 2字节无符号短整型
|
|
|
- $binData = $binMsgId . $data->serializeToString();
|
|
|
- $this->socket->send($binData, \WebSocket\Client::BINARY);
|
|
|
+ try {
|
|
|
+ $binMsgId = pack('n', $msgId); // 2字节无符号短整型
|
|
|
+ $binData = $binMsgId . $data;
|
|
|
+ $this->socket->binary($binData);
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ $this->handleDisconnect($e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 启动消息监听循环
|
|
|
- */
|
|
|
- public function startListening(): void
|
|
|
+ public function startListening(): void
|
|
|
{
|
|
|
if ($this->isRunning) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
$this->isRunning = true;
|
|
|
- $this->startHeartbeat();
|
|
|
+ $lastHeartbeat = time();
|
|
|
|
|
|
while ($this->isRunning) {
|
|
|
try {
|
|
|
+ // 检查是否需要发送心跳
|
|
|
+ if (time() - $lastHeartbeat >= $this->options['heartbeat_interval']) {
|
|
|
+ $this->sendHeartbeat();
|
|
|
+ $lastHeartbeat = time();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 接收消息
|
|
|
$message = $this->socket->receive();
|
|
|
- $this->dispatchBinaryMessage($message);
|
|
|
+ if ($message) {
|
|
|
+ $this->dispatchBinaryMessage($message);
|
|
|
+ }
|
|
|
+
|
|
|
+ usleep(10000); // 10ms 避免CPU过高使用
|
|
|
} catch (\WebSocket\ConnectionException $e) {
|
|
|
$this->handleDisconnect($e);
|
|
|
+ $lastHeartbeat = time(); // 重置心跳时间
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ error_log("错误: " . $e->getMessage());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * 解析二进制消息
|
|
|
- */
|
|
|
- private function dispatchBinaryMessage(string $binary): void
|
|
|
+ private function dispatchBinaryMessage(string $binary): void
|
|
|
{
|
|
|
if (strlen($binary) < 2) {
|
|
|
- throw new \RuntimeException("Invalid binary message length");
|
|
|
+ error_log("无效的消息长度");
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
- // 提取MsgID (前2字节)
|
|
|
- $msgId = unpack('n', substr($binary, 0, 2))[1];
|
|
|
- $protoData = substr($binary, 2);
|
|
|
+ try {
|
|
|
+ // 解析消息ID(前2字节)
|
|
|
+ $msgId = unpack('n', substr($binary, 0, 2))[1];
|
|
|
+ $data = substr($binary, 2);
|
|
|
|
|
|
- if (isset($this->handlers[$msgId])) {
|
|
|
- try {
|
|
|
- // 根据MsgID获取对应的Protobuf类
|
|
|
- $protoClass = $this->getProtoClassByMsgId($msgId);
|
|
|
- $message = new $protoClass();
|
|
|
- $message->mergeFromString($protoData);
|
|
|
- $this->handlers[$msgId]($message);
|
|
|
- } catch (\Exception $e) {
|
|
|
- error_log("Decode error for MsgID {$msgId}: " . $e->getMessage());
|
|
|
+ if (isset($this->handlers[$msgId])) {
|
|
|
+ call_user_func($this->handlers[$msgId], $data);
|
|
|
}
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ error_log("消息处理错误: " . $e->getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * MsgID到Protobuf类的映射
|
|
|
- */
|
|
|
- private function getProtoClassByMsgId(int $msgId): string
|
|
|
+ private function sendHeartbeat(): void
|
|
|
{
|
|
|
- static $map = [
|
|
|
- MsgID::REQLOGIN => \Protocol\ReqLogin::class,
|
|
|
- MsgID::RESLOGIN => \Protocol\ResLogin::class,
|
|
|
- // 其他MsgID映射...
|
|
|
- ];
|
|
|
-
|
|
|
- if (!isset($map[$msgId])) {
|
|
|
- throw new \RuntimeException("Unregistered MsgID: {$msgId}");
|
|
|
+ try {
|
|
|
+ // 这里假设心跳消息ID为1,根据实际协议调整
|
|
|
+ $this->sendBinary(1, '');
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ error_log("心跳发送失败: " . $e->getMessage());
|
|
|
}
|
|
|
-
|
|
|
- return $map[$msgId];
|
|
|
}
|
|
|
|
|
|
- private function startHeartbeat(): void
|
|
|
+ private function handleDisconnect(\Throwable $e): void
|
|
|
{
|
|
|
- if ($this->options['heartbeat_interval'] <= 0) {
|
|
|
- return;
|
|
|
- }
|
|
|
+ error_log("连接断开: " . $e->getMessage());
|
|
|
|
|
|
- \React\EventLoop\Loop::addPeriodicTimer(
|
|
|
- $this->options['heartbeat_interval'],
|
|
|
- fn() => $this->sendBinary(MsgID::REQHEARTBEAT, new \Protocol\Heartbeat())
|
|
|
- );
|
|
|
- }
|
|
|
+ if ($this->retryCount >= $this->options['max_retries']) {
|
|
|
+ $this->close();
|
|
|
+ throw new \RuntimeException("达到最大重试次数");
|
|
|
+ }
|
|
|
|
|
|
- private function handleDisconnect(\Throwable $e): void
|
|
|
- {
|
|
|
- // 实现带退避的重连
|
|
|
- static $retryCount = 0;
|
|
|
- $delay = min(pow(2, $retryCount), 30); // 指数退避,最大30秒
|
|
|
-
|
|
|
+ $delay = min(pow(2, $this->retryCount), 30);
|
|
|
+ error_log("{$delay}秒后重试连接...");
|
|
|
sleep($delay);
|
|
|
- $this->socket = new \WebSocket\Client(
|
|
|
- $this->socket->getUrl(),
|
|
|
- ['timeout' => $this->options['timeout']]
|
|
|
- );
|
|
|
-
|
|
|
- $retryCount++;
|
|
|
+
|
|
|
+ try {
|
|
|
+ $this->connect();
|
|
|
+ $this->retryCount++;
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ $this->handleDisconnect($e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- public function close(): void
|
|
|
+ public function close(): void
|
|
|
{
|
|
|
$this->isRunning = false;
|
|
|
- $this->socket->close();
|
|
|
+ try {
|
|
|
+ $this->socket->close();
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ error_log("关闭连接错误: " . $e->getMessage());
|
|
|
+ }
|
|
|
}
|
|
|
}
|