Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add control for deserialization error behavior #443

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arroyo-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ pub(crate) fn to_micros(dt: OffsetDateTime) -> u64 {
UdfPost,
GlobalUdf,
GlobalUdfCollection,
BadData,
)),
tags(
(name = "ping", description = "Ping endpoint"),
Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/blackhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl Connector for BlackholeConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: None,
bad_data: None,
framing: None,
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Connector for DeltaLakeConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl Connector for FileSystemConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/fluvio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl Connector for FluvioConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
2 changes: 2 additions & 0 deletions arroyo-connectors/src/impulse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub fn impulse_schema() -> ConnectionSchema {
ConnectionSchema {
format: None,
framing: None,
bad_data: None,
struct_name: Some("arroyo_types::ImpulseEvent".to_string()),
fields: vec![
source_field("counter", Primitive(PrimitiveType::UInt64)),
Expand Down Expand Up @@ -161,6 +162,7 @@ impl Connector for ImpulseConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: None,
bad_data: None,
framing: None,
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ impl Connector for KafkaConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl Connector for KinesisConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
2 changes: 2 additions & 0 deletions arroyo-connectors/src/nexmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn nexmark_schema() -> ConnectionSchema {
use arroyo_rpc::api_types::connections::PrimitiveType::*;
ConnectionSchema {
format: None,
bad_data: None,
framing: None,
struct_name: Some("arroyo_types::nexmark::Event".to_string()),
fields: vec![
Expand Down Expand Up @@ -207,6 +208,7 @@ impl Connector for NexmarkConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: None,
bad_data: None,
framing: None,
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ impl Connector for PollingHTTPConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl Connector for RedisConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/single_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl Connector for SingleFileConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Connector for SSEConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl Connector for WebhookConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl Connector for WebsocketConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
bad_data: schema.bad_data.clone(),
framing: schema.framing.clone(),
};

Expand Down
17 changes: 7 additions & 10 deletions arroyo-formats/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use apache_avro::{from_avro_datum, Reader, Schema, Writer};
use arrow::datatypes::{DataType, Field, Fields, TimeUnit};
use arroyo_rpc::formats::AvroFormat;
use arroyo_rpc::schema_resolver::SchemaResolver;
use arroyo_types::UserError;
use arroyo_types::SourceError;
use serde::de::DeserializeOwned;
use serde_json::{json, Value as JsonValue};
use std::collections::HashMap;
Expand All @@ -17,7 +17,7 @@ pub async fn deserialize_slice_avro<'a, T: DeserializeOwned>(
schema_registry: Arc<Mutex<HashMap<u32, Schema>>>,
resolver: Arc<dyn SchemaResolver + Sync>,
mut msg: &'a [u8],
) -> Result<impl Iterator<Item = Result<T, UserError>> + 'a, String> {
) -> Result<impl Iterator<Item = Result<T, SourceError>> + 'a, String> {
let id = if format.confluent_schema_registry {
let magic_byte = msg[0];
if magic_byte != 0 {
Expand Down Expand Up @@ -72,10 +72,7 @@ pub async fn deserialize_slice_avro<'a, T: DeserializeOwned>(
let into_json = format.into_unstructured_json;
Ok(messages.into_iter().map(move |record| {
let value = record.map_err(|e| {
UserError::new(
"Deserialization failed",
format!("Failed to deserialize from avro: {:?}", e),
)
SourceError::bad_data(format!("Failed to deserialize from avro: {:?}", e))
})?;

if into_json {
Expand All @@ -84,10 +81,10 @@ pub async fn deserialize_slice_avro<'a, T: DeserializeOwned>(
// for now round-trip through json in order to handle unsupported avro features
// as that allows us to rely on raw json deserialization
serde_json::from_value(avro_to_json(value)).map_err(|e| {
UserError::new(
"Deserialization failed",
format!("Failed to convert avro message into struct type: {:?}", e),
)
SourceError::bad_data(format!(
"Failed to convert avro message into struct type: {:?}",
e
))
})
}
}))
Expand Down
23 changes: 11 additions & 12 deletions arroyo-formats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arrow_array::cast::AsArray;
use arrow_array::{RecordBatch, StringArray};
use arroyo_rpc::formats::{AvroFormat, Format, Framing, FramingMethod};
use arroyo_rpc::schema_resolver::{FailingSchemaResolver, FixedSchemaResolver, SchemaResolver};
use arroyo_types::{Data, Debezium, RawJson, UserError};
use arroyo_types::{Data, Debezium, RawJson, SourceError};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::{json, Value};
Expand Down Expand Up @@ -290,7 +290,7 @@ impl<T: SchemaData> DataDeserializer<T> {
pub async fn deserialize_slice<'a>(
&mut self,
msg: &'a [u8],
) -> impl Iterator<Item = Result<T, UserError>> + 'a + Send {
) -> impl Iterator<Item = Result<T, SourceError>> + 'a + Send {
match &*self.format {
Format::Avro(avro) => {
let schema_registry = self.schema_registry.clone();
Expand All @@ -300,17 +300,21 @@ impl<T: SchemaData> DataDeserializer<T> {
{
Ok(iter) => Box::new(iter),
Err(e) => Box::new(
vec![Err(UserError::new("Avro deserialization failed", e))].into_iter(),
vec![Err(SourceError::other(
"Avro error",
format!("Avro deserialization failed: {}", e),
))]
.into_iter(),
)
as Box<dyn Iterator<Item = Result<T, UserError>> + Send>,
as Box<dyn Iterator<Item = Result<T, SourceError>> + Send>,
}
}
_ => {
let new_self = self.clone();
Box::new(
FramingIterator::new(self.framing.clone(), msg)
.map(move |t| new_self.deserialize_single(t)),
) as Box<dyn Iterator<Item = Result<T, UserError>> + Send>
) as Box<dyn Iterator<Item = Result<T, SourceError>> + Send>
}
}
}
Expand All @@ -319,19 +323,14 @@ impl<T: SchemaData> DataDeserializer<T> {
self.format.clone()
}

pub fn deserialize_single(&self, msg: &[u8]) -> Result<T, UserError> {
pub fn deserialize_single(&self, msg: &[u8]) -> Result<T, SourceError> {
match &*self.format {
Format::Json(json) => json::deserialize_slice_json(json, msg),
Format::Avro(_) => unreachable!("avro should be handled by here"),
Format::Parquet(_) => todo!("parquet is not supported as an input format"),
Format::RawString(_) => deserialize_raw_string(msg),
}
.map_err(|e| {
UserError::new(
"Deserialization failed",
format!("Failed to deserialize: {}", e),
)
})
.map_err(|e| SourceError::bad_data(format!("Failed to deserialize: {:?}", e)))
}
}

Expand Down
5 changes: 4 additions & 1 deletion arroyo-rpc/src/api_types/connections.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::formats::{Format, Framing};
use crate::formats::{BadData, Format, Framing};
use anyhow::bail;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -128,6 +128,7 @@ pub enum SchemaDefinition {
#[serde(rename_all = "camelCase")]
pub struct ConnectionSchema {
pub format: Option<Format>,
pub bad_data: Option<BadData>,
pub framing: Option<Framing>,
pub struct_name: Option<String>,
pub fields: Vec<SourceField>,
Expand All @@ -138,6 +139,7 @@ pub struct ConnectionSchema {
impl ConnectionSchema {
pub fn try_new(
format: Option<Format>,
bad_data: Option<BadData>,
framing: Option<Framing>,
struct_name: Option<String>,
fields: Vec<SourceField>,
Expand All @@ -146,6 +148,7 @@ impl ConnectionSchema {
) -> anyhow::Result<Self> {
let s = ConnectionSchema {
format,
bad_data,
framing,
struct_name,
fields,
Expand Down
23 changes: 23 additions & 0 deletions arroyo-rpc/src/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,29 @@ impl Format {
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum BadData {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will be extensible to policies that require configuration (like allowing 10 bad records/sec). This turns into a JSON schema like

      "BadData": {
        "type": "string",
        "enum": [
          "drop",
          "fail"
        ]
      },

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if/when we add policies like that we'll want something like this:

struct BadData {
    limit: i32,
    per_seconds: i32,
}

or we could even represent it as a percentage, like an SLA.

But either way, I'm not sure there's a clean way of implementing something backwards compatible with the current binary option.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made the enum variants structs so it gets serialized as a json object so we can add more complex ones in the future.

Fail {},
Drop {},
}

impl BadData {
pub fn from_opts(opts: &mut HashMap<String, String>) -> Result<Option<Self>, String> {
let Some(method) = opts.remove("bad_data") else {
return Ok(None);
};

let method = match method.as_str() {
"drop" => BadData::Drop {},
"fail" => BadData::Fail {},
f => return Err(format!("Unknown invalid data behavior '{}'", f)),
};

Ok(Some(method))
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct Framing {
Expand Down
4 changes: 3 additions & 1 deletion arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use std::{fs, time::SystemTime};

use crate::api_types::connections::PrimitiveType;
use crate::formats::{Format, Framing};
use crate::formats::{BadData, Format, Framing};
use crate::grpc::{LoadCompactedDataReq, SubtaskCheckpointMetadata};
use arroyo_types::CheckpointBarrier;
use grpc::{StopMode, TaskCheckpointEventType};
Expand Down Expand Up @@ -165,6 +165,7 @@ pub struct OperatorConfig {
pub connection: Value,
pub table: Value,
pub format: Option<Format>,
pub bad_data: Option<BadData>,
pub framing: Option<Framing>,
pub rate_limit: Option<RateLimit>,
}
Expand All @@ -175,6 +176,7 @@ impl Default for OperatorConfig {
connection: serde_json::from_str("{}").unwrap(),
table: serde_json::from_str("{}").unwrap(),
format: None,
bad_data: None,
framing: None,
rate_limit: None,
}
Expand Down
2 changes: 2 additions & 0 deletions arroyo-sql-macro/src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub fn get_json_schema_source() -> Result<Connection> {
})),
None,
None,
None,
struct_fields
.into_iter()
.map(|field| field.try_into().unwrap())
Expand Down Expand Up @@ -187,6 +188,7 @@ pub fn get_avro_source() -> Result<Connection> {
Some(Format::Avro(format)),
None,
None,
None,
struct_fields
.into_iter()
.map(|field| field.try_into().unwrap())
Expand Down
1 change: 1 addition & 0 deletions arroyo-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ pub fn get_test_expression(
let struct_def = test_struct_def();
let schema = ConnectionSchema {
format: Some(Format::Json(JsonFormat::default())),
bad_data: None,
framing: None,
struct_name: struct_def.name.clone(),
fields: struct_def
Expand Down
6 changes: 5 additions & 1 deletion arroyo-sql/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arroyo_datastream::{ConnectorOp, Operator};
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, SchemaDefinition, SourceField,
};
use arroyo_rpc::formats::{Format, Framing};
use arroyo_rpc::formats::{BadData, Format, Framing};
use datafusion::sql::sqlparser::ast::Query;
use datafusion::{
optimizer::{analyzer::Analyzer, optimizer::Optimizer, OptimizerContext},
Expand Down Expand Up @@ -190,6 +190,9 @@ impl ConnectorTable {

let format = Format::from_opts(options).map_err(|e| anyhow!("invalid format: '{e}'"))?;

let bad_data =
BadData::from_opts(options).map_err(|e| anyhow!("Invalid bad_data: '{e}'"))?;

let framing = Framing::from_opts(options).map_err(|e| anyhow!("invalid framing: '{e}'"))?;

let schema_fields: Result<Vec<SourceField>> = fields
Expand All @@ -209,6 +212,7 @@ impl ConnectorTable {

let schema = ConnectionSchema::try_new(
format,
bad_data,
framing,
None,
schema_fields?,
Expand Down
Loading
Loading