Press n or j to go to the next uncovered block, b, p or k for the previous block.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | 392x 392x 392x 392x 392x 392x 392x 135604x 135376x 135376x 135376x 135376x 135376x 637x 134739x 67125x 257541x 257541x 134771x 66629x 66629x 199887x 66629x 66573x 80x 66493x 56x 12x 66629x 36027x 27826x 8201x 36027x 191527x 191527x 188526x 188526x 153640x 125526x 125526x 120611x 120611x 120611x 53359x 120611x 120611x 120611x 120611x 145091x 70916x 70916x 70916x 70916x 70916x | import type { Document } from '../bson'; import { ChangeStream, type ChangeStreamDocument, type ChangeStreamEvents, type OperationTime, type ResumeToken } from '../change_stream'; import { type CursorResponse } from '../cmap/wire_protocol/responses'; import { INIT, RESPONSE } from '../constants'; import type { MongoClient } from '../mongo_client'; import { AggregateOperation } from '../operations/aggregate'; import type { CollationOptions } from '../operations/command'; import { executeOperation } from '../operations/execute_operation'; import type { ClientSession } from '../sessions'; import { maxWireVersion, type MongoDBNamespace } from '../utils'; import { AbstractCursor, type AbstractCursorOptions, type InitialCursorResponse } from './abstract_cursor'; /** @internal */ export interface ChangeStreamCursorOptions extends AbstractCursorOptions { startAtOperationTime?: OperationTime; resumeAfter?: ResumeToken; startAfter?: ResumeToken; maxAwaitTimeMS?: number; collation?: CollationOptions; fullDocument?: string; } /** @internal */ export class ChangeStreamCursor< TSchema extends Document = Document, TChange extends Document = ChangeStreamDocument<TSchema> > extends AbstractCursor<TChange, ChangeStreamEvents> { private _resumeToken: ResumeToken; private startAtOperationTime: OperationTime | null; private hasReceived?: boolean; private readonly changeStreamCursorOptions: ChangeStreamCursorOptions; private postBatchResumeToken?: ResumeToken; private readonly pipeline: Document[]; /** * @internal * * used to determine change stream resumability */ maxWireVersion: number | undefined; constructor( client: MongoClient, namespace: MongoDBNamespace, pipeline: Document[] = [], options: ChangeStreamCursorOptions = {} ) { super(client, namespace, { ...options, tailable: true, awaitData: true }); this.pipeline = pipeline; this.changeStreamCursorOptions = options; this._resumeToken = null; this.startAtOperationTime = options.startAtOperationTime ?? null; if (options.startAfter) { this.resumeToken = options.startAfter; } else if (options.resumeAfter) { this.resumeToken = options.resumeAfter; } } set resumeToken(token: ResumeToken) { this._resumeToken = token; this.emit(ChangeStream.RESUME_TOKEN_CHANGED, token); } get resumeToken(): ResumeToken { return this._resumeToken; } get resumeOptions(): ChangeStreamCursorOptions { const options: ChangeStreamCursorOptions = { ...this.changeStreamCursorOptions }; for (const key of ['resumeAfter', 'startAfter', 'startAtOperationTime'] as const) { delete options[key]; } if (this.resumeToken != null) { if (this.changeStreamCursorOptions.startAfter && !this.hasReceived) { options.startAfter = this.resumeToken; } else { options.resumeAfter = this.resumeToken; } } else if (this.startAtOperationTime != null && maxWireVersion(this.server) >= 7) { options.startAtOperationTime = this.startAtOperationTime; } return options; } cacheResumeToken(resumeToken: ResumeToken): void { if (this.bufferedCount() === 0 && this.postBatchResumeToken) { this.resumeToken = this.postBatchResumeToken; } else { this.resumeToken = resumeToken; } this.hasReceived = true; } _processBatch(response: CursorResponse): void { const { postBatchResumeToken } = response; if (postBatchResumeToken) { this.postBatchResumeToken = postBatchResumeToken; if (response.batchSize === 0) { this.resumeToken = postBatchResumeToken; } } } clone(): AbstractCursor<TChange> { return new ChangeStreamCursor(this.client, this.namespace, this.pipeline, { ...this.cursorOptions }); } async _initialize(session: ClientSession): Promise<InitialCursorResponse> { const aggregateOperation = new AggregateOperation(this.namespace, this.pipeline, { ...this.cursorOptions, ...this.changeStreamCursorOptions, session }); const response = await executeOperation( session.client, aggregateOperation, this.timeoutContext ); const server = aggregateOperation.server; this.maxWireVersion = maxWireVersion(server); if ( this.startAtOperationTime == null && this.changeStreamCursorOptions.resumeAfter == null && this.changeStreamCursorOptions.startAfter == null && this.maxWireVersion >= 7 ) { this.startAtOperationTime = response.operationTime; } this._processBatch(response); this.emit(INIT, response); this.emit(RESPONSE); return { server, session, response }; } override async getMore(batchSize: number): Promise<CursorResponse> { const response = await super.getMore(batchSize); this.maxWireVersion = maxWireVersion(this.server); this._processBatch(response); this.emit(ChangeStream.MORE, response); this.emit(ChangeStream.RESPONSE); return response; } } |