All files / src/operations/client_bulk_write client_bulk_write.ts

92.85% Statements 26/28
84.61% Branches 11/13
100% Functions 5/5
92.85% Lines 26/28

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116392x 392x       392x 392x 392x               392x         7577x       7157x 7157x 7157x 7157x       420x       12558x                               7577x 1185x   1185x   1155x     1125x   30x   1155x                           6392x                 6392x               6480x 1071x   6480x                     392x              
import { MongoClientBulkWriteExecutionError, ServerType } from '../../beta';
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
import type { Server } from '../../sdam/server';
import type { ClientSession } from '../../sessions';
import { type TimeoutContext } from '../../timeout';
import { MongoDBNamespace } from '../../utils';
import { CommandOperation } from '../command';
import { Aspect, defineAspects } from '../operation';
import { type ClientBulkWriteCommandBuilder } from './command_builder';
import { type ClientBulkWriteOptions } from './common';
 
/**
 * Executes a single client bulk write operation within a potential batch.
 * @internal
 */
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
  commandBuilder: ClientBulkWriteCommandBuilder;
  override options: ClientBulkWriteOptions;
 
  override get commandName() {
    return 'bulkWrite' as const;
  }
 
  constructor(commandBuilder: ClientBulkWriteCommandBuilder, options: ClientBulkWriteOptions) {
    super(undefined, options);
    this.commandBuilder = commandBuilder;
    this.options = options;
    this.ns = new MongoDBNamespace('admin', '$cmd');
  }
 
  override resetBatch(): boolean {
    return this.commandBuilder.resetBatch();
  }
 
  override get canRetryWrite(): boolean {
    return this.commandBuilder.isBatchRetryable;
  }
 
  /**
   * Execute the command. Superclass will handle write concern, etc.
   * @param server - The server.
   * @param session - The session.
   * @returns The response.
   */
  override async execute(
    server: Server,
    session: ClientSession | undefined,
    timeoutContext: TimeoutContext
  ): Promise<ClientBulkWriteCursorResponse> {
    let command;
 
    if (server.description.type === ServerType.LoadBalancer) {
      Eif (session) {
        let connection;
        if (!session.pinnedConnection) {
          // Checkout a connection to build the command.
          connection = await server.pool.checkOut({ timeoutContext });
          // Pin the connection to the session so it get used to execute the command and we do not
          // perform a double check-in/check-out.
          session.pin(connection);
        } else {
          connection = session.pinnedConnection;
        }
        command = this.commandBuilder.buildBatch(
          connection.hello?.maxMessageSizeBytes,
          connection.hello?.maxWriteBatchSize,
          connection.hello?.maxBsonObjectSize
        );
      } else {
        throw new MongoClientBulkWriteExecutionError(
          'Session provided to the client bulk write operation must be present.'
        );
      }
    } else {
      // At this point we have a server and the auto connect code has already
      // run in executeOperation, so the server description will be populated.
      // We can use that to build the command.
      Iif (
        !server.description.maxWriteBatchSize ||
        !server.description.maxMessageSizeBytes ||
        !server.description.maxBsonObjectSize
      ) {
        throw new MongoClientBulkWriteExecutionError(
          'In order to execute a client bulk write, both maxWriteBatchSize, maxMessageSizeBytes and maxBsonObjectSize must be provided by the servers hello response.'
        );
      }
      command = this.commandBuilder.buildBatch(
        server.description.maxMessageSizeBytes,
        server.description.maxWriteBatchSize,
        server.description.maxBsonObjectSize
      );
    }
 
    // Check after the batch is built if we cannot retry it and override the option.
    if (!this.canRetryWrite) {
      this.options.willRetryWrite = false;
    }
    return await super.executeCommand(
      server,
      session,
      command,
      timeoutContext,
      ClientBulkWriteCursorResponse
    );
  }
}
 
// Skipping the collation as it goes on the individual ops.
defineAspects(ClientBulkWriteOperation, [
  Aspect.WRITE_OPERATION,
  Aspect.SKIP_COLLATION,
  Aspect.CURSOR_CREATING,
  Aspect.RETRYABLE,
  Aspect.COMMAND_BATCHING
]);