Skip to content

Commit a65271d

Browse files
authored
Merge pull request #13 from almacken/feature/enqueue
Added Enqueue Route
2 parents 5bbd2ab + b933a80 commit a65271d

9 files changed

+263
-28
lines changed

Diff for: src/main/resources/application.conf

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
es-priority-mailbox {
2+
mailbox-type = "de.upb.cs.swt.delphi.webapi.ElasticPriorityMailbox"
3+
}
4+
5+
akka.actor.deployment {
6+
/espriomailboxactor {
7+
mailbox = es-priority-mailbox
8+
}
9+
}
10+
11+
akka {
12+
http {
13+
server {
14+
remote-address-header = on
15+
}
16+
}
17+
}
18+
19+
# Use this dispatcher for actors that make blocking calls to the Elasticsearch database
20+
elasticsearch-handling-dispatcher {
21+
type = Dispatcher
22+
executor = "thread-pool-executor"
23+
thread-pool-executor {
24+
fixed-pool-size = 4
25+
# This thread pool is intended for development purposes, and should be increased for production
26+
}
27+
throughput = 1
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
import akka.actor.{ActorSystem, ExtendedActorSystem}
4+
import akka.event.{BusLogging, LoggingAdapter}
5+
6+
trait AppLogging {
7+
def log(implicit system: ActorSystem): LoggingAdapter = new BusLogging(system.eventStream, this.getClass.getName, this.getClass, system.asInstanceOf[ExtendedActorSystem].logFilter)
8+
}
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
package de.upb.cs.swt.delphi.webapi
22

3-
import com.sksamuel.elastic4s.ElasticsearchClientUri
3+
import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType}
4+
import com.sksamuel.elastic4s.http.ElasticDsl._
45

56
/**
67
* @author Ben Hermann
78
*/
8-
class Configuration(val bindHost: String = "0.0.0.0",
9+
class Configuration( //Server and Elasticsearch configuration
10+
val bindHost: String = "0.0.0.0",
911
val bindPort: Int = 8080,
1012
val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(
11-
sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200"))) {
13+
sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200")),
14+
val esProjectIndex: IndexAndType = "delphi" / "project",
15+
16+
//Actor system configuration
17+
val elasticActorPoolSize: Int = 8
18+
) {
1219

1320
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
import akka.actor.{Actor, ActorLogging, Props}
4+
import com.sksamuel.elastic4s.IndexAndType
5+
import com.sksamuel.elastic4s.http.ElasticDsl._
6+
import com.sksamuel.elastic4s.http.HttpClient
7+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
8+
9+
import scala.concurrent.ExecutionContext
10+
import scala.concurrent.duration._
11+
12+
class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{
13+
14+
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher")
15+
val client = HttpClient(configuration.elasticsearchClientUri)
16+
17+
override def preStart(): Unit = log.info("Search actor started")
18+
override def postStop(): Unit = log.info("Search actor shut down")
19+
context.setReceiveTimeout(2 seconds)
20+
21+
override def receive = {
22+
case Enqueue(id) => getSource(id)
23+
case Retrieve(id) => getSource(id)
24+
}
25+
26+
private def getSource(id: String) = {
27+
log.info("Executing get on entry {}", id)
28+
def source = client.execute{
29+
get(id).from(index)
30+
}.await match {
31+
case Right(res) => res.body.get
32+
case Left(_) => Option.empty
33+
}
34+
sender().tell(source, context.self)
35+
}
36+
}
37+
38+
object ElasticActor{
39+
def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index))
40+
.withMailbox("es-priority-mailbox")
41+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
import akka.actor.{Actor, ActorLogging, Props, Terminated}
4+
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router}
5+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage
6+
7+
class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{
8+
9+
private val index = configuration.esProjectIndex
10+
private var elasticRouter = {
11+
val routees = Vector.fill(configuration.elasticActorPoolSize) {
12+
val r = context.actorOf(ElasticActor.props(configuration, index))
13+
context watch r
14+
ActorRefRoutee(r)
15+
}
16+
Router(RoundRobinRoutingLogic(), routees)
17+
}
18+
19+
override def preStart(): Unit = log.info("Actor manager started")
20+
override def postStop(): Unit = log.info("Actor manager shut down")
21+
22+
override def receive = {
23+
case em: ElasticMessage => {
24+
log.info("Forwarding request {} to ElasticActor", em)
25+
elasticRouter.route(em, sender())
26+
}
27+
case Terminated(id) => {
28+
elasticRouter.removeRoutee(id)
29+
val r = context.actorOf(ElasticActor.props(configuration, index))
30+
context watch r
31+
elasticRouter = elasticRouter.addRoutee(r)
32+
}
33+
}
34+
}
35+
36+
object ElasticActorManager{
37+
def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration))
38+
.withMailbox("es-priority-mailbox")
39+
40+
sealed trait ElasticMessage
41+
42+
final case class Retrieve(id: String) extends ElasticMessage
43+
final case class Enqueue(id: String) extends ElasticMessage
44+
}

Diff for: src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala

