AsyncQueue.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. /*
  2. MIT License http://www.opensource.org/licenses/mit-license.php
  3. Author Tobias Koppers @sokra
  4. */
  5. "use strict";
  6. const { SyncHook, AsyncSeriesHook } = require("tapable");
  7. const { makeWebpackError } = require("../HookWebpackError");
  8. const WebpackError = require("../WebpackError");
  9. const ArrayQueue = require("./ArrayQueue");
  10. const QUEUED_STATE = 0;
  11. const PROCESSING_STATE = 1;
  12. const DONE_STATE = 2;
  13. let inHandleResult = 0;
  14. /**
  15. * @template T
  16. * @callback Callback
  17. * @param {(WebpackError | null)=} err
  18. * @param {(T | null)=} result
  19. */
  20. /**
  21. * @template T
  22. * @template K
  23. * @template R
  24. */
  25. class AsyncQueueEntry {
  26. /**
  27. * @param {T} item the item
  28. * @param {Callback<R>} callback the callback
  29. */
  30. constructor(item, callback) {
  31. this.item = item;
  32. /** @type {typeof QUEUED_STATE | typeof PROCESSING_STATE | typeof DONE_STATE} */
  33. this.state = QUEUED_STATE;
  34. /** @type {Callback<R> | undefined} */
  35. this.callback = callback;
  36. /** @type {Callback<R>[] | undefined} */
  37. this.callbacks = undefined;
  38. /** @type {R | null | undefined} */
  39. this.result = undefined;
  40. /** @type {WebpackError | null | undefined} */
  41. this.error = undefined;
  42. }
  43. }
  44. /**
  45. * @template T, K
  46. * @typedef {function(T): K} getKey
  47. */
  48. /**
  49. * @template T, R
  50. * @typedef {function(T, Callback<R>): void} Processor
  51. */
  52. /**
  53. * @template T
  54. * @template K
  55. * @template R
  56. */
  57. class AsyncQueue {
  58. /**
  59. * @param {object} options options object
  60. * @param {string=} options.name name of the queue
  61. * @param {number=} options.parallelism how many items should be processed at once
  62. * @param {string=} options.context context of execution
  63. * @param {AsyncQueue<any, any, any>=} options.parent parent queue, which will have priority over this queue and with shared parallelism
  64. * @param {getKey<T, K>=} options.getKey extract key from item
  65. * @param {Processor<T, R>} options.processor async function to process items
  66. */
  67. constructor({ name, context, parallelism, parent, processor, getKey }) {
  68. this._name = name;
  69. this._context = context || "normal";
  70. this._parallelism = parallelism || 1;
  71. this._processor = processor;
  72. this._getKey =
  73. getKey ||
  74. /** @type {getKey<T, K>} */ (item => /** @type {T & K} */ (item));
  75. /** @type {Map<K, AsyncQueueEntry<T, K, R>>} */
  76. this._entries = new Map();
  77. /** @type {ArrayQueue<AsyncQueueEntry<T, K, R>>} */
  78. this._queued = new ArrayQueue();
  79. /** @type {AsyncQueue<any, any, any>[] | undefined} */
  80. this._children = undefined;
  81. this._activeTasks = 0;
  82. this._willEnsureProcessing = false;
  83. this._needProcessing = false;
  84. this._stopped = false;
  85. /** @type {AsyncQueue<any, any, any>} */
  86. this._root = parent ? parent._root : this;
  87. if (parent) {
  88. if (this._root._children === undefined) {
  89. this._root._children = [this];
  90. } else {
  91. this._root._children.push(this);
  92. }
  93. }
  94. this.hooks = {
  95. /** @type {AsyncSeriesHook<[T]>} */
  96. beforeAdd: new AsyncSeriesHook(["item"]),
  97. /** @type {SyncHook<[T]>} */
  98. added: new SyncHook(["item"]),
  99. /** @type {AsyncSeriesHook<[T]>} */
  100. beforeStart: new AsyncSeriesHook(["item"]),
  101. /** @type {SyncHook<[T]>} */
  102. started: new SyncHook(["item"]),
  103. /** @type {SyncHook<[T, WebpackError | null | undefined, R | null | undefined]>} */
  104. result: new SyncHook(["item", "error", "result"])
  105. };
  106. this._ensureProcessing = this._ensureProcessing.bind(this);
  107. }
  108. /**
  109. * @returns {string} context of execution
  110. */
  111. getContext() {
  112. return this._context;
  113. }
  114. /**
  115. * @param {string} value context of execution
  116. */
  117. setContext(value) {
  118. this._context = value;
  119. }
  120. /**
  121. * @param {T} item an item
  122. * @param {Callback<R>} callback callback function
  123. * @returns {void}
  124. */
  125. add(item, callback) {
  126. if (this._stopped) return callback(new WebpackError("Queue was stopped"));
  127. this.hooks.beforeAdd.callAsync(item, err => {
  128. if (err) {
  129. callback(
  130. makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeAdd`)
  131. );
  132. return;
  133. }
  134. const key = this._getKey(item);
  135. const entry = this._entries.get(key);
  136. if (entry !== undefined) {
  137. if (entry.state === DONE_STATE) {
  138. if (inHandleResult++ > 3) {
  139. process.nextTick(() => callback(entry.error, entry.result));
  140. } else {
  141. callback(entry.error, entry.result);
  142. }
  143. inHandleResult--;
  144. } else if (entry.callbacks === undefined) {
  145. entry.callbacks = [callback];
  146. } else {
  147. entry.callbacks.push(callback);
  148. }
  149. return;
  150. }
  151. const newEntry = new AsyncQueueEntry(item, callback);
  152. if (this._stopped) {
  153. this.hooks.added.call(item);
  154. this._root._activeTasks++;
  155. process.nextTick(() =>
  156. this._handleResult(newEntry, new WebpackError("Queue was stopped"))
  157. );
  158. } else {
  159. this._entries.set(key, newEntry);
  160. this._queued.enqueue(newEntry);
  161. const root = this._root;
  162. root._needProcessing = true;
  163. if (root._willEnsureProcessing === false) {
  164. root._willEnsureProcessing = true;
  165. setImmediate(root._ensureProcessing);
  166. }
  167. this.hooks.added.call(item);
  168. }
  169. });
  170. }
  171. /**
  172. * @param {T} item an item
  173. * @returns {void}
  174. */
  175. invalidate(item) {
  176. const key = this._getKey(item);
  177. const entry =
  178. /** @type {AsyncQueueEntry<T, K, R>} */
  179. (this._entries.get(key));
  180. this._entries.delete(key);
  181. if (entry.state === QUEUED_STATE) {
  182. this._queued.delete(entry);
  183. }
  184. }
  185. /**
  186. * Waits for an already started item
  187. * @param {T} item an item
  188. * @param {Callback<R>} callback callback function
  189. * @returns {void}
  190. */
  191. waitFor(item, callback) {
  192. const key = this._getKey(item);
  193. const entry = this._entries.get(key);
  194. if (entry === undefined) {
  195. return callback(
  196. new WebpackError(
  197. "waitFor can only be called for an already started item"
  198. )
  199. );
  200. }
  201. if (entry.state === DONE_STATE) {
  202. process.nextTick(() => callback(entry.error, entry.result));
  203. } else if (entry.callbacks === undefined) {
  204. entry.callbacks = [callback];
  205. } else {
  206. entry.callbacks.push(callback);
  207. }
  208. }
  209. /**
  210. * @returns {void}
  211. */
  212. stop() {
  213. this._stopped = true;
  214. const queue = this._queued;
  215. this._queued = new ArrayQueue();
  216. const root = this._root;
  217. for (const entry of queue) {
  218. this._entries.delete(
  219. this._getKey(/** @type {AsyncQueueEntry<T, K, R>} */ (entry).item)
  220. );
  221. root._activeTasks++;
  222. this._handleResult(
  223. /** @type {AsyncQueueEntry<T, K, R>} */ (entry),
  224. new WebpackError("Queue was stopped")
  225. );
  226. }
  227. }
  228. /**
  229. * @returns {void}
  230. */
  231. increaseParallelism() {
  232. const root = this._root;
  233. root._parallelism++;
  234. /* istanbul ignore next */
  235. if (root._willEnsureProcessing === false && root._needProcessing) {
  236. root._willEnsureProcessing = true;
  237. setImmediate(root._ensureProcessing);
  238. }
  239. }
  240. /**
  241. * @returns {void}
  242. */
  243. decreaseParallelism() {
  244. const root = this._root;
  245. root._parallelism--;
  246. }
  247. /**
  248. * @param {T} item an item
  249. * @returns {boolean} true, if the item is currently being processed
  250. */
  251. isProcessing(item) {
  252. const key = this._getKey(item);
  253. const entry = this._entries.get(key);
  254. return entry !== undefined && entry.state === PROCESSING_STATE;
  255. }
  256. /**
  257. * @param {T} item an item
  258. * @returns {boolean} true, if the item is currently queued
  259. */
  260. isQueued(item) {
  261. const key = this._getKey(item);
  262. const entry = this._entries.get(key);
  263. return entry !== undefined && entry.state === QUEUED_STATE;
  264. }
  265. /**
  266. * @param {T} item an item
  267. * @returns {boolean} true, if the item is currently queued
  268. */
  269. isDone(item) {
  270. const key = this._getKey(item);
  271. const entry = this._entries.get(key);
  272. return entry !== undefined && entry.state === DONE_STATE;
  273. }
  274. /**
  275. * @returns {void}
  276. */
  277. _ensureProcessing() {
  278. while (this._activeTasks < this._parallelism) {
  279. const entry = this._queued.dequeue();
  280. if (entry === undefined) break;
  281. this._activeTasks++;
  282. entry.state = PROCESSING_STATE;
  283. this._startProcessing(entry);
  284. }
  285. this._willEnsureProcessing = false;
  286. if (this._queued.length > 0) return;
  287. if (this._children !== undefined) {
  288. for (const child of this._children) {
  289. while (this._activeTasks < this._parallelism) {
  290. const entry = child._queued.dequeue();
  291. if (entry === undefined) break;
  292. this._activeTasks++;
  293. entry.state = PROCESSING_STATE;
  294. child._startProcessing(entry);
  295. }
  296. if (child._queued.length > 0) return;
  297. }
  298. }
  299. if (!this._willEnsureProcessing) this._needProcessing = false;
  300. }
  301. /**
  302. * @param {AsyncQueueEntry<T, K, R>} entry the entry
  303. * @returns {void}
  304. */
  305. _startProcessing(entry) {
  306. this.hooks.beforeStart.callAsync(entry.item, err => {
  307. if (err) {
  308. this._handleResult(
  309. entry,
  310. makeWebpackError(err, `AsyncQueue(${this._name}).hooks.beforeStart`)
  311. );
  312. return;
  313. }
  314. let inCallback = false;
  315. try {
  316. this._processor(entry.item, (e, r) => {
  317. inCallback = true;
  318. this._handleResult(entry, e, r);
  319. });
  320. } catch (err) {
  321. if (inCallback) throw err;
  322. this._handleResult(entry, /** @type {WebpackError} */ (err), null);
  323. }
  324. this.hooks.started.call(entry.item);
  325. });
  326. }
  327. /**
  328. * @param {AsyncQueueEntry<T, K, R>} entry the entry
  329. * @param {(WebpackError | null)=} err error, if any
  330. * @param {(R | null)=} result result, if any
  331. * @returns {void}
  332. */
  333. _handleResult(entry, err, result) {
  334. this.hooks.result.callAsync(entry.item, err, result, hookError => {
  335. const error = hookError
  336. ? makeWebpackError(hookError, `AsyncQueue(${this._name}).hooks.result`)
  337. : err;
  338. const callback = /** @type {Callback<R>} */ (entry.callback);
  339. const callbacks = entry.callbacks;
  340. entry.state = DONE_STATE;
  341. entry.callback = undefined;
  342. entry.callbacks = undefined;
  343. entry.result = result;
  344. entry.error = error;
  345. const root = this._root;
  346. root._activeTasks--;
  347. if (root._willEnsureProcessing === false && root._needProcessing) {
  348. root._willEnsureProcessing = true;
  349. setImmediate(root._ensureProcessing);
  350. }
  351. if (inHandleResult++ > 3) {
  352. process.nextTick(() => {
  353. callback(error, result);
  354. if (callbacks !== undefined) {
  355. for (const callback of callbacks) {
  356. callback(error, result);
  357. }
  358. }
  359. });
  360. } else {
  361. callback(error, result);
  362. if (callbacks !== undefined) {
  363. for (const callback of callbacks) {
  364. callback(error, result);
  365. }
  366. }
  367. }
  368. inHandleResult--;
  369. });
  370. }
  371. clear() {
  372. this._entries.clear();
  373. this._queued.clear();
  374. this._activeTasks = 0;
  375. this._willEnsureProcessing = false;
  376. this._needProcessing = false;
  377. this._stopped = false;
  378. }
  379. }
  380. module.exports = AsyncQueue;