Skip to content

Commit e5470cc

Browse files
feat(reactor netty): adding reactor-netty based backend service, http host client, and related components
1 parent abf4df6 commit e5470cc

File tree

16 files changed

+3643
-2
lines changed

16 files changed

+3643
-2
lines changed

components/client/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@
3232
<classifier>linux-aarch_64</classifier>
3333
</dependency>
3434

35+
<dependency>
36+
<groupId>io.projectreactor.netty</groupId>
37+
<artifactId>reactor-netty</artifactId>
38+
</dependency>
39+
3540
<dependency>
3641
<groupId>com.hotels.styx</groupId>
3742
<artifactId>styx-api</artifactId>
@@ -78,16 +83,19 @@
7883
<dependency>
7984
<groupId>org.hamcrest</groupId>
8085
<artifactId>hamcrest</artifactId>
86+
<scope>test</scope>
8187
</dependency>
8288

8389
<dependency>
8490
<groupId>org.scalatest</groupId>
8591
<artifactId>scalatest_${scala.version}</artifactId>
92+
<scope>test</scope>
8693
</dependency>
8794

8895
<dependency>
8996
<groupId>org.mockito</groupId>
9097
<artifactId>mockito-core</artifactId>
98+
<scope>test</scope>
9199
</dependency>
92100

93101
<dependency>
@@ -96,6 +104,24 @@
96104
<scope>test</scope>
97105
</dependency>
98106

107+
<dependency>
108+
<groupId>io.mockk</groupId>
109+
<artifactId>mockk-jvm</artifactId>
110+
<scope>test</scope>
111+
</dependency>
112+
113+
<dependency>
114+
<groupId>com.squareup.okhttp3</groupId>
115+
<artifactId>mockwebserver</artifactId>
116+
<scope>test</scope>
117+
</dependency>
118+
119+
<dependency>
120+
<groupId>com.squareup.okhttp3</groupId>
121+
<artifactId>okhttp-tls</artifactId>
122+
<scope>test</scope>
123+
</dependency>
124+
99125
</dependencies>
100126

