Connection.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. <?php
  2. /**
  3. * Copyright (C) 2014-2022 Textalk/Abicart and contributors.
  4. *
  5. * This file is part of Websocket PHP and is free software under the ISC License.
  6. * License text: https://raw.githubusercontent.com/Textalk/websocket-php/master/COPYING
  7. */
  8. namespace WebSocket;
  9. use Psr\Log\{
  10. LoggerAwareInterface,
  11. LoggerAwareTrait,
  12. LoggerInterface, NullLogger
  13. };
  14. use WebSocket\Message\{
  15. Factory,
  16. Message
  17. };
  18. class Connection implements LoggerAwareInterface
  19. {
  20. use LoggerAwareTrait;
  21. use OpcodeTrait;
  22. private $stream;
  23. private $read_buffer;
  24. private $msg_factory;
  25. private $options = [];
  26. protected $is_closing = false;
  27. protected $close_status = null;
  28. private $uid;
  29. /* ---------- Construct & Destruct ----------------------------------------------- */
  30. public function __construct($stream, array $options = [])
  31. {
  32. $this->stream = $stream;
  33. $this->setOptions($options);
  34. $this->setLogger(new NullLogger());
  35. $this->msg_factory = new Factory();
  36. }
  37. public function __destruct()
  38. {
  39. if ($this->getType() === 'stream') {
  40. fclose($this->stream);
  41. }
  42. }
  43. public function setOptions(array $options = []): void
  44. {
  45. $this->options = array_merge($this->options, $options);
  46. }
  47. public function getCloseStatus(): ?int
  48. {
  49. return $this->close_status;
  50. }
  51. /**
  52. * Tell the socket to close.
  53. *
  54. * @param integer $status http://tools.ietf.org/html/rfc6455#section-7.4
  55. * @param string $message A closing message, max 125 bytes.
  56. */
  57. public function close(int $status = 1000, string $message = 'ttfn'): void
  58. {
  59. if (!$this->isConnected()) {
  60. return;
  61. }
  62. $status_binstr = sprintf('%016b', $status);
  63. $status_str = '';
  64. foreach (str_split($status_binstr, 8) as $binstr) {
  65. $status_str .= chr(bindec($binstr));
  66. }
  67. $message = $this->msg_factory->create('close', $status_str . $message);
  68. $this->pushMessage($message, true);
  69. $this->logger->debug("Closing with status: {$status}.");
  70. $this->is_closing = true;
  71. while (true) {
  72. $message = $this->pullMessage();
  73. if ($message->getOpcode() == 'close') {
  74. break;
  75. }
  76. }
  77. }
  78. /* ---------- Message methods ---------------------------------------------------- */
  79. // Push a message to stream
  80. public function pushMessage(Message $message, bool $masked = true): void
  81. {
  82. $frames = $message->getFrames($masked, $this->options['fragment_size']);
  83. foreach ($frames as $frame) {
  84. $this->pushFrame($frame);
  85. }
  86. $this->logger->info("[connection] Pushed {$message}", [
  87. 'opcode' => $message->getOpcode(),
  88. 'content-length' => $message->getLength(),
  89. 'frames' => count($frames),
  90. ]);
  91. }
  92. // Pull a message from stream
  93. public function pullMessage(): Message
  94. {
  95. do {
  96. $frame = $this->pullFrame();
  97. $frame = $this->autoRespond($frame);
  98. list ($final, $payload, $opcode, $masked) = $frame;
  99. if ($opcode == 'close') {
  100. $this->close();
  101. }
  102. // Continuation and factual opcode
  103. $continuation = $opcode == 'continuation';
  104. $payload_opcode = $continuation ? $this->read_buffer['opcode'] : $opcode;
  105. // First continuation frame, create buffer
  106. if (!$final && !$continuation) {
  107. $this->read_buffer = ['opcode' => $opcode, 'payload' => $payload, 'frames' => 1];
  108. continue; // Continue reading
  109. }
  110. // Subsequent continuation frames, add to buffer
  111. if ($continuation) {
  112. $this->read_buffer['payload'] .= $payload;
  113. $this->read_buffer['frames']++;
  114. }
  115. } while (!$final);
  116. // Final, return payload
  117. $frames = 1;
  118. if ($continuation) {
  119. $payload = $this->read_buffer['payload'];
  120. $frames = $this->read_buffer['frames'];
  121. $this->read_buffer = null;
  122. }
  123. $message = $this->msg_factory->create($payload_opcode, $payload);
  124. $this->logger->info("[connection] Pulled {$message}", [
  125. 'opcode' => $payload_opcode,
  126. 'content-length' => strlen($payload),
  127. 'frames' => $frames,
  128. ]);
  129. return $message;
  130. }
  131. /* ---------- Frame I/O methods -------------------------------------------------- */
  132. // Pull frame from stream
  133. private function pullFrame(): array
  134. {
  135. // Read the fragment "header" first, two bytes.
  136. $data = $this->read(2);
  137. list ($byte_1, $byte_2) = array_values(unpack('C*', $data));
  138. $final = (bool)($byte_1 & 0b10000000); // Final fragment marker.
  139. $rsv = $byte_1 & 0b01110000; // Unused bits, ignore
  140. // Parse opcode
  141. $opcode_int = $byte_1 & 0b00001111;
  142. $opcode_ints = array_flip(self::$opcodes);
  143. if (!array_key_exists($opcode_int, $opcode_ints)) {
  144. $warning = "Bad opcode in websocket frame: {$opcode_int}";
  145. $this->logger->warning($warning);
  146. throw new ConnectionException($warning, ConnectionException::BAD_OPCODE);
  147. }
  148. $opcode = $opcode_ints[$opcode_int];
  149. // Masking bit
  150. $masked = (bool)($byte_2 & 0b10000000);
  151. $payload = '';
  152. // Payload length
  153. $payload_length = $byte_2 & 0b01111111;
  154. if ($payload_length > 125) {
  155. if ($payload_length === 126) {
  156. $data = $this->read(2); // 126: Payload is a 16-bit unsigned int
  157. $payload_length = current(unpack('n', $data));
  158. } else {
  159. $data = $this->read(8); // 127: Payload is a 64-bit unsigned int
  160. $payload_length = current(unpack('J', $data));
  161. }
  162. }
  163. // Get masking key.
  164. if ($masked) {
  165. $masking_key = $this->read(4);
  166. }
  167. // Get the actual payload, if any (might not be for e.g. close frames.
  168. if ($payload_length > 0) {
  169. $data = $this->read($payload_length);
  170. if ($masked) {
  171. // Unmask payload.
  172. for ($i = 0; $i < $payload_length; $i++) {
  173. $payload .= ($data[$i] ^ $masking_key[$i % 4]);
  174. }
  175. } else {
  176. $payload = $data;
  177. }
  178. }
  179. $this->logger->debug("[connection] Pulled '{opcode}' frame", [
  180. 'opcode' => $opcode,
  181. 'final' => $final,
  182. 'content-length' => strlen($payload),
  183. ]);
  184. return [$final, $payload, $opcode, $masked];
  185. }
  186. // Push frame to stream
  187. private function pushFrame(array $frame): void
  188. {
  189. list ($final, $payload, $opcode, $masked) = $frame;
  190. $data = '';
  191. $byte_1 = $final ? 0b10000000 : 0b00000000; // Final fragment marker.
  192. $byte_1 |= self::$opcodes[$opcode]; // Set opcode.
  193. $data .= pack('C', $byte_1);
  194. $byte_2 = $masked ? 0b10000000 : 0b00000000; // Masking bit marker.
  195. // 7 bits of payload length...
  196. $payload_length = strlen($payload);
  197. if ($payload_length > 65535) {
  198. $data .= pack('C', $byte_2 | 0b01111111);
  199. $data .= pack('J', $payload_length);
  200. } elseif ($payload_length > 125) {
  201. $data .= pack('C', $byte_2 | 0b01111110);
  202. $data .= pack('n', $payload_length);
  203. } else {
  204. $data .= pack('C', $byte_2 | $payload_length);
  205. }
  206. // Handle masking
  207. if ($masked) {
  208. // generate a random mask:
  209. $mask = '';
  210. for ($i = 0; $i < 4; $i++) {
  211. $mask .= chr(rand(0, 255));
  212. }
  213. $data .= $mask;
  214. // Append payload to frame:
  215. for ($i = 0; $i < $payload_length; $i++) {
  216. $data .= $payload[$i] ^ $mask[$i % 4];
  217. }
  218. } else {
  219. $data .= $payload;
  220. }
  221. $this->write($data);
  222. $this->logger->debug("[connection] Pushed '{$opcode}' frame", [
  223. 'opcode' => $opcode,
  224. 'final' => $final,
  225. 'content-length' => strlen($payload),
  226. ]);
  227. }
  228. // Trigger auto response for frame
  229. private function autoRespond(array $frame)
  230. {
  231. list ($final, $payload, $opcode, $masked) = $frame;
  232. $payload_length = strlen($payload);
  233. switch ($opcode) {
  234. case 'ping':
  235. // If we received a ping, respond with a pong
  236. $this->logger->debug("[connection] Received 'ping', sending 'pong'.");
  237. $message = $this->msg_factory->create('pong', $payload);
  238. $this->pushMessage($message, $masked);
  239. return [$final, $payload, $opcode, $masked];
  240. case 'close':
  241. // If we received close, possibly acknowledge and close connection
  242. $status_bin = '';
  243. $status = '';
  244. if ($payload_length > 0) {
  245. $status_bin = $payload[0] . $payload[1];
  246. $status = current(unpack('n', $payload));
  247. $this->close_status = $status;
  248. }
  249. // Get additional close message
  250. if ($payload_length >= 2) {
  251. $payload = substr($payload, 2);
  252. }
  253. $this->logger->debug("[connection] Received 'close', status: {$status}.");
  254. if (!$this->is_closing) {
  255. $ack = "{$status_bin}Close acknowledged: {$status}";
  256. $message = $this->msg_factory->create('close', $ack);
  257. $this->pushMessage($message, $masked);
  258. } else {
  259. $this->is_closing = false; // A close response, all done.
  260. }
  261. $this->disconnect();
  262. return [$final, $payload, $opcode, $masked];
  263. default:
  264. return [$final, $payload, $opcode, $masked];
  265. }
  266. }
  267. /* ---------- Stream I/O methods ------------------------------------------------- */
  268. /**
  269. * Close connection stream.
  270. * @return bool
  271. */
  272. public function disconnect(): bool
  273. {
  274. $this->logger->debug('Closing connection');
  275. return fclose($this->stream);
  276. }
  277. /**
  278. * If connected to stream.
  279. * @return bool
  280. */
  281. public function isConnected(): bool
  282. {
  283. return in_array($this->getType(), ['stream', 'persistent stream']);
  284. }
  285. /**
  286. * Return type of connection.
  287. * @return string|null Type of connection or null if invalid type.
  288. */
  289. public function getType(): ?string
  290. {
  291. return get_resource_type($this->stream);
  292. }
  293. /**
  294. * Get name of local socket, or null if not connected.
  295. * @return string|null
  296. */
  297. public function getName(): ?string
  298. {
  299. return stream_socket_get_name($this->stream, false);
  300. }
  301. /**
  302. * Get name of remote socket, or null if not connected.
  303. * @return string|null
  304. */
  305. public function getRemoteName(): ?string
  306. {
  307. return stream_socket_get_name($this->stream, true);
  308. }
  309. /**
  310. * Get meta data for connection.
  311. * @return array
  312. */
  313. public function getMeta(): array
  314. {
  315. return stream_get_meta_data($this->stream);
  316. }
  317. /**
  318. * Returns current position of stream pointer.
  319. * @return int
  320. * @throws ConnectionException
  321. */
  322. public function tell(): int
  323. {
  324. $tell = ftell($this->stream);
  325. if ($tell === false) {
  326. $this->throwException('Could not resolve stream pointer position');
  327. }
  328. return $tell;
  329. }
  330. /**
  331. * If stream pointer is at end of file.
  332. * @return bool
  333. */
  334. public function eof(): int
  335. {
  336. return feof($this->stream);
  337. }
  338. /* ---------- Stream option methods ---------------------------------------------- */
  339. /**
  340. * Set time out on connection.
  341. * @param int $seconds Timeout part in seconds
  342. * @param int $microseconds Timeout part in microseconds
  343. * @return bool
  344. */
  345. public function setTimeout(int $seconds, int $microseconds = 0): bool
  346. {
  347. $this->logger->debug("Setting timeout {$seconds}:{$microseconds} seconds");
  348. return stream_set_timeout($this->stream, $seconds, $microseconds);
  349. }
  350. /* ---------- Stream read/write methods ------------------------------------------ */
  351. /**
  352. * Read line from stream.
  353. * @param int $length Maximum number of bytes to read
  354. * @param string $ending Line delimiter
  355. * @return string Read data
  356. */
  357. public function getLine(int $length, string $ending): string
  358. {
  359. $line = stream_get_line($this->stream, $length, $ending);
  360. if ($line === false) {
  361. $this->throwException('Could not read from stream');
  362. }
  363. $read = strlen($line);
  364. $this->logger->debug("Read {$read} bytes of line.");
  365. return $line;
  366. }
  367. /**
  368. * Read line from stream.
  369. * @param int $length Maximum number of bytes to read
  370. * @return string Read data
  371. */
  372. public function gets(int $length): string
  373. {
  374. $line = fgets($this->stream, $length);
  375. if ($line === false) {
  376. $this->throwException('Could not read from stream');
  377. }
  378. $read = strlen($line);
  379. $this->logger->debug("Read {$read} bytes of line.");
  380. return $line;
  381. }
  382. /**
  383. * Read characters from stream.
  384. * @param int $length Maximum number of bytes to read
  385. * @return string Read data
  386. */
  387. public function read(string $length): string
  388. {
  389. $data = '';
  390. while (strlen($data) < $length) {
  391. $buffer = fread($this->stream, $length - strlen($data));
  392. if (!$buffer) {
  393. $meta = stream_get_meta_data($this->stream);
  394. if (!empty($meta['timed_out'])) {
  395. $message = 'Client read timeout';
  396. $this->logger->error($message, $meta);
  397. throw new TimeoutException($message, ConnectionException::TIMED_OUT, $meta);
  398. }
  399. }
  400. if ($buffer === false) {
  401. $read = strlen($data);
  402. $this->throwException("Broken frame, read {$read} of stated {$length} bytes.");
  403. }
  404. if ($buffer === '') {
  405. $this->throwException("Empty read; connection dead?");
  406. }
  407. $data .= $buffer;
  408. $read = strlen($data);
  409. $this->logger->debug("Read {$read} of {$length} bytes.");
  410. }
  411. return $data;
  412. }
  413. /**
  414. * Write characters to stream.
  415. * @param string $data Data to read
  416. */
  417. public function write(string $data): void
  418. {
  419. $length = strlen($data);
  420. $written = fwrite($this->stream, $data);
  421. if ($written === false) {
  422. $this->throwException("Failed to write {$length} bytes.");
  423. }
  424. if ($written < strlen($data)) {
  425. $this->throwException("Could only write {$written} out of {$length} bytes.");
  426. }
  427. $this->logger->debug("Wrote {$written} of {$length} bytes.");
  428. }
  429. /* ---------- Internal helper methods -------------------------------------------- */
  430. private function throwException(string $message, int $code = 0): void
  431. {
  432. $meta = ['closed' => true];
  433. if ($this->isConnected()) {
  434. $meta = $this->getMeta();
  435. $this->disconnect();
  436. if (!empty($meta['timed_out'])) {
  437. $this->logger->error($message, $meta);
  438. throw new TimeoutException($message, ConnectionException::TIMED_OUT, $meta);
  439. }
  440. if (!empty($meta['eof'])) {
  441. $code = ConnectionException::EOF;
  442. }
  443. }
  444. $this->logger->error($message, $meta);
  445. throw new ConnectionException($message, $code, $meta);
  446. }
  447. }