Skip to content

Commit

Permalink
Cleanup printlns in schema registry code
Browse files Browse the repository at this point in the history
  • Loading branch information
mwylde authored and jbeisen committed Dec 9, 2023
1 parent 38079a2 commit 0de45fb
Show file tree
Hide file tree
Showing 31 changed files with 194 additions and 72 deletions.
1 change: 1 addition & 0 deletions arroyo-connectors/src/blackhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl Connector for BlackholeConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: None,
invalid_data_behavior: None,
framing: None,
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl Connector for DeltaLakeConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl Connector for FileSystemConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/fluvio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl Connector for FluvioConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
2 changes: 2 additions & 0 deletions arroyo-connectors/src/impulse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub fn impulse_schema() -> ConnectionSchema {
ConnectionSchema {
format: None,
framing: None,
invalid_data_behavior: None,
struct_name: Some("arroyo_types::ImpulseEvent".to_string()),
fields: vec![
source_field("counter", Primitive(PrimitiveType::UInt64)),
Expand Down Expand Up @@ -161,6 +162,7 @@ impl Connector for ImpulseConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: None,
invalid_data_behavior: None,
framing: None,
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl Connector for KafkaConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/kinesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl Connector for KinesisConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
2 changes: 2 additions & 0 deletions arroyo-connectors/src/nexmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn nexmark_schema() -> ConnectionSchema {
use arroyo_rpc::api_types::connections::PrimitiveType::*;
ConnectionSchema {
format: None,
invalid_data_behavior: None,
framing: None,
struct_name: Some("arroyo_types::nexmark::Event".to_string()),
fields: vec![
Expand Down Expand Up @@ -207,6 +208,7 @@ impl Connector for NexmarkConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: None,
invalid_data_behavior: None,
framing: None,
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/polling_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ impl Connector for PollingHTTPConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ impl Connector for RedisConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/single_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl Connector for SingleFileConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl Connector for SSEConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/webhook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl Connector for WebhookConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
1 change: 1 addition & 0 deletions arroyo-connectors/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl Connector for WebsocketConnector {
table: serde_json::to_value(table).unwrap(),
rate_limit: None,
format: Some(format),
invalid_data_behavior: schema.invalid_data_behavior.clone(),
framing: schema.framing.clone(),
};

Expand Down
2 changes: 1 addition & 1 deletion arroyo-formats/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ tracing = "0.1"
anyhow = "1"
chrono = "0.4"
bincode = "2.0.0-rc.3"
memchr = "2"
memchr = "2"
2 changes: 1 addition & 1 deletion arroyo-formats/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ mod tests {

let mut format = AvroFormat::new(false, true, true);
format.add_reader_schema(Schema::parse_str(&schema_str).unwrap());
let mut deserializer = DataDeserializer::new(Format::Avro(format), None);
let mut deserializer = DataDeserializer::new(Format::Avro(format), None, None);

let v: Result<Vec<RawJson>, _> = deserializer.deserialize_slice(&data[..]).await.collect();

Expand Down
50 changes: 38 additions & 12 deletions arroyo-formats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::bail;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_array::cast::AsArray;
use arrow_array::{RecordBatch, StringArray};
use arroyo_rpc::formats::{AvroFormat, Format, Framing, FramingMethod};
use arroyo_rpc::formats::{AvroFormat, Format, Framing, FramingMethod, InvalidDataBehavior};
use arroyo_rpc::schema_resolver::{FailingSchemaResolver, FixedSchemaResolver, SchemaResolver};
use arroyo_types::{Data, Debezium, RawJson, UserError};
use serde::de::DeserializeOwned;
Expand All @@ -14,6 +14,7 @@ use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::warn;

pub mod avro;
pub mod json;
Expand Down Expand Up @@ -251,14 +252,19 @@ impl<'a> Iterator for FramingIterator<'a> {
#[derive(Clone)]
pub struct DataDeserializer<T: SchemaData> {
format: Arc<Format>,
invalid_data_behavior: Option<InvalidDataBehavior>,
framing: Option<Arc<Framing>>,
schema_registry: Arc<Mutex<HashMap<u32, apache_avro::schema::Schema>>>,
schema_resolver: Arc<dyn SchemaResolver + Sync>,
_t: PhantomData<T>,
}

impl<T: SchemaData> DataDeserializer<T> {
pub fn new(format: Format, framing: Option<Framing>) -> Self {
pub fn new(
format: Format,
invalid_data_behavior: Option<InvalidDataBehavior>,
framing: Option<Framing>,
) -> Self {
let resolver = if let Format::Avro(AvroFormat {
reader_schema: Some(schema),
..
Expand All @@ -270,16 +276,18 @@ impl<T: SchemaData> DataDeserializer<T> {
Arc::new(FailingSchemaResolver::new()) as Arc<dyn SchemaResolver + Sync>
};

Self::with_schema_resolver(format, framing, resolver)
Self::with_schema_resolver(format, invalid_data_behavior, framing, resolver)
}

pub fn with_schema_resolver(
format: Format,
invalid_data_behavior: Option<InvalidDataBehavior>,
framing: Option<Framing>,
schema_resolver: Arc<dyn SchemaResolver + Sync>,
) -> Self {
Self {
format: Arc::new(format),
invalid_data_behavior,
framing: framing.map(|f| Arc::new(f)),
schema_registry: Arc::new(Mutex::new(HashMap::new())),
schema_resolver,
Expand All @@ -290,7 +298,7 @@ impl<T: SchemaData> DataDeserializer<T> {
pub async fn deserialize_slice<'a>(
&mut self,
msg: &'a [u8],
) -> impl Iterator<Item = Result<T, UserError>> + 'a + Send {
) -> impl Iterator<Item = Result<Option<T>, UserError>> + 'a + Send {
match &*self.format {
Format::Avro(avro) => {
let schema_registry = self.schema_registry.clone();
Expand All @@ -299,18 +307,22 @@ impl<T: SchemaData> DataDeserializer<T> {
.await
{
Ok(iter) => Box::new(iter),
Err(e) => Box::new(
vec![Err(UserError::new("Avro deserialization failed", e))].into_iter(),
)
as Box<dyn Iterator<Item = Result<T, UserError>> + Send>,
Err(e) => {
// TODO: apply invalid data behavior here too

Box::new(
vec![Err(UserError::new("Avro deserialization failed", e))].into_iter(),
)
as Box<dyn Iterator<Item = Result<Option<T>, UserError>> + Send>
}
}
}
_ => {
let new_self = self.clone();
Box::new(
FramingIterator::new(self.framing.clone(), msg)
.map(move |t| new_self.deserialize_single(t)),
) as Box<dyn Iterator<Item = Result<T, UserError>> + Send>
) as Box<dyn Iterator<Item = Result<Option<T>, UserError>> + Send>
}
}
}
Expand All @@ -319,8 +331,8 @@ impl<T: SchemaData> DataDeserializer<T> {
self.format.clone()
}

pub fn deserialize_single(&self, msg: &[u8]) -> Result<T, UserError> {
match &*self.format {
pub fn deserialize_single(&self, msg: &[u8]) -> Result<Option<T>, UserError> {
let deserialization = match &*self.format {
Format::Json(json) => json::deserialize_slice_json(json, msg),
Format::Avro(_) => unreachable!("avro should be handled by here"),
Format::Parquet(_) => todo!("parquet is not supported as an input format"),
Expand All @@ -335,7 +347,21 @@ impl<T: SchemaData> DataDeserializer<T> {
e
),
)
})
});

match deserialization {
Ok(t) => Ok(Some(t)),
Err(e) => match self.invalid_data_behavior {
None | Some(InvalidDataBehavior::Drop) => {
warn!("Dropping invalid data: {}", e.details.clone());
// TODO: increment counter
Ok(None)
}
Some(InvalidDataBehavior::Fail) => {
return Err(e);
}
},
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion arroyo-rpc/src/api_types/connections.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::formats::{Format, Framing};
use crate::formats::{Format, Framing, InvalidDataBehavior};
use anyhow::bail;
use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter};
Expand Down Expand Up @@ -128,6 +128,7 @@ pub enum SchemaDefinition {
#[serde(rename_all = "camelCase")]
pub struct ConnectionSchema {
pub format: Option<Format>,
pub invalid_data_behavior: Option<InvalidDataBehavior>,
pub framing: Option<Framing>,
pub struct_name: Option<String>,
pub fields: Vec<SourceField>,
Expand All @@ -138,6 +139,7 @@ pub struct ConnectionSchema {
impl ConnectionSchema {
pub fn try_new(
format: Option<Format>,
invalid_data_behavior: Option<InvalidDataBehavior>,
framing: Option<Framing>,
struct_name: Option<String>,
fields: Vec<SourceField>,
Expand All @@ -146,6 +148,7 @@ impl ConnectionSchema {
) -> anyhow::Result<Self> {
let s = ConnectionSchema {
format,
invalid_data_behavior,
framing,
struct_name,
fields,
Expand Down
22 changes: 22 additions & 0 deletions arroyo-rpc/src/formats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,28 @@ impl Format {
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
pub enum InvalidDataBehavior {
Drop, // TODO: add optional limit
Fail,
}

impl InvalidDataBehavior {
pub fn from_opts(opts: &mut HashMap<String, String>) -> Result<Option<Self>, String> {
let Some(method) = opts.remove("invalid_data_behavior") else {
return Ok(None);
};

let method = match method.as_str() {
"drop" => InvalidDataBehavior::Drop,
"fail" => InvalidDataBehavior::Fail,
f => return Err(format!("Unknown invalid data behavior '{}'", f)),
};

Ok(Some(method))
}
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct Framing {
Expand Down
4 changes: 3 additions & 1 deletion arroyo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
use std::{fs, time::SystemTime};

use crate::api_types::connections::PrimitiveType;
use crate::formats::{Format, Framing};
use crate::formats::{Format, Framing, InvalidDataBehavior};
use crate::grpc::{LoadCompactedDataReq, SubtaskCheckpointMetadata};
use arroyo_types::CheckpointBarrier;
use grpc::{StopMode, TaskCheckpointEventType};
Expand Down Expand Up @@ -165,6 +165,7 @@ pub struct OperatorConfig {
pub connection: Value,
pub table: Value,
pub format: Option<Format>,
pub invalid_data_behavior: Option<InvalidDataBehavior>,
pub framing: Option<Framing>,
pub rate_limit: Option<RateLimit>,
}
Expand All @@ -175,6 +176,7 @@ impl Default for OperatorConfig {
connection: serde_json::from_str("{}").unwrap(),
table: serde_json::from_str("{}").unwrap(),
format: None,
invalid_data_behavior: None,
framing: None,
rate_limit: None,
}
Expand Down
1 change: 1 addition & 0 deletions arroyo-sql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ pub fn get_test_expression(
let struct_def = test_struct_def();
let schema = ConnectionSchema {
format: Some(Format::Json(JsonFormat::default())),
invalid_data_behavior: None,
framing: None,
struct_name: struct_def.name.clone(),
fields: struct_def
Expand Down
6 changes: 5 additions & 1 deletion arroyo-sql/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arroyo_datastream::{ConnectorOp, Operator};
use arroyo_rpc::api_types::connections::{
ConnectionProfile, ConnectionSchema, ConnectionType, SchemaDefinition, SourceField,
};
use arroyo_rpc::formats::{Format, Framing};
use arroyo_rpc::formats::{Format, Framing, InvalidDataBehavior};
use datafusion::sql::sqlparser::ast::Query;
use datafusion::{
optimizer::{analyzer::Analyzer, optimizer::Optimizer, OptimizerContext},
Expand Down Expand Up @@ -190,6 +190,9 @@ impl ConnectorTable {

let format = Format::from_opts(options).map_err(|e| anyhow!("invalid format: '{e}'"))?;

let invalid_data_behavior = InvalidDataBehavior::from_opts(options)
.map_err(|e| anyhow!("Invalid invalid_data_behavior: '{e}'"))?;

let framing = Framing::from_opts(options).map_err(|e| anyhow!("invalid framing: '{e}'"))?;

let schema_fields: Result<Vec<SourceField>> = fields
Expand All @@ -209,6 +212,7 @@ impl ConnectorTable {

let schema = ConnectionSchema::try_new(
format,
invalid_data_behavior,
framing,
None,
schema_fields?,
Expand Down
1 change: 1 addition & 0 deletions arroyo-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,7 @@ pub static BYTES_RECV: &str = "arroyo_worker_bytes_recv";
pub static BYTES_SENT: &str = "arroyo_worker_bytes_sent";
pub static TX_QUEUE_SIZE: &str = "arroyo_worker_tx_queue_size";
pub static TX_QUEUE_REM: &str = "arroyo_worker_tx_queue_rem";
pub static DESERIALIZATION_ERRORS: &str = "arroyo_worker_deserialization_errors";

#[derive(Debug, Copy, Clone, Encode, Decode)]
pub struct CheckpointBarrier {
Expand Down
Loading

0 comments on commit 0de45fb

Please sign in to comment.