Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 | 404x 404x 404x 404x 404x 404x 404x 404x 404x 190104x 190104x 190104x 190104x 190104x 190104x 303x 303x 189801x 174698x 174698x 5552x 190104x 5855x 184249x 190104x 458x 189646x 229x 209434x 179867x 208556x 208556x 208556x 208556x 232x 208556x 2044x 208556x 203x 208556x 1145x 208556x 208556x 458x 208556x 3533x 208556x 208556x 5924x 208556x 404x | import type { Document } from '../bson'; import { CursorResponse, ExplainedCursorResponse } from '../cmap/wire_protocol/responses'; import { type CursorTimeoutMode } from '../cursor/abstract_cursor'; import { MongoInvalidArgumentError } from '../error'; import { type ExplainOptions } from '../explain'; import type { Server } from '../sdam/server'; import type { ClientSession } from '../sessions'; import { type TimeoutContext } from '../timeout'; import { maxWireVersion, type MongoDBNamespace } from '../utils'; import { WriteConcern } from '../write_concern'; import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command'; import { Aspect, defineAspects, type Hint } from './operation'; /** @internal */ // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion export const DB_AGGREGATE_COLLECTION = 1 as const; const MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT = 8; /** @public */ export interface AggregateOptions extends Omit<CommandOperationOptions, 'explain'> { /** allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 \>). */ allowDiskUse?: boolean; /** The number of documents to return per batch. See [aggregation documentation](https://www.mongodb.com/docs/manual/reference/command/aggregate). */ batchSize?: number; /** Allow driver to bypass schema validation. */ bypassDocumentValidation?: boolean; /** Return the query as cursor, on 2.6 \> it returns as a real cursor on pre 2.6 it returns as an emulated cursor. */ cursor?: Document; /** * Specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point. */ maxTimeMS?: number; /** The maximum amount of time for the server to wait on new documents to satisfy a tailable cursor query. */ maxAwaitTimeMS?: number; /** Specify collation. */ collation?: CollationOptions; /** Add an index selection hint to an aggregation command */ hint?: Hint; /** Map of parameter names and values that can be accessed using $$var (requires MongoDB 5.0). */ let?: Document; out?: string; /** * Specifies the verbosity mode for the explain output. * @deprecated This API is deprecated in favor of `collection.aggregate().explain()` * or `db.aggregate().explain()`. */ explain?: ExplainOptions['explain']; /** @internal */ timeoutMode?: CursorTimeoutMode; } /** @internal */ export class AggregateOperation extends CommandOperation<CursorResponse> { override options: AggregateOptions; target: string | typeof DB_AGGREGATE_COLLECTION; pipeline: Document[]; hasWriteStage: boolean; constructor(ns: MongoDBNamespace, pipeline: Document[], options?: AggregateOptions) { super(undefined, { ...options, dbName: ns.db }); this.options = { ...options }; // Covers when ns.collection is null, undefined or the empty string, use DB_AGGREGATE_COLLECTION this.target = ns.collection || DB_AGGREGATE_COLLECTION; this.pipeline = pipeline; // determine if we have a write stage, override read preference if so this.hasWriteStage = false; if (typeof options?.out === 'string') { this.pipeline = this.pipeline.concat({ $out: options.out }); this.hasWriteStage = true; } else if (pipeline.length > 0) { const finalStage = pipeline[pipeline.length - 1]; if (finalStage.$out || finalStage.$merge) { this.hasWriteStage = true; } } if (this.hasWriteStage) { this.trySecondaryWrite = true; } else { delete this.options.writeConcern; } if (this.explain && this.writeConcern) { throw new MongoInvalidArgumentError( 'Option "explain" cannot be used on an aggregate call with writeConcern' ); } if (options?.cursor != null && typeof options.cursor !== 'object') { throw new MongoInvalidArgumentError('Cursor options must be an object'); } } override get commandName() { return 'aggregate' as const; } override get canRetryRead(): boolean { return !this.hasWriteStage; } addToPipeline(stage: Document): void { this.pipeline.push(stage); } override async execute( server: Server, session: ClientSession | undefined, timeoutContext: TimeoutContext ): Promise<CursorResponse> { const options: AggregateOptions = this.options; const serverWireVersion = maxWireVersion(server); const command: Document = { aggregate: this.target, pipeline: this.pipeline }; if (this.hasWriteStage && serverWireVersion < MIN_WIRE_VERSION_$OUT_READ_CONCERN_SUPPORT) { this.readConcern = undefined; } if (this.hasWriteStage && this.writeConcern) { WriteConcern.apply(command, this.writeConcern); } if (options.bypassDocumentValidation === true) { command.bypassDocumentValidation = options.bypassDocumentValidation; } if (typeof options.allowDiskUse === 'boolean') { command.allowDiskUse = options.allowDiskUse; } Iif (options.hint) { command.hint = options.hint; } if (options.let) { command.let = options.let; } // we check for undefined specifically here to allow falsy values // eslint-disable-next-line no-restricted-syntax if (options.comment !== undefined) { command.comment = options.comment; } command.cursor = options.cursor || {}; if (options.batchSize && !this.hasWriteStage) { command.cursor.batchSize = options.batchSize; } return await super.executeCommand( server, session, command, timeoutContext, this.explain ? ExplainedCursorResponse : CursorResponse ); } } defineAspects(AggregateOperation, [ Aspect.READ_OPERATION, Aspect.RETRYABLE, Aspect.EXPLAINABLE, Aspect.CURSOR_CREATING ]); |