123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- "use strict";
- Object.defineProperty(exports, "__esModule", { value: true });
- exports.FsaNodeWriteStream = void 0;
- const stream_1 = require("stream");
- const Defer_1 = require("../thingies/Defer");
- const concurrency_1 = require("../thingies/concurrency");
- const util_1 = require("../node/util");
- const queueMicrotask_1 = require("../queueMicrotask");
- /**
- * This WriteStream implementation does not build on top of the `fs` module,
- * but instead uses the lower-level `FileSystemFileHandle` interface. The reason
- * is the different semantics in `fs` and FSA (File System Access API) write streams.
- *
- * When data is written to an FSA file, a new FSA stream is created, it copies
- * the file to a temporary swap file. After each written chunk, that swap file
- * is closed and the original file is replaced with the swap file. This means,
- * if WriteStream was built on top of `fs`, each chunk write would result in
- * a file copy, write, close, rename operations, which is not what we want.
- *
- * Instead this implementation hooks into the lower-level and closes the swap
- * file only once the stream is closed. The downside is that the written data
- * is not immediately visible to other processes (because it is written to the
- * swap file), but that is the trade-off we have to make.
- *
- * @todo Could make this flush the data to the original file periodically, so that
- * the data is visible to other processes.
- * @todo This stream could work through `FileSystemSyncAccessHandle.write` in a
- * Worker thread instead.
- */
- class FsaNodeWriteStream extends stream_1.Writable {
- constructor(handle, path, options) {
- super();
- this.path = path;
- this.options = options;
- this.__pending__ = true;
- this.__closed__ = false;
- this.__bytes__ = 0;
- this.__mutex__ = (0, concurrency_1.concurrency)(1);
- if (options.start !== undefined) {
- if (typeof options.start !== 'number') {
- throw new TypeError('"start" option must be a Number');
- }
- if (options.start < 0) {
- throw new TypeError('"start" must be >= zero');
- }
- }
- const stream = new Defer_1.Defer();
- this.__stream__ = stream.promise;
- (async () => {
- var _a, _b;
- const fsaHandle = await handle;
- const fileWasOpened = !options.fd;
- if (fileWasOpened)
- this.emit('open', fsaHandle.fd);
- const flags = (0, util_1.flagsToNumber)((_a = options.flags) !== null && _a !== void 0 ? _a : 'w');
- const keepExistingData = flags & 1024 /* FLAG.O_APPEND */ ? true : false;
- const writable = await fsaHandle.file.createWritable({ keepExistingData });
- if (keepExistingData) {
- const start = Number((_b = options.start) !== null && _b !== void 0 ? _b : 0);
- if (start)
- await writable.seek(start);
- }
- this.__pending__ = false;
- stream.resolve(writable);
- })().catch(error => {
- stream.reject(error);
- });
- }
- async ___write___(buffers) {
- await this.__mutex__(async () => {
- if (this.__closed__)
- return;
- // if (this.__closed__) throw new Error('WriteStream is closed');
- const writable = await this.__stream__;
- for (const buffer of buffers) {
- await writable.write(buffer);
- this.__bytes__ += buffer.byteLength;
- }
- });
- }
- async __close__() {
- const emitClose = this.options.emitClose;
- await this.__mutex__(async () => {
- if (this.__closed__ && emitClose) {
- (0, queueMicrotask_1.default)(() => this.emit('close'));
- return;
- }
- try {
- const writable = await this.__stream__;
- this.__closed__ = true;
- await writable.close();
- if (emitClose)
- this.emit('close');
- }
- catch (error) {
- this.emit('error', error);
- if (emitClose)
- this.emit('close', error);
- }
- });
- }
- // ------------------------------------------------------------- IWriteStream
- get bytesWritten() {
- return this.__bytes__;
- }
- get pending() {
- return this.__pending__;
- }
- close(cb) {
- if (cb)
- this.once('close', cb);
- this.__close__().catch(() => { });
- }
- // ----------------------------------------------------------------- Writable
- _write(chunk, encoding, callback) {
- this.___write___([chunk])
- .then(() => {
- if (callback)
- callback(null);
- })
- .catch(error => {
- if (callback)
- callback(error);
- });
- }
- _writev(chunks, callback) {
- const buffers = chunks.map(({ chunk }) => chunk);
- this.___write___(buffers)
- .then(() => {
- if (callback)
- callback(null);
- })
- .catch(error => {
- if (callback)
- callback(error);
- });
- }
- _final(callback) {
- this.__close__()
- .then(() => {
- if (callback)
- callback(null);
- })
- .catch(error => {
- if (callback)
- callback(error);
- });
- }
- }
- exports.FsaNodeWriteStream = FsaNodeWriteStream;
- //# sourceMappingURL=FsaNodeWriteStream.js.map
|