Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement rework cancel #2095

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1479,19 +1479,13 @@ public int EndExecuteNonQueryAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
if (!_internalEndExecuteInitiated && _stateObj != null)
{
lock (_stateObj)
{
return EndExecuteNonQueryInternal(asyncResult);
}
}
else
{
return EndExecuteNonQueryInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}
return EndExecuteNonQueryInternal(asyncResult);
}
}

Expand Down Expand Up @@ -1900,19 +1894,14 @@ private XmlReader EndExecuteXmlReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
// If we have already initiate the End call internally, we have already done that, so no point doing it again.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteXmlReaderInternal(asyncResult);
}
}
else
if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteXmlReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot
// happen after we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteXmlReaderInternal(asyncResult);
}
}

Expand Down Expand Up @@ -2097,18 +2086,14 @@ internal SqlDataReader EndExecuteReaderAsync(IAsyncResult asyncResult)
else
{
ThrowIfReconnectionHasBeenCanceled();
// lock on _stateObj prevents races with close/cancel.
if (!_internalEndExecuteInitiated)
{
lock (_stateObj)
{
return EndExecuteReaderInternal(asyncResult);
}
}
else
if (!_internalEndExecuteInitiated && _stateObj != null)
{
return EndExecuteReaderInternal(asyncResult);
// call SetCancelStateClosed on the stateobject to ensure that cancel cannot happen after
// we have changed started the end processing
_stateObj.SetCancelStateClosed();
}

return EndExecuteReaderInternal(asyncResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal abstract partial class TdsParserStateObject
// Timeout variables
private readonly WeakReference _cancellationOwner = new WeakReference(null);

// Async
// Async
private StateSnapshot _cachedSnapshot;
private SnapshottedStateFlags _snapshottedState;

Expand Down Expand Up @@ -108,6 +108,10 @@ internal bool TryInitialize(TdsParserStateObject stateObj, int columnsCount)
/////////////////////
// General methods //
/////////////////////
internal bool SetCancelStateClosed()
{
return Interlocked.CompareExchange(ref _cancelState, CancelState.Closed, CancelState.Unset) == CancelState.Unset && _cancelState == CancelState.Closed;
}

// This method is only called by the command or datareader as a result of a user initiated
// cancel request.
Expand All @@ -116,61 +120,38 @@ internal void Cancel(object caller)
Debug.Assert(caller != null, "Null caller for Cancel!");
Debug.Assert(caller is SqlCommand || caller is SqlDataReader, "Calling API with invalid caller type: " + caller.GetType());

bool hasLock = false;
try
// only change state if it is Unset, so don't check the return value
Interlocked.CompareExchange(ref _cancelState, CancelState.Cancelled, CancelState.Unset);

if ((_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken)
&& (_cancellationOwner.Target == caller) && HasPendingData && !_attentionSent)
{
// Keep looping until we either grabbed the lock (and therefore sent attention) or the connection closes\breaks
while ((!hasLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
Monitor.TryEnter(this, WaitForCancellationLockPollTimeout, ref hasLock);
if (hasLock)
{ // Lock for the time being - since we need to synchronize the attention send.
// This lock is also protecting against concurrent close and async continuations

// Ensure that, once we have the lock, that we are still the owner
if ((!_cancelled) && (_cancellationOwner.Target == caller))
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: WaitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_cancelled = true;

if (HasPendingData && !_attentionSent)
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: WaitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
}
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
}
}
finally
{
if (hasLock)
{
Monitor.Exit(this);
}
}
}

private void ResetCancelAndProcessAttention()
Expand All @@ -181,7 +162,7 @@ private void ResetCancelAndProcessAttention()
lock (this)
{
// Reset cancel state.
_cancelled = false;
_cancelState = CancelState.Unset;
_cancellationOwner.Target = null;

if (_attentionSent)
Expand Down Expand Up @@ -2383,7 +2364,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)
byte packetNumber = _outputPacketNumber;

// Set Status byte based whether this is end of message or not
bool willCancel = (_cancelled) && (_parser._asyncWrite);
bool willCancel = (_cancelState != CancelState.Unset) && (_parser._asyncWrite);
if (willCancel)
{
status = TdsEnums.ST_EOM | TdsEnums.ST_IGNORE;
Expand Down Expand Up @@ -2432,7 +2413,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)

private void CancelWritePacket()
{
Debug.Assert(_cancelled, "Should not call CancelWritePacket if _cancelled is not set");
Debug.Assert(_cancelState != CancelState.Unset, "Should not call CancelWritePacket if _cancelled is not set");

_parser.Connection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we are holding the lock
try
Expand Down Expand Up @@ -3014,7 +2995,7 @@ internal void AssertStateIsClean()
Debug.Assert(_delayedWriteAsyncCallbackException == null, "StateObj has an unobserved exceptions from an async write");
// Attention\Cancellation\Timeouts
Debug.Assert(!HasReceivedAttention && !_attentionSent && !_attentionSending, $"StateObj is still dealing with attention: Sent: {_attentionSent}, Received: {HasReceivedAttention}, Sending: {_attentionSending}");
Debug.Assert(!_cancelled, "StateObj still has cancellation set");
Debug.Assert(_cancelState == CancelState.Unset, "StateObj still has cancellation set");
Debug.Assert(_timeoutState == TimeoutState.Stopped, "StateObj still has internal timeout set");
// Errors and Warnings
Debug.Assert(!_hasErrorOrWarning, "StateObj still has stored errors or warnings");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,62 +163,40 @@ internal bool TryInitialize(TdsParserStateObject stateObj, int columnsCount)
// cancel request.
internal void Cancel(int objectID)
{
bool hasLock = false;
try
// only change state if it is Unset, so don't check the return value
Interlocked.CompareExchange(ref _cancelState, CancelState.Cancelled, CancelState.Unset);

// don't allow objectID -1 since it is reserved for 'not associated with a command'
// yes, the 2^32-1 comand won't cancel - but it also won't cancel when we don't want it
if ((_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken)
&& (objectID == _allowObjectID) && (objectID != -1) && _pendingData && !_attentionSent)
{
// Keep looping until we either grabbed the lock (and therefore sent attention) or the connection closes\breaks
while ((!hasLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
Monitor.TryEnter(this, WaitForCancellationLockPollTimeout, ref hasLock);
if (hasLock)
{ // Lock for the time being - since we need to synchronize the attention send.
// This lock is also protecting against concurrent close and async continuations

// don't allow objectID -1 since it is reserved for 'not associated with a command'
// yes, the 2^32-1 comand won't cancel - but it also won't cancel when we don't want it
if ((!_cancelled) && (objectID == _allowObjectID) && (objectID != -1))
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: WaitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_cancelled = true;

if (HasPendingData && !_attentionSent)
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
bool hasParserLock = false;
// Keep looping until we have the parser lock (and so are allowed to write), or the connection closes\breaks
while ((!hasParserLock) && (_parser.State != TdsParserState.Closed) && (_parser.State != TdsParserState.Broken))
{
try
{
_parser.Connection._parserLock.Wait(canReleaseFromAnyThread: false, timeout: WaitForCancellationLockPollTimeout, lockTaken: ref hasParserLock);
if (hasParserLock)
{
_parser.Connection.ThreadHasParserLockForClose = true;
SendAttention();
}
}
finally
{
if (hasParserLock)
{
if (_parser.Connection.ThreadHasParserLockForClose)
{
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
}
_parser.Connection.ThreadHasParserLockForClose = false;
}
_parser.Connection._parserLock.Release();
}
}
}
}
finally
{
if (hasLock)
{
Monitor.Exit(this);
}
}
}

private void ResetCancelAndProcessAttention()
Expand All @@ -229,7 +207,7 @@ private void ResetCancelAndProcessAttention()
lock (this)
{
// Reset cancel state.
_cancelled = false;
_cancelState = CancelState.Unset;
_allowObjectID = -1;

if (_attentionSent)
Expand Down Expand Up @@ -2443,7 +2421,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)
byte packetNumber = _outputPacketNumber;

// Set Status byte based whether this is end of message or not
bool willCancel = (_cancelled) && (_parser._asyncWrite);
bool willCancel = (_cancelState != CancelState.Unset) && (_parser._asyncWrite);
if (willCancel)
{
status = TdsEnums.ST_EOM | TdsEnums.ST_IGNORE;
Expand Down Expand Up @@ -2492,7 +2470,7 @@ internal Task WritePacket(byte flushMode, bool canAccumulate = false)

private void CancelWritePacket()
{
Debug.Assert(_cancelled, "Should not call CancelWritePacket if _cancelled is not set");
Debug.Assert(_cancelState != CancelState.Unset, "Should not call CancelWritePacket if _cancelled is not set");

_parser.Connection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we are holding the lock
try
Expand Down Expand Up @@ -3164,7 +3142,7 @@ internal void AssertStateIsClean()
Debug.Assert(_delayedWriteAsyncCallbackException == null, "StateObj has an unobserved exceptions from an async write");
// Attention\Cancellation\Timeouts
Debug.Assert(!HasReceivedAttention && !_attentionSent && !_attentionSending, $"StateObj is still dealing with attention: Sent: {_attentionSent}, Received: {HasReceivedAttention}, Sending: {_attentionSending}");
Debug.Assert(!_cancelled, "StateObj still has cancellation set");
Debug.Assert(_cancelState == CancelState.Unset, "StateObj still has cancellation set");
Debug.Assert(_timeoutState == TimeoutState.Stopped, "StateObj still has internal timeout set");
// Errors and Warnings
Debug.Assert(!_hasErrorOrWarning, "StateObj still has stored errors or warnings");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ partial class TdsParserStateObject
private static int s_objectTypeCount; // EventSource counter
internal readonly int _objectID = Interlocked.Increment(ref s_objectTypeCount);

private static class CancelState
{
public const int Unset = 0;
public const int Closed = 1;
public const int Cancelled = 2;
}

private int _cancelState;

[Flags]
internal enum SnapshottedStateFlags : byte
{
Expand Down Expand Up @@ -158,7 +167,6 @@ public TimeoutState(int value)
// 2) post first packet write, but before session return - a call to cancel will send an
// attention to the server
// 3) post session close - no attention is allowed
private bool _cancelled;
private const int WaitForCancellationLockPollTimeout = 100;

// Cache the transaction for which this command was executed so upon completion we can
Expand Down Expand Up @@ -787,10 +795,10 @@ internal Task ExecuteFlush()
{
lock (this)
{
if (_cancelled && 1 == _outputPacketNumber)
if (_cancelState != CancelState.Unset && 1 == _outputPacketNumber)
{
ResetBuffer();
_cancelled = false;
_cancelState = CancelState.Unset;
throw SQL.OperationCancelled();
}
else
Expand Down
Loading