Skip to content

Commit 37b23f1

Browse files
authored
Improve SKIP LOCKED implementation (#7811)
* Cleanup * SKIP LOCKED re-implemented * Fixed handling of returning clause by EraseNode, perhaps more complex solution required. Correct (simplify) "skip locked" handling in VIO_chase. * Follow @dyemanov suggestion
1 parent c5d7c72 commit 37b23f1

32 files changed

+239
-144
lines changed

src/dsql/StmtNodes.cpp

Lines changed: 90 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,40 @@ namespace
152152
{}
153153
};
154154

155+
// Combined conditional savepoint and its change marker.
156+
class CondSavepointAndMarker
157+
{
158+
public:
159+
CondSavepointAndMarker(thread_db* tdbb, jrd_tra* trans, bool cond) :
160+
m_savepoint(tdbb, trans, cond),
161+
m_marker(cond ? trans->tra_save_point : nullptr)
162+
{}
163+
164+
~CondSavepointAndMarker()
165+
{
166+
rollback();
167+
}
168+
169+
void release()
170+
{
171+
m_marker.done();
172+
m_savepoint.release();
173+
}
174+
175+
void rollback()
176+
{
177+
m_marker.done();
178+
m_savepoint.rollback();
179+
}
180+
181+
private:
182+
// Prohibit unwanted creation/copying
183+
CondSavepointAndMarker(const CondSavepointAndMarker&) = delete;
184+
CondSavepointAndMarker& operator=(const CondSavepointAndMarker&) = delete;
185+
186+
AutoSavePoint m_savepoint;
187+
Savepoint::ChangeMarker m_marker;
188+
};
155189
} // namespace
156190

157191

@@ -2314,7 +2348,7 @@ StmtNode* EraseNode::dsqlPass(DsqlCompilerScratch* dsqlScratch)
23142348
PASS1_limit(dsqlScratch, dsqlRows->length, dsqlRows->skip, rse);
23152349

23162350
if (dsqlSkipLocked)
2317-
rse->flags |= RseNode::FLAG_WRITELOCK | RseNode::FLAG_SKIP_LOCKED;
2351+
rse->flags |= RseNode::FLAG_SKIP_LOCKED;
23182352
}
23192353

23202354
if (dsqlReturning && dsqlScratch->isPsql())
@@ -2348,6 +2382,7 @@ string EraseNode::internalPrint(NodePrinter& printer) const
23482382
NODE_PRINT(printer, dsqlReturning);
23492383
NODE_PRINT(printer, dsqlRse);
23502384
NODE_PRINT(printer, dsqlContext);
2385+
NODE_PRINT(printer, dsqlSkipLocked);
23512386
NODE_PRINT(printer, statement);
23522387
NODE_PRINT(printer, subStatement);
23532388
NODE_PRINT(printer, stream);
@@ -2356,6 +2391,8 @@ string EraseNode::internalPrint(NodePrinter& printer) const
23562391
return "EraseNode";
23572392
}
23582393

2394+
// The EraseNode::erase() depends on generated nodes layout in case when
2395+
// RETURNING specified.
23592396
void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)
23602397
{
23612398
std::optional<USHORT> tableNumber;
@@ -2388,7 +2425,6 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)
23882425
if (dsqlReturning)
23892426
{
23902427
dsqlScratch->appendUChar(blr_begin);
2391-
dsqlGenReturning(dsqlScratch, dsqlReturning, tableNumber);
23922428
}
23932429

23942430
dsqlScratch->appendUChar(blr_erase);
@@ -2399,6 +2435,8 @@ void EraseNode::genBlr(DsqlCompilerScratch* dsqlScratch)
23992435