101127
<build>
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
Copyright (C) 2013-2024 Expedia Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
package com.hotels.styx.client
17+
18+
import com.hotels.styx.api.HttpHeaderNames.CONTENT_LENGTH
19+
import com.hotels.styx.api.HttpHeaderNames.HOST
20+
import com.hotels.styx.api.HttpHeaderNames.TRANSFER_ENCODING
21+
import com.hotels.styx.api.HttpInterceptor
22+
import com.hotels.styx.api.HttpMethod
23+
import com.hotels.styx.api.Id
24+
import com.hotels.styx.api.LiveHttpRequest
25+
import com.hotels.styx.api.LiveHttpResponse
26+
import com.hotels.styx.api.exceptions.NoAvailableHostsException
27+
import com.hotels.styx.api.extension.Origin
28+
import com.hotels.styx.api.extension.RemoteHost
29+
import com.hotels.styx.api.extension.loadbalancing.spi.LoadBalancer
30+
import com.hotels.styx.api.extension.retrypolicy.spi.RetryPolicy
31+
import com.hotels.styx.api.extension.service.StickySessionConfig
32+
import com.hotels.styx.client.stickysession.StickySessionCookie.newStickySessionCookie
33+
import com.hotels.styx.client.stickysession.StickySessionLoadBalancingStrategy
34+
import com.hotels.styx.ext.newRequest
35+
import com.hotels.styx.ext.newResponse
36+
import com.hotels.styx.metrics.CentralisedMetrics
37+
import org.reactivestreams.Publisher
38+
import reactor.core.publisher.Mono
39+
import java.util.Objects.nonNull
40+
import java.util.Optional
41+
42+
/**
43+
* A configurable HTTP client with integration of Reactor Netty client
44+
*/
45+
class ReactorBackendServiceClient(
46+
private val id: Id,
47+
private val rewriteRuleset: RewriteRuleset,
48+
private val originsRestrictionCookieName: String?,
49+
private val stickySessionConfig: StickySessionConfig,
50+
private val originIdHeader: CharSequence,
51+
private val loadBalancer: LoadBalancer,
52+
private val retryPolicy: RetryPolicy,
53+
private val metrics: CentralisedMetrics,
54+
private val overrideHostHeader: Boolean,
55+
) : BackendServiceClient {
56+
override fun sendRequest(
57+
request: LiveHttpRequest,
58+
context: HttpInterceptor.Context,
59+
): Publisher<LiveHttpResponse> = sendRequest(rewriteUrl(request), emptyList(), 0, context)
60+
61+
private fun sendRequest(
62+
request: LiveHttpRequest,
63+
previousOrigins: List<RemoteHost>,
64+
attempt: Int,
65+
context: HttpInterceptor.Context,
66+
): Publisher<LiveHttpResponse> {
67+
if (attempt >= MAX_RETRY_ATTEMPTS) {
68+
return Mono.error(NoAvailableHostsException(id))
69+
}
70+
val remoteHost = selectOrigin(request)
71+
return if (remoteHost.isPresent) {
72+
val host = remoteHost.get()
73+
val updatedRequest = shouldOverrideHostHeader(host, request)
74+
val newPreviousOrigins = previousOrigins.toMutableList()
75+
newPreviousOrigins.add(host)
76+
Mono.from(host.hostClient().handle(updatedRequest, context))
77+
.doOnNext { recordErrorStatusMetrics(it) }
78+
.map { response ->
79+
response.addStickySessionIdentifier(host.origin())
80+
.removeUnexpectedResponseBody(updatedRequest)
81+
.removeRedundantContentLengthHeader()
82+
.addOriginId(host.id())
83+
.let { LiveHttpResponse.Builder(it).request(updatedRequest).build() }
84+
}
85+
.onErrorResume { cause ->
86+
val retryContext = RetryPolicyContext(id, attempt + 1, cause, updatedRequest, previousOrigins)
87+
retry(updatedRequest, retryContext, newPreviousOrigins, attempt + 1, cause, context)
88+
}
89+
} else {
90+
val retryContext = RetryPolicyContext(id, attempt + 1, null, request, previousOrigins)
91+
retry(request, retryContext, previousOrigins, attempt + 1, NoAvailableHostsException(id), context)
92+
}
93+
}
94+
95+
private fun recordErrorStatusMetrics(response: LiveHttpResponse) {
96+
val statusCode = response.status().code()
97+
if (statusCode.isErrorStatus()) {
98+
metrics.proxy.client.errorResponseFromOriginByStatus(statusCode).increment()
99+
}
100+
}
101+
102+
private fun Int.isErrorStatus() = this >= 400
103+
104+
private fun bodyNeedsToBeRemoved(
105+
request: LiveHttpRequest,
106+
response: LiveHttpResponse,
107+
) = isHeadRequest(request) || isBodilessResponse(response)
108+
109+
private fun responseWithoutBody(response: LiveHttpResponse) =
110+
response.newResponse {
111+
header(CONTENT_LENGTH, 0)
112+
removeHeader(TRANSFER_ENCODING)
113+
removeBody()
114+
}
115+
116+
private fun isBodilessResponse(response: LiveHttpResponse): Boolean =
117+
when (val code = response.status().code()) {
118+
204, 304 -> true
119+
else -> code / 100 == 1
120+
}
121+
122+
private fun isHeadRequest(request: LiveHttpRequest): Boolean = request.method() == HttpMethod.HEAD
123+
124+
private fun shouldOverrideHostHeader(
125+
host: RemoteHost,
126+
request: LiveHttpRequest,
127+
): LiveHttpRequest =
128+
if (overrideHostHeader && !host.origin().host().isNullOrBlank()) {
129+
request.newRequest { header(HOST, host.origin().host()) }
130+
} else {
131+
request
132+
}
133+
134+
private fun LiveHttpResponse.addOriginId(originId: Id): LiveHttpResponse =
135+
newResponse {
136+
header(originIdHeader, originId)
137+
}
138+
139+
private fun retry(
140+
request: LiveHttpRequest,
141+
retryContext: RetryPolicyContext,
142+
previousOrigins: List<RemoteHost>,
143+
attempt: Int,
144+
cause: Throwable,
145+
context: HttpInterceptor.Context,
146+
): Mono<LiveHttpResponse> {
147+
val lbContext: LoadBalancer.Preferences =
148+
object : LoadBalancer.Preferences {
149+
override fun preferredOrigins(): Optional<String> = Optional.empty()
150+
151+
override fun avoidOrigins(): List<Origin> = previousOrigins.map { it.origin() }
152+
}
153+
return if (retryPolicy.evaluate(retryContext, loadBalancer, lbContext).shouldRetry()) {
154+
Mono.from(sendRequest(request, previousOrigins, attempt, context))
155+
} else {
156+
Mono.error(cause)
157+
}
158+
}
159+
160+
private fun LiveHttpResponse.removeUnexpectedResponseBody(request: LiveHttpRequest) =
161+
if (bodyNeedsToBeRemoved(request, this)) {
162+
responseWithoutBody(this)
163+
} else {
164+
this
165+
}
166+
167+
private fun LiveHttpResponse.removeRedundantContentLengthHeader() =
168+
if (contentLength().isPresent && chunked()) {
169+
newResponse {
170+
removeHeader(CONTENT_LENGTH)
171+
}
172+
} else {
173+
this
174+
}
175+
176+
private fun selectOrigin(rewrittenRequest: LiveHttpRequest): Optional<RemoteHost> {
177+
val preferences =
178+
object : LoadBalancer.Preferences {
179+
override fun preferredOrigins(): Optional<String> {
180+
return if (nonNull(originsRestrictionCookieName)) {
181+
rewrittenRequest.cookie(originsRestrictionCookieName)
182+
.map { it.value() }
183+
.or { rewrittenRequest.cookie("styx_origin_$id").map { it.value() } }
184+
} else {
185+
rewrittenRequest.cookie("styx_origin_$id").map { it.value() }
186+
}
187+
}
188+
189+
override fun avoidOrigins(): List<Origin> = emptyList()
190+
}
191+
return loadBalancer.choose(preferences)
192+
}
193+
194+
private fun LiveHttpResponse.addStickySessionIdentifier(origin: Origin): LiveHttpResponse =
195+
if (loadBalancer is StickySessionLoadBalancingStrategy) {
196+
val maxAge = stickySessionConfig.stickySessionTimeoutSeconds()
197+
newResponse {
198+
addCookies(newStickySessionCookie(id, origin.id(), maxAge))
199+
}
200+
} else {
201+
this
202+
}
203+
204+
private fun rewriteUrl(request: LiveHttpRequest): LiveHttpRequest = rewriteRuleset.rewrite(request)
205+
206+
private class RetryPolicyContext(
207+
private val appId: Id,
208+
private val retryCount: Int,
209+
private val lastException: Throwable?,
210+
private val request: LiveHttpRequest,
211+
private val previouslyUsedOrigins: Iterable<RemoteHost>,
212+
) : RetryPolicy.Context {
213+
override fun appId(): Id = appId
214+
215+
override fun currentRetryCount(): Int = retryCount
216+
217+
override fun lastException(): Optional<Throwable> = Optional.ofNullable(lastException)
218+
219+
override fun currentRequest(): LiveHttpRequest = request
220+
221+
override fun previousOrigins(): Iterable<RemoteHost> = previouslyUsedOrigins
222+
223+
override fun toString(): String =
224+
buildString {
225+
append("appId", appId)
226+
append(", retryCount", retryCount)
227+
append(", lastException", lastException)
228+
append(", request", request.url())
229+
append(", previouslyUsedOrigins", previouslyUsedOrigins)
230+
}
231+
232+
fun hosts(): String = hosts(previouslyUsedOrigins)
233+
234+
companion object {
235+
private fun hosts(origins: Iterable<RemoteHost>): String =
236+
origins.asSequence().map { it.origin().hostAndPortString() }.joinToString(", ")
237+
}
238+
}
239+
240+
override fun toString(): String =
241+
buildString {
242+
append("id", id)
243+
append(", stickySessionConfig", stickySessionConfig)
244+
append(", retryPolicy", retryPolicy)
245+
append(", rewriteRuleset", rewriteRuleset)
246+
append(", loadBalancingStrategy", loadBalancer)
247+
append(", overrideHostHeader", overrideHostHeader)
248+
}
249+
250+
companion object {
251+
private const val MAX_RETRY_ATTEMPTS = 3
252+
}
253+
}

0 commit comments

Comments
 (0)