Skip to content

Commit f1b5c32

Browse files
author
twitter-team
committed
Open-sourcing User Signal Service
User Signal Service (USS) is a centralized online platform that supplies comprehensive data on user actions and behaviors on Twitter. This service stores information on both explicit signals, such as Favorites, Retweets, and replies, and implicit signals like Tweet clicks, profile visits, and more.
1 parent 94ff4ca commit f1b5c32

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+2951
-0
lines changed

user-signal-service/README.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# User Signal Service #
2+
3+
**User Signal Service** (USS) is a centralized online platform that supplies comprehensive data on user actions and behaviors on Twitter. This information encompasses both explicit signals, such as favoriting, retweeting, and replying, as well as implicit signals, including tweet clicks, video views, profile visits, and more.
4+
5+
To ensure consistency and accuracy, USS gathers these signals from various underlying datasets and online services, processing them into uniform formats. These standardized source signals are then utilized in candidate retrieval and machine learning features for ranking stages.

user-signal-service/server/BUILD

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
jvm_binary(
2+
name = "bin",
3+
basename = "user-signal-service",
4+
main = "com.twitter.usersignalservice.UserSignalServiceStratoFedServerMain",
5+
runtime_platform = "java11",
6+
tags = ["bazel-compatible"],
7+
dependencies = [
8+
"3rdparty/jvm/ch/qos/logback:logback-classic",
9+
"loglens/loglens-logback/src/main/scala/com/twitter/loglens/logback",
10+
"strato/src/main/scala/com/twitter/strato/logging/logback",
11+
"user-signal-service/server/src/main/resources",
12+
"user-signal-service/server/src/main/scala/com/twitter/usersignalservice",
13+
],
14+
)
15+
16+
# Aurora Workflows build phase convention requires a jvm_app named with ${project-name}-app
17+
jvm_app(
18+
name = "user-signal-service-app",
19+
archive = "zip",
20+
binary = ":bin",
21+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
resources(
2+
sources = [
3+
"*.xml",
4+
"*.yml",
5+
"config/*.yml",
6+
],
7+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
test_value:
2+
comment: Test Value
3+
default_availability: 10000
4+
dark_traffic_percent:
5+
comment: Percentage of traffic to send to dark traffic destination
6+
default_availability: 0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
<configuration>
2+
<shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
3+
<property name="async_queue_size" value="${queue.size:-50000}"/>
4+
<property name="async_max_flush_time" value="${max.flush.time:-0}"/>
5+
<!-- ===================================================== -->
6+
<!-- Structured Logging -->
7+
<!-- ===================================================== -->
8+
<!-- Only sample 0.1% of the requests -->
9+
<property name="splunk_sampling_rate" value="${splunk_sampling_rate:-0.001}"/>
10+
<include resource="structured-logger-logback.xml"/>
11+
<!-- ===================================================== -->
12+
<!-- Service Config -->
13+
<!-- ===================================================== -->
14+
<property name="DEFAULT_SERVICE_PATTERN"
15+
value="%-16X{transactionId} %logger %msg"/>
16+
17+
<!-- ===================================================== -->
18+
<!-- Common Config -->
19+
<!-- ===================================================== -->
20+
21+
<!-- JUL/JDK14 to Logback bridge -->
22+
<contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
23+
<resetJUL>true</resetJUL>
24+
</contextListener>
25+
26+
<!-- Service Log (Rollover every 50MB, max 11 logs) -->
27+
<appender name="SERVICE" class="ch.qos.logback.core.rolling.RollingFileAppender">
28+
<file>${log.service.output}</file>
29+
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
30+
<fileNamePattern>${log.service.output}.%i</fileNamePattern>
31+
<minIndex>1</minIndex>
32+
<maxIndex>10</maxIndex>
33+
</rollingPolicy>
34+
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
35+
<maxFileSize>50MB</maxFileSize>
36+
</triggeringPolicy>
37+
<encoder>
38+
<pattern>%date %.-3level ${DEFAULT_SERVICE_PATTERN}%n</pattern>
39+
</encoder>
40+
</appender>
41+
42+
<!-- Strato package only log (Rollover every 50MB, max 11 logs) -->
43+
<appender name="STRATO-ONLY" class="ch.qos.logback.core.rolling.RollingFileAppender">
44+
<file>${log.strato_only.output}</file>
45+
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
46+
<fileNamePattern>${log.strato_only.output}.%i</fileNamePattern>
47+
<minIndex>1</minIndex>
48+
<maxIndex>10</maxIndex>
49+
</rollingPolicy>
50+
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
51+
<maxFileSize>50MB</maxFileSize>
52+
</triggeringPolicy>
53+
<encoder>
54+
<pattern>%date %.-3level ${DEFAULT_SERVICE_PATTERN}%n</pattern>
55+
</encoder>
56+
</appender>
57+
58+
<!-- LogLens -->
59+
<appender name="LOGLENS" class="com.twitter.loglens.logback.LoglensAppender">
60+
<mdcAdditionalContext>true</mdcAdditionalContext>
61+
<category>loglens</category>
62+
<index>${log.lens.index}</index>
63+
<tag>${log.lens.tag}/service</tag>
64+
<encoder>
65+
<pattern>%msg%n</pattern>
66+
</encoder>
67+
<turboFilter class="ch.qos.logback.classic.turbo.DuplicateMessageFilter">
68+
<cacheSize>500</cacheSize>
69+
<allowedRepetitions>50</allowedRepetitions>
70+
</turboFilter>
71+
<filter class="com.twitter.strato.logging.logback.RegexFilter">
72+
<forLogger>manhattan-client</forLogger>
73+
<excludeRegex>.*InvalidRequest.*</excludeRegex>
74+
</filter>
75+
</appender>
76+
77+
<!-- ===================================================== -->
78+
<!-- Primary Async Appenders -->
79+
<!-- ===================================================== -->
80+
81+
<appender name="ASYNC-SERVICE" class="ch.qos.logback.classic.AsyncAppender">
82+
<queueSize>${async_queue_size}</queueSize>
83+
<maxFlushTime>${async_max_flush_time}</maxFlushTime>
84+
<appender-ref ref="SERVICE"/>
85+
</appender>
86+
87+
<appender name="ASYNC-STRATO-ONLY" class="ch.qos.logback.classic.AsyncAppender">
88+
<queueSize>${async_queue_size}</queueSize>
89+
<maxFlushTime>${async_max_flush_time}</maxFlushTime>
90+
<appender-ref ref="STRATO-ONLY"/>
91+
</appender>
92+
93+
<appender name="ASYNC-LOGLENS" class="ch.qos.logback.classic.AsyncAppender">
94+
<queueSize>${async_queue_size}</queueSize>
95+
<maxFlushTime>${async_max_flush_time}</maxFlushTime>
96+
<appender-ref ref="LOGLENS"/>
97+
</appender>
98+
99+
<!-- ===================================================== -->
100+
<!-- Package Config -->
101+
<!-- ===================================================== -->
102+
103+
<!-- Per-Package Config (shared) -->
104+
<logger name="com.twitter" level="info"/>
105+
106+
<!--
107+
By default, we leave the strato package at INFO level.
108+
However, this line allows us to set the entire strato package, or a subset of it, to
109+
a specific level. For example, if you pass -Dstrato_log_package=streaming -Dstrato_log_level=DEBUG
110+
only loggers under com.twitter.strato.streaming.* will be set to DEBUG level. Passing only
111+
-Dstrato_log_level will set all of strato.* to the specified level.
112+
-->
113+
<logger name="com.twitter.strato${strato_log_package:-}" level="${strato_log_level:-INFO}"/>
114+
115+
<logger name="com.twitter.wilyns" level="warn"/>
116+
<logger name="com.twitter.finagle.mux" level="warn"/>
117+
<logger name="com.twitter.finagle.serverset2" level="warn"/>
118+
<logger name="com.twitter.logging.ScribeHandler" level="warn"/>
119+
<logger name="com.twitter.zookeeper.client.internal" level="warn"/>
120+
<logger name="com.twitter.decider.StoreDecider" level="warn"/>
121+
122+
<!-- Per-Package Config (Strato) -->
123+
<logger name="com.twitter.distributedlog.client" level="warn"/>
124+
<logger name="com.twitter.finagle.mtls.authorization.config.AccessControlListConfiguration" level="warn"/>
125+
<logger name="com.twitter.finatra.kafka.common.kerberoshelpers" level="warn"/>
126+
<logger name="com.twitter.finatra.kafka.utils.BootstrapServerUtils" level="warn"/>
127+
<logger name="com.twitter.server.coordinate" level="error"/>
128+
<logger name="com.twitter.zookeeper.client" level="info"/>
129+
<logger name="org.apache.zookeeper" level="error"/>
130+
<logger name="org.apache.zookeeper.ClientCnxn" level="warn"/>
131+
<logger name="ZkSession" level="info"/>
132+
<logger name="OptimisticLockingCache" level="off"/>
133+
<logger name="manhattan-client" level="warn"/>
134+
<logger name="strato.op" level="warn"/>
135+
<logger name="org.apache.kafka.clients.NetworkClient" level="error"/>
136+
<logger name="org.apache.kafka.clients.consumer.internals" level="error"/>
137+
<logger name="org.apache.kafka.clients.producer.internals" level="error"/>
138+
<!-- produce a lot of messages like: Building client authenticator with server name kafka -->
139+
<logger name="org.apache.kafka.common.network" level="warn"/>
140+
141+
<!-- Root Config -->
142+
<root level="${log_level:-INFO}">
143+
<appender-ref ref="ASYNC-SERVICE"/>
144+
<appender-ref ref="ASYNC-LOGLENS"/>
145+
</root>
146+
147+
<!-- Strato package only logging-->
148+
<logger name="com.twitter.strato"
149+
level="info"
150+
additivity="true">
151+
<appender-ref ref="ASYNC-STRATO-ONLY" />
152+
</logger>
153+
154+
155+
</configuration>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
scala_library(
2+
compiler_option_sets = ["fatal_warnings"],
3+
tags = ["bazel-compatible"],
4+
dependencies = [
5+
"user-signal-service/server/src/main/scala/com/twitter/usersignalservice/base",
6+
"user-signal-service/server/src/main/scala/com/twitter/usersignalservice/columns",
7+
"user-signal-service/server/src/main/scala/com/twitter/usersignalservice/config",
8+
],
9+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.twitter.usersignalservice
2+
3+
import com.google.inject.Module
4+
import com.twitter.inject.thrift.modules.ThriftClientIdModule
5+
import com.twitter.usersignalservice.columns.UserSignalServiceColumn
6+
import com.twitter.strato.fed._
7+
import com.twitter.strato.fed.server._
8+
import com.twitter.usersignalservice.module.CacheModule
9+
import com.twitter.usersignalservice.module.MHMtlsParamsModule
10+
import com.twitter.usersignalservice.module.SocialGraphServiceClientModule
11+
import com.twitter.usersignalservice.module.TimerModule
12+
13+
object UserSignalServiceStratoFedServerMain extends UserSignalServiceStratoFedServer
14+
15+
trait UserSignalServiceStratoFedServer extends StratoFedServer {
16+
override def dest: String = "/s/user-signal-service/user-signal-service"
17+
18+
override def columns: Seq[Class[_ <: StratoFed.Column]] =
19+
Seq(
20+
classOf[UserSignalServiceColumn]
21+
)
22+
23+
override def modules: Seq[Module] =
24+
Seq(
25+
CacheModule,
26+
MHMtlsParamsModule,
27+
SocialGraphServiceClientModule,
28+
ThriftClientIdModule,
29+
TimerModule,
30+
)
31+
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.twitter.usersignalservice.base
2+
3+
import com.twitter.finagle.stats.StatsReceiver
4+
import com.twitter.frigate.common.base.Stats
5+
import com.twitter.storehaus.ReadableStore
6+
import com.twitter.twistly.common.UserId
7+
import com.twitter.usersignalservice.base.BaseSignalFetcher.Timeout
8+
import com.twitter.usersignalservice.thriftscala.Signal
9+
import com.twitter.usersignalservice.thriftscala.SignalType
10+
import com.twitter.util.Future
11+
import com.twitter.util.Timer
12+
13+
case class AggregatedSignalController(
14+
signalsAggregationInfo: Seq[SignalAggregatedInfo],
15+
signalsWeightMapInfo: Map[SignalType, Double],
16+
stats: StatsReceiver,
17+
timer: Timer)
18+
extends ReadableStore[Query, Seq[Signal]] {
19+
20+
val name: String = this.getClass.getCanonicalName
21+
val statsReceiver: StatsReceiver = stats.scope(name)
22+
23+
override def get(query: Query): Future[Option[Seq[Signal]]] = {
24+
Stats
25+
.trackItems(statsReceiver) {
26+
val allSignalsFut =
27+
Future
28+
.collect(signalsAggregationInfo.map(_.getSignals(query.userId))).map(_.flatten.flatten)
29+
val aggregatedSignals =
30+
allSignalsFut.map { allSignals =>
31+
allSignals
32+
.groupBy(_.targetInternalId).collect {
33+
case (Some(internalId), signals) =>
34+
val mostRecentEnagementTime = signals.map(_.timestamp).max
35+
val totalWeight =
36+
signals
37+
.map(signal => signalsWeightMapInfo.getOrElse(signal.signalType, 0.0)).sum
38+
(Signal(query.signalType, mostRecentEnagementTime, Some(internalId)), totalWeight)
39+
}.toSeq.sortBy { case (signal, weight) => (-weight, -signal.timestamp) }
40+
.map(_._1)
41+
.take(query.maxResults.getOrElse(Int.MaxValue))
42+
}
43+
aggregatedSignals.map(Some(_))
44+
}.raiseWithin(Timeout)(timer).handle {
45+
case e =>
46+
statsReceiver.counter(e.getClass.getCanonicalName).incr()
47+
Some(Seq.empty[Signal])
48+
}
49+
}
50+
}
51+
52+
case class SignalAggregatedInfo(
53+
signalType: SignalType,
54+
signalFetcher: ReadableStore[Query, Seq[Signal]]) {
55+
def getSignals(userId: UserId): Future[Option[Seq[Signal]]] = {
56+
signalFetcher.get(Query(userId, signalType, None))
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
scala_library(
2+
compiler_option_sets = ["fatal_warnings"],
3+
tags = ["bazel-compatible"],
4+
dependencies = [
5+
"3rdparty/src/jvm/com/twitter/storehaus:core",
6+
"finagle/finagle-stats",
7+
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store",
8+
"frigate/frigate-common/src/main/scala/com/twitter/frigate/common/store/strato",
9+
"hermit/hermit-core/src/main/scala/com/twitter/hermit/store/common",
10+
"relevance-platform/src/main/scala/com/twitter/relevance_platform/common/injection",
11+
"src/scala/com/twitter/storehaus_internal/manhattan",
12+
"src/scala/com/twitter/twistly/common",
13+
"src/thrift/com/twitter/simclusters_v2:simclusters_v2-thrift-scala",
14+
"user-signal-service/thrift/src/main/thrift:thrift-scala",
15+
],
16+
)

0 commit comments

Comments
 (0)