Skip to content

fix(NODE-4845): allocate sessions lazily in cursors #4575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 61 additions & 40 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { ReadConcern, type ReadConcernLike } from '../read_concern';
import { ReadPreference, type ReadPreferenceLike } from '../read_preference';
import { type AsyncDisposable, configureResourceManagement } from '../resource_management';
import type { Server } from '../sdam/server';
import { ClientSession, maybeClearPinnedConnection } from '../sessions';
import { type ClientSession, maybeClearPinnedConnection } from '../sessions';
import { type CSOTTimeoutContext, type Timeout, TimeoutContext } from '../timeout';
import {
addAbortListener,
Expand Down Expand Up @@ -227,7 +227,7 @@ export abstract class AbstractCursor<
/** @internal */
private cursorId: Long | null;
/** @internal */
private cursorSession: ClientSession;
private cursorSession: ClientSession | null;
/** @internal */
private selectedServer?: Server;
/** @internal */
Expand Down Expand Up @@ -352,11 +352,7 @@ export abstract class AbstractCursor<
this.cursorOptions.maxAwaitTimeMS = options.maxAwaitTimeMS;
}

if (options.session instanceof ClientSession) {
this.cursorSession = options.session;
} else {
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });
}
this.cursorSession = options.session ?? null;

this.deserializationOptions = {
...this.cursorOptions,
Expand Down Expand Up @@ -413,7 +409,7 @@ export abstract class AbstractCursor<
}

/** @internal */
get session(): ClientSession {
get session(): ClientSession | null {
return this.cursorSession;
}

Expand Down Expand Up @@ -877,11 +873,12 @@ export abstract class AbstractCursor<
this.trackCursor();

// We only want to end this session if we created it, and it hasn't ended yet
if (this.cursorSession.explicit === false) {
if (this.cursorSession?.explicit === false) {
if (!this.cursorSession.hasEnded) {
this.cursorSession.endSession().then(undefined, squashError);
}
this.cursorSession = this.cursorClient.startSession({ owner: this, explicit: false });

this.cursorSession = null;
}
}

Expand All @@ -907,6 +904,13 @@ export abstract class AbstractCursor<
'Unexpected null selectedServer. A cursor creating command should have set this'
);
}

if (!this.cursorSession) {
throw new MongoRuntimeError(
'Unexpected null session. A cursor creating command should have set this'
);
}

const getMoreOptions = {
...this.cursorOptions,
session: this.cursorSession,
Expand Down Expand Up @@ -941,6 +945,7 @@ export abstract class AbstractCursor<
);
}
try {
this.cursorSession ??= this.cursorClient.startSession({ owner: this, explicit: false });
const state = await this._initialize(this.cursorSession);
// Set omitMaxTimeMS to the value needed for subsequent getMore calls
this.cursorOptions.omitMaxTimeMS = this.cursorOptions.timeoutMS != null;
Expand Down Expand Up @@ -1032,41 +1037,57 @@ export abstract class AbstractCursor<
return this.timeoutContext?.refreshed();
}
};
try {
if (
!this.isKilled &&
this.cursorId &&
!this.cursorId.isZero() &&
this.cursorNamespace &&
this.selectedServer &&
!this.cursorSession.hasEnded
) {
this.isKilled = true;
const cursorId = this.cursorId;
this.cursorId = Long.ZERO;

await executeOperation(
this.cursorClient,
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session: this.cursorSession
}),
timeoutContextForKillCursors()
);

const withEmitClose = async (fn: () => Promise<void>) => {
try {
await fn();
} finally {
this.emitClose();
}
} catch (error) {
squashError(error);
} finally {
};

const close = async () => {
// if no session has been defined on the cursor, the cursor was never initialized
// or the cursor was re-wound and never re-iterated. In either case, we
// 1. do not need to end the session (there is no session after all)
// 2. do not need to kill the cursor server-side
const session = this.cursorSession;
if (!session) return;

try {
if (this.cursorSession?.owner === this) {
await this.cursorSession.endSession({ error });
}
if (!this.cursorSession?.inTransaction()) {
maybeClearPinnedConnection(this.cursorSession, { error });
if (
!this.isKilled &&
this.cursorId &&
!this.cursorId.isZero() &&
this.cursorNamespace &&
this.selectedServer &&
!session.hasEnded
) {
this.isKilled = true;
const cursorId = this.cursorId;
this.cursorId = Long.ZERO;

await executeOperation(
this.cursorClient,
new KillCursorsOperation(cursorId, this.cursorNamespace, this.selectedServer, {
session
}),
timeoutContextForKillCursors()
);
}
} catch (error) {
squashError(error);
} finally {
this.emitClose();
if (session.owner === this) {
await session.endSession({ error });
}
if (!session?.inTransaction()) {
maybeClearPinnedConnection(session, { error });
}
}
}
};

await withEmitClose(close);
}

