Skip to content

Commit e622a88

Browse files
authored
Port feast 0.10+ data model to feast-serving (#37)
* Update feast dep to 0.12 Signed-off-by: Achal Shah <[email protected]> * Port feast 0.10+ data model to feast-serving Signed-off-by: Achal Shah <[email protected]> * Fix tests Signed-off-by: Achal Shah <[email protected]> * Fix integ tests Signed-off-by: Achal Shah <[email protected]> * Fix integ tests Signed-off-by: Achal Shah <[email protected]> * remove logging Signed-off-by: Achal Shah <[email protected]> * Fix ilnt Signed-off-by: Achal Shah <[email protected]> * Fix serialization Signed-off-by: Achal Shah <[email protected]> * Implement EntityKeySerialization correctly Signed-off-by: Achal Shah <[email protected]> * Update workflows Signed-off-by: Achal Shah <[email protected]> * Update python version Signed-off-by: Achal Shah <[email protected]> * Change redis ports Signed-off-by: Achal Shah <[email protected]> * materialize into redis Signed-off-by: Achal Shah <[email protected]> * fix path Signed-off-by: Achal Shah <[email protected]> * Install redis vairant Signed-off-by: Achal Shah <[email protected]> * Remove odfv Signed-off-by: Achal Shah <[email protected]> * Include test file Signed-off-by: Achal Shah <[email protected]> * update source Signed-off-by: Achal Shah <[email protected]> * update source Signed-off-by: Achal Shah <[email protected]> * update source Signed-off-by: Achal Shah <[email protected]> * update source Signed-off-by: Achal Shah <[email protected]> * Wrestling with spring Signed-off-by: Achal Shah <[email protected]> * Tests Signed-off-by: Achal Shah <[email protected]> * Remove github action Signed-off-by: Achal Shah <[email protected]> * Add registry Signed-off-by: Achal Shah <[email protected]> * Remove redundant stuff Signed-off-by: Achal Shah <[email protected]> * Rename test Signed-off-by: Achal Shah <[email protected]> * awaitTermination Signed-off-by: Achal Shah <[email protected]> * lint Signed-off-by: Achal Shah <[email protected]> * lint Signed-off-by: Achal Shah <[email protected]> * dynamic properties instead Signed-off-by: Achal Shah <[email protected]> * dirtiescontext Signed-off-by: Achal Shah <[email protected]> * python 3.7 Signed-off-by: Achal Shah <[email protected]> * spotless Signed-off-by: Achal Shah <[email protected]> * Dirty Context after test method as well Signed-off-by: Achal Shah <[email protected]> * Cleanup Signed-off-by: Achal Shah <[email protected]> * Cleanup Signed-off-by: Achal Shah <[email protected]> * cr Signed-off-by: Achal Shah <[email protected]> * spotless Signed-off-by: Achal Shah <[email protected]>
1 parent c039416 commit e622a88

30 files changed

+821
-43
lines changed

.github/workflows/complete.yml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,16 @@ jobs:
4242
integration-test:
4343
runs-on: ubuntu-latest
4444
needs: unit-test-java
45+
services:
46+
redis:
47+
image: redis
48+
ports:
49+
- 6389:6379
50+
options: >-
51+
--health-cmd "redis-cli ping"
52+
--health-interval 10s
53+
--health-timeout 5s
54+
--health-retries 5
4555
steps:
4656
- uses: actions/checkout@v2
4757
with:
@@ -54,7 +64,7 @@ jobs:
5464
architecture: x64
5565
- uses: actions/setup-python@v2
5666
with:
57-
python-version: '3.6'
67+
python-version: '3.7'
5868
architecture: 'x64'
5969
- uses: actions/cache@v2
6070
with:

deps/feast

Submodule feast updated 883 files

serving/src/main/java/feast/serving/config/ContextClosedHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import java.util.concurrent.ScheduledExecutorService;
2020
import org.springframework.beans.factory.annotation.Autowired;
21+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
2122
import org.springframework.context.ApplicationListener;
2223
import org.springframework.context.event.ContextClosedEvent;
2324
import org.springframework.stereotype.Component;
2425

2526
@Component
27+
@ConditionalOnBean(CoreCondition.class)
2628
public class ContextClosedHandler implements ApplicationListener<ContextClosedEvent> {
2729

2830
@Autowired ScheduledExecutorService executor;
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2021 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.serving.config;
18+
19+
import org.springframework.context.annotation.Condition;
20+
import org.springframework.context.annotation.ConditionContext;
21+
import org.springframework.core.env.Environment;
22+
import org.springframework.core.type.AnnotatedTypeMetadata;
23+
24+
/**
25+
* A {@link Condition} to signal that the ServingService should get feature definitions and metadata
26+
* from Core service.
27+
*/
28+
public class CoreCondition implements Condition {
29+
@Override
30+
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
31+
final Environment env = context.getEnvironment();
32+
return env.getProperty("feast.registry") == null;
33+
}
34+
}

serving/src/main/java/feast/serving/config/FeastProperties.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,16 @@ public FeastProperties() {}
7272
/* Feast Core port to connect to. */
7373
@Positive private int coreGrpcPort;
7474

75+
private String registry;
76+
77+
public String getRegistry() {
78+
return registry;
79+
}
80+
81+
public void setRegistry(final String registry) {
82+
this.registry = registry;
83+
}
84+
7585
private CoreAuthenticationProperties coreAuthentication;
7686

7787
public CoreAuthenticationProperties getCoreAuthentication() {
@@ -82,7 +92,6 @@ public void setCoreAuthentication(CoreAuthenticationProperties coreAuthenticatio
8292
this.coreAuthentication = coreAuthentication;
8393
}
8494

85-
/* Feast Core port to connect to. */
8695
@Positive private int coreCacheRefreshInterval;
8796

8897
private SecurityProperties security;
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright 2018-2021 The Feast Authors
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* https://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package feast.serving.config;
18+
19+
import org.springframework.context.annotation.Condition;
20+
import org.springframework.context.annotation.ConditionContext;
21+
import org.springframework.core.env.Environment;
22+
import org.springframework.core.type.AnnotatedTypeMetadata;
23+
24+
/**
25+
* A {@link Condition} to signal that the ServingService should get feature definitions and metadata
26+
* from the Registry object. This is needed for versions of the feature store written by feast
27+
* 0.10+.
28+
*/
29+
public class RegistryCondition implements Condition {
30+
31+
@Override
32+
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
33+
final Environment env = context.getEnvironment();
34+
return env.getProperty("feast.registry") != null;
35+
}
36+
}

serving/src/main/java/feast/serving/config/ServingServiceConfigV2.java

Lines changed: 62 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@
2020
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
2121
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
2222
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
23+
import com.google.protobuf.AbstractMessageLite;
24+
import feast.serving.registry.LocalRegistryRepo;
2325
import feast.serving.service.OnlineServingServiceV2;
2426
import feast.serving.service.ServingServiceV2;
2527
import feast.serving.specs.CachedSpecService;
28+
import feast.serving.specs.CoreFeatureSpecRetriever;
29+
import feast.serving.specs.FeatureSpecRetriever;
30+
import feast.serving.specs.RegistryFeatureSpecRetriever;
2631
import feast.storage.api.retriever.OnlineRetrieverV2;
2732
import feast.storage.connectors.bigtable.retriever.BigTableOnlineRetriever;
2833
import feast.storage.connectors.bigtable.retriever.BigTableStoreConfig;
@@ -32,13 +37,15 @@
3237
import io.opentracing.Tracer;
3338
import java.io.IOException;
3439
import java.net.InetSocketAddress;
40+
import java.nio.file.Paths;
3541
import java.util.Arrays;
3642
import java.util.List;
3743
import java.util.stream.Collectors;
3844
import org.slf4j.Logger;
3945
import org.springframework.beans.factory.annotation.Autowired;
4046
import org.springframework.context.ApplicationContext;
4147
import org.springframework.context.annotation.Bean;
48+
import org.springframework.context.annotation.Conditional;
4249
import org.springframework.context.annotation.Configuration;
4350
import org.springframework.context.annotation.Lazy;
4451

@@ -64,27 +71,26 @@ public BigtableDataClient bigtableClient(FeastProperties feastProperties) throws
6471
}
6572

