Skip to content

Commit

Permalink
feat: Added list proto methods
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhargav Dodla committed Oct 23, 2024
1 parent 5571425 commit 9818798
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 0 deletions.
108 changes: 108 additions & 0 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
from feast.permissions.permission import Permission
from feast.project import Project
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList
from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceList as FeatureServiceProtoList,
)
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewList as FeatureViewProtoList,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewList as OnDemandFeatureViewProtoList,
)
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
Expand Down Expand Up @@ -467,3 +478,100 @@ def _start_thread_async_refresh(self, cache_ttl_seconds):

def _exit_handler(self):
self.registry_refresh_thread.cancel()

# Methods to improve the registry calls

@abstractmethod
def _list_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureViewProtoList]:
pass

def list_feature_views_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[FeatureViewProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_feature_views_proto(
self.cached_registry_proto, project, tags
)
return self._list_feature_views_proto(project, tags)

@abstractmethod
def _list_entities_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[EntityProtoList]:
pass

def list_entities_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[EntityProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_entities_proto(
self.cached_registry_proto, project, tags
)
return self._list_entities_proto(project, tags)

@abstractmethod
def _list_data_sources_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[DataSourceProtoList]:
pass

def list_data_sources_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[DataSourceProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_data_sources_proto(
self.cached_registry_proto, project, tags
)
return self._list_data_sources_proto(project, tags)

@abstractmethod
def _list_on_demand_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[OnDemandFeatureViewProtoList]:
pass

def list_on_demand_feature_views_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[OnDemandFeatureViewProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_on_demand_feature_views_proto(
self.cached_registry_proto, project, tags
)
return self._list_on_demand_feature_views_proto(project, tags)

@abstractmethod
def _list_feature_services_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureServiceProtoList]:
pass

def list_feature_services_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> List[FeatureServiceProtoList]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_feature_services_proto(
self.cached_registry_proto, project, tags
)
return self._list_feature_services_proto(project, tags)
76 changes: 76 additions & 0 deletions sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
from feast.permissions.permission import Permission
from feast.project import Project
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList
from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceList as FeatureServiceProtoList,
)
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewList as FeatureViewProtoList,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewList as OnDemandFeatureViewProtoList,
)
from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.saved_dataset import SavedDataset, ValidationReference
Expand Down Expand Up @@ -367,3 +378,68 @@ def get_project(registry_proto: RegistryProto, name: str) -> Project:
if projects_proto.spec.name == name:
return Project.from_proto(projects_proto)
raise ProjectObjectNotFoundException(name=name)


@registry_proto_cache_with_tags
def list_feature_views_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureViewProtoList]:
feature_views: List[FeatureViewProtoList] = []
for feature_view_proto in registry_proto.feature_views:
if feature_view_proto.spec.project == project and utils.has_all_tags(
feature_view_proto.spec.tags, tags
):
feature_views.append(feature_view_proto)
return feature_views


@registry_proto_cache_with_tags
def list_feature_services_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureServiceProtoList]:
feature_services = []
for feature_service_proto in registry_proto.feature_services:
if feature_service_proto.spec.project == project and utils.has_all_tags(
feature_service_proto.spec.tags, tags
):
feature_services.append(feature_service_proto)
return feature_services


@registry_proto_cache_with_tags
def list_on_demand_feature_views_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[OnDemandFeatureViewProtoList]:
on_demand_feature_views = []
for on_demand_feature_view in registry_proto.on_demand_feature_views:
if on_demand_feature_view.spec.project == project and utils.has_all_tags(
on_demand_feature_view.spec.tags, tags
):
on_demand_feature_views.append(on_demand_feature_view)
return on_demand_feature_views


@registry_proto_cache_with_tags
def list_entities_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[EntityProtoList]:
entities = []
for entity_proto in registry_proto.entities:
if entity_proto.spec.project == project and utils.has_all_tags(
entity_proto.spec.tags, tags
):
entities.append(entity_proto)
return entities


@registry_proto_cache_with_tags
def list_data_sources_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> List[DataSourceProtoList]:
data_sources = []
for data_source_proto in registry_proto.data_sources:
if data_source_proto.project == project and utils.has_all_tags(
data_source_proto.tags, tags
):
data_sources.append(data_source_proto)
return data_sources
86 changes: 86 additions & 0 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,26 @@
from feast.project import Project
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
)
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceList as FeatureServiceProtoList,
)
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewList as FeatureViewProtoList,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewList as OnDemandFeatureViewProtoList,
)
from feast.protos.feast.core.Permission_pb2 import Permission as PermissionProto
from feast.protos.feast.core.Project_pb2 import Project as ProjectProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
Expand Down Expand Up @@ -1326,3 +1337,78 @@ def get_project_metadata(
datetime.utcfromtimestamp(int(metadata_value))
)
return project_metadata_model

def _list_objects_proto(
self,
table: Table,
project: str,
proto_class: Any,
proto_field_name: str,
tags: Optional[dict[str, str]] = None,
):
with self.read_engine.begin() as conn:
stmt = select(table).where(table.c.project_id == project)
rows = conn.execute(stmt).all()
if rows:
objects = []
for row in rows:
obj = proto_class.FromString(row._mapping[proto_field_name])
if utils.has_all_tags(dict(obj.spec.tags), tags):
objects.append(obj)
return objects
return []

def _list_feature_services_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureServiceProtoList]:
return self._list_objects_proto(
feature_services,
project,
FeatureServiceProto,
"feature_service_proto",
tags=tags,
)

def _list_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[FeatureViewProtoList]:
return self._list_objects_proto(
feature_views,
project,
FeatureViewProto,
"feature_view_proto",
tags=tags,
)

def _list_on_demand_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[OnDemandFeatureViewProtoList]:
return self._list_objects_proto(
on_demand_feature_views,
project,
OnDemandFeatureViewProto,
"feature_view_proto",
tags=tags,
)

def _list_entities_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[EntityProtoList]:
return self._list_objects_proto(
entities,
project,
EntityProto,
"entity_proto",
tags=tags,
)

def _list_data_sources_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> List[DataSourceProtoList]:
return self._list_objects_proto(
data_sources,
project,
DataSourceProto,
"data_source_proto",
tags=tags,
)

0 comments on commit 9818798

Please sign in to comment.