-
Notifications
You must be signed in to change notification settings - Fork 238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add control for deserialization error behavior #443
Conversation
1fa5420
to
0de45fb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great start.
A few general thoughts:
invalid_data_behavior
is pretty long — can we come up with a shorter name for this (at least in SQL?)- I think the key design decision here will be what part of the logic is handled in the connectors vs the deserializer. In the very near future, we might want to support things like "allow 10 failures per second" (of possibly processing or event time) or "10 failures per 100 messages" or something. Where would that logic be possible/easy to implement?
- For Avro schema registry, there are different kinds of errors, some of which may be transient. For example, if we fail to fetch the id for a record that may be because the schema registry is temporarily down or temporarily inconsistent. We may want to handle those differently than bad data errors.
- Longer term, we want to support redirecting invalid records elsewhere in the pipeline. For example, allowing users to define a sink as a dead-letter queue. That will likely need a more complex implementation of this feature that's more integrated with the dataflow, but we don't have to solve that now.
arroyo-formats/src/lib.rs
Outdated
@@ -290,7 +298,7 @@ impl<T: SchemaData> DataDeserializer<T> { | |||
pub async fn deserialize_slice<'a>( | |||
&mut self, | |||
msg: &'a [u8], | |||
) -> impl Iterator<Item = Result<T, UserError>> + 'a + Send { | |||
) -> impl Iterator<Item = Result<Option<T>, UserError>> + 'a + Send { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this now an option? It looks like the deserializer is responsible for handling the InvalidDataBehavior, and all of the users just ignore None values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, changed.
arroyo-formats/src/lib.rs
Outdated
Ok(t) => Ok(Some(t)), | ||
Err(e) => match self.invalid_data_behavior { | ||
None | Some(InvalidDataBehavior::Drop) => { | ||
warn!("Dropping invalid data: {}", e.details.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in this case we still want to report back to the user. We also likely need some amount of rate-limiting on logging to the console to avoid perf impact (imagine if you're reading 100k messages/s and sometimes changes such that 10% of them are invalid—you're going to want to know, but also not log every one of them.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(unrelatedly — the clone isn't necessary)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've moved this logic to source_collector.rs
and added rate limiting.
arroyo-worker/src/connectors/sse.rs
Outdated
key: None, | ||
value, | ||
}).await; | ||
if let Some(value) = value { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The HTTP sources (SSE and websocket) have their own implementation of this that we probably want to replace
60b62d1
to
c25839f
Compare
I've made a few changes:
About Avro data, I left the existing behavior for non-deserialization errors, which is that the job fails. Should we change that? |
6d2a9e6
to
8381f4f
Compare
8381f4f
to
1aa80a2
Compare
@@ -260,6 +260,29 @@ impl Format { | |||
} | |||
} | |||
|
|||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, ToSchema)] | |||
#[serde(rename_all = "snake_case")] | |||
pub enum BadData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will be extensible to policies that require configuration (like allowing 10 bad records/sec). This turns into a JSON schema like
"BadData": {
"type": "string",
"enum": [
"drop",
"fail"
]
},
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think if/when we add policies like that we'll want something like this:
struct BadData {
limit: i32,
per_seconds: i32,
}
or we could even represent it as a percentage, like an SLA.
But either way, I'm not sure there's a clean way of implementing something backwards compatible with the current binary option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made the enum variants structs so it gets serialized as a json object so we can add more complex ones in the future.
key: None, | ||
value: value?, | ||
}).await; | ||
match value { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be updated to use collect_source_record?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, missed this. It's done now.
Err(e) => { | ||
ctx.report_user_error(e).await; | ||
} | ||
if let Err(e) = collect_source_record(ctx, SystemTime::now(), record, &self.bad_data, &mut self.rate_limiter).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be updated to remove the existing bad data handling code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like most sources panic on user errors, except for this one and SSE. Is there a reason for that or should we make them all consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this and made all the sources consistent in handling user errors.
arroyo-worker/src/connectors/sse.rs
Outdated
}).await; | ||
} | ||
Err(e) => { | ||
if let Err(e) = collect_source_record(ctx, SystemTime::now(), v, &self.bad_data, &mut self.rate_limiter).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here — there's existing error handling code that is now duplicative with what collect_source_record is doing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this
ec31279
to
0b42734
Compare
Adds a config option 'bad_data' to control whether deserialization errors result in dropping the data or failing the job. The logic is mostly contained in `source_collector.rs`, and it uses a new `RateLimiter` struct to limit the logging frequency and user error reporting.
0b42734
to
152090c
Compare
Adds a config option 'bad_data' to control whether deserialization errors result in dropping the data or failing the job. The logic is mostly contained in
source_collector.rs
, and it uses a newRateLimiter
struct to limit the logging frequency and user error reporting.