All files / src/operations/client_bulk_write results_merger.ts

100% Statements 58/58
93.33% Branches 28/30
100% Functions 6/6
100% Lines 58/58

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261392x     392x                         392x                                                                                                             392x                     190x               6104x 6104x 6104x 6104x 6104x                       6104x 3256x 3256x 3256x               4153x                                           6621x 6621x   9506340x 5770x 5770x           9500570x 380x     380x       380x 380x   9500190x               2773x 357x 357x               357x 95x 95x 95x         2416x       6621x 4483x 4483x       6621x       4205x 357x 357x 357x           4205x                   5865x   5865x 1622x         5865x 2811x           2811x 950x   2811x     5865x 1432x                     4840x 4840x 4840x 4840x 4840x      
import { MongoWriteConcernError } from '../..';
import { type Document } from '../../bson';
import { type ClientBulkWriteCursor } from '../../cursor/client_bulk_write_cursor';
import { MongoClientBulkWriteError } from '../../error';
import {
  type ClientBulkWriteError,
  type ClientBulkWriteOptions,
  type ClientBulkWriteResult,
  type ClientDeleteResult,
  type ClientInsertOneResult,
  type ClientUpdateResult
} from './common';
 
/**
 * Unacknowledged bulk writes are always the same.
 */
const UNACKNOWLEDGED = {
  acknowledged: false,
  insertedCount: 0,
  upsertedCount: 0,
  matchedCount: 0,
  modifiedCount: 0,
  deletedCount: 0,
  insertResults: undefined,
  updateResults: undefined,
  deleteResults: undefined
};
 
interface ClientBulkWriteResultAccumulation {
  /**
   * Whether the bulk write was acknowledged.
   */
  acknowledged: boolean;
  /**
   * The total number of documents inserted across all insert operations.
   */
  insertedCount: number;
  /**
   * The total number of documents upserted across all update operations.
   */
  upsertedCount: number;
  /**
   * The total number of documents matched across all update operations.
   */
  matchedCount: number;
  /**
   * The total number of documents modified across all update operations.
   */
  modifiedCount: number;
  /**
   * The total number of documents deleted across all delete operations.
   */
  deletedCount: number;
  /**
   * The results of each individual insert operation that was successfully performed.
   */
  insertResults?: Map<number, ClientInsertOneResult>;
  /**
   * The results of each individual update operation that was successfully performed.
   */
  updateResults?: Map<number, ClientUpdateResult>;
  /**
   * The results of each individual delete operation that was successfully performed.
   */
  deleteResults?: Map<number, ClientDeleteResult>;
}
 
/**
 * Merges client bulk write cursor responses together into a single result.
 * @internal
 */
export class ClientBulkWriteResultsMerger {
  private result: ClientBulkWriteResultAccumulation;
  private options: ClientBulkWriteOptions;
  private currentBatchOffset: number;
  writeConcernErrors: Document[];
  writeErrors: Map<number, ClientBulkWriteError>;
 
  /**
   * @returns The standard unacknowledged bulk write result.
   */
  static unacknowledged(): ClientBulkWriteResult {
    return UNACKNOWLEDGED;
  }
 
  /**
   * Instantiate the merger.
   * @param options - The options.
   */
  constructor(options: ClientBulkWriteOptions) {
    this.options = options;
    this.currentBatchOffset = 0;
    this.writeConcernErrors = [];
    this.writeErrors = new Map();
    this.result = {
      acknowledged: true,
      insertedCount: 0,
      upsertedCount: 0,
      matchedCount: 0,
      modifiedCount: 0,
      deletedCount: 0,
      insertResults: undefined,
      updateResults: undefined,
      deleteResults: undefined
    };
 
    if (options.verboseResults) {
      this.result.insertResults = new Map<number, ClientInsertOneResult>();
      this.result.updateResults = new Map<number, ClientUpdateResult>();
      this.result.deleteResults = new Map<number, ClientDeleteResult>();
    }
  }
 
  /**
   * Get the bulk write result object.
   */
  get bulkWriteResult(): ClientBulkWriteResult {
    return {
      acknowledged: this.result.acknowledged,
      insertedCount: this.result.insertedCount,
      upsertedCount: this.result.upsertedCount,
      matchedCount: this.result.matchedCount,
      modifiedCount: this.result.modifiedCount,
      deletedCount: this.result.deletedCount,
      insertResults: this.result.insertResults,
      updateResults: this.result.updateResults,
      deleteResults: this.result.deleteResults
    };
  }
 
