All files / src/operations aggregate.ts

96.22% Statements 51/53
97.82% Branches 45/46
80% Functions 4/5
96.22% Lines 51/53

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172  143x   143x         143x 143x 143x 143x       143x 143x                                                                           143x             39103x   39103x     39103x   39103x     39103x 39103x 58x 58x 39045x 36017x 36017x 1072x       39103x 1130x   37973x     39103x 88x         39015x 44x         43088x       36818x                       42940x 42940x 42940x   42940x 42x     42940x 344x     42940x 48x     42940x 220x     42940x       42940x 88x         42940x 688x     42940x 42940x 1169x     42940x                   143x            
import type { Document } from '../bson';
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { MongoInvalidArgumentError } from '../error';
import { type ExplainOptions } from '../explain';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maxWireVersion, type MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects, type Hint } from './operation';
 
/** @internal */
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
export const DB_AGGREGATE_COLLECTION = 1 as const;
const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8;
 
/** @public */
export interface AggregateOptions extends Omit<CommandOperationOptions, 'explain'> {
  /** allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 \>). */
  allowDiskUse?: boolean;
  /** The number of documents to return per batch. See [aggregation documentation](https://www.mongodb.com/docs/manual/reference/command/aggregate). */
  batchSize?: number;
  /** Allow driver to bypass schema validation. */
  bypassDocumentValidation?: boolean;
  /** Return the query as cursor, on 2.6 \> it returns as a real cursor on pre 2.6 it returns as an emulated cursor. */
  cursor?: Document;
  /**
   * Specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point.
   */
  maxTimeMS?: number;
  /** The maximum amount of time for the server to wait on new documents to satisfy a tailable cursor query. */
  maxAwaitTimeMS?: number;
  /** Specify collation. */
  collation?: CollationOptions;
  /** Add an index selection hint to an aggregation command */
  hint?: Hint;
  /** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
  let?: Document;
 
  out?: string;
 
  /**
   * Specifies the verbosity mode for the explain output.
   * @deprecated This API is deprecated in favor of `collection.aggregate().explain()`
   * or `db.aggregate().explain()`.
   */
  explain?: ExplainOptions['explain'];
  /** @internal */
  timeoutMode?: CursorTimeoutMode;
}
 
/** @internal */
export class AggregateOperation extends CommandOperation<CursorResponse> {
  override options: AggregateOptions;
  target: string | typeof DB_AGGREGATE_COLLECTION;
  pipeline: Document[];
  hasWriteStage: boolean;
 
  constructor(ns: MongoDBNamespace, pipeline: Document[], options?: AggregateOptions) {
    super(undefined, { ...options, dbName: ns.db });
 
    this.options = { ...options };
 
    // Covers when ns.collection is null, undefined or the empty string, use DB_AGGREGATE_COLLECTION
    this.target = ns.collection || DB_AGGREGATE_COLLECTION;
 
    this.pipeline = pipeline;
 
    // determine if we have a write stage, override read preference if so
    this.hasWriteStage = false;
    if (typeof options?.out === 'string') {
      this.pipeline = this.pipeline.concat({ $out: options.out });
      this.hasWriteStage = true;
    } else if (pipeline.length > 0) {
      const finalStage = pipeline[pipeline.length - 1];
      if (finalStage.$out || finalStage.$merge) {
        this.hasWriteStage = true;
      }
    }
 
    if (this.hasWriteStage) {
      this.trySecondaryWrite = true;
    } else {
      delete this.options.writeConcern;
    }
 
    if (this.explain && this.writeConcern) {
      throw new MongoInvalidArgumentError(
        'Option "explain" cannot be used on an aggregate call with writeConcern'
      );
    }
 
    if (options?.cursor != null && typeof options.cursor !== 'object') {
      throw new MongoInvalidArgumentError('Cursor options must be an object');
    }
  }
 
  override get commandName() {
    return 'aggregate' as const;
  }
 
  override get canRetryRead(): boolean {
    return !this.hasWriteStage;
  }
 
  addToPipeline(stage: Document): void {
    this.pipeline.push(stage);
  }
 
  override async execute(
    server: Server,
    session: ClientSession | undefined,
    timeoutContext: TimeoutContext
  ): Promise<CursorResponse> {
    const options: AggregateOptions = this.options;
    const serverWireVersion = maxWireVersion(server);
    const command: Document = { aggregate: this.target, pipeline: this.pipeline };
 
    if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) {
      this.readConcern = undefined;
    }
 
    if (this.hasWriteStage && this.writeConcern) {
      WriteConcern.apply(command, this.writeConcern);
    }
 
    if (options.bypassDocumentValidation === true) {
      command.bypassDocumentValidation = options.bypassDocumentValidation;
    }
 
    if (typeof options.allowDiskUse === 'boolean') {
      command.allowDiskUse = options.allowDiskUse;
    }
 
    Iif (options.hint) {
      command.hint = options.hint;
    }
 
    if (options.let) {
      command.let = options.let;
    }
 
    // we check for undefined specifically here to allow falsy values
    // eslint-disable-next-line no-restricted-syntax
    if (options.comment !== undefined) {
      command.comment = options.comment;
    }
 
    command.cursor = options.cursor || {};
    if (options.batchSize && !this.hasWriteStage) {
      command.cursor.batchSize = options.batchSize;
    }
 
    return await super.executeCommand(
      server,
      session,
      command,
      timeoutContext,
      this.explain ? ExplainedCursorResponse : CursorResponse
    );
  }
}
 
defineAspects(AggregateOperation, [
  Aspect.READ_OPERATION,
  Aspect.RETRYABLE,
  Aspect.EXPLAINABLE,
  Aspect.CURSOR_CREATING
]);