Skip to content

Commit 43cdcf2

Browse files
author
twitter-team
committed
Open-sourcing Representation Manager
Representation Manager (RMS) serves as a centralized embedding management system, providing SimClusters or other embeddings as facade of the underlying storage or services.
1 parent 197bf2c commit 43cdcf2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+3439
-0
lines changed

representation-manager/BUILD.bazel

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# This prevents SQ query from grabbing //:all since it traverses up once to find a BUILD

representation-manager/README.md

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Representation Manager #
2+
3+
**Representation Manager** (RMS) serves as a centralized embedding management system, providing SimClusters or other embeddings as facade of the underlying storage or services.
4+

representation-manager/bin/deploy.sh

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
#!/usr/bin/env bash
2+
3+
JOB=representation-manager bazel run --ui_event_filters=-info,-stdout,-stderr --noshow_progress \
4+
//relevance-platform/src/main/python/deploy -- "$@"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
scala_library(
2+
compiler_option_sets = ["fatal_warnings"],
3+
platform = "java8",
4+
tags = ["bazel-compatible"],
5+
dependencies = [
6+
"finatra/inject/inject-thrift-client",
7+
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/strato",
8+
"hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common",
9+
"relevance-platform/src/main/scala/com/twitter/relevance_platform/common/readablestore",
10+
"representation-manager/client/src/main/scala/com/twitter/representation_manager/config",
11+
"representation-manager/server/src/main/thrift:thrift-scala",
12+
"src/scala/com/twitter/simclusters_v2/common",
13+
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala",
14+
"stitch/stitch-storehaus",
15+
"strato/src/main/scala/com/twitter/strato/client",
16+
],
17+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
package com.twitter.representation_manager
2+
3+
import com.twitter.finagle.memcached.{Client => MemcachedClient}
4+
import com.twitter.finagle.stats.StatsReceiver
5+
import com.twitter.frigate.common.store.strato.StratoFetchableStore
6+
import com.twitter.hermit.store.common.ObservedCachedReadableStore
7+
import com.twitter.hermit.store.common.ObservedReadableStore
8+
import com.twitter.representation_manager.config.ClientConfig
9+
import com.twitter.representation_manager.config.DisabledInMemoryCacheParams
10+
import com.twitter.representation_manager.config.EnabledInMemoryCacheParams
11+
import com.twitter.representation_manager.thriftscala.SimClustersEmbeddingView
12+
import com.twitter.simclusters_v2.common.SimClustersEmbedding
13+
import com.twitter.simclusters_v2.thriftscala.InternalId
14+
import com.twitter.simclusters_v2.thriftscala.LocaleEntityId
15+
import com.twitter.simclusters_v2.thriftscala.SimClustersEmbeddingId
16+
import com.twitter.simclusters_v2.thriftscala.TopicId
17+
import com.twitter.simclusters_v2.thriftscala.{SimClustersEmbedding => ThriftSimClustersEmbedding}
18+
import com.twitter.storehaus.ReadableStore
19+
import com.twitter.strato.client.{Client => StratoClient}
20+
import com.twitter.strato.thrift.ScroogeConvImplicits._
21+
22+
/**
23+
* This is the class that offers features to build readable stores for a given
24+
* SimClustersEmbeddingView (i.e. embeddingType and modelVersion). It applies ClientConfig
25+
* for a particular service and build ReadableStores which implement that config.
26+
*/
27+
class StoreBuilder(
28+
clientConfig: ClientConfig,
29+
stratoClient: StratoClient,
30+
memCachedClient: MemcachedClient,
31+
globalStats: StatsReceiver,
32+
) {
33+
private val stats =
34+
globalStats.scope("representation_manager_client").scope(this.getClass.getSimpleName)
35+
36+
// Column consts
37+
private val ColPathPrefix = "recommendations/representation_manager/"
38+
private val SimclustersTweetColPath = ColPathPrefix + "simClustersEmbedding.Tweet"
39+
private val SimclustersUserColPath = ColPathPrefix + "simClustersEmbedding.User"
40+
private val SimclustersTopicIdColPath = ColPathPrefix + "simClustersEmbedding.TopicId"
41+
private val SimclustersLocaleEntityIdColPath =
42+
ColPathPrefix + "simClustersEmbedding.LocaleEntityId"
43+
44+
def buildSimclustersTweetEmbeddingStore(
45+
embeddingColumnView: SimClustersEmbeddingView
46+
): ReadableStore[Long, SimClustersEmbedding] = {
47+
val rawStore = StratoFetchableStore
48+
.withView[Long, SimClustersEmbeddingView, ThriftSimClustersEmbedding](
49+
stratoClient,
50+
SimclustersTweetColPath,
51+
embeddingColumnView)
52+
.mapValues(SimClustersEmbedding(_))
53+
54+
addCacheLayer(rawStore, embeddingColumnView)
55+
}
56+
57+
def buildSimclustersUserEmbeddingStore(
58+
embeddingColumnView: SimClustersEmbeddingView
59+
): ReadableStore[Long, SimClustersEmbedding] = {
60+
val rawStore = StratoFetchableStore
61+
.withView[Long, SimClustersEmbeddingView, ThriftSimClustersEmbedding](
62+
stratoClient,
63+
SimclustersUserColPath,
64+
embeddingColumnView)
65+
.mapValues(SimClustersEmbedding(_))
66+
67+
addCacheLayer(rawStore, embeddingColumnView)
68+
}
69+
70+
def buildSimclustersTopicIdEmbeddingStore(
71+
embeddingColumnView: SimClustersEmbeddingView
72+
): ReadableStore[TopicId, SimClustersEmbedding] = {
73+
val rawStore = StratoFetchableStore
74+
.withView[TopicId, SimClustersEmbeddingView, ThriftSimClustersEmbedding](
75+
stratoClient,
76+
SimclustersTopicIdColPath,
77+
embeddingColumnView)
78+
.mapValues(SimClustersEmbedding(_))
79+
80+
addCacheLayer(rawStore, embeddingColumnView)
81+
}
82+
83+
def buildSimclustersLocaleEntityIdEmbeddingStore(
84+
embeddingColumnView: SimClustersEmbeddingView
85+
): ReadableStore[LocaleEntityId, SimClustersEmbedding] = {
86+
val rawStore = StratoFetchableStore
87+
.withView[LocaleEntityId, SimClustersEmbeddingView, ThriftSimClustersEmbedding](
88+
stratoClient,
89+
SimclustersLocaleEntityIdColPath,
90+
embeddingColumnView)
91+
.mapValues(SimClustersEmbedding(_))
92+
93+
addCacheLayer(rawStore, embeddingColumnView)
94+
}
95+
96+
def buildSimclustersTweetEmbeddingStoreWithEmbeddingIdAsKey(
97+
embeddingColumnView: SimClustersEmbeddingView
98+
): ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = {
99+
val rawStore = StratoFetchableStore
100+
.withView[Long, SimClustersEmbeddingView, ThriftSimClustersEmbedding](
101+
stratoClient,
102+
SimclustersTweetColPath,
103+
embeddingColumnView)
104+
.mapValues(SimClustersEmbedding(_))
105+
val embeddingIdAsKeyStore = rawStore.composeKeyMapping[SimClustersEmbeddingId] {
106+
case SimClustersEmbeddingId(_, _, InternalId.TweetId(tweetId)) =>
107+
tweetId
108+
}
109+
110+
addCacheLayer(embeddingIdAsKeyStore, embeddingColumnView)
111+
}
112+
113+
def buildSimclustersUserEmbeddingStoreWithEmbeddingIdAsKey(
114+
embeddingColumnView: SimClustersEmbeddingView
115+
): ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = {
116+
val rawStore = StratoFetchableStore
117+
.withView[Long, SimClustersEmbeddingView, ThriftSimClustersEmbedding](
118+
stratoClient,
119+
SimclustersUserColPath,
120+
embeddingColumnView)
121+
.mapValues(SimClustersEmbedding(_))
122+
val embeddingIdAsKeyStore = rawStore.composeKeyMapping[SimClustersEmbeddingId] {
123+
case SimClustersEmbeddingId(_, _, InternalId.UserId(userId)) =>
124+
userId
125+
}
126+
127+
addCacheLayer(embeddingIdAsKeyStore, embeddingColumnView)
128+
}
129+
130+
def buildSimclustersTopicEmbeddingStoreWithEmbeddingIdAsKey(
131+
embeddingColumnView: SimClustersEmbeddingView
132+
): ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = {
133+
val rawStore = StratoFetchableStore
134+
.withView[TopicId, SimClustersEmbeddingView, ThriftSimClustersEmbedding](
135+
stratoClient,
136+
SimclustersTopicIdColPath,
137+
embeddingColumnView)
138+
.mapValues(SimClustersEmbedding(_))
139+
val embeddingIdAsKeyStore = rawStore.composeKeyMapping[SimClustersEmbeddingId] {
140+
case SimClustersEmbeddingId(_, _, InternalId.TopicId(topicId)) =>
141+
topicId
142+
}
143+
144+
addCacheLayer(embeddingIdAsKeyStore, embeddingColumnView)
145+
}
146+
147+
def buildSimclustersTopicIdEmbeddingStoreWithEmbeddingIdAsKey(
148+
embeddingColumnView: SimClustersEmbeddingView
149+
): ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = {
150+
val rawStore = StratoFetchableStore
151+
.withView[TopicId, SimClustersEmbeddingView, ThriftSimClustersEmbedding](
152+
stratoClient,
153+
SimclustersTopicIdColPath,
154+
embeddingColumnView)
155+
.mapValues(SimClustersEmbedding(_))
156+
val embeddingIdAsKeyStore = rawStore.composeKeyMapping[SimClustersEmbeddingId] {
157+
case SimClustersEmbeddingId(_, _, InternalId.TopicId(topicId)) =>
158+
topicId
159+
}
160+
161+
addCacheLayer(embeddingIdAsKeyStore, embeddingColumnView)
162+
}
163+
164+
def buildSimclustersLocaleEntityIdEmbeddingStoreWithEmbeddingIdAsKey(
165+
embeddingColumnView: SimClustersEmbeddingView
166+
): ReadableStore[SimClustersEmbeddingId, SimClustersEmbedding] = {
167+
val rawStore = StratoFetchableStore
168+
.withView[LocaleEntityId, SimClustersEmbeddingView, ThriftSimClustersEmbedding](
169+
stratoClient,
170+
SimclustersLocaleEntityIdColPath,
171+
embeddingColumnView)
172+
.mapValues(SimClustersEmbedding(_))
173+
val embeddingIdAsKeyStore = rawStore.composeKeyMapping[SimClustersEmbeddingId] {
174+
case SimClustersEmbeddingId(_, _, InternalId.LocaleEntityId(localeEntityId)) =>
175+
localeEntityId
176+
}
177+
178+
addCacheLayer(embeddingIdAsKeyStore, embeddingColumnView)
179+
}
180+
181+
private def addCacheLayer[K](
182+
rawStore: ReadableStore[K, SimClustersEmbedding],
183+
embeddingColumnView: SimClustersEmbeddingView,
184+
): ReadableStore[K, SimClustersEmbedding] = {
185+
// Add in-memory caching based on ClientConfig
186+
val inMemCacheParams = clientConfig.inMemoryCacheConfig
187+
.getCacheSetup(embeddingColumnView.embeddingType, embeddingColumnView.modelVersion)
188+
189+
val statsPerStore = stats
190+
.scope(embeddingColumnView.embeddingType.name).scope(embeddingColumnView.modelVersion.name)
191+
192+
inMemCacheParams match {
193+
case DisabledInMemoryCacheParams =>
194+
ObservedReadableStore(
195+
store = rawStore
196+
)(statsPerStore)
197+
case EnabledInMemoryCacheParams(ttl, maxKeys, cacheName) =>
198+
ObservedCachedReadableStore.from[K, SimClustersEmbedding](
199+
rawStore,
200+
ttl = ttl,
201+
maxKeys = maxKeys,
202+
cacheName = cacheName,
203+
windowSize = 10000L
204+
)(statsPerStore)
205+
}
206+
}
207+
208+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
scala_library(
2+
compiler_option_sets = ["fatal_warnings"],
3+
platform = "java8",
4+
tags = ["bazel-compatible"],
5+
dependencies = [
6+
"finatra/inject/inject-thrift-client",
7+
"representation-manager/server/src/main/scala/com/twitter/representation_manager/common",
8+
"representation-manager/server/src/main/thrift:thrift-scala",
9+
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala",
10+
"strato/src/main/scala/com/twitter/strato/client",
11+
],
12+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.twitter.representation_manager.config
2+
3+
import com.twitter.simclusters_v2.thriftscala.EmbeddingType
4+
import com.twitter.simclusters_v2.thriftscala.ModelVersion
5+
6+
/*
7+
* This is RMS client config class.
8+
* We only support setting up in memory cache params for now, but we expect to enable other
9+
* customisations in the near future e.g. request timeout
10+
*
11+
* --------------------------------------------
12+
* PLEASE NOTE:
13+
* Having in-memory cache is not necessarily a free performance win, anyone considering it should
14+
* investigate rather than blindly enabling it
15+
* */
16+
class ClientConfig(inMemCacheParamsOverrides: Map[
17+
(EmbeddingType, ModelVersion),
18+
InMemoryCacheParams
19+
] = Map.empty) {
20+
// In memory cache config per embedding
21+
val inMemCacheParams = DefaultInMemoryCacheConfig.cacheParamsMap ++ inMemCacheParamsOverrides
22+
val inMemoryCacheConfig = new InMemoryCacheConfig(inMemCacheParams)
23+
}
24+
25+
object DefaultClientConfig extends ClientConfig
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.twitter.representation_manager.config
2+
3+
import com.twitter.simclusters_v2.thriftscala.EmbeddingType
4+
import com.twitter.simclusters_v2.thriftscala.ModelVersion
5+
import com.twitter.util.Duration
6+
7+
/*
8+
* --------------------------------------------
9+
* PLEASE NOTE:
10+
* Having in-memory cache is not necessarily a free performance win, anyone considering it should
11+
* investigate rather than blindly enabling it
12+
* --------------------------------------------
13+
* */
14+
15+
sealed trait InMemoryCacheParams
16+
17+
/*
18+
* This holds params that is required to set up a in-mem cache for a single embedding store
19+
*/
20+
case class EnabledInMemoryCacheParams(
21+
ttl: Duration,
22+
maxKeys: Int,
23+
cacheName: String)
24+
extends InMemoryCacheParams
25+
object DisabledInMemoryCacheParams extends InMemoryCacheParams
26+
27+
/*
28+
* This is the class for the in-memory cache config. Client could pass in their own cacheParamsMap to
29+
* create a new InMemoryCacheConfig instead of using the DefaultInMemoryCacheConfig object below
30+
* */
31+
class InMemoryCacheConfig(
32+
cacheParamsMap: Map[
33+
(EmbeddingType, ModelVersion),
34+
InMemoryCacheParams
35+
] = Map.empty) {
36+
37+
def getCacheSetup(
38+
embeddingType: EmbeddingType,
39+
modelVersion: ModelVersion
40+
): InMemoryCacheParams = {
41+
// When requested embedding type doesn't exist, we return DisabledInMemoryCacheParams
42+
cacheParamsMap.getOrElse((embeddingType, modelVersion), DisabledInMemoryCacheParams)
43+
}
44+
}
45+
46+
/*
47+
* Default config for the in-memory cache
48+
* Clients can directly import and use this one if they don't want to set up a customised config
49+
* */
50+
object DefaultInMemoryCacheConfig extends InMemoryCacheConfig {
51+
// set default to no in-memory caching
52+
val cacheParamsMap = Map.empty
53+
}

representation-manager/server/BUILD

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
jvm_binary(
2+
name = "bin",
3+
basename = "representation-manager",
4+
main = "com.twitter.representation_manager.RepresentationManagerFedServerMain",
5+
platform = "java8",
6+
tags = ["bazel-compatible"],
7+
dependencies = [
8+
"finatra/inject/inject-logback/src/main/scala",
9+
"loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback",
10+
"representation-manager/server/src/main/resources",
11+
"representation-manager/server/src/main/scala/com/twitter/representation_manager",
12+
"twitter-server/logback-classic/src/main/scala",
13+
],
14+
)
15+
16+
# Aurora Workflows build phase convention requires a jvm_app named with ${project-name}-app
17+
jvm_app(
18+
name = "representation-manager-app",
19+
archive = "zip",
20+
binary = ":bin",
21+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
resources(
2+
sources = [
3+
"*.xml",
4+
"config/*.yml",
5+
],
6+
tags = ["bazel-compatible"],
7+
)

0 commit comments

Comments
 (0)