Skip to content

Commit

Permalink
feat: Added support for Confluent Avro Format (#90)
Browse files Browse the repository at this point in the history
* feat: Added support for Confluent Avro Format

* Removed request source from SUPPORTED_KAFKA_BATCH_SOURCES

* rename schoma to schema_str

---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Mar 7, 2024
1 parent c540117 commit f1a1571
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 66 deletions.
7 changes: 7 additions & 0 deletions protos/feast/core/DataFormat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,17 @@ message StreamFormat {
string schema_json = 1;
}

// Confluent Avro fetches schema from a schema registry
message ConfluentAvroFormat {
string record_name = 1;
string record_namespace = 2;
}

// Specifies the data format and format specific options
oneof format {
AvroFormat avro_format = 1;
ProtoFormat proto_format = 2;
JsonFormat json_format = 3;
ConfluentAvroFormat confluent_avro_format = 4;
}
}
28 changes: 28 additions & 0 deletions sdk/python/feast/data_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def from_proto(cls, proto):
return JsonFormat(schema_json=proto.json_format.schema_json)
if fmt == "proto_format":
return ProtoFormat(class_path=proto.proto_format.class_path)
if fmt == "confluent_avro_format":
return ConfluentAvroFormat(
record_name=proto.confluent_avro_format.record_name,
record_namespace=proto.confluent_avro_format.record_namespace,
)
raise NotImplementedError(f"StreamFormat is unsupported: {fmt}")


Expand Down Expand Up @@ -155,3 +160,26 @@ def to_proto(self):
return StreamFormatProto(
proto_format=StreamFormatProto.ProtoFormat(class_path=self.class_path)
)


class ConfluentAvroFormat(StreamFormat):
"""
Defines the Confluent Avro streaming data format that encodes data in Confluent Avro format
"""

def __init__(self, record_name: str, record_namespace: str):
"""
Construct a new Confluet Avro data format.
Args:
record_name: Record name used by schema registry
record_namespace: Record namespace used by schema registry
"""
self.record_name = record_name
self.record_namespace = record_namespace

def to_proto(self):
proto = StreamFormatProto.ConfluentAvroFormat(
record_name=self.record_name, record_namespace=self.record_namespace
)
return StreamFormatProto(confluent_avro_format=proto)
39 changes: 25 additions & 14 deletions sdk/python/feast/expediagroup/pydantic_models/data_source_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Copyright 2023 Expedia Group
Author: [email protected]
"""

import sys
from datetime import timedelta
from typing import Dict, List, Literal, Optional, Union
Expand All @@ -17,6 +18,7 @@
from feast.expediagroup.pydantic_models.stream_format_model import (
AnyStreamFormat,
AvroFormatModel,
ConfluentAvroFormatModel,
JsonFormatModel,
ProtoFormatModel,
)
Expand Down Expand Up @@ -230,8 +232,13 @@ def from_data_source(
)


SUPPORTED_MESSAGE_FORMATS = [AvroFormatModel, JsonFormatModel, ProtoFormatModel]
SUPPORTED_KAFKA_BATCH_SOURCES = [RequestSourceModel, SparkSourceModel]
SUPPORTED_MESSAGE_FORMATS = [
AvroFormatModel,
JsonFormatModel,
ProtoFormatModel,
ConfluentAvroFormatModel,
]
SUPPORTED_KAFKA_BATCH_SOURCES = [SparkSourceModel]


class KafkaSourceModel(DataSourceModel):
Expand Down Expand Up @@ -271,9 +278,9 @@ def to_data_source(self) -> KafkaSource:
description=self.description,
tags=self.tags,
owner=self.owner,
batch_source=self.batch_source.to_data_source()
if self.batch_source
else None,
batch_source=(
self.batch_source.to_data_source() if self.batch_source else None
),
watermark_delay_threshold=self.watermark_delay_threshold,
)

Expand Down Expand Up @@ -317,16 +324,20 @@ def from_data_source(
name=data_source.name,
timestamp_field=data_source.timestamp_field,
message_format=message_format,
kafka_bootstrap_servers=data_source.kafka_options.kafka_bootstrap_servers
if data_source.kafka_options.kafka_bootstrap_servers
else "",
topic=data_source.kafka_options.topic
if data_source.kafka_options.topic
else "",
kafka_bootstrap_servers=(
data_source.kafka_options.kafka_bootstrap_servers
if data_source.kafka_options.kafka_bootstrap_servers
else ""
),
topic=(
data_source.kafka_options.topic
if data_source.kafka_options.topic
else ""
),
created_timestamp_column=data_source.created_timestamp_column,
field_mapping=data_source.field_mapping
if data_source.field_mapping
else None,
field_mapping=(
data_source.field_mapping if data_source.field_mapping else None
),
description=data_source.description,
tags=data_source.tags if data_source.tags else None,
owner=data_source.owner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pydantic import Field as PydanticField
from typing_extensions import Annotated, Self

from feast.data_format import AvroFormat, JsonFormat, ProtoFormat
from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, ProtoFormat


class StreamFormatModel(BaseModel):
Expand Down Expand Up @@ -38,7 +38,7 @@ class AvroFormatModel(StreamFormatModel):
"""

