All files / src/cursor change_stream_cursor.ts

98.18% Statements 54/55
93.75% Branches 30/32
88.88% Functions 8/9
98.18% Lines 54/55

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173  392x               392x   392x   392x   392x 392x                                 392x                                               135604x   135376x 135376x 135376x 135376x   135376x 637x 134739x 67125x         257541x 257541x       134771x       66629x       66629x 199887x     66629x 66573x 80x   66493x   56x 12x     66629x       36027x 27826x   8201x   36027x       191527x 191527x 188526x   188526x 153640x                       125526x           125526x           120611x 120611x   120611x           53359x     120611x   120611x 120611x   120611x       145091x   70916x 70916x   70916x 70916x 70916x      
import type { Document } from '../bson';
import {
  ChangeStream,
  type ChangeStreamDocument,
  type ChangeStreamEvents,
  type OperationTime,
  type ResumeToken
} from '../change_stream';
import { type CursorResponse } from '../cmap/wire_protocol/responses';
import { INIT, RESPONSE } from '../constants';
import type { MongoClient } from '../mongo_client';
import { AggregateOperation } from '../operations/aggregate';
import type { CollationOptions } from '../operations/command';
import { executeOperation } from '../operations/execute_operation';
import type { ClientSession } from '../sessions';
import { maxWireVersion, type MongoDBNamespace } from '../utils';
import {
  AbstractCursor,
  type AbstractCursorOptions,
  type InitialCursorResponse
} from './abstract_cursor';
 
/** @internal */
export interface ChangeStreamCursorOptions extends AbstractCursorOptions {
  startAtOperationTime?: OperationTime;
  resumeAfter?: ResumeToken;
  startAfter?: ResumeToken;
  maxAwaitTimeMS?: number;
  collation?: CollationOptions;
  fullDocument?: string;
}
 
/** @internal */
export class ChangeStreamCursor<
  TSchema extends Document = Document,
  TChange extends Document = ChangeStreamDocument<TSchema>
> extends AbstractCursor<TChange, ChangeStreamEvents> {
  private _resumeToken: ResumeToken;
  private startAtOperationTime: OperationTime | null;
  private hasReceived?: boolean;
  private readonly changeStreamCursorOptions: ChangeStreamCursorOptions;
  private postBatchResumeToken?: ResumeToken;
  private readonly pipeline: Document[];
 
  /**
   * @internal
   *
   * used to determine change stream resumability
   */
  maxWireVersion: number | undefined;
 
  constructor(
    client: MongoClient,
    namespace: MongoDBNamespace,
    pipeline: Document[] = [],
    options: ChangeStreamCursorOptions = {}
  ) {
    super(client, namespace, { ...options, tailable: true, awaitData: true });
 
    this.pipeline = pipeline;
    this.changeStreamCursorOptions = options;
    this._resumeToken = null;
    this.startAtOperationTime = options.startAtOperationTime ?? null;
 
    if (options.startAfter) {
      this.resumeToken = options.startAfter;
    } else if (options.resumeAfter) {
      this.resumeToken = options.resumeAfter;
    }
  }
 
  set resumeToken(token: ResumeToken) {
    this._resumeToken = token;
    this.emit(ChangeStream.RESUME_TOKEN_CHANGED, token);
  }
 
  get resumeToken(): ResumeToken {
    return this._resumeToken;
  }
 
  get resumeOptions(): ChangeStreamCursorOptions {
    const options: ChangeStreamCursorOptions = {
      ...this.changeStreamCursorOptions
    };
 
    for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) {
      delete options[key];
    }
 
    if (this.resumeToken != null) {
      if (this.changeStreamCursorOptions.startAfter && !this.hasReceived) {
        options.startAfter = this.resumeToken;
      } else {
        options.resumeAfter = this.resumeToken;
      }
    } else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) {
      options.startAtOperationTime = this.startAtOperationTime;
    }
 
    return options;
  }
 
  cacheResumeToken(resumeToken: ResumeToken): void {
    if (this.bufferedCount() === 0 && this.postBatchResumeToken) {
      this.resumeToken = this.postBatchResumeToken;
    } else {
      this.resumeToken = resumeToken;
    }
    this.hasReceived = true;
  }
 
  _processBatch(response: CursorResponse): void {
    const { postBatchResumeToken } = response;
    if (postBatchResumeToken) {
      this.postBatchResumeToken = postBatchResumeToken;
 
      if (response.batchSize === 0) {
        this.resumeToken = postBatchResumeToken;
      }
    }
  }
 
  clone(): AbstractCursor<TChange> {
    return new ChangeStreamCursor(this.client, this.namespace, this.pipeline, {
      ...this.cursorOptions
    });
  }
 
  async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
    const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, {
      ...this.cursorOptions,
      ...this.changeStreamCursorOptions,
      session
    });
 
    const response = await executeOperation(
      session.client,
      aggregateOperation,
      this.timeoutContext
    );
 
    const server = aggregateOperation.server;
    this.maxWireVersion = maxWireVersion(server);
 
    if (
      this.startAtOperationTime == null &&
      this.changeStreamCursorOptions.resumeAfter == null &&
      this.changeStreamCursorOptions.startAfter == null &&
      this.maxWireVersion >= 7
    ) {
      this.startAtOperationTime = response.operationTime;
    }
 
    this._processBatch(response);
 
    this.emit(INIT, response);
    this.emit(RESPONSE);
 
    return { server, session, response };
  }
 
  override async getMore(batchSize: number): Promise<CursorResponse> {
    const response = await super.getMore(batchSize);
 
    this.maxWireVersion = maxWireVersion(this.server);
    this._processBatch(response);
 
    this.emit(ChangeStream.MORE, response);
    this.emit(ChangeStream.RESPONSE);
    return response;
  }
}