  /**
   * Merge the results in the cursor to the existing result.
   * @param currentBatchOffset - The offset index to the original models.
   * @param response - The cursor response.
   * @param documents - The documents in the cursor.
   * @returns The current result.
   */
  async merge(cursor: ClientBulkWriteCursor): Promise<ClientBulkWriteResult> {
    let writeConcernErrorResult;
    try {
      for await (const document of cursor) {
        // Only add to maps if ok: 1
        if (document.ok === 1) {
          Eif (this.options.verboseResults) {
            this.processDocument(cursor, document);
          }
        } else {
          // If an individual write error is encountered during an ordered bulk write, drivers MUST
          // record the error in writeErrors and immediately throw the exception. Otherwise, drivers
          // MUST continue to iterate the results cursor and execute any further bulkWrite batches.
          if (this.options.ordered) {
            const error = new MongoClientBulkWriteError({
              message: 'Mongo client ordered bulk write encountered a write error.'
            });
            error.writeErrors.set(document.idx + this.currentBatchOffset, {
              code: document.code,
              message: document.errmsg
            });
            error.partialResult = this.result;
            throw error;
          } else {
            this.writeErrors.set(document.idx + this.currentBatchOffset, {
              code: document.code,
              message: document.errmsg
            });
          }
        }
      }
    } catch (error) {
      if (error instanceof MongoWriteConcernError) {
        const result = error.result;
        writeConcernErrorResult = {
          insertedCount: result.nInserted,
          upsertedCount: result.nUpserted,
          matchedCount: result.nMatched,
          modifiedCount: result.nModified,
          deletedCount: result.nDeleted,
          writeConcernError: result.writeConcernError
        };
        if (this.options.verboseResults && result.cursor.firstBatch) {
          for (const document of result.cursor.firstBatch) {
            Eif (document.ok === 1) {
              this.processDocument(cursor, document);
            }
          }
        }
      } else {
        throw error;
      }
    } finally {
      // Update the counts from the cursor response.
      if (cursor.response) {
        const response = cursor.response;
        this.incrementCounts(response);
      }
 
      // Increment the batch offset.
      this.currentBatchOffset += cursor.operations.length;
    }
 
    // If we have write concern errors ensure they are added.
    if (writeConcernErrorResult) {
      const writeConcernError = writeConcernErrorResult.writeConcernError as Document;
      this.incrementCounts(writeConcernErrorResult);
      this.writeConcernErrors.push({
        code: writeConcernError.code,
        message: writeConcernError.errmsg
      });
    }
 
    return this.result;
  }
 
  /**
   * Process an individual document in the results.
   * @param cursor - The cursor.
   * @param document - The document to process.
   */
  private processDocument(cursor: ClientBulkWriteCursor, document: Document) {
    // Get the corresponding operation from the command.
    const operation = cursor.operations[document.idx];
    // Handle insert results.
    if ('insert' in operation) {
      this.result.insertResults?.set(document.idx + this.currentBatchOffset, {
        insertedId: operation.document._id
      });
    }
    // Handle update results.
    if ('update' in operation) {
      const result: ClientUpdateResult = {
        matchedCount: document.n,
        modifiedCount: document.nModified ?? 0,
        // Check if the bulk did actually upsert.
        didUpsert: document.upserted != null
      };
      if (document.upserted) {
        result.upsertedId = document.upserted._id;
      }
      this.result.updateResults?.set(document.idx + this.currentBatchOffset, result);
    }
    // Handle delete results.
    if ('delete' in operation) {
      this.result.deleteResults?.set(document.idx + this.currentBatchOffset, {
        deletedCount: document.n
      });
    }
  }
 
  /**
   * Increment the result counts.
   * @param document - The document with the results.
   */
  private incrementCounts(document: Document) {
    this.result.insertedCount += document.insertedCount;
    this.result.upsertedCount += document.upsertedCount;
    this.result.matchedCount += document.matchedCount;
    this.result.modifiedCount += document.modifiedCount;
    this.result.deletedCount += document.deletedCount;
  }
}