/** @internal */
Expand Down
8 changes: 7 additions & 1 deletion src/cursor/run_command_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { BSONSerializeOptions, Document } from '../bson';
import { CursorResponse } from '../cmap/wire_protocol/responses';
import type { Db } from '../db';
import { MongoAPIError } from '../error';
import { MongoAPIError, MongoRuntimeError } from '../error';
import { executeOperation } from '../operations/execute_operation';
import { GetMoreOperation } from '../operations/get_more';
import { RunCommandOperation } from '../operations/run_command';
Expand Down Expand Up @@ -161,6 +161,12 @@ export class RunCommandCursor extends AbstractCursor {

/** @internal */
override async getMore(_batchSize: number): Promise<CursorResponse> {
if (!this.session) {
throw new MongoRuntimeError(
'Unexpected null session. A cursor creating command should have set this'
);
}

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const getMoreOperation = new GetMoreOperation(this.namespace, this.id!, this.server!, {
...this.cursorOptions,
Expand Down
70 changes: 34 additions & 36 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1676,22 +1676,24 @@ describe('Cursor', function () {
const collection = await client.db().collection('test');

const cursor = collection.find({});
await cursor.next();

const clonedCursor = cursor.clone();

expect(cursor).to.have.property('session');
expect(clonedCursor).to.have.property('session');
expect(cursor.session).to.not.equal(clonedCursor.session);
expect(cursor).to.have.property('session').not.to.be.null;
expect(clonedCursor).to.have.property('session').to.be.null;
});

it('removes session when cloning an aggregation cursor', async function () {
const collection = await client.db().collection('test');

const cursor = collection.aggregate([{ $match: {} }]);
await cursor.next();

const clonedCursor = cursor.clone();

expect(cursor).to.have.property('session');
expect(clonedCursor).to.have.property('session');
expect(cursor.session).to.not.equal(clonedCursor.session);
expect(cursor).to.have.property('session').not.to.be.null;
expect(clonedCursor).to.have.property('session').to.be.null;
});

it('destroying a stream stops it', async function () {
Expand Down Expand Up @@ -3598,42 +3600,38 @@ describe('Cursor', function () {
});

context('when executing on a find cursor', function () {
it('removes the existing session from the cloned cursor', function () {
it('removes the existing session from the cloned cursor', async function () {
const docs = [{ name: 'test1' }, { name: 'test2' }];
return collection.insertMany(docs).then(() => {
const cursor = collection.find({}, { batchSize: 1 });
return cursor
.next()
.then(doc => {
expect(doc).to.exist;
const clonedCursor = cursor.clone();
expect(clonedCursor.cursorOptions.session).to.not.exist;
expect(clonedCursor.session).to.have.property('_serverSession', null); // session is brand new and has not been used
})
.finally(() => {
return cursor.close();
});
});
await collection.insertMany(docs);

const cursor = collection.find({}, { batchSize: 1 });
try {
const doc = await cursor.next();
expect(doc).to.exist;

const clonedCursor = cursor.clone();
expect(clonedCursor.session).to.be.null;
} finally {
await cursor.close();
}
});
});

context('when executing on an aggregation cursor', function () {
it('removes the existing session from the cloned cursor', function () {
it('removes the existing session from the cloned cursor', async function () {
const docs = [{ name: 'test1' }, { name: 'test2' }];
return collection.insertMany(docs).then(() => {
const cursor = collection.aggregate([{ $match: {} }], { batchSize: 1 });
return cursor
.next()
.then(doc => {
expect(doc).to.exist;
const clonedCursor = cursor.clone();
expect(clonedCursor.cursorOptions.session).to.not.exist;
expect(clonedCursor.session).to.have.property('_serverSession', null); // session is brand new and has not been used
})
.finally(() => {
return cursor.close();
});
});
await collection.insertMany(docs);

const cursor = collection.aggregate([{ $match: {} }], { batchSize: 1 });
try {
const doc = await cursor.next();
expect(doc).to.exist;

const clonedCursor = cursor.clone();
expect(clonedCursor.session).to.be.null;
} finally {
await cursor.close();
}
});
});
});
Expand Down
37 changes: 37 additions & 0 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,43 @@ import { clearFailPoint, configureFailPoint } from '../../tools/utils';
import { filterForCommands } from '../shared';

describe('class AbstractCursor', function () {
describe('lazy implicit session acquisition', function () {
let client: MongoClient;
let collection: Collection;
const docs = [{ count: 0 }, { count: 10 }];

beforeEach(async function () {
client = this.configuration.newClient();

collection = client.db('abstract_cursor_integration').collection('test');

await collection.insertMany(docs);
});

afterEach(async function () {
await collection.deleteMany({});
await client.close();
});

it('does not allocate a session when the cursor is constructed', function () {
const cursor = collection.find();
expect(cursor.session).to.be.null;
});

it('allocates a session once the cursor is initialized', async function () {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
expect(cursor.session).not.to.be.null;
});

it('sets the session to `null` when rewound', async function () {
const cursor = collection.find({}, { batchSize: 1 });
await cursor.next();
cursor.rewind();
expect(cursor.session).to.be.null;
});
});

describe('regression tests NODE-5372', function () {
let client: MongoClient;
let collection: Collection;
Expand Down
6 changes: 3 additions & 3 deletions test/unit/cursor/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
AbstractCursor,
type AbstractCursorOptions,
type Callback,
ClientSession,
type ClientSession,
type ExecutionResult,
MongoClient,
ns,
Expand Down Expand Up @@ -32,9 +32,9 @@ describe('class AbstractCursor', () => {
});

context('#constructor', () => {
it('creates a session if none passed in', () => {
it('does not create a session if none passed in', () => {
const cursor = new ConcreteCursor(client);
expect(cursor).to.have.property('session').that.is.instanceOf(ClientSession);
expect(cursor).to.have.property('session').that.is.null;
});

it('uses the passed in session', async () => {
Expand Down
3 changes: 1 addition & 2 deletions test/unit/cursor/aggregation_cursor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ describe('class AggregationCursor', () => {
});

context('clone()', () => {
it('returns a new cursor with a different session', () => {
it('returns a new cursor', () => {
const cloned = cursor.clone();
expect(cursor).to.not.equal(cloned);
expect(cursor.session).to.not.equal(cloned.session);
});
});

Expand Down