format: Literal["AvroFormatModel"] = "AvroFormatModel"
schoma: str
schema_str: str

def to_stream_format(self) -> AvroFormat:
"""
Expand All @@ -47,7 +47,7 @@ def to_stream_format(self) -> AvroFormat:
Returns:
An AvroFormat.
"""
return AvroFormat(schema_json=self.schoma)
return AvroFormat(schema_json=self.schema_str)

@classmethod
def from_stream_format(
Expand All @@ -60,7 +60,7 @@ def from_stream_format(
Returns:
An AvroFormatModel.
"""
return cls(schoma=avro_format.schema_json)
return cls(schema_str=avro_format.schema_json)


class JsonFormatModel(StreamFormatModel):
Expand All @@ -69,7 +69,7 @@ class JsonFormatModel(StreamFormatModel):
"""

format: Literal["JsonFormatModel"] = "JsonFormatModel"
schoma: str
schema_str: str

def to_stream_format(self) -> JsonFormat:
"""
Expand All @@ -78,7 +78,7 @@ def to_stream_format(self) -> JsonFormat:
Returns:
A JsonFormat.
"""
return JsonFormat(schema_json=self.schoma)
return JsonFormat(schema_json=self.schema_str)

@classmethod
def from_stream_format(
Expand All @@ -91,7 +91,7 @@ def from_stream_format(
Returns:
A JsonFormatModel.
"""
return cls(schoma=json_format.schema_json)
return cls(schema_str=json_format.schema_json)


class ProtoFormatModel(StreamFormatModel):
Expand Down Expand Up @@ -125,9 +125,46 @@ def from_stream_format(
return cls(class_path=proto_format.class_path)


class ConfluentAvroFormatModel(StreamFormatModel):
"""
Pydantic Model of a Feast ProtoFormat.
"""

format: Literal["ConfluentAvroFormatModel"] = "ConfluentAvroFormatModel"
record_name: str
record_namespace: str

def to_stream_format(self) -> ConfluentAvroFormat:
"""
Given a Pydantic ProtoFormatModel, create and return a ProtoFormat.
Returns:
A ProtoFormat.
"""
return ConfluentAvroFormat(
record_name=self.record_name, record_namespace=self.record_namespace
)

@classmethod
def from_stream_format(
cls,
confluent_avro_format,
) -> Self: # type: ignore
"""
Converts a ProtoFormat object to its pydantic model representation.
Returns:
A ProtoFormatModel.
"""
return cls(
record_name=confluent_avro_format.record_name,
record_namespace=confluent_avro_format.record_namespace,
)


# https://blog.devgenius.io/deserialize-child-classes-with-pydantic-that-gonna-work-784230e1cf83
# This lets us discriminate child classes of DataSourceModel with type hints.
AnyStreamFormat = Annotated[
Union[AvroFormatModel, JsonFormatModel, ProtoFormatModel],
Union[AvroFormatModel, JsonFormatModel, ProtoFormatModel, ConfluentAvroFormatModel],
PydanticField(discriminator="format"),
]
Loading

0 comments on commit f1a1571

Please sign in to comment.