WebSocketClient.php 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. <?php
  2. namespace App;
  3. require_once __DIR__ . '/../vendor/autoload.php';
  4. class BinaryWebSocketClient
  5. {
  6. private \WebSocket\Client $socket;
  7. private array $handlers = [];
  8. private bool $isRunning = false;
  9. private int $retryCount = 0;
  10. public function __construct(
  11. private string $url,
  12. private array $options = [
  13. 'timeout' => 5,
  14. 'heartbeat_interval' => 30,
  15. 'max_retries' => 10
  16. ]
  17. ) {
  18. $this->connect();
  19. }
  20. private function connect(): void
  21. {
  22. try {
  23. $this->socket = new \WebSocket\Client($this->url, [
  24. 'timeout' => $this->options['timeout']
  25. ]);
  26. } catch (\Exception $e) {
  27. throw new \RuntimeException("连接失败: " . $e->getMessage());
  28. }
  29. }
  30. public function on(int $msgId, callable $handler): void
  31. {
  32. $this->handlers[$msgId] = $handler;
  33. }
  34. public function sendBinary(int $msgId, string $data): void
  35. {
  36. try {
  37. $binMsgId = pack('n', $msgId); // 2字节无符号短整型
  38. $binData = $binMsgId . $data;
  39. $this->socket->binary($binData);
  40. } catch (\Exception $e) {
  41. $this->handleDisconnect($e);
  42. }
  43. }
  44. public function startListening(): void
  45. {
  46. if ($this->isRunning) {
  47. return;
  48. }
  49. $this->isRunning = true;
  50. $lastHeartbeat = time();
  51. while ($this->isRunning) {
  52. try {
  53. // 检查是否需要发送心跳
  54. if (time() - $lastHeartbeat >= $this->options['heartbeat_interval']) {
  55. $this->sendHeartbeat();
  56. $lastHeartbeat = time();
  57. }
  58. // 接收消息
  59. $message = $this->socket->receive();
  60. if ($message) {
  61. $this->dispatchBinaryMessage($message);
  62. }
  63. usleep(10000); // 10ms 避免CPU过高使用
  64. } catch (\WebSocket\ConnectionException $e) {
  65. $this->handleDisconnect($e);
  66. $lastHeartbeat = time(); // 重置心跳时间
  67. } catch (\Exception $e) {
  68. error_log("错误: " . $e->getMessage());
  69. }
  70. }
  71. }
  72. private function dispatchBinaryMessage(string $binary): void
  73. {
  74. if (strlen($binary) < 2) {
  75. error_log("无效的消息长度");
  76. return;
  77. }
  78. try {
  79. // 解析消息ID(前2字节)
  80. $msgId = unpack('n', substr($binary, 0, 2))[1];
  81. $data = substr($binary, 2);
  82. if (isset($this->handlers[$msgId])) {
  83. call_user_func($this->handlers[$msgId], $data);
  84. }
  85. } catch (\Exception $e) {
  86. error_log("消息处理错误: " . $e->getMessage());
  87. }
  88. }
  89. private function sendHeartbeat(): void
  90. {
  91. try {
  92. // 这里假设心跳消息ID为1,根据实际协议调整
  93. $this->sendBinary(1, '');
  94. } catch (\Exception $e) {
  95. error_log("心跳发送失败: " . $e->getMessage());
  96. }
  97. }
  98. private function handleDisconnect(\Throwable $e): void
  99. {
  100. error_log("连接断开: " . $e->getMessage());
  101. if ($this->retryCount >= $this->options['max_retries']) {
  102. $this->close();
  103. throw new \RuntimeException("达到最大重试次数");
  104. }
  105. $delay = min(pow(2, $this->retryCount), 30);
  106. error_log("{$delay}秒后重试连接...");
  107. sleep($delay);
  108. try {
  109. $this->connect();
  110. $this->retryCount++;
  111. } catch (\Exception $e) {
  112. $this->handleDisconnect($e);
  113. }
  114. }
  115. public function close(): void
  116. {
  117. $this->isRunning = false;
  118. try {
  119. $this->socket->close();
  120. } catch (\Exception $e) {
  121. error_log("关闭连接错误: " . $e->getMessage());
  122. }
  123. }
  124. }