All files / src/cmap/wire_protocol on_data.ts

86% Statements 43/50
60% Branches 6/10
100% Functions 9/9
87.23% Lines 41/47

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136        155x                                   155x       3715283x               3713987x           3713987x           3713987x     3713987x   3713987x     3727947x 3727947x             3727947x               3727947x     3727947x 3727947x 3727947x       3668567x       40175x 40175x       3713987x         3713987x 3713987x 3713987x 152x     3713987x 3713987x 3713987x   3713987x     3682527x 3682527x         45572x   45572x 152x 45572x         3714139x 3714139x 3714139x 3714139x 3714139x 3714139x   3714139x       3714139x      
import { type EventEmitter } from 'events';
 
import { type Abortable } from '../../mongo_types';
import { type TimeoutContext } from '../../timeout';
import { addAbortListener, kDispose, List, promiseWithResolvers } from '../../utils';
 
/**
 * @internal
 * An object holding references to a promise's resolve and reject functions.
 */
type PendingPromises = Omit<
  ReturnType<typeof promiseWithResolvers<IteratorResult<Buffer>>>,
  'promise'
>;
 
/**
 * onData is adapted from Node.js' events.on helper
 * https://nodejs.org/api/events.html#eventsonemitter-eventname-options
 *
 * Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
 * It will reject upon an error event.
 */
export function onData(
  emitter: EventEmitter,
  { timeoutContext, signal }: { timeoutContext?: TimeoutContext } & Abortable
) {
  signal?.throwIfAborted();
 
  // Setup pending events and pending promise lists
  /**
   * When the caller has not yet called .next(), we store the
   * value from the event in this list. Next time they call .next()
   * we pull the first value out of this list and resolve a promise with it.
   */
  const unconsumedEvents = new List<Buffer>();
  /**
   * When there has not yet been an event, a new promise will be created
   * and implicitly stored in this list. When an event occurs we take the first
   * promise in this list and resolve it.
   */
  const unconsumedPromises = new List<PendingPromises>();
 
  /**
   * Stored an error created by an error event.
   * This error will turn into a rejection for the subsequent .next() call
   */
  let error: Error | null = null;
 
  /** Set to true only after event listeners have been removed. */
  let finished = false;
 
  const iterator: AsyncGenerator<Buffer> = {
    next() {
      // First, we consume all unread events
      const value = unconsumedEvents.shift();
      Iif (value != null) {
        return Promise.resolve({ value, done: false });
      }
 
      // Then we error, if an error happened
      // This happens one time if at all, because after 'error'
      // we stop listening
      Iif (error != null) {
        const p = Promise.reject(error);
        // Only the first element errors
        error = null;
        return p;
      }
 
      // If the iterator is finished, resolve to done
      Iif (finished) return closeHandler();
 
      // Wait until an event happens
      const { promise, resolve, reject } = promiseWithResolvers<IteratorResult<Buffer>>();
      unconsumedPromises.push({ resolve, reject });
      return promise;
    },
 
    return() {
      return closeHandler();
    },
 
    throw(err: Error) {
      errorHandler(err);
      return Promise.resolve({ value: undefined, done: true });
    },
 
    [Symbol.asyncIterator]() {
      return this;
    }
  };
 
  // Adding event handlers
  emitter.on('data', eventHandler);
  emitter.on('error', errorHandler);
  const abortListener = addAbortListener(signal, function () {
    errorHandler(this.reason);
  });
 
  const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead;
  timeoutForSocketRead?.throwIfExpired();
  timeoutForSocketRead?.then(undefined, errorHandler);
 
  return iterator;
 
  function eventHandler(value: Buffer) {
    const promise = unconsumedPromises.shift();
    Eif (promise != null) promise.resolve({ value, done: false });
    else unconsumedEvents.push(value);
  }
 
  function errorHandler(err: Error) {
    const promise = unconsumedPromises.shift();
 
    if (promise != null) promise.reject(err);
    else error = err;
    void closeHandler();
  }
 
  function closeHandler() {
    // Adding event handlers
    emitter.off('data', eventHandler);
    emitter.off('error', errorHandler);
    abortListener?.[kDispose]();
    finished = true;
    timeoutForSocketRead?.clear();
    const doneResult = { value: undefined, done: finished } as const;
 
    for (const promise of unconsumedPromises) {
      promise.resolve(doneResult);
    }
 
    return Promise.resolve(doneResult);
  }
}