| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514 | 'use strict';const zlib = require('zlib');const bufferUtil = require('./buffer-util');const Limiter = require('./limiter');const { kStatusCode } = require('./constants');const FastBuffer = Buffer[Symbol.species];const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);const kPerMessageDeflate = Symbol('permessage-deflate');const kTotalLength = Symbol('total-length');const kCallback = Symbol('callback');const kBuffers = Symbol('buffers');const kError = Symbol('error');//// We limit zlib concurrency, which prevents severe memory fragmentation// as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913// and https://github.com/websockets/ws/issues/1202//// Intentionally global; it's the global thread pool that's an issue.//let zlibLimiter;/** * permessage-deflate implementation. */class PerMessageDeflate {  /**   * Creates a PerMessageDeflate instance.   *   * @param {Object} [options] Configuration options   * @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support   *     for, or request, a custom client window size   * @param {Boolean} [options.clientNoContextTakeover=false] Advertise/   *     acknowledge disabling of client context takeover   * @param {Number} [options.concurrencyLimit=10] The number of concurrent   *     calls to zlib   * @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the   *     use of a custom server window size   * @param {Boolean} [options.serverNoContextTakeover=false] Request/accept   *     disabling of server context takeover   * @param {Number} [options.threshold=1024] Size (in bytes) below which   *     messages should not be compressed if context takeover is disabled   * @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on   *     deflate   * @param {Object} [options.zlibInflateOptions] Options to pass to zlib on   *     inflate   * @param {Boolean} [isServer=false] Create the instance in either server or   *     client mode   * @param {Number} [maxPayload=0] The maximum allowed message length   */  constructor(options, isServer, maxPayload) {    this._maxPayload = maxPayload | 0;    this._options = options || {};    this._threshold =      this._options.threshold !== undefined ? this._options.threshold : 1024;    this._isServer = !!isServer;    this._deflate = null;    this._inflate = null;    this.params = null;    if (!zlibLimiter) {      const concurrency =        this._options.concurrencyLimit !== undefined          ? this._options.concurrencyLimit          : 10;      zlibLimiter = new Limiter(concurrency);    }  }  /**   * @type {String}   */  static get extensionName() {    return 'permessage-deflate';  }  /**   * Create an extension negotiation offer.   *   * @return {Object} Extension parameters   * @public   */  offer() {    const params = {};    if (this._options.serverNoContextTakeover) {      params.server_no_context_takeover = true;    }    if (this._options.clientNoContextTakeover) {      params.client_no_context_takeover = true;    }    if (this._options.serverMaxWindowBits) {      params.server_max_window_bits = this._options.serverMaxWindowBits;    }    if (this._options.clientMaxWindowBits) {      params.client_max_window_bits = this._options.clientMaxWindowBits;    } else if (this._options.clientMaxWindowBits == null) {      params.client_max_window_bits = true;    }    return params;  }  /**   * Accept an extension negotiation offer/response.   *   * @param {Array} configurations The extension negotiation offers/reponse   * @return {Object} Accepted configuration   * @public   */  accept(configurations) {    configurations = this.normalizeParams(configurations);    this.params = this._isServer      ? this.acceptAsServer(configurations)      : this.acceptAsClient(configurations);    return this.params;  }  /**   * Releases all resources used by the extension.   *   * @public   */  cleanup() {    if (this._inflate) {      this._inflate.close();      this._inflate = null;    }    if (this._deflate) {      const callback = this._deflate[kCallback];      this._deflate.close();      this._deflate = null;      if (callback) {        callback(          new Error(            'The deflate stream was closed while data was being processed'          )        );      }    }  }  /**   *  Accept an extension negotiation offer.   *   * @param {Array} offers The extension negotiation offers   * @return {Object} Accepted configuration   * @private   */  acceptAsServer(offers) {    const opts = this._options;    const accepted = offers.find((params) => {      if (        (opts.serverNoContextTakeover === false &&          params.server_no_context_takeover) ||        (params.server_max_window_bits &&          (opts.serverMaxWindowBits === false ||            (typeof opts.serverMaxWindowBits === 'number' &&              opts.serverMaxWindowBits > params.server_max_window_bits))) ||        (typeof opts.clientMaxWindowBits === 'number' &&          !params.client_max_window_bits)      ) {        return false;      }      return true;    });    if (!accepted) {      throw new Error('None of the extension offers can be accepted');    }    if (opts.serverNoContextTakeover) {      accepted.server_no_context_takeover = true;    }    if (opts.clientNoContextTakeover) {      accepted.client_no_context_takeover = true;    }    if (typeof opts.serverMaxWindowBits === 'number') {      accepted.server_max_window_bits = opts.serverMaxWindowBits;    }    if (typeof opts.clientMaxWindowBits === 'number') {      accepted.client_max_window_bits = opts.clientMaxWindowBits;    } else if (      accepted.client_max_window_bits === true ||      opts.clientMaxWindowBits === false    ) {      delete accepted.client_max_window_bits;    }    return accepted;  }  /**   * Accept the extension negotiation response.   *   * @param {Array} response The extension negotiation response   * @return {Object} Accepted configuration   * @private   */  acceptAsClient(response) {    const params = response[0];    if (      this._options.clientNoContextTakeover === false &&      params.client_no_context_takeover    ) {      throw new Error('Unexpected parameter "client_no_context_takeover"');    }    if (!params.client_max_window_bits) {      if (typeof this._options.clientMaxWindowBits === 'number') {        params.client_max_window_bits = this._options.clientMaxWindowBits;      }    } else if (      this._options.clientMaxWindowBits === false ||      (typeof this._options.clientMaxWindowBits === 'number' &&        params.client_max_window_bits > this._options.clientMaxWindowBits)    ) {      throw new Error(        'Unexpected or invalid parameter "client_max_window_bits"'      );    }    return params;  }  /**   * Normalize parameters.   *   * @param {Array} configurations The extension negotiation offers/reponse   * @return {Array} The offers/response with normalized parameters   * @private   */  normalizeParams(configurations) {    configurations.forEach((params) => {      Object.keys(params).forEach((key) => {        let value = params[key];        if (value.length > 1) {          throw new Error(`Parameter "${key}" must have only a single value`);        }        value = value[0];        if (key === 'client_max_window_bits') {          if (value !== true) {            const num = +value;            if (!Number.isInteger(num) || num < 8 || num > 15) {              throw new TypeError(                `Invalid value for parameter "${key}": ${value}`              );            }            value = num;          } else if (!this._isServer) {            throw new TypeError(              `Invalid value for parameter "${key}": ${value}`            );          }        } else if (key === 'server_max_window_bits') {          const num = +value;          if (!Number.isInteger(num) || num < 8 || num > 15) {            throw new TypeError(              `Invalid value for parameter "${key}": ${value}`            );          }          value = num;        } else if (          key === 'client_no_context_takeover' ||          key === 'server_no_context_takeover'        ) {          if (value !== true) {            throw new TypeError(              `Invalid value for parameter "${key}": ${value}`            );          }        } else {          throw new Error(`Unknown parameter "${key}"`);        }        params[key] = value;      });    });    return configurations;  }  /**   * Decompress data. Concurrency limited.   *   * @param {Buffer} data Compressed data   * @param {Boolean} fin Specifies whether or not this is the last fragment   * @param {Function} callback Callback   * @public   */  decompress(data, fin, callback) {    zlibLimiter.add((done) => {      this._decompress(data, fin, (err, result) => {        done();        callback(err, result);      });    });  }  /**   * Compress data. Concurrency limited.   *   * @param {(Buffer|String)} data Data to compress   * @param {Boolean} fin Specifies whether or not this is the last fragment   * @param {Function} callback Callback   * @public   */  compress(data, fin, callback) {    zlibLimiter.add((done) => {      this._compress(data, fin, (err, result) => {        done();        callback(err, result);      });    });  }  /**   * Decompress data.   *   * @param {Buffer} data Compressed data   * @param {Boolean} fin Specifies whether or not this is the last fragment   * @param {Function} callback Callback   * @private   */  _decompress(data, fin, callback) {    const endpoint = this._isServer ? 'client' : 'server';    if (!this._inflate) {      const key = `${endpoint}_max_window_bits`;      const windowBits =        typeof this.params[key] !== 'number'          ? zlib.Z_DEFAULT_WINDOWBITS          : this.params[key];      this._inflate = zlib.createInflateRaw({        ...this._options.zlibInflateOptions,        windowBits      });      this._inflate[kPerMessageDeflate] = this;      this._inflate[kTotalLength] = 0;      this._inflate[kBuffers] = [];      this._inflate.on('error', inflateOnError);      this._inflate.on('data', inflateOnData);    }    this._inflate[kCallback] = callback;    this._inflate.write(data);    if (fin) this._inflate.write(TRAILER);    this._inflate.flush(() => {      const err = this._inflate[kError];      if (err) {        this._inflate.close();        this._inflate = null;        callback(err);        return;      }      const data = bufferUtil.concat(        this._inflate[kBuffers],        this._inflate[kTotalLength]      );      if (this._inflate._readableState.endEmitted) {        this._inflate.close();        this._inflate = null;      } else {        this._inflate[kTotalLength] = 0;        this._inflate[kBuffers] = [];        if (fin && this.params[`${endpoint}_no_context_takeover`]) {          this._inflate.reset();        }      }      callback(null, data);    });  }  /**   * Compress data.   *   * @param {(Buffer|String)} data Data to compress   * @param {Boolean} fin Specifies whether or not this is the last fragment   * @param {Function} callback Callback   * @private   */  _compress(data, fin, callback) {    const endpoint = this._isServer ? 'server' : 'client';    if (!this._deflate) {      const key = `${endpoint}_max_window_bits`;      const windowBits =        typeof this.params[key] !== 'number'          ? zlib.Z_DEFAULT_WINDOWBITS          : this.params[key];      this._deflate = zlib.createDeflateRaw({        ...this._options.zlibDeflateOptions,        windowBits      });      this._deflate[kTotalLength] = 0;      this._deflate[kBuffers] = [];      this._deflate.on('data', deflateOnData);    }    this._deflate[kCallback] = callback;    this._deflate.write(data);    this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {      if (!this._deflate) {        //        // The deflate stream was closed while data was being processed.        //        return;      }      let data = bufferUtil.concat(        this._deflate[kBuffers],        this._deflate[kTotalLength]      );      if (fin) {        data = new FastBuffer(data.buffer, data.byteOffset, data.length - 4);      }      //      // Ensure that the callback will not be called again in      // `PerMessageDeflate#cleanup()`.      //      this._deflate[kCallback] = null;      this._deflate[kTotalLength] = 0;      this._deflate[kBuffers] = [];      if (fin && this.params[`${endpoint}_no_context_takeover`]) {        this._deflate.reset();      }      callback(null, data);    });  }}module.exports = PerMessageDeflate;/** * The listener of the `zlib.DeflateRaw` stream `'data'` event. * * @param {Buffer} chunk A chunk of data * @private */function deflateOnData(chunk) {  this[kBuffers].push(chunk);  this[kTotalLength] += chunk.length;}/** * The listener of the `zlib.InflateRaw` stream `'data'` event. * * @param {Buffer} chunk A chunk of data * @private */function inflateOnData(chunk) {  this[kTotalLength] += chunk.length;  if (    this[kPerMessageDeflate]._maxPayload < 1 ||    this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload  ) {    this[kBuffers].push(chunk);    return;  }  this[kError] = new RangeError('Max payload size exceeded');  this[kError].code = 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH';  this[kError][kStatusCode] = 1009;  this.removeListener('data', inflateOnData);  this.reset();}/** * The listener of the `zlib.InflateRaw` stream `'error'` event. * * @param {Error} err The emitted error * @private */function inflateOnError(err) {  //  // There is no need to call `Zlib#close()` as the handle is automatically  // closed when an error is emitted.  //  this[kPerMessageDeflate]._inflate = null;  err[kStatusCode] = 1007;  this[kCallback](err);}
 |