All files / src transactions.ts

95.34% Statements 41/43
92% Branches 23/25
90.9% Functions 10/11
95.34% Lines 41/43

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188  303x   303x 303x   303x     303x                       303x                                                   303x         303x                                                   303x                     2605267x 2605267x 2605267x   2605267x 2605267x 1596x 68x     1528x     2605199x 544x     2605199x 10394x     2605199x 265x       2605199x 2605199x         4768197x       18490x       2956094x                       21528823x       12240x               31938x 31938x 31938x 31938x         15107x   31938x                   9675x 9675x           24432x       303x 2686887x    
import type { Document } from './bson';
import { MongoRuntimeError, MongoTransactionError } from './error';
import type { CommandOperationOptions } from './operations/command';
import { ReadConcern, type ReadConcernLike } from './read_concern';
import { ReadPreference, type ReadPreferenceLike } from './read_preference';
import type { Server } from './sdam/server';
import { WriteConcern } from './write_concern';
 
/** @internal */
export const TxnState = Object.freeze({
  NO_TRANSACTION: 'NO_TRANSACTION',
  STARTING_TRANSACTION: 'STARTING_TRANSACTION',
  TRANSACTION_IN_PROGRESS: 'TRANSACTION_IN_PROGRESS',
  TRANSACTION_COMMITTED: 'TRANSACTION_COMMITTED',
  TRANSACTION_COMMITTED_EMPTY: 'TRANSACTION_COMMITTED_EMPTY',
  TRANSACTION_ABORTED: 'TRANSACTION_ABORTED'
} as const);
 
/** @internal */
export type TxnState = (typeof TxnState)[keyof typeof TxnState];
 
const stateMachine: { [state in TxnState]: TxnState[] } = {
  [TxnState.NO_TRANSACTION]: [TxnState.NO_TRANSACTION, TxnState.STARTING_TRANSACTION],
  [TxnState.STARTING_TRANSACTION]: [
    TxnState.TRANSACTION_IN_PROGRESS,
    TxnState.TRANSACTION_COMMITTED,
    TxnState.TRANSACTION_COMMITTED_EMPTY,
    TxnState.TRANSACTION_ABORTED
  ],
  [TxnState.TRANSACTION_IN_PROGRESS]: [
    TxnState.TRANSACTION_IN_PROGRESS,
    TxnState.TRANSACTION_COMMITTED,
    TxnState.TRANSACTION_ABORTED
  ],
  [TxnState.TRANSACTION_COMMITTED]: [
    TxnState.TRANSACTION_COMMITTED,
    TxnState.TRANSACTION_COMMITTED_EMPTY,
    TxnState.STARTING_TRANSACTION,
    TxnState.NO_TRANSACTION
  ],
  [TxnState.TRANSACTION_ABORTED]: [TxnState.STARTING_TRANSACTION, TxnState.NO_TRANSACTION],
  [TxnState.TRANSACTION_COMMITTED_EMPTY]: [
    TxnState.TRANSACTION_COMMITTED_EMPTY,
    TxnState.NO_TRANSACTION
  ]
};
 
const ACTIVE_STATES: Set<TxnState> = new Set([
  TxnState.STARTING_TRANSACTION,
  TxnState.TRANSACTION_IN_PROGRESS
]);
 
const COMMITTED_STATES: Set<TxnState> = new Set([
  TxnState.TRANSACTION_COMMITTED,
  TxnState.TRANSACTION_COMMITTED_EMPTY,
  TxnState.TRANSACTION_ABORTED
]);
 
/**
 * Configuration options for a transaction.
 * @public
 */
export interface TransactionOptions extends Omit<CommandOperationOptions, 'timeoutMS'> {
  // TODO(NODE-3344): These options use the proper class forms of these settings, it should accept the basic enum values too
  /** A default read concern for commands in this transaction */
  readConcern?: ReadConcernLike;
  /** A default writeConcern for commands in this transaction */
  writeConcern?: WriteConcern;
  /** A default read preference for commands in this transaction */
  readPreference?: ReadPreferenceLike;
  /** Specifies the maximum amount of time to allow a commit action on a transaction to run in milliseconds */
  maxCommitTimeMS?: number;
}
 
/**
 * @public
 * A class maintaining state related to a server transaction. Internal Only
 */
export class Transaction {
  /** @internal */
  state: TxnState;
  options: TransactionOptions;
  /** @internal */
  _pinnedServer?: Server;
  /** @internal */
  _recoveryToken?: Document;
 
  /** Create a transaction @internal */
  constructor(options?: TransactionOptions) {
    options = options ?? {};
    this.state = TxnState.NO_TRANSACTION;
    this.options = {};
 
    const writeConcern = WriteConcern.fromOptions(options);
    if (writeConcern) {
      if (writeConcern.w === 0) {
        throw new MongoTransactionError('Transactions do not support unacknowledged write concern');
      }
 
      this.options.writeConcern = writeConcern;
    }
 
    if (options.readConcern) {
      this.options.readConcern = ReadConcern.fromOptions(options);
    }
 
    if (options.readPreference) {
      this.options.readPreference = ReadPreference.fromOptions(options);
    }
 
    if (options.maxCommitTimeMS) {
      this.options.maxTimeMS = options.maxCommitTimeMS;
    }
 
    // TODO: This isn't technically necessary
    this._pinnedServer = undefined;
    this._recoveryToken = undefined;
  }
 
  /** @internal */
  get server(): Server | undefined {
    return this._pinnedServer;
  }
 
  get recoveryToken(): Document | undefined {
    return this._recoveryToken;
  }
 
  get isPinned(): boolean {
    return !!this.server;
  }
 
  /** @returns Whether the transaction has started */
  get isStarting(): boolean {
    return this.state === TxnState.STARTING_TRANSACTION;
  }
 
  /**
   * @returns Whether this session is presently in a transaction
   */
  get isActive(): boolean {
    return ACTIVE_STATES.has(this.state);
  }
 
  get isCommitted(): boolean {
    return COMMITTED_STATES.has(this.state);
  }
  /**
   * Transition the transaction in the state machine
   * @internal
   * @param nextState - The new state to transition to
   */
  transition(nextState: TxnState): void {
    const nextStates = stateMachine[this.state];
    Eif (nextStates && nextStates.includes(nextState)) {
      this.state = nextState;
      if (
        this.state === TxnState.NO_TRANSACTION ||
        this.state === TxnState.STARTING_TRANSACTION ||
        this.state === TxnState.TRANSACTION_ABORTED
      ) {
        this.unpinServer();
      }
      return;
    }
 
    throw new MongoRuntimeError(
      `Attempted illegal state transition from [${this.state}] to [${nextState}]`
    );
  }
 
  /** @internal */
  pinServer(server: Server): void {
    Eif (this.isActive) {
      this._pinnedServer = server;
    }
  }
 
  /** @internal */
  unpinServer(): void {
    this._pinnedServer = undefined;
  }
}
 
export function isTransactionCommand(command: Document): boolean {
  return !!(command.commitTransaction || command.abortTransaction);
}