Skip to content

Commit 7671a32

Browse files
committed
Added reentrancy test
1 parent e2130d3 commit 7671a32

1 file changed

Lines changed: 155 additions & 0 deletions

File tree

src/packages/dumbo/src/storage/sqlite/sqlite3/transactions/transactions.int.spec.ts

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -653,6 +653,161 @@ describe('SQLite3 Transactions', () => {
653653
}
654654
});
655655

656+
// Reentrancy: a writer-bound call from inside an already-active writer
657+
// task (same async stack) must bypass the queue instead of deadlocking.
658+
// These tests exercise the patterns emmett relies on (workflow processor
659+
// calling messageStore.appendToStream inside its tx handler).
660+
it(
661+
`allows pool.withConnection reentry from inside pool.withTransaction with ${testName} database`,
662+
{ timeout: 5000 },
663+
async () => {
664+
const pool = sqlite3Pool({
665+
fileName,
666+
transactionOptions: { allowNestedTransactions: true },
667+
});
668+
669+
try {
670+
await pool.execute.command(
671+
SQL`CREATE TABLE reentry_with_conn (id INTEGER PRIMARY KEY, value TEXT)`,
672+
);
673+
674+
await pool.withTransaction(async (tx) => {
675+
await tx.execute.command(
676+
SQL`INSERT INTO reentry_with_conn (id, value) VALUES (1, 'outer')`,
677+
);
678+
679+
await pool.withConnection(async (conn) => {
680+
await conn.execute.command(
681+
SQL`INSERT INTO reentry_with_conn (id, value) VALUES (2, 'inner')`,
682+
);
683+
});
684+
});
685+
686+
const rows = await pool.execute.query<{ count: number }>(
687+
SQL`SELECT COUNT(*) as count FROM reentry_with_conn`,
688+
);
689+
assert.strictEqual(rows.rows[0]?.count, 2);
690+
} finally {
691+
await pool.close();
692+
}
693+
},
694+
);
695+
696+
it(
697+
`allows pool.execute.command reentry from inside pool.withTransaction with ${testName} database`,
698+
{ timeout: 5000 },
699+
async () => {
700+
const pool = sqlite3Pool({
701+
fileName,
702+
transactionOptions: { allowNestedTransactions: true },
703+
});
704+
705+
try {
706+
await pool.execute.command(
707+
SQL`CREATE TABLE reentry_with_cmd (id INTEGER PRIMARY KEY, value TEXT)`,
708+
);
709+
710+
await pool.withTransaction(async (tx) => {
711+
await tx.execute.command(
712+
SQL`INSERT INTO reentry_with_cmd (id, value) VALUES (1, 'outer')`,
713+
);
714+
715+
await pool.execute.command(
716+
SQL`INSERT INTO reentry_with_cmd (id, value) VALUES (2, 'inner-plain')`,
717+
);
718+
});
719+
720+
const rows = await pool.execute.query<{ count: number }>(
721+
SQL`SELECT COUNT(*) as count FROM reentry_with_cmd`,
722+
);
723+
assert.strictEqual(rows.rows[0]?.count, 2);
724+
} finally {
725+
await pool.close();
726+
}
727+
},
728+
);
729+
730+
it(
731+
`allows nested pool.withTransaction reentry with ${testName} database`,
732+
{ timeout: 5000 },
733+
async () => {
734+
const pool = sqlite3Pool({
735+
fileName,
736+
transactionOptions: { allowNestedTransactions: true },
737+
});
738+
739+
try {
740+
await pool.execute.command(
741+
SQL`CREATE TABLE reentry_nested_tx (id INTEGER PRIMARY KEY, value TEXT)`,
742+
);
743+
744+
await pool.withTransaction(async (outerTx) => {
745+
await outerTx.execute.command(
746+
SQL`INSERT INTO reentry_nested_tx (id, value) VALUES (1, 'outer')`,
747+
);
748+
749+
await pool.withTransaction(async (innerTx) => {
750+
await innerTx.execute.command(
751+
SQL`INSERT INTO reentry_nested_tx (id, value) VALUES (2, 'inner')`,
752+
);
753+
});
754+
});
755+
756+
const rows = await pool.execute.query<{ count: number }>(
757+
SQL`SELECT COUNT(*) as count FROM reentry_nested_tx`,
758+
);
759+
assert.strictEqual(rows.rows[0]?.count, 2);
760+
} finally {
761+
await pool.close();
762+
}
763+
},
764+
);
765+
766+
// Mirrors the emmett workflow scenario: outer pool.withConnection holds
767+
// the writer, the workflow opens connection.withTransaction on it, then
768+
// an inner messageStore.appendToStream re-enters pool.withConnection,
769+
// which itself opens connection.withTransaction. That's the exact stack
770+
// that deadlocked the LLMAgentWorkflow.
771+
it(
772+
`survives nested pool.withConnection inside connection.withTransaction with ${testName} database`,
773+
{ timeout: 5000 },
774+
async () => {
775+
const pool = sqlite3Pool({
776+
fileName,
777+
transactionOptions: { allowNestedTransactions: true },
778+
});
779+
780+
try {
781+
await pool.execute.command(
782+
SQL`CREATE TABLE reentry_emmett (id INTEGER PRIMARY KEY, value TEXT)`,
783+
);
784+
785+
await pool.withConnection(async (outerConn) => {
786+
await outerConn.withTransaction(async (outerTx) => {
787+
await outerTx.execute.command(
788+
SQL`INSERT INTO reentry_emmett (id, value) VALUES (1, 'outer-tx')`,
789+
);
790+
791+
await pool.withConnection(async (innerConn) => {
792+
await innerConn.withTransaction(async (innerTx) => {
793+
await innerTx.execute.command(
794+
SQL`INSERT INTO reentry_emmett (id, value) VALUES (2, 'inner-tx')`,
795+
);
796+
});
797+
});
798+
});
799+
});
800+
801+
const rows = await pool.execute.query<{ count: number }>(
802+
SQL`SELECT COUNT(*) as count FROM reentry_emmett`,
803+
);
804+
assert.strictEqual(rows.rows[0]?.count, 2);
805+
} finally {
806+
await pool.close();
807+
}
808+
},
809+
);
810+
656811
it(`serializes concurrent withConnection writes with ${testName} database`, async () => {
657812
const pool = sqlite3Pool({
658813
fileName,

0 commit comments

Comments
 (0)