6673
@Bean
74+
@Conditional(CoreCondition.class)
6775
public ServingServiceV2 servingServiceV2(
6876
FeastProperties feastProperties, CachedSpecService specService, Tracer tracer) {
69-
ServingServiceV2 servingService = null;
70-
FeastProperties.Store store = feastProperties.getActiveStore();
77+
final ServingServiceV2 servingService;
78+
final FeastProperties.Store store = feastProperties.getActiveStore();
7179

80+
OnlineRetrieverV2 retrieverV2;
7281
switch (store.getType()) {
7382
case REDIS_CLUSTER:
7483
RedisClientAdapter redisClusterClient =
7584
RedisClusterClient.create(store.getRedisClusterConfig());
76-
OnlineRetrieverV2 redisClusterRetriever = new OnlineRetriever(redisClusterClient);
77-
servingService = new OnlineServingServiceV2(redisClusterRetriever, specService, tracer);
85+
retrieverV2 = new OnlineRetriever(redisClusterClient, (AbstractMessageLite::toByteArray));
7886
break;
7987
case REDIS:
8088
RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig());
81-
OnlineRetrieverV2 redisRetriever = new OnlineRetriever(redisClient);
82-
servingService = new OnlineServingServiceV2(redisRetriever, specService, tracer);
89+
retrieverV2 = new OnlineRetriever(redisClient, (AbstractMessageLite::toByteArray));
8390
break;
8491
case BIGTABLE:
8592
BigtableDataClient bigtableClient = context.getBean(BigtableDataClient.class);
86-
OnlineRetrieverV2 bigtableRetriever = new BigTableOnlineRetriever(bigtableClient);
87-
servingService = new OnlineServingServiceV2(bigtableRetriever, specService, tracer);
93+
retrieverV2 = new BigTableOnlineRetriever(bigtableClient);
8894
break;
8995
case CASSANDRA:
9096
CassandraStoreConfig config = feastProperties.getActiveStore().getCassandraConfig();
@@ -109,11 +115,57 @@ public ServingServiceV2 servingServiceV2(
109115
.withLocalDatacenter(dataCenter)
110116
.withKeyspace(keySpace)
111117
.build();
112-
OnlineRetrieverV2 cassandraRetriever = new CassandraOnlineRetriever(session);
113-
servingService = new OnlineServingServiceV2(cassandraRetriever, specService, tracer);
118+
retrieverV2 = new CassandraOnlineRetriever(session);
114119
break;
120+
default:
121+
throw new RuntimeException(
122+
String.format("Unable to identify online store type: %s", store.getType()));
115123
}
116124

125+
final FeatureSpecRetriever featureSpecRetriever;
126+
log.info("Created CoreFeatureSpecRetriever");
127+
featureSpecRetriever = new CoreFeatureSpecRetriever(specService);
128+
129+
servingService = new OnlineServingServiceV2(retrieverV2, tracer, featureSpecRetriever);
130+
131+
return servingService;
132+
}
133+
134+
@Bean
135+
@Conditional(RegistryCondition.class)
136+
public ServingServiceV2 registryBasedServingServiceV2(
137+
FeastProperties feastProperties, Tracer tracer) {
138+
final ServingServiceV2 servingService;
139+
final FeastProperties.Store store = feastProperties.getActiveStore();
140+
141+
OnlineRetrieverV2 retrieverV2;
142+
// TODO: Support more store types, and potentially use a plugin model here.
143+
switch (store.getType()) {
144+
case REDIS_CLUSTER:
145+
RedisClientAdapter redisClusterClient =
146+
RedisClusterClient.create(store.getRedisClusterConfig());
147+
retrieverV2 = new OnlineRetriever(redisClusterClient, new EntityKeySerializerV2());
148+
break;
149+
case REDIS:
150+
RedisClientAdapter redisClient = RedisClient.create(store.getRedisConfig());
151+
log.info("Created EntityKeySerializerV2");
152+
retrieverV2 = new OnlineRetriever(redisClient, new EntityKeySerializerV2());
153+
break;
154+
default:
155+
throw new RuntimeException(
156+
String.format(
157+
"Unable to identify online store type: %s for Regsitry Backed Serving Service",
158+
store.getType()));
159+
}
160+
161+
final FeatureSpecRetriever featureSpecRetriever;
162+
log.info("Created RegistryFeatureSpecRetriever");
163+
log.info("Working Directory = " + System.getProperty("user.dir"));
164+
final LocalRegistryRepo repo = new LocalRegistryRepo(Paths.get(feastProperties.getRegistry()));
165+
featureSpecRetriever = new RegistryFeatureSpecRetriever(repo);
166+
167+
servingService = new OnlineServingServiceV2(retrieverV2, tracer, featureSpecRetriever);
168+
117169
return servingService;
118170
}
119171
}

