FsaNodeWriteStream.js 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.FsaNodeWriteStream = void 0;
  4. const stream_1 = require("stream");
  5. const Defer_1 = require("../thingies/Defer");
  6. const concurrency_1 = require("../thingies/concurrency");
  7. const util_1 = require("../node/util");
  8. const queueMicrotask_1 = require("../queueMicrotask");
  9. /**
  10. * This WriteStream implementation does not build on top of the `fs` module,
  11. * but instead uses the lower-level `FileSystemFileHandle` interface. The reason
  12. * is the different semantics in `fs` and FSA (File System Access API) write streams.
  13. *
  14. * When data is written to an FSA file, a new FSA stream is created, it copies
  15. * the file to a temporary swap file. After each written chunk, that swap file
  16. * is closed and the original file is replaced with the swap file. This means,
  17. * if WriteStream was built on top of `fs`, each chunk write would result in
  18. * a file copy, write, close, rename operations, which is not what we want.
  19. *
  20. * Instead this implementation hooks into the lower-level and closes the swap
  21. * file only once the stream is closed. The downside is that the written data
  22. * is not immediately visible to other processes (because it is written to the
  23. * swap file), but that is the trade-off we have to make.
  24. *
  25. * @todo Could make this flush the data to the original file periodically, so that
  26. * the data is visible to other processes.
  27. * @todo This stream could work through `FileSystemSyncAccessHandle.write` in a
  28. * Worker thread instead.
  29. */
  30. class FsaNodeWriteStream extends stream_1.Writable {
  31. constructor(handle, path, options) {
  32. super();
  33. this.path = path;
  34. this.options = options;
  35. this.__pending__ = true;
  36. this.__closed__ = false;
  37. this.__bytes__ = 0;
  38. this.__mutex__ = (0, concurrency_1.concurrency)(1);
  39. if (options.start !== undefined) {
  40. if (typeof options.start !== 'number') {
  41. throw new TypeError('"start" option must be a Number');
  42. }
  43. if (options.start < 0) {
  44. throw new TypeError('"start" must be >= zero');
  45. }
  46. }
  47. const stream = new Defer_1.Defer();
  48. this.__stream__ = stream.promise;
  49. (async () => {
  50. var _a, _b;
  51. const fsaHandle = await handle;
  52. const fileWasOpened = !options.fd;
  53. if (fileWasOpened)
  54. this.emit('open', fsaHandle.fd);
  55. const flags = (0, util_1.flagsToNumber)((_a = options.flags) !== null && _a !== void 0 ? _a : 'w');
  56. const keepExistingData = flags & 1024 /* FLAG.O_APPEND */ ? true : false;
  57. const writable = await fsaHandle.file.createWritable({ keepExistingData });
  58. if (keepExistingData) {
  59. const start = Number((_b = options.start) !== null && _b !== void 0 ? _b : 0);
  60. if (start)
  61. await writable.seek(start);
  62. }
  63. this.__pending__ = false;
  64. stream.resolve(writable);
  65. })().catch(error => {
  66. stream.reject(error);
  67. });
  68. }
  69. async ___write___(buffers) {
  70. await this.__mutex__(async () => {
  71. if (this.__closed__)
  72. return;
  73. // if (this.__closed__) throw new Error('WriteStream is closed');
  74. const writable = await this.__stream__;
  75. for (const buffer of buffers) {
  76. await writable.write(buffer);
  77. this.__bytes__ += buffer.byteLength;
  78. }
  79. });
  80. }
  81. async __close__() {
  82. const emitClose = this.options.emitClose;
  83. await this.__mutex__(async () => {
  84. if (this.__closed__ && emitClose) {
  85. (0, queueMicrotask_1.default)(() => this.emit('close'));
  86. return;
  87. }
  88. try {
  89. const writable = await this.__stream__;
  90. this.__closed__ = true;
  91. await writable.close();
  92. if (emitClose)
  93. this.emit('close');
  94. }
  95. catch (error) {
  96. this.emit('error', error);
  97. if (emitClose)
  98. this.emit('close', error);
  99. }
  100. });
  101. }
  102. // ------------------------------------------------------------- IWriteStream
  103. get bytesWritten() {
  104. return this.__bytes__;
  105. }
  106. get pending() {
  107. return this.__pending__;
  108. }
  109. close(cb) {
  110. if (cb)
  111. this.once('close', cb);
  112. this.__close__().catch(() => { });
  113. }
  114. // ----------------------------------------------------------------- Writable
  115. _write(chunk, encoding, callback) {
  116. this.___write___([chunk])
  117. .then(() => {
  118. if (callback)
  119. callback(null);
  120. })
  121. .catch(error => {
  122. if (callback)
  123. callback(error);
  124. });
  125. }
  126. _writev(chunks, callback) {
  127. const buffers = chunks.map(({ chunk }) => chunk);
  128. this.___write___(buffers)
  129. .then(() => {
  130. if (callback)
  131. callback(null);
  132. })
  133. .catch(error => {
  134. if (callback)
  135. callback(error);
  136. });
  137. }
  138. _final(callback) {
  139. this.__close__()
  140. .then(() => {
  141. if (callback)
  142. callback(null);
  143. })
  144. .catch(error => {
  145. if (callback)
  146. callback(error);
  147. });
  148. }
  149. }
  150. exports.FsaNodeWriteStream = FsaNodeWriteStream;
  151. //# sourceMappingURL=FsaNodeWriteStream.js.map