All files / src/operations aggregate.ts

96.22% Statements 51/53
97.82% Branches 45/46
80% Functions 4/5
96.22% Lines 51/53

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172  404x   404x         404x 404x 404x 404x       404x 404x                                                                           404x             190104x   190104x     190104x   190104x     190104x 190104x 303x 303x 189801x 174698x 174698x 5552x       190104x 5855x   184249x     190104x 458x         189646x 229x         209434x       179867x                       208556x 208556x 208556x   208556x 232x     208556x 2044x     208556x 203x     208556x 1145x     208556x       208556x 458x         208556x 3533x     208556x 208556x 5924x     208556x                   404x            
import type { Document } from '../bson';
import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses';
import { type CursorTimeoutMode } from '../cursor/abstract_cursor';
import { MongoInvalidArgumentError } from '../error';
import { type ExplainOptions } from '../explain';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { maxWireVersion, type MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects, type Hint } from './operation';
 
/** @internal */
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
export const DB_AGGREGATE_COLLECTION = 1 as const;
const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8;
 
/** @public */
export interface AggregateOptions extends Omit<CommandOperationOptions, 'explain'> {
  /** allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 \>). */
  allowDiskUse?: boolean;
  /** The number of documents to return per batch. See [aggregation documentation](https://www.mongodb.com/docs/manual/reference/command/aggregate). */
  batchSize?: number;
  /** Allow driver to bypass schema validation. */
  bypassDocumentValidation?: boolean;
  /** Return the query as cursor, on 2.6 \> it returns as a real cursor on pre 2.6 it returns as an emulated cursor. */
  cursor?: Document;
  /**
   * Specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point.
   */
  maxTimeMS?: number;
  /** The maximum amount of time for the server to wait on new documents to satisfy a tailable cursor query. */
  maxAwaitTimeMS?: number;
  /** Specify collation. */
  collation?: CollationOptions;
  /** Add an index selection hint to an aggregation command */
  hint?: Hint;
  /** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */
  let?: Document;
 
  out?: string;
 
  /**
   * Specifies the verbosity mode for the explain output.
   * @deprecated This API is deprecated in favor of `collection.aggregate().explain()`
   * or `db.aggregate().explain()`.
   */
  explain?: ExplainOptions['explain'];
  /** @internal */
  timeoutMode?: CursorTimeoutMode;
}
 
/** @internal */
export class AggregateOperation extends CommandOperation<CursorResponse> {
  override options: AggregateOptions;
  target: string | typeof DB_AGGREGATE_COLLECTION;
  pipeline: Document[];
  hasWriteStage: boolean;
 
  constructor(ns: MongoDBNamespace, pipeline: Document[], options?: AggregateOptions) {
    super(undefined, { ...options, dbName: ns.db });
 
    this.options = { ...options };
 
    // Covers when ns.collection is null, undefined or the empty string, use DB_AGGREGATE_COLLECTION
    this.target = ns.collection || DB_AGGREGATE_COLLECTION;
 
    this.pipeline = pipeline;
 
    // determine if we have a write stage, override read preference if so
    this.hasWriteStage = false;
    if (typeof options?.out === 'string') {
      this.pipeline = this.pipeline.concat({ $out: options.out });
      this.hasWriteStage = true;
    } else if (pipeline.length > 0) {
      const finalStage = pipeline[pipeline.length - 1];
      if (finalStage.$out || finalStage.$merge) {
        this.hasWriteStage = true;
      }
    }
 
    if (this.hasWriteStage) {
      this.trySecondaryWrite = true;
    } else {
      delete this.options.writeConcern;
    }
 
    if (this.explain && this.writeConcern) {
      throw new MongoInvalidArgumentError(
        'Option "explain" cannot be used on an aggregate call with writeConcern'
      );
    }
 
    if (options?.cursor != null && typeof options.cursor !== 'object') {
      throw new MongoInvalidArgumentError('Cursor options must be an object');
    }
  }
 
  override get commandName() {
    return 'aggregate' as const;
  }
 
  override get canRetryRead(): boolean {
    return !this.hasWriteStage;
  }
 
  addToPipeline(stage: Document): void {
    this.pipeline.push(stage);
  }
 
  override async execute(
    server: Server,
    session: ClientSession | undefined,
    timeoutContext: TimeoutContext
  ): Promise<CursorResponse> {
    const options: AggregateOptions = this.options;
    const serverWireVersion = maxWireVersion(server);
    const command: Document = { aggregate: this.target, pipeline: this.pipeline };
 
    if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) {
      this.readConcern = undefined;
    }
 
    if (this.hasWriteStage && this.writeConcern) {
      WriteConcern.apply(command, this.writeConcern);
    }
 
    if (options.bypassDocumentValidation === true) {
      command.bypassDocumentValidation = options.bypassDocumentValidation;
    }
 
    if (typeof options.allowDiskUse === 'boolean') {
      command.allowDiskUse = options.allowDiskUse;
    }
 
    Iif (options.hint) {
      command.hint = options.hint;
    }
 
    if (options.let) {
      command.let = options.let;
    }
 
    // we check for undefined specifically here to allow falsy values
    // eslint-disable-next-line no-restricted-syntax
    if (options.comment !== undefined) {
      command.comment = options.comment;
    }
 
    command.cursor = options.cursor || {};
    if (options.batchSize && !this.hasWriteStage) {
      command.cursor.batchSize = options.batchSize;
    }
 
    return await super.executeCommand(
      server,
      session,
      command,
      timeoutContext,
      this.explain ? ExplainedCursorResponse : CursorResponse
    );
  }
}
 
defineAspects(AggregateOperation, [
  Aspect.READ_OPERATION,
  Aspect.RETRYABLE,
  Aspect.EXPLAINABLE,
  Aspect.CURSOR_CREATING
]);