All files / src transactions.ts

95.34% Statements 41/43
92% Branches 23/25
90.9% Functions 10/11
95.34% Lines 41/43

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188  564x   564x 564x   564x     564x                       564x                                                   564x         564x                                                   564x                     12018483x 12018483x 12018483x   12018483x 12018483x 7866x 328x     7538x     12018155x 2924x     12018155x 46489x     12018155x 1190x       12018155x 12018155x         17557362x       76205x       12590623x                       94212336x       1056299x               142878x 142878x 142878x 142878x         65702x   142878x                   25800x 25800x           96227x       564x 12475302x    
import type { Document } from './bson';
import { MongoRuntimeError, MongoTransactionError } from './error';
import type { CommandOperationOptions } from './operations/command';
import { ReadConcern, type ReadConcernLike } from './read_concern';
import { ReadPreference, type ReadPreferenceLike } from './read_preference';
import type { Server } from './sdam/server';
import { WriteConcern } from './write_concern';
 
/** @internal */
export const TxnState = Object.freeze({
  NO_TRANSACTION: 'NO_TRANSACTION',
  STARTING_TRANSACTION: 'STARTING_TRANSACTION',
  TRANSACTION_IN_PROGRESS: 'TRANSACTION_IN_PROGRESS',
  TRANSACTION_COMMITTED: 'TRANSACTION_COMMITTED',
  TRANSACTION_COMMITTED_EMPTY: 'TRANSACTION_COMMITTED_EMPTY',
  TRANSACTION_ABORTED: 'TRANSACTION_ABORTED'
} as const);
 
/** @internal */
export type TxnState = (typeof TxnState)[keyof typeof TxnState];
 
const stateMachine: { [state in TxnState]: TxnState[] } = {
  [TxnState.NO_TRANSACTION]: [TxnState.NO_TRANSACTION, TxnState.STARTING_TRANSACTION],
  [TxnState.STARTING_TRANSACTION]: [
    TxnState.TRANSACTION_IN_PROGRESS,
    TxnState.TRANSACTION_COMMITTED,
    TxnState.TRANSACTION_COMMITTED_EMPTY,
    TxnState.TRANSACTION_ABORTED
  ],
  [TxnState.TRANSACTION_IN_PROGRESS]: [
    TxnState.TRANSACTION_IN_PROGRESS,
    TxnState.TRANSACTION_COMMITTED,
    TxnState.TRANSACTION_ABORTED
  ],
  [TxnState.TRANSACTION_COMMITTED]: [
    TxnState.TRANSACTION_COMMITTED,
    TxnState.TRANSACTION_COMMITTED_EMPTY,
    TxnState.STARTING_TRANSACTION,
    TxnState.NO_TRANSACTION
  ],
  [TxnState.TRANSACTION_ABORTED]: [TxnState.STARTING_TRANSACTION, TxnState.NO_TRANSACTION],
  [TxnState.TRANSACTION_COMMITTED_EMPTY]: [
    TxnState.TRANSACTION_COMMITTED_EMPTY,
    TxnState.NO_TRANSACTION
  ]
};
 
const ACTIVE_STATES: Set<TxnState> = new Set([
  TxnState.STARTING_TRANSACTION,
  TxnState.TRANSACTION_IN_PROGRESS
]);
 
const COMMITTED_STATES: Set<TxnState> = new Set([
  TxnState.TRANSACTION_COMMITTED,
  TxnState.TRANSACTION_COMMITTED_EMPTY,
  TxnState.TRANSACTION_ABORTED
]);
 
/**
 * Configuration options for a transaction.
 * @public
 */
export interface TransactionOptions extends Omit<CommandOperationOptions, 'timeoutMS'> {
  // TODO(NODE-3344): These options use the proper class forms of these settings, it should accept the basic enum values too
  /** A default read concern for commands in this transaction */
  readConcern?: ReadConcernLike;
  /** A default writeConcern for commands in this transaction */
  writeConcern?: WriteConcern;
  /** A default read preference for commands in this transaction */
  readPreference?: ReadPreferenceLike;
  /** Specifies the maximum amount of time to allow a commit action on a transaction to run in milliseconds */
  maxCommitTimeMS?: number;
}
 
/**
 * @public
 * A class maintaining state related to a server transaction. Internal Only
 */
export class Transaction {
  /** @internal */
  state: TxnState;
  options: TransactionOptions;
  /** @internal */
  _pinnedServer?: Server;
  /** @internal */
  _recoveryToken?: Document;
 
  /** Create a transaction @internal */
  constructor(options?: TransactionOptions) {
    options = options ?? {};
    this.state = TxnState.NO_TRANSACTION;
    this.options = {};
 
    const writeConcern = WriteConcern.fromOptions(options);
    if (writeConcern) {
      if (writeConcern.w === 0) {
        throw new MongoTransactionError('Transactions do not support unacknowledged write concern');
      }
 
      this.options.writeConcern = writeConcern;
    }
 
    if (options.readConcern) {
      this.options.readConcern = ReadConcern.fromOptions(options);
    }
 
    if (options.readPreference) {
      this.options.readPreference = ReadPreference.fromOptions(options);
    }
 
    if (options.maxCommitTimeMS) {
      this.options.maxTimeMS = options.maxCommitTimeMS;
    }
 
    // TODO: This isn't technically necessary
    this._pinnedServer = undefined;
    this._recoveryToken = undefined;
  }
 
  /** @internal */
  get server(): Server | undefined {
    return this._pinnedServer;
  }
 
  get recoveryToken(): Document | undefined {
    return this._recoveryToken;
  }
 
  get isPinned(): boolean {
    return !!this.server;
  }
 
  /** @returns Whether the transaction has started */
  get isStarting(): boolean {
    return this.state === TxnState.STARTING_TRANSACTION;
  }
 
  /**
   * @returns Whether this session is presently in a transaction
   */
  get isActive(): boolean {
    return ACTIVE_STATES.has(this.state);
  }
 
  get isCommitted(): boolean {
    return COMMITTED_STATES.has(this.state);
  }
  /**
   * Transition the transaction in the state machine
   * @internal
   * @param nextState - The new state to transition to
   */
  transition(nextState: TxnState): void {
    const nextStates = stateMachine[this.state];
    Eif (nextStates && nextStates.includes(nextState)) {
      this.state = nextState;
      if (
        this.state === TxnState.NO_TRANSACTION ||
        this.state === TxnState.STARTING_TRANSACTION ||
        this.state === TxnState.TRANSACTION_ABORTED
      ) {
        this.unpinServer();
      }
      return;
    }
 
    throw new MongoRuntimeError(
      `Attempted illegal state transition from [${this.state}] to [${nextState}]`
    );
  }
 
  /** @internal */
  pinServer(server: Server): void {
    Eif (this.isActive) {
      this._pinnedServer = server;
    }
  }
 
  /** @internal */
  unpinServer(): void {
    this._pinnedServer = undefined;
  }
}
 
export function isTransactionCommand(command: Document): boolean {
  return !!(command.commitTransaction || command.abortTransaction);
}