-20
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
import akka.actor.ActorSystem
4+
import akka.dispatch.{PriorityGenerator, UnboundedStablePriorityMailbox}
5+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
6+
import com.typesafe.config.Config
7+
8+
class ElasticPriorityMailbox (settings: ActorSystem.Settings, config: Config)
9+
extends UnboundedStablePriorityMailbox(
10+
PriorityGenerator{
11+
case Retrieve(_) => 5
12+
case Enqueue(_) => 1
13+
case _ => 2
14+
})
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package de.upb.cs.swt.delphi.webapi
2+
3+
4+
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
5+
import akka.actor.Timers
6+
import akka.http.scaladsl.model.RemoteAddress
7+
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter._
8+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage
9+
10+
import scala.concurrent.duration._
11+
import scala.collection.mutable
12+
13+
//Limits the number of requests any given IP can make by tracking how many requests an IP has made within a given
14+
// window of time, and timing out any IP that exceeds a threshold by rejecting any further request for a period of time
15+
class ElasticRequestLimiter(configuration: Configuration, nextActor: ActorRef) extends Actor with ActorLogging with Timers {
16+
17+
private val window = 1 second
18+
private val threshold = 10
19+
private val timeout = 2 hours
20+
21+
private var recentIPs: mutable.Map[String, Int] = mutable.Map()
22+
private var blockedIPs: mutable.Set[String] = mutable.Set()
23+
24+
override def preStart(): Unit = {
25+
log.info("Request limiter started")
26+
timers.startPeriodicTimer(ClearTimer, ClearLogs, window)
27+
}
28+
override def postStop(): Unit = log.info("Request limiter shut down")
29+
30+
override def receive = {
31+
case Validate(rawIp, message) => {
32+
val ip = rawIp.toOption.map(_.getHostAddress).getOrElse("unknown")
33+
//First, reject IPs marked as blocked
34+
if (blockedIPs.contains(ip)) {
35+
rejectRequest()
36+
} else {
37+
//Check if this IP has made any requests recently
38+
if (recentIPs.contains(ip)) {
39+
//If so, increment their counter and test if they have exceeded the request threshold
40+
recentIPs.update(ip, recentIPs(ip) + 1)
41+
if (recentIPs(ip) > threshold) {
42+
//If the threshold has been exceeded, mark this IP as blocked and reject it, and set up a message to unblock it after a period
43+
blockedIPs += ip
44+
log.info("Blocked IP {} due to exceeding request frequency threshold", ip)
45+
timers.startSingleTimer(ForgiveTimer(ip), Forgive(ip), timeout)
46+
rejectRequest()
47+
} else {
48+
//Else, forward this message
49+
nextActor forward message
50+
}
51+
} else {
52+
//Else, register their request in the map and pass it to the next actor
53+
recentIPs += (ip -> 1)
54+
nextActor forward message
55+
}
56+
}
57+
}
58+
case ClearLogs =>
59+
recentIPs.clear()
60+
case Forgive(ip) => {
61+
blockedIPs -= ip
62+
log.info("Forgave IP {} after timeout", ip)
63+
}
64+
}
65+
66+
//Rejects requests from blocked IPs
67+
private def rejectRequest() =
68+
sender() ! "Sorry, you have exceeded the limit on request frequency for unregistered users.\n" +
69+
"As a result, you have been timed out.\n" +
70+
"Please wait a while or register an account with us to continue using this service."
71+
}
72+
73+
object ElasticRequestLimiter{
74+
def props(configuration: Configuration, nextActor: ActorRef) : Props = Props(new ElasticRequestLimiter(configuration, nextActor))
75+
76+
final case class Validate(rawIp: RemoteAddress, message: ElasticMessage)
77+
final case object ClearLogs
78+
final case class Forgive(ip: String)
79+
80+
final case object ClearTimer
81+
final case class ForgiveTimer(ip: String)
82+
}

Diff for: src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala

+36-5
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,33 @@
11
package de.upb.cs.swt.delphi.webapi
22

3+
import java.util.concurrent.TimeUnit
4+
5+
import akka.actor.ActorSystem
36
import akka.http.scaladsl.server.HttpApp
7+
import akka.pattern.ask
8+
import akka.util.Timeout
49
import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping
10+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
11+
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter.Validate
512
import spray.json._
613

714
/**
815
* Web server configuration for Delphi web API.
916
*/
10-
object Server extends HttpApp with JsonSupport {
17+
object Server extends HttpApp with JsonSupport with AppLogging {
18+
19+
private val configuration = new Configuration()
20+
private val system = ActorSystem("delphi-webapi")
21+
private val actorManager = system.actorOf(ElasticActorManager.props(configuration))
22+
private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager))
23+
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
1124

1225
override def routes =
1326
path("version") { version } ~
1427
path("features") { features } ~
1528
pathPrefix("search" / Remaining) { query => search(query) } ~
16-
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) }
29+
pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~
30+
pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) }
1731

1832

1933
private def version = {
@@ -34,9 +48,25 @@ object Server extends HttpApp with JsonSupport {
3448

3549
def retrieve(identifier: String) = {
3650
get {
37-
complete(
38-
ElasticClient.getSource(identifier)
39-
)
51+
pass { //TODO: Require authentication here
52+
complete(
53+
(actorManager ? Retrieve(identifier)).mapTo[String]
54+
)
55+
} ~ extractClientIP{ ip =>
56+
complete(
57+
(requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String]
58+
)
59+
}
60+
}
61+
}
62+
63+
def enqueue(identifier: String) = {
64+
get {
65+
pass { //TODO: Require authorization here
66+
complete(
67+
(actorManager ? Enqueue(identifier)).mapTo[String]
68+
)
69+
}
4070
}
4171
}
4272

@@ -51,6 +81,7 @@ object Server extends HttpApp with JsonSupport {
5181
def main(args: Array[String]): Unit = {
5282
val configuration = new Configuration()
5383
Server.startServer(configuration.bindHost, configuration.bindPort)
84+
system.terminate()
5485
}
5586

5687

0 commit comments

Comments
 (0)