Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 | 155x 155x 3715283x 3713987x 3713987x 3713987x 3713987x 3713987x 3727947x 3727947x 3727947x 3727947x 3727947x 3727947x 3727947x 3668567x 40175x 40175x 3713987x 3713987x 3713987x 3713987x 152x 3713987x 3713987x 3713987x 3713987x 3682527x 3682527x 45572x 45572x 152x 45572x 3714139x 3714139x 3714139x 3714139x 3714139x 3714139x 3714139x 3714139x | import { type EventEmitter } from 'events'; import { type Abortable } from '../../mongo_types'; import { type TimeoutContext } from '../../timeout'; import { addAbortListener, kDispose, List, promiseWithResolvers } from '../../utils'; /** * @internal * An object holding references to a promise's resolve and reject functions. */ type PendingPromises = Omit< ReturnType<typeof promiseWithResolvers<IteratorResult<Buffer>>>, 'promise' >; /** * onData is adapted from Node.js' events.on helper * https://nodejs.org/api/events.html#eventsonemitter-eventname-options * * Returns an AsyncIterator that iterates each 'data' event emitted from emitter. * It will reject upon an error event. */ export function onData( emitter: EventEmitter, { timeoutContext, signal }: { timeoutContext?: TimeoutContext } & Abortable ) { signal?.throwIfAborted(); // Setup pending events and pending promise lists /** * When the caller has not yet called .next(), we store the * value from the event in this list. Next time they call .next() * we pull the first value out of this list and resolve a promise with it. */ const unconsumedEvents = new List<Buffer>(); /** * When there has not yet been an event, a new promise will be created * and implicitly stored in this list. When an event occurs we take the first * promise in this list and resolve it. */ const unconsumedPromises = new List<PendingPromises>(); /** * Stored an error created by an error event. * This error will turn into a rejection for the subsequent .next() call */ let error: Error | null = null; /** Set to true only after event listeners have been removed. */ let finished = false; const iterator: AsyncGenerator<Buffer> = { next() { // First, we consume all unread events const value = unconsumedEvents.shift(); Iif (value != null) { return Promise.resolve({ value, done: false }); } // Then we error, if an error happened // This happens one time if at all, because after 'error' // we stop listening Iif (error != null) { const p = Promise.reject(error); // Only the first element errors error = null; return p; } // If the iterator is finished, resolve to done Iif (finished) return closeHandler(); // Wait until an event happens const { promise, resolve, reject } = promiseWithResolvers<IteratorResult<Buffer>>(); unconsumedPromises.push({ resolve, reject }); return promise; }, return() { return closeHandler(); }, throw(err: Error) { errorHandler(err); return Promise.resolve({ value: undefined, done: true }); }, [Symbol.asyncIterator]() { return this; } }; // Adding event handlers emitter.on('data', eventHandler); emitter.on('error', errorHandler); const abortListener = addAbortListener(signal, function () { errorHandler(this.reason); }); const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead; timeoutForSocketRead?.throwIfExpired(); timeoutForSocketRead?.then(undefined, errorHandler); return iterator; function eventHandler(value: Buffer) { const promise = unconsumedPromises.shift(); Eif (promise != null) promise.resolve({ value, done: false }); else unconsumedEvents.push(value); } function errorHandler(err: Error) { const promise = unconsumedPromises.shift(); if (promise != null) promise.reject(err); else error = err; void closeHandler(); } function closeHandler() { // Adding event handlers emitter.off('data', eventHandler); emitter.off('error', errorHandler); abortListener?.[kDispose](); finished = true; timeoutForSocketRead?.clear(); const doneResult = { value: undefined, done: finished } as const; for (const promise of unconsumedPromises) { promise.resolve(doneResult); } return Promise.resolve(doneResult); } } |