Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Decoupling concurrency guarantees/behavior on existing invocation id in invocation request #2393

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions crates/types/src/invocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,26 +273,102 @@
}
}

/// Concurrency guarantee of the invocation request.
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
enum ConcurrencyGuarantee {
/// Enqueue the invocation in the inbox when the target is busy
EnqueueWhenBusy {
/// fka ServiceId.name
inbox_target: ByteString,
/// fka ServiceId.key
inbox_key: ByteString,
},
/// No queueing, just execute the request
MaxParallelism,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the previous one becomes Sequential, then this can be Concurrent

/// Use the default from the target semantics
#[default]
UseTargetDefault,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear what the default means here. It might be worth adding a couple of words to explain that it'll use the default behavior based on the target type (virtual object, service, etc.)

Also, see my note below.

}

impl ConcurrencyGuarantee {
fn infer_target_default(invocation_target: &InvocationTarget) -> ConcurrencyGuarantee {
match invocation_target {
InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Exclusive, name, key, .. } => {
ConcurrencyGuarantee::EnqueueWhenBusy {
inbox_target: name.clone(),
inbox_key: key.clone(),
}
}
InvocationTarget::Service { .. } |
InvocationTarget::VirtualObject { handler_ty: VirtualObjectHandlerType::Shared, .. } |
/* For workflow, there is no enqueueing as we have the behavior on existing invocation id that guarantees correctness */
InvocationTarget::Workflow { .. } => ConcurrencyGuarantee::MaxParallelism
}
}
}

/// Behavior when sending an invocation request and the request already exists.
#[derive(Debug, Default, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
enum BehaviorOnExistingInvocationId {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: IfExists might be a reasonable option if you like it. I can already imagine the call-site being invocation::IfExists::Attach

/// Attach to the existing invocation
Attach,
/// Reply with "conflict" error response
ReplyConflict,
/// Just drop the request
Drop,
/// Use the default from the target/idempotency key semantics
#[default]
UseTargetDefault,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I like having the default variant. It adds cognitive overhead. It sounds like "it depends". An alternative strategy is to always set it on the caller-side, I suppose.

Copy link
Contributor Author

@slinkydeveloper slinkydeveloper Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is "it depends" really :D

The reason i was thinking about introducing UseTargetDefault is for backward compatibility, and eventually i guess we remove it. The idea is that next release will act on this new two types and their variants, but we never write it, while the release afterward will also write those fields.

Also i tried to make clear the "it depends" adding the method below "infer target default"

}

impl BehaviorOnExistingInvocationId {
fn infer_target_default(
invocation_target_type: InvocationTargetType,
has_idempotency_key: bool,
) -> BehaviorOnExistingInvocationId {
match (invocation_target_type, has_idempotency_key) {
(InvocationTargetType::Workflow(WorkflowHandlerType::Workflow), _) => {
BehaviorOnExistingInvocationId::ReplyConflict
}
(_, true) => BehaviorOnExistingInvocationId::Attach,
_ => BehaviorOnExistingInvocationId::Drop,
}
}
}

/// Invocation request flow is as follows:
///
/// 1. Invocation is proposed in the PP log.
/// 2. PP will first check if another invocation with the same id exists. If it exists, it applies the [`BehaviorOnExistingInvocationId`], otherwise moves to point 3.
/// 3. If the invocation id doesn't exist, wait for the `execution_time` if present, otherwise continue immediately.
/// 4. Apply the given [`ConcurrencyGuarantee`].
/// 5. Finally execute it sending the request to the service endpoint.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct InvocationRequestHeader {
pub id: InvocationId,
pub target: InvocationTarget,
pub headers: Vec<Header>,
pub span_context: ServiceInvocationSpanContext,

/// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`].
pub idempotency_key: Option<ByteString>,

/// Behavior to apply on an existing invocation id.
#[serde(default)]
pub behavior_on_existing_idempotency_key: BehaviorOnExistingInvocationId,
/// Time when the request should be executed. If none, it's executed immediately.
pub execution_time: Option<MillisSinceEpoch>,
/// Concurrency behavior to apply.
#[serde(default)]
pub concurrency_guarantee: ConcurrencyGuarantee,

/// Key to use for idempotent request. If none, this request is not idempotent, or it's a workflow call. See [`InvocationRequestHeader::is_idempotent`].
/// This value is propagated only for observability purposes, as the invocation id is already deterministic given the invocation id.
pub idempotency_key: Option<ByteString>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does that need serde(default)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this field is not new, i just moved it to the bottom here.

/// Retention duration of the completed status. If none, the completed status is not retained.
pub completion_retention_duration: Option<Duration>,
}

impl InvocationRequestHeader {
pub fn initialize(id: InvocationId, target: InvocationTarget) -> Self {
Self {

Check failure on line 371 in crates/types/src/invocation.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

missing fields `behavior_on_existing_idempotency_key` and `concurrency_guarantee` in initializer of `invocation::InvocationRequestHeader`
id,
target,
headers: vec![],
Expand Down
Loading