Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | 143x 143x 143x 143x 143x 143x 143x 143x 143x 39103x 39103x 39103x 39103x 39103x 39103x 58x 58x 39045x 36017x 36017x 1072x 39103x 1130x 37973x 39103x 88x 39015x 44x 43088x 36818x 42940x 42940x 42940x 42940x 42x 42940x 344x 42940x 48x 42940x 220x 42940x 42940x 88x 42940x 688x 42940x 42940x 1169x 42940x 143x | import type { Document } from '../bson'; import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses'; import { type CursorTimeoutMode } from '../cursor/abstract_cursor'; import { MongoInvalidArgumentError } from '../error'; import { type ExplainOptions } from '../explain'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { type TimeoutContext } from '../timeout'; import { maxWireVersion, type MongoDBNamespace } from '../utils'; import { WriteConcern } from '../write_concern'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects, type Hint } from './operation'; /** @internal */ // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion export const DB_AGGREGATE_COLLECTION = 1 as const; const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8; /** @public */ export interface AggregateOptions extends Omit<CommandOperationOptions, 'explain'> { /** allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 \>). */ allowDiskUse?: boolean; /** The number of documents to return per batch. See [aggregation documentation](https://www.mongodb.com/docs/manual/reference/command/aggregate). */ batchSize?: number; /** Allow driver to bypass schema validation. */ bypassDocumentValidation?: boolean; /** Return the query as cursor, on 2.6 \> it returns as a real cursor on pre 2.6 it returns as an emulated cursor. */ cursor?: Document; /** * Specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point. */ maxTimeMS?: number; /** The maximum amount of time for the server to wait on new documents to satisfy a tailable cursor query. */ maxAwaitTimeMS?: number; /** Specify collation. */ collation?: CollationOptions; /** Add an index selection hint to an aggregation command */ hint?: Hint; /** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */ let?: Document; out?: string; /** * Specifies the verbosity mode for the explain output. * @deprecated This API is deprecated in favor of `collection.aggregate().explain()` * or `db.aggregate().explain()`. */ explain?: ExplainOptions['explain']; /** @internal */ timeoutMode?: CursorTimeoutMode; } /** @internal */ export class AggregateOperation extends CommandOperation<CursorResponse> { override options: AggregateOptions; target: string | typeof DB_AGGREGATE_COLLECTION; pipeline: Document[]; hasWriteStage: boolean; constructor(ns: MongoDBNamespace, pipeline: Document[], options?: AggregateOptions) { super(undefined, { ...options, dbName: ns.db }); this.options = { ...options }; // Covers when ns.collection is null, undefined or the empty string, use DB_AGGREGATE_COLLECTION this.target = ns.collection || DB_AGGREGATE_COLLECTION; this.pipeline = pipeline; // determine if we have a write stage, override read preference if so this.hasWriteStage = false; if (typeof options?.out === 'string') { this.pipeline = this.pipeline.concat({ $out: options.out }); this.hasWriteStage = true; } else if (pipeline.length > 0) { const finalStage = pipeline[pipeline.length - 1]; if (finalStage.$out || finalStage.$merge) { this.hasWriteStage = true; } } if (this.hasWriteStage) { this.trySecondaryWrite = true; } else { delete this.options.writeConcern; } if (this.explain && this.writeConcern) { throw new MongoInvalidArgumentError( 'Option "explain" cannot be used on an aggregate call with writeConcern' ); } if (options?.cursor != null && typeof options.cursor !== 'object') { throw new MongoInvalidArgumentError('Cursor options must be an object'); } } override get commandName() { return 'aggregate' as const; } override get canRetryRead(): boolean { return !this.hasWriteStage; } addToPipeline(stage: Document): void { this.pipeline.push(stage); } override async execute( server: Server, session: ClientSession | undefined, timeoutContext: TimeoutContext ): Promise<CursorResponse> { const options: AggregateOptions = this.options; const serverWireVersion = maxWireVersion(server); const command: Document = { aggregate: this.target, pipeline: this.pipeline }; if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) { this.readConcern = undefined; } if (this.hasWriteStage && this.writeConcern) { WriteConcern.apply(command, this.writeConcern); } if (options.bypassDocumentValidation === true) { command.bypassDocumentValidation = options.bypassDocumentValidation; } if (typeof options.allowDiskUse === 'boolean') { command.allowDiskUse = options.allowDiskUse; } Iif (options.hint) { command.hint = options.hint; } if (options.let) { command.let = options.let; } // we check for undefined specifically here to allow falsy values // eslint-disable-next-line no-restricted-syntax if (options.comment !== undefined) { command.comment = options.comment; } command.cursor = options.cursor || {}; if (options.batchSize && !this.hasWriteStage) { command.cursor.batchSize = options.batchSize; } return await super.executeCommand( server, session, command, timeoutContext, this.explain ? ExplainedCursorResponse : CursorResponse ); } } defineAspects(AggregateOperation, [ Aspect.READ_OPERATION, Aspect.RETRYABLE, Aspect.EXPLAINABLE, Aspect.CURSOR_CREATING ]); |