24002436
if (dsqlReturning)
24012437
{
2438+
dsqlGenReturning(dsqlScratch, dsqlReturning, tableNumber);
2439+
24022440
dsqlScratch->appendUChar(blr_end);
24032441

24042442
if (!dsqlScratch->isPsql() && dsqlCursorName.isEmpty())
@@ -2665,7 +2703,7 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger
26652703

26662704
if (rpb->rpb_runtime_flags & RPB_refetch)
26672705
{
2668-
VIO_refetch_record(tdbb, rpb, transaction, RecordLock::NONE, false);
2706+
VIO_refetch_record(tdbb, rpb, transaction, false, false);
26692707
rpb->rpb_runtime_flags &= ~RPB_refetch;
26702708
}
26712709

@@ -2674,6 +2712,12 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger
26742712

26752713
SavepointChangeMarker scMarker(transaction);
26762714

2715+
// Prepare to undo changes by PRE-triggers if record is locked by another
2716+
// transaction and delete should be skipped.
2717+
const bool skipLocked = rpb->rpb_stream_flags & RPB_s_skipLocked;
2718+
CondSavepointAndMarker spPreTriggers(tdbb, transaction,
2719+
skipLocked && !(transaction->tra_flags & TRA_system) && relation->rel_pre_erase);
2720+
26772721
// Handle pre-operation trigger.
26782722
preModifyEraseTriggers(tdbb, &relation->rel_pre_erase, whichTrig, rpb, NULL, TRIGGER_DELETE);
26792723

@@ -2683,23 +2727,36 @@ const StmtNode* EraseNode::erase(thread_db* tdbb, Request* request, WhichTrigger
26832727
VirtualTable::erase(tdbb, rpb);
26842728
else if (!relation->rel_view_rse)
26852729
{
2686-
// VIO_erase returns false if there is an update conflict in Read Consistency
2687-
// transaction. Before returning false it disables statement-level snapshot
2688-
// (via setting req_update_conflict flag) so re-fetch should see new data.
2730+
// VIO_erase returns false if:
2731+
// a) there is an update conflict in Read Consistency transaction.
2732+
// Before returning false it disables statement-level snapshot (via
2733+
// setting req_update_conflict flag) so re-fetch should see new data.
2734+
// b) record is locked by another transaction and should be skipped.
26892735

26902736
if (!VIO_erase(tdbb, rpb, transaction))
26912737
{
2738+
// Record was not deleted, flow control should be passed to the
2739+
// parent ForNode. Note, If RETURNING clause was specified, then
2740+
// parent node is CompoundStmtNode, not ForNode. If\when this
2741+
// will be changed, the code below should be changed accordingly.
2742+
2743+
if (skipLocked)
2744+
return forNode;
2745+
2746+
spPreTriggers.release();
2747+
26922748
forceWriteLock(tdbb, rpb, transaction);
26932749

26942750
if (!forNode)
26952751
restartRequest(request, transaction);
26962752

26972753
forNode->setWriteLockMode(request);
2698-
return parentStmt;
2754+
return forNode;
26992755
}
27002756

27012757
REPL_erase(tdbb, rpb, transaction);
27022758
}
2759+
spPreTriggers.release();
27032760

27042761
// Handle post operation trigger.
27052762
if (relation->rel_post_erase && whichTrig != PRE_TRIG)
@@ -7199,6 +7256,7 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up
71997256
}
72007257

72017258
node->dsqlCursorName = dsqlCursorName;
7259+
node->dsqlSkipLocked = dsqlSkipLocked;
72027260

72037261
if (dsqlCursorName.hasData() && dsqlScratch->isPsql())
72047262
{
@@ -7305,7 +7363,7 @@ StmtNode* ModifyNode::internalDsqlPass(DsqlCompilerScratch* dsqlScratch, bool up
73057363
PASS1_limit(dsqlScratch, dsqlRows->length, dsqlRows->skip, rse);
73067364

73077365
if (dsqlSkipLocked)
7308-
rse->flags |= RseNode::FLAG_WRITELOCK | RseNode::FLAG_SKIP_LOCKED;
7366+
rse->flags |= RseNode::FLAG_SKIP_LOCKED;
73097367
}
73107368

73117369
node->dsqlReturning = dsqlProcessReturning(dsqlScratch,
@@ -7366,6 +7424,7 @@ string ModifyNode::internalPrint(NodePrinter& printer) const
73667424
NODE_PRINT(printer, dsqlRseFlags);
73677425
NODE_PRINT(printer, dsqlRse);
73687426
NODE_PRINT(printer, dsqlContext);
7427+
NODE_PRINT(printer, dsqlSkipLocked);
73697428
NODE_PRINT(printer, statement);
73707429
NODE_PRINT(printer, statement2);
73717430
NODE_PRINT(printer, subMod);
@@ -7715,6 +7774,12 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, Request* request, WhichTrigg
77157774

77167775
SavepointChangeMarker scMarker(transaction);
77177776

7777+
// Prepare to undo changed by PRE-triggers if record is locked by another
7778+
// transaction and update should be skipped.
7779+
const bool skipLocked = orgRpb->rpb_stream_flags & RPB_s_skipLocked;
7780+
CondSavepointAndMarker spPreTriggers(tdbb, transaction,
7781+
skipLocked && !(transaction->tra_flags & TRA_system) && relation->rel_pre_modify);
7782+
77187783
preModifyEraseTriggers(tdbb, &relation->rel_pre_modify, whichTrig, orgRpb, newRpb,
77197784
TRIGGER_UPDATE);
77207785

@@ -7727,24 +7792,31 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, Request* request, WhichTrigg
77277792
VirtualTable::modify(tdbb, orgRpb, newRpb);
77287793
else if (!relation->rel_view_rse)
77297794
{
7730-
// VIO_modify returns false if there is an update conflict in Read Consistency
7731-
// transaction. Before returning false it disables statement-level snapshot
7732-
// (via setting req_update_conflict flag) so re-fetch should see new data.
7795+
// VIO_modify returns false if:
7796+
// a) there is an update conflict in Read Consistency transaction.
7797+
// Before returning false it disables statement-level snapshot (via
7798+
// setting req_update_conflict flag) so re-fetch should see new data.
7799+
// b) record is locked by another transaction and should be skipped.
77337800

77347801
if (!VIO_modify(tdbb, orgRpb, newRpb, transaction))
77357802
{
7736-
forceWriteLock(tdbb, orgRpb, transaction);
7803+
if (!skipLocked)
7804+
{
7805+
spPreTriggers.release();
7806+
forceWriteLock(tdbb, orgRpb, transaction);
77377807

7738-
if (!forNode)
7739-
restartRequest(request, transaction);
7808+
if (!forNode)
7809+
restartRequest(request, transaction);
77407810

7741-
forNode->setWriteLockMode(request);
7811+
forNode->setWriteLockMode(request);
7812+
}
77427813
return parentStmt;
77437814
}
77447815

77457816
IDX_modify(tdbb, orgRpb, newRpb, transaction);
77467817
REPL_modify(tdbb, orgRpb, newRpb, transaction);
77477818
}
7819+
spPreTriggers.release();
77487820

77497821
newRpb->rpb_number = orgRpb->rpb_number;
77507822
newRpb->rpb_number.setValid(true);
@@ -7815,7 +7887,7 @@ const StmtNode* ModifyNode::modify(thread_db* tdbb, Request* request, WhichTrigg
78157887

78167888
if (orgRpb->rpb_runtime_flags & RPB_refetch)
78177889
{
7818-
VIO_refetch_record(tdbb, orgRpb, transaction, RecordLock::NONE, false);
7890+
VIO_refetch_record(tdbb, orgRpb, transaction, false, false);
78197891
orgRpb->rpb_runtime_flags &= ~RPB_refetch;
78207892
}
78217893

@@ -11281,13 +11353,13 @@ static void cleanupRpb(thread_db* tdbb, record_param* rpb)
1128111353
// Try to set write lock on record until success or record exists
1128211354
static void forceWriteLock(thread_db* tdbb, record_param* rpb, jrd_tra* transaction)
1128311355
{
11284-
while (VIO_refetch_record(tdbb, rpb, transaction, RecordLock::LOCK, true))
11356+
while (VIO_refetch_record(tdbb, rpb, transaction, true, true))
1128511357
{
1128611358
rpb->rpb_runtime_flags &= ~RPB_refetch;
1128711359

1128811360
// VIO_writelock returns false if record has been deleted or modified
1128911361
// by someone else.
11290-
if (VIO_writelock(tdbb, rpb, transaction, false) == WriteLockResult::LOCKED)
11362+
if (VIO_writelock(tdbb, rpb, transaction) == WriteLockResult::LOCKED)
1129111363
break;
1129211364
}
1129311365
}

src/jrd/Savepoint.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,9 +620,12 @@ Savepoint* Savepoint::release(Savepoint* prior)
620620

621621
// AutoSavePoint implementation
622622

623-
AutoSavePoint::AutoSavePoint(thread_db* tdbb, jrd_tra* trans)
623+
AutoSavePoint::AutoSavePoint(thread_db* tdbb, jrd_tra* trans, bool cond)
624624
: m_tdbb(tdbb), m_transaction(trans), m_number(0)
625625
{
626+
if (!cond)
627+
return;
628+
626629
const auto savepoint = trans->startSavepoint();
627630
m_number = savepoint->getNumber();
628631
}

src/jrd/Savepoint.h

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,15 @@ namespace Jrd
291291
++m_savepoint->m_count;
292292
}
293293

294+
void done()
295+
{
296+
if (m_savepoint)
297+
{
298+
--m_savepoint->m_count;
299+
m_savepoint = nullptr;
300+
}
301+
}
302+
294303
~ChangeMarker()
295304
{
296305
if (m_savepoint)
@@ -301,7 +310,7 @@ namespace Jrd
301310
ChangeMarker(const ChangeMarker&);
302311
ChangeMarker& operator=(const ChangeMarker&);
303312

304-
Savepoint* const m_savepoint;
313+
Savepoint* m_savepoint;
305314
};
306315

307316
private:
@@ -330,7 +339,7 @@ namespace Jrd
330339
class AutoSavePoint
331340
{
332341
public:
333-
AutoSavePoint(thread_db* tdbb, jrd_tra* trans);
342+
AutoSavePoint(thread_db* tdbb, jrd_tra* trans, bool cond = true);
334343
~AutoSavePoint();
335344

336345
void release();

src/jrd/Statement.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ Statement::Statement(thread_db* tdbb, MemoryPool* p, CompilerScratch* csb)
191191
if (tail->csb_flags & csb_unstable)
192192
rpb->rpb_stream_flags |= RPB_s_unstable;
193193

194+
if (tail->csb_flags & csb_skip_locked)
195+
rpb->rpb_stream_flags |= RPB_s_skipLocked;
196+
194197
rpb->rpb_relation = tail->csb_relation;
195198

196199
delete tail->csb_fields;

src/jrd/exe.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ const int csb_erase = 256; // we are processing an erase
120120
const int csb_unmatched = 512; // stream has conjuncts unmatched by any index
121121
const int csb_update = 1024; // erase or modify for relation
122122
const int csb_unstable = 2048; // unstable explicit cursor
123+
const int csb_skip_locked = 4096; // skip locked record
123124

124125

125126
// Aggregate Sort Block (for DISTINCT aggregates)

src/jrd/optimizer/Optimizer.cpp

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,13 +1024,20 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
10241024
if (invariantBoolean)
10251025
rsb = FB_NEW_POOL(getPool()) PreFilteredStream(csb, rsb, invariantBoolean);
10261026

1027-
// Handle SKIP, WITH LOCK and FIRST.
1028-
// The SKIP must (if present) appear in the rsb list deeper than FIRST.
1029-
// WITH LOCK must appear between them to work correct with SKIP LOCKED.
1027+
// Handle first and/or skip. The skip MUST (if present)
1028+
// appear in the rsb list AFTER the first. Since the gen_first and gen_skip
1029+
// functions add their nodes at the beginning of the rsb list we MUST call
1030+
// gen_skip before gen_first.
10301031

10311032
if (rse->rse_skip)
10321033
rsb = FB_NEW_POOL(getPool()) SkipRowsStream(csb, rsb, rse->rse_skip);
10331034

1035+
if (rse->rse_first)
1036+
rsb = FB_NEW_POOL(getPool()) FirstRowsStream(csb, rsb, rse->rse_first);
1037+
1038+
if (rse->isSingular())
1039+
rsb = FB_NEW_POOL(getPool()) SingularStream(csb, rsb);
1040+
10341041
if (rse->hasWriteLock())
10351042
{
10361043
for (const auto compileStream : compileStreams)
@@ -1045,14 +1052,16 @@ RecordSource* Optimizer::compile(BoolExprNodeStack* parentStack)
10451052
SCL_update, obj_relations, tail->csb_relation->rel_name);
10461053
}
10471054

1048-
rsb = FB_NEW_POOL(getPool()) LockedStream(csb, rsb, rse->hasSkipLocked());
1055+
rsb = FB_NEW_POOL(getPool()) LockedStream(csb, rsb);
10491056
}
10501057

1051-
if (rse->rse_first)
1052-
rsb = FB_NEW_POOL(getPool()) FirstRowsStream(csb, rsb, rse->rse_first);
1053-
1054-
if (rse->isSingular())
1055-
rsb = FB_NEW_POOL(getPool()) SingularStream(csb, rsb);
1058+
if (rse->hasSkipLocked())
1059+
{
1060+
for (const auto compileStream : compileStreams)
1061+
{
1062+
csb->csb_rpt[compileStream].csb_flags |= csb_skip_locked;
1063+
}
1064+
}
10561065

10571066
if (rse->isScrollable())
10581067
rsb = FB_NEW_POOL(getPool()) BufferedStream(csb, rsb);

src/jrd/par.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1322,13 +1322,6 @@ RseNode* PAR_rse(thread_db* tdbb, CompilerScratch* csb, SSHORT rse_op)
13221322
break;
13231323

13241324
case blr_skip_locked:
1325-
if (!rse->hasWriteLock())
1326-
{
1327-
PAR_error(csb,
1328-
Arg::Gds(isc_random) <<
1329-
"blr_skip_locked cannot be used without previous blr_writelock",
1330-
false);
1331-
}
13321325
rse->flags |= RseNode::FLAG_SKIP_LOCKED;
13331326
break;
13341327

src/jrd/recsrc/AggregatedStream.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ bool BaseAggWinStream<ThisType, NextType>::refetchRecord(thread_db* tdbb) const
109109
}
110110

111111
template <typename ThisType, typename NextType>
112-
WriteLockResult BaseAggWinStream<ThisType, NextType>::lockRecord(thread_db* /*tdbb*/, bool /*skipLocked*/) const
112+
WriteLockResult BaseAggWinStream<ThisType, NextType>::lockRecord(thread_db* /*tdbb*/) const
113113
{
114114
status_exception::raise(Arg::Gds(isc_record_lock_not_supp));
115115
}

src/jrd/recsrc/BufferedStream.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,9 +309,9 @@ bool BufferedStream::refetchRecord(thread_db* tdbb) const
309309
return m_next->refetchRecord(tdbb);
310310
}
311311

312-
WriteLockResult BufferedStream::lockRecord(thread_db* tdbb, bool skipLocked) const
312+
WriteLockResult BufferedStream::lockRecord(thread_db* tdbb) const
313313
{
314-
return m_next->lockRecord(tdbb, skipLocked);
314+
return m_next->lockRecord(tdbb);
315315
}
316316

317317
void BufferedStream::getLegacyPlan(thread_db* tdbb, string& plan, unsigned level) const

0 commit comments

Comments
 (0)