All files / src/cmap/wire_protocol on_data.ts

86% Statements 43/50
60% Branches 6/10
100% Functions 9/9
87.23% Lines 41/47

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136        416x                                   416x       19453326x               19446450x           19446450x           19446450x     19446450x   19446450x     19520582x 19520582x             19520582x               19520582x     19520582x 19520582x 19520582x       19226673x       195502x 195502x       19446450x         19446450x 19446450x 19446450x 802x     19446450x 19446450x 19446450x   19446450x     19300805x 19300805x         220549x   220549x 802x 220549x         19447222x 19447222x 19447222x 19447222x 19447222x 19447222x   19447222x       19447222x      
import { type EventEmitter } from 'events';
 
import { type Abortable } from '../../mongo_types';
import { type TimeoutContext } from '../../timeout';
import { addAbortListener, kDispose, List, promiseWithResolvers } from '../../utils';
 
/**
 * @internal
 * An object holding references to a promise's resolve and reject functions.
 */
type PendingPromises = Omit<
  ReturnType<typeof promiseWithResolvers<IteratorResult<Buffer>>>,
  'promise'
>;
 
/**
 * onData is adapted from Node.js' events.on helper
 * https://nodejs.org/api/events.html#eventsonemitter-eventname-options
 *
 * Returns an AsyncIterator that iterates each 'data' event emitted from emitter.
 * It will reject upon an error event.
 */
export function onData(
  emitter: EventEmitter,
  { timeoutContext, signal }: { timeoutContext?: TimeoutContext } & Abortable
) {
  signal?.throwIfAborted();
 
  // Setup pending events and pending promise lists
  /**
   * When the caller has not yet called .next(), we store the
   * value from the event in this list. Next time they call .next()
   * we pull the first value out of this list and resolve a promise with it.
   */
  const unconsumedEvents = new List<Buffer>();
  /**
   * When there has not yet been an event, a new promise will be created
   * and implicitly stored in this list. When an event occurs we take the first
   * promise in this list and resolve it.
   */
  const unconsumedPromises = new List<PendingPromises>();
 
  /**
   * Stored an error created by an error event.
   * This error will turn into a rejection for the subsequent .next() call
   */
  let error: Error | null = null;
 
  /** Set to true only after event listeners have been removed. */
  let finished = false;
 
  const iterator: AsyncGenerator<Buffer> = {
    next() {
      // First, we consume all unread events
      const value = unconsumedEvents.shift();
      Iif (value != null) {
        return Promise.resolve({ value, done: false });
      }
 
      // Then we error, if an error happened
      // This happens one time if at all, because after 'error'
      // we stop listening
      Iif (error != null) {
        const p = Promise.reject(error);
        // Only the first element errors
        error = null;
        return p;
      }
 
      // If the iterator is finished, resolve to done
      Iif (finished) return closeHandler();
 
      // Wait until an event happens
      const { promise, resolve, reject } = promiseWithResolvers<IteratorResult<Buffer>>();
      unconsumedPromises.push({ resolve, reject });
      return promise;
    },
 
    return() {
      return closeHandler();
    },
 
    throw(err: Error) {
      errorHandler(err);
      return Promise.resolve({ value: undefined, done: true });
    },
 
    [Symbol.asyncIterator]() {
      return this;
    }
  };
 
  // Adding event handlers
  emitter.on('data', eventHandler);
  emitter.on('error', errorHandler);
  const abortListener = addAbortListener(signal, function () {
    errorHandler(this.reason);
  });
 
  const timeoutForSocketRead = timeoutContext?.timeoutForSocketRead;
  timeoutForSocketRead?.throwIfExpired();
  timeoutForSocketRead?.then(undefined, errorHandler);
 
  return iterator;
 
  function eventHandler(value: Buffer) {
    const promise = unconsumedPromises.shift();
    Eif (promise != null) promise.resolve({ value, done: false });
    else unconsumedEvents.push(value);
  }
 
  function errorHandler(err: Error) {
    const promise = unconsumedPromises.shift();
 
    if (promise != null) promise.reject(err);
    else error = err;
    void closeHandler();
  }
 
  function closeHandler() {
    // Adding event handlers
    emitter.off('data', eventHandler);
    emitter.off('error', errorHandler);
    abortListener?.[kDispose]();
    finished = true;
    timeoutForSocketRead?.clear();
    const doneResult = { value: undefined, done: finished } as const;
 
    for (const promise of unconsumedPromises) {
      promise.resolve(doneResult);
    }
 
    return Promise.resolve(doneResult);
  }
}