serving/src/main/java/feast/serving/config/SpecServiceConfig.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package feast.serving.config;
1818

19-
import com.fasterxml.jackson.core.JsonProcessingException;
20-
import com.google.protobuf.InvalidProtocolBufferException;
2119
import feast.serving.specs.CachedSpecService;
2220
import feast.serving.specs.CoreSpecService;
2321
import io.grpc.CallCredentials;
@@ -28,15 +26,16 @@
2826
import org.springframework.beans.factory.ObjectProvider;
2927
import org.springframework.beans.factory.annotation.Autowired;
3028
import org.springframework.context.annotation.Bean;
29+
import org.springframework.context.annotation.Conditional;
3130
import org.springframework.context.annotation.Configuration;
3231

3332
@Configuration
3433
public class SpecServiceConfig {
3534

3635
private static final Logger log = org.slf4j.LoggerFactory.getLogger(SpecServiceConfig.class);
37-
private String feastCoreHost;
38-
private int feastCorePort;
39-
private int feastCachedSpecServiceRefreshInterval;
36+
private final String feastCoreHost;
37+
private final int feastCorePort;
38+
private final int feastCachedSpecServiceRefreshInterval;
4039

4140
@Autowired
4241
public SpecServiceConfig(FeastProperties feastProperties) {
@@ -46,6 +45,7 @@ public SpecServiceConfig(FeastProperties feastProperties) {
4645
}
4746

4847
@Bean
48+
@Conditional(CoreCondition.class)
4949
public ScheduledExecutorService cachedSpecServiceScheduledExecutorService(
5050
CachedSpecService cachedSpecStorage) {
5151
ScheduledExecutorService scheduledExecutorService =
@@ -60,8 +60,8 @@ public ScheduledExecutorService cachedSpecServiceScheduledExecutorService(
6060
}
6161

6262
@Bean
63-
public CachedSpecService specService(ObjectProvider<CallCredentials> callCredentials)
64-
throws InvalidProtocolBufferException, JsonProcessingException {
63+
@Conditional(CoreCondition.class)
64+
public CachedSpecService specService(ObjectProvider<CallCredentials> callCredentials) {
6565
CoreSpecService coreService =
6666
new CoreSpecService(feastCoreHost, feastCorePort, callCredentials);
6767
CachedSpecService cachedSpecStorage = new CachedSpecService(coreService);

serving/src/main/java/feast/serving/controller/HealthServiceController.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest;
2020
import feast.serving.interceptors.GrpcMonitoringInterceptor;
2121
import feast.serving.service.ServingServiceV2;
22-
import feast.serving.specs.CachedSpecService;
2322
import io.grpc.health.v1.HealthGrpc.HealthImplBase;
2423
import io.grpc.health.v1.HealthProto.HealthCheckRequest;
2524
import io.grpc.health.v1.HealthProto.HealthCheckResponse;
@@ -32,12 +31,10 @@
3231

3332
@GrpcService(interceptors = {GrpcMonitoringInterceptor.class})
3433
public class HealthServiceController extends HealthImplBase {
35-
private CachedSpecService specService;
36-
private ServingServiceV2 servingService;
34+
private final ServingServiceV2 servingService;
3735

3836
@Autowired
39-
public HealthServiceController(CachedSpecService specService, ServingServiceV2 servingService) {
40-
this.specService = specService;
37+
public HealthServiceController(final ServingServiceV2 servingService) {
4138
this.servingService = servingService;
4239
}
4340

@@ -47,7 +44,7 @@ public void check(
4744
// TODO: Implement proper logic to determine if ServingServiceV2 is healthy e.g.
4845
// if it's online service check that it the service can retrieve dummy/random
4946
// feature table.
50-
// Implement similary for batch service.
47+
// Implement similarly for batch service.
5148

5249
try {
5350
servingService.getFeastServingInfo(GetFeastServingInfoRequest.getDefaultInstance());

0 commit comments

Comments
 (0)