receiver.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679
  1. 'use strict';
  2. const { Writable } = require('stream');
  3. const PerMessageDeflate = require('./permessage-deflate');
  4. const {
  5. BINARY_TYPES,
  6. EMPTY_BUFFER,
  7. kStatusCode,
  8. kWebSocket
  9. } = require('./constants');
  10. const { concat, toArrayBuffer, unmask } = require('./buffer-util');
  11. const { isValidStatusCode, isValidUTF8 } = require('./validation');
  12. const FastBuffer = Buffer[Symbol.species];
  13. const promise = Promise.resolve();
  14. //
  15. // `queueMicrotask()` is not available in Node.js < 11.
  16. //
  17. const queueTask =
  18. typeof queueMicrotask === 'function' ? queueMicrotask : queueMicrotaskShim;
  19. const GET_INFO = 0;
  20. const GET_PAYLOAD_LENGTH_16 = 1;
  21. const GET_PAYLOAD_LENGTH_64 = 2;
  22. const GET_MASK = 3;
  23. const GET_DATA = 4;
  24. const INFLATING = 5;
  25. const WAIT_MICROTASK = 6;
  26. /**
  27. * HyBi Receiver implementation.
  28. *
  29. * @extends Writable
  30. */
  31. class Receiver extends Writable {
  32. /**
  33. * Creates a Receiver instance.
  34. *
  35. * @param {Object} [options] Options object
  36. * @param {String} [options.binaryType=nodebuffer] The type for binary data
  37. * @param {Object} [options.extensions] An object containing the negotiated
  38. * extensions
  39. * @param {Boolean} [options.isServer=false] Specifies whether to operate in
  40. * client or server mode
  41. * @param {Number} [options.maxPayload=0] The maximum allowed message length
  42. * @param {Boolean} [options.skipUTF8Validation=false] Specifies whether or
  43. * not to skip UTF-8 validation for text and close messages
  44. */
  45. constructor(options = {}) {
  46. super();
  47. this._binaryType = options.binaryType || BINARY_TYPES[0];
  48. this._extensions = options.extensions || {};
  49. this._isServer = !!options.isServer;
  50. this._maxPayload = options.maxPayload | 0;
  51. this._skipUTF8Validation = !!options.skipUTF8Validation;
  52. this[kWebSocket] = undefined;
  53. this._bufferedBytes = 0;
  54. this._buffers = [];
  55. this._compressed = false;
  56. this._payloadLength = 0;
  57. this._mask = undefined;
  58. this._fragmented = 0;
  59. this._masked = false;
  60. this._fin = false;
  61. this._opcode = 0;
  62. this._totalPayloadLength = 0;
  63. this._messageLength = 0;
  64. this._fragments = [];
  65. this._state = GET_INFO;
  66. this._loop = false;
  67. }
  68. /**
  69. * Implements `Writable.prototype._write()`.
  70. *
  71. * @param {Buffer} chunk The chunk of data to write
  72. * @param {String} encoding The character encoding of `chunk`
  73. * @param {Function} cb Callback
  74. * @private
  75. */
  76. _write(chunk, encoding, cb) {
  77. if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
  78. this._bufferedBytes += chunk.length;
  79. this._buffers.push(chunk);
  80. this.startLoop(cb);
  81. }
  82. /**
  83. * Consumes `n` bytes from the buffered data.
  84. *
  85. * @param {Number} n The number of bytes to consume
  86. * @return {Buffer} The consumed bytes
  87. * @private
  88. */
  89. consume(n) {
  90. this._bufferedBytes -= n;
  91. if (n === this._buffers[0].length) return this._buffers.shift();
  92. if (n < this._buffers[0].length) {
  93. const buf = this._buffers[0];
  94. this._buffers[0] = new FastBuffer(
  95. buf.buffer,
  96. buf.byteOffset + n,
  97. buf.length - n
  98. );
  99. return new FastBuffer(buf.buffer, buf.byteOffset, n);
  100. }
  101. const dst = Buffer.allocUnsafe(n);
  102. do {
  103. const buf = this._buffers[0];
  104. const offset = dst.length - n;
  105. if (n >= buf.length) {
  106. dst.set(this._buffers.shift(), offset);
  107. } else {
  108. dst.set(new Uint8Array(buf.buffer, buf.byteOffset, n), offset);
  109. this._buffers[0] = new FastBuffer(
  110. buf.buffer,
  111. buf.byteOffset + n,
  112. buf.length - n
  113. );
  114. }
  115. n -= buf.length;
  116. } while (n > 0);
  117. return dst;
  118. }
  119. /**
  120. * Starts the parsing loop.
  121. *
  122. * @param {Function} cb Callback
  123. * @private
  124. */
  125. startLoop(cb) {
  126. let err;
  127. this._loop = true;
  128. do {
  129. switch (this._state) {
  130. case GET_INFO:
  131. err = this.getInfo();
  132. break;
  133. case GET_PAYLOAD_LENGTH_16:
  134. err = this.getPayloadLength16();
  135. break;
  136. case GET_PAYLOAD_LENGTH_64:
  137. err = this.getPayloadLength64();
  138. break;
  139. case GET_MASK:
  140. this.getMask();
  141. break;
  142. case GET_DATA:
  143. err = this.getData(cb);
  144. break;
  145. case INFLATING:
  146. this._loop = false;
  147. return;
  148. default:
  149. //
  150. // `WAIT_MICROTASK`.
  151. //
  152. this._loop = false;
  153. queueTask(() => {
  154. this._state = GET_INFO;
  155. this.startLoop(cb);
  156. });
  157. return;
  158. }
  159. } while (this._loop);
  160. cb(err);
  161. }
  162. /**
  163. * Reads the first two bytes of a frame.
  164. *
  165. * @return {(RangeError|undefined)} A possible error
  166. * @private
  167. */
  168. getInfo() {
  169. if (this._bufferedBytes < 2) {
  170. this._loop = false;
  171. return;
  172. }
  173. const buf = this.consume(2);
  174. if ((buf[0] & 0x30) !== 0x00) {
  175. this._loop = false;
  176. return error(
  177. RangeError,
  178. 'RSV2 and RSV3 must be clear',
  179. true,
  180. 1002,
  181. 'WS_ERR_UNEXPECTED_RSV_2_3'
  182. );
  183. }
  184. const compressed = (buf[0] & 0x40) === 0x40;
  185. if (compressed && !this._extensions[PerMessageDeflate.extensionName]) {
  186. this._loop = false;
  187. return error(
  188. RangeError,
  189. 'RSV1 must be clear',
  190. true,
  191. 1002,
  192. 'WS_ERR_UNEXPECTED_RSV_1'
  193. );
  194. }
  195. this._fin = (buf[0] & 0x80) === 0x80;
  196. this._opcode = buf[0] & 0x0f;
  197. this._payloadLength = buf[1] & 0x7f;
  198. if (this._opcode === 0x00) {
  199. if (compressed) {
  200. this._loop = false;
  201. return error(
  202. RangeError,
  203. 'RSV1 must be clear',
  204. true,
  205. 1002,
  206. 'WS_ERR_UNEXPECTED_RSV_1'
  207. );
  208. }
  209. if (!this._fragmented) {
  210. this._loop = false;
  211. return error(
  212. RangeError,
  213. 'invalid opcode 0',
  214. true,
  215. 1002,
  216. 'WS_ERR_INVALID_OPCODE'
  217. );
  218. }
  219. this._opcode = this._fragmented;
  220. } else if (this._opcode === 0x01 || this._opcode === 0x02) {
  221. if (this._fragmented) {
  222. this._loop = false;
  223. return error(
  224. RangeError,
  225. `invalid opcode ${this._opcode}`,
  226. true,
  227. 1002,
  228. 'WS_ERR_INVALID_OPCODE'
  229. );
  230. }
  231. this._compressed = compressed;
  232. } else if (this._opcode > 0x07 && this._opcode < 0x0b) {
  233. if (!this._fin) {
  234. this._loop = false;
  235. return error(
  236. RangeError,
  237. 'FIN must be set',
  238. true,
  239. 1002,
  240. 'WS_ERR_EXPECTED_FIN'
  241. );
  242. }
  243. if (compressed) {
  244. this._loop = false;
  245. return error(
  246. RangeError,
  247. 'RSV1 must be clear',
  248. true,
  249. 1002,
  250. 'WS_ERR_UNEXPECTED_RSV_1'
  251. );
  252. }
  253. if (
  254. this._payloadLength > 0x7d ||
  255. (this._opcode === 0x08 && this._payloadLength === 1)
  256. ) {
  257. this._loop = false;
  258. return error(
  259. RangeError,
  260. `invalid payload length ${this._payloadLength}`,
  261. true,
  262. 1002,
  263. 'WS_ERR_INVALID_CONTROL_PAYLOAD_LENGTH'
  264. );
  265. }
  266. } else {
  267. this._loop = false;
  268. return error(
  269. RangeError,
  270. `invalid opcode ${this._opcode}`,
  271. true,
  272. 1002,
  273. 'WS_ERR_INVALID_OPCODE'
  274. );
  275. }
  276. if (!this._fin && !this._fragmented) this._fragmented = this._opcode;
  277. this._masked = (buf[1] & 0x80) === 0x80;
  278. if (this._isServer) {
  279. if (!this._masked) {
  280. this._loop = false;
  281. return error(
  282. RangeError,
  283. 'MASK must be set',
  284. true,
  285. 1002,
  286. 'WS_ERR_EXPECTED_MASK'
  287. );
  288. }
  289. } else if (this._masked) {
  290. this._loop = false;
  291. return error(
  292. RangeError,
  293. 'MASK must be clear',
  294. true,
  295. 1002,
  296. 'WS_ERR_UNEXPECTED_MASK'
  297. );
  298. }
  299. if (this._payloadLength === 126) this._state = GET_PAYLOAD_LENGTH_16;
  300. else if (this._payloadLength === 127) this._state = GET_PAYLOAD_LENGTH_64;
  301. else return this.haveLength();
  302. }
  303. /**
  304. * Gets extended payload length (7+16).
  305. *
  306. * @return {(RangeError|undefined)} A possible error
  307. * @private
  308. */
  309. getPayloadLength16() {
  310. if (this._bufferedBytes < 2) {
  311. this._loop = false;
  312. return;
  313. }
  314. this._payloadLength = this.consume(2).readUInt16BE(0);
  315. return this.haveLength();
  316. }
  317. /**
  318. * Gets extended payload length (7+64).
  319. *
  320. * @return {(RangeError|undefined)} A possible error
  321. * @private
  322. */
  323. getPayloadLength64() {
  324. if (this._bufferedBytes < 8) {
  325. this._loop = false;
  326. return;
  327. }
  328. const buf = this.consume(8);
  329. const num = buf.readUInt32BE(0);
  330. //
  331. // The maximum safe integer in JavaScript is 2^53 - 1. An error is returned
  332. // if payload length is greater than this number.
  333. //
  334. if (num > Math.pow(2, 53 - 32) - 1) {
  335. this._loop = false;
  336. return error(
  337. RangeError,
  338. 'Unsupported WebSocket frame: payload length > 2^53 - 1',
  339. false,
  340. 1009,
  341. 'WS_ERR_UNSUPPORTED_DATA_PAYLOAD_LENGTH'
  342. );
  343. }
  344. this._payloadLength = num * Math.pow(2, 32) + buf.readUInt32BE(4);
  345. return this.haveLength();
  346. }
  347. /**
  348. * Payload length has been read.
  349. *
  350. * @return {(RangeError|undefined)} A possible error
  351. * @private
  352. */
  353. haveLength() {
  354. if (this._payloadLength && this._opcode < 0x08) {
  355. this._totalPayloadLength += this._payloadLength;
  356. if (this._totalPayloadLength > this._maxPayload && this._maxPayload > 0) {
  357. this._loop = false;
  358. return error(
  359. RangeError,
  360. 'Max payload size exceeded',
  361. false,
  362. 1009,
  363. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  364. );
  365. }
  366. }
  367. if (this._masked) this._state = GET_MASK;
  368. else this._state = GET_DATA;
  369. }
  370. /**
  371. * Reads mask bytes.
  372. *
  373. * @private
  374. */
  375. getMask() {
  376. if (this._bufferedBytes < 4) {
  377. this._loop = false;
  378. return;
  379. }
  380. this._mask = this.consume(4);
  381. this._state = GET_DATA;
  382. }
  383. /**
  384. * Reads data bytes.
  385. *
  386. * @param {Function} cb Callback
  387. * @return {(Error|RangeError|undefined)} A possible error
  388. * @private
  389. */
  390. getData(cb) {
  391. let data = EMPTY_BUFFER;
  392. if (this._payloadLength) {
  393. if (this._bufferedBytes < this._payloadLength) {
  394. this._loop = false;
  395. return;
  396. }
  397. data = this.consume(this._payloadLength);
  398. if (
  399. this._masked &&
  400. (this._mask[0] | this._mask[1] | this._mask[2] | this._mask[3]) !== 0
  401. ) {
  402. unmask(data, this._mask);
  403. }
  404. }
  405. if (this._opcode > 0x07) return this.controlMessage(data);
  406. if (this._compressed) {
  407. this._state = INFLATING;
  408. this.decompress(data, cb);
  409. return;
  410. }
  411. if (data.length) {
  412. //
  413. // This message is not compressed so its length is the sum of the payload
  414. // length of all fragments.
  415. //
  416. this._messageLength = this._totalPayloadLength;
  417. this._fragments.push(data);
  418. }
  419. return this.dataMessage();
  420. }
  421. /**
  422. * Decompresses data.
  423. *
  424. * @param {Buffer} data Compressed data
  425. * @param {Function} cb Callback
  426. * @private
  427. */
  428. decompress(data, cb) {
  429. const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
  430. perMessageDeflate.decompress(data, this._fin, (err, buf) => {
  431. if (err) return cb(err);
  432. if (buf.length) {
  433. this._messageLength += buf.length;
  434. if (this._messageLength > this._maxPayload && this._maxPayload > 0) {
  435. return cb(
  436. error(
  437. RangeError,
  438. 'Max payload size exceeded',
  439. false,
  440. 1009,
  441. 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH'
  442. )
  443. );
  444. }
  445. this._fragments.push(buf);
  446. }
  447. const er = this.dataMessage();
  448. if (er) return cb(er);
  449. this.startLoop(cb);
  450. });
  451. }
  452. /**
  453. * Handles a data message.
  454. *
  455. * @return {(Error|undefined)} A possible error
  456. * @private
  457. */
  458. dataMessage() {
  459. if (this._fin) {
  460. const messageLength = this._messageLength;
  461. const fragments = this._fragments;
  462. this._totalPayloadLength = 0;
  463. this._messageLength = 0;
  464. this._fragmented = 0;
  465. this._fragments = [];
  466. if (this._opcode === 2) {
  467. let data;
  468. if (this._binaryType === 'nodebuffer') {
  469. data = concat(fragments, messageLength);
  470. } else if (this._binaryType === 'arraybuffer') {
  471. data = toArrayBuffer(concat(fragments, messageLength));
  472. } else {
  473. data = fragments;
  474. }
  475. this.emit('message', data, true);
  476. } else {
  477. const buf = concat(fragments, messageLength);
  478. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  479. this._loop = false;
  480. return error(
  481. Error,
  482. 'invalid UTF-8 sequence',
  483. true,
  484. 1007,
  485. 'WS_ERR_INVALID_UTF8'
  486. );
  487. }
  488. this.emit('message', buf, false);
  489. }
  490. }
  491. this._state = WAIT_MICROTASK;
  492. }
  493. /**
  494. * Handles a control message.
  495. *
  496. * @param {Buffer} data Data to handle
  497. * @return {(Error|RangeError|undefined)} A possible error
  498. * @private
  499. */
  500. controlMessage(data) {
  501. if (this._opcode === 0x08) {
  502. this._loop = false;
  503. if (data.length === 0) {
  504. this.emit('conclude', 1005, EMPTY_BUFFER);
  505. this.end();
  506. this._state = GET_INFO;
  507. } else {
  508. const code = data.readUInt16BE(0);
  509. if (!isValidStatusCode(code)) {
  510. return error(
  511. RangeError,
  512. `invalid status code ${code}`,
  513. true,
  514. 1002,
  515. 'WS_ERR_INVALID_CLOSE_CODE'
  516. );
  517. }
  518. const buf = new FastBuffer(
  519. data.buffer,
  520. data.byteOffset + 2,
  521. data.length - 2
  522. );
  523. if (!this._skipUTF8Validation && !isValidUTF8(buf)) {
  524. return error(
  525. Error,
  526. 'invalid UTF-8 sequence',
  527. true,
  528. 1007,
  529. 'WS_ERR_INVALID_UTF8'
  530. );
  531. }
  532. this.emit('conclude', code, buf);
  533. this.end();
  534. this._state = GET_INFO;
  535. }
  536. } else if (this._opcode === 0x09) {
  537. this.emit('ping', data);
  538. this._state = WAIT_MICROTASK;
  539. } else {
  540. this.emit('pong', data);
  541. this._state = WAIT_MICROTASK;
  542. }
  543. }
  544. }
  545. module.exports = Receiver;
  546. /**
  547. * Builds an error object.
  548. *
  549. * @param {function(new:Error|RangeError)} ErrorCtor The error constructor
  550. * @param {String} message The error message
  551. * @param {Boolean} prefix Specifies whether or not to add a default prefix to
  552. * `message`
  553. * @param {Number} statusCode The status code
  554. * @param {String} errorCode The exposed error code
  555. * @return {(Error|RangeError)} The error
  556. * @private
  557. */
  558. function error(ErrorCtor, message, prefix, statusCode, errorCode) {
  559. const err = new ErrorCtor(
  560. prefix ? `Invalid WebSocket frame: ${message}` : message
  561. );
  562. Error.captureStackTrace(err, error);
  563. err.code = errorCode;
  564. err[kStatusCode] = statusCode;
  565. return err;
  566. }
  567. /**
  568. * A shim for `queueMicrotask()`.
  569. *
  570. * @param {Function} cb Callback
  571. */
  572. function queueMicrotaskShim(cb) {
  573. promise.then(cb).catch(throwErrorNextTick);
  574. }
  575. /**
  576. * Throws an error.
  577. *
  578. * @param {Error} err The error to throw
  579. * @private
  580. */
  581. function throwError(err) {
  582. throw err;
  583. }
  584. /**
  585. * Throws an error in the next tick.
  586. *
  587. * @param {Error} err The error to throw
  588. * @private
  589. */
  590. function throwErrorNextTick(err) {
  591. process.nextTick(throwError, err);
  592. }