Skip to content

Commit c6c53dc

Browse files
committed
Improved actor routing
1 parent 39b0374 commit c6c53dc

File tree

6 files changed

+46
-15
lines changed

6 files changed

+46
-15
lines changed

src/main/resources/application.conf

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,15 @@ akka {
1414
remote-address-header = on
1515
}
1616
}
17+
}
18+
19+
# Use this dispatcher for actors that make blocking calls to the Elasticsearch database
20+
elasticsearch-handling-dispatcher {
21+
type = Dispatcher
22+
executor = "thread-pool-executor"
23+
thread-pool-executor {
24+
fixed-pool-size = 4
25+
# This thread pool is intended for development purposes, and should be increased for production
26+
}
27+
throughput = 1
1728
}
Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
package de.upb.cs.swt.delphi.webapi
22

3-
import com.sksamuel.elastic4s.ElasticsearchClientUri
3+
import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType}
4+
import com.sksamuel.elastic4s.http.ElasticDsl._
45

56
/**
67
* @author Ben Hermann
78
*/
8-
class Configuration(val bindHost: String = "0.0.0.0",
9+
class Configuration( //Server and Elasticsearch configuration
10+
val bindHost: String = "0.0.0.0",
911
val bindPort: Int = 8080,
1012
val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri(
11-
sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200"))) {
13+
sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200")),
14+
val esProjectIndex: IndexAndType = "delphi" / "project",
15+
16+
//Actor system configuration
17+
val elasticActorPoolSize: Int = 8
18+
) {
1219

1320
}

src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package de.upb.cs.swt.delphi.webapi
22

3-
import akka.actor.{Actor, ActorLogging, Props, ReceiveTimeout}
3+
import akka.actor.{Actor, ActorLogging, Props}
44
import com.sksamuel.elastic4s.IndexAndType
55
import com.sksamuel.elastic4s.http.ElasticDsl._
66
import com.sksamuel.elastic4s.http.HttpClient
77
import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource
88

9+
import scala.concurrent.ExecutionContext
910
import scala.concurrent.duration._
1011

1112
class ElasticActor(configuration: Configuration) extends Actor with ActorLogging{
1213

14+
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher")
1315
val client = HttpClient(configuration.elasticsearchClientUri)
1416

1517
override def preStart(): Unit = log.info("Search actor started")
@@ -26,9 +28,7 @@ class ElasticActor(configuration: Configuration) extends Actor with ActorLogging
2628
case Left(_) => Option.empty
2729
}
2830
sender().tell(source, context.self)
29-
context.stop(self)
3031
}
31-
case ReceiveTimeout => context.stop(self)
3232
}
3333
}
3434

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,48 @@
11
package de.upb.cs.swt.delphi.webapi
22

3-
import akka.actor.{Actor, ActorLogging, Props}
4-
import com.sksamuel.elastic4s.http.ElasticDsl._
3+
import akka.actor.{Actor, ActorLogging, Props, Terminated}
4+
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router}
55
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
66
import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource
77

88
class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{
99

10-
private val index = "delphi" / "project"
10+
private val index = configuration.esProjectIndex
11+
private var elasticRouter = {
12+
val routees = Vector.fill(configuration.elasticActorPoolSize) {
13+
val r = context.actorOf(ElasticActor.props(configuration))
14+
context watch r
15+
ActorRefRoutee(r)
16+
}
17+
Router(RoundRobinRoutingLogic(), routees)
18+
}
1119

1220
override def preStart(): Unit = log.info("Actor manager started")
1321
override def postStop(): Unit = log.info("Actor manager shut down")
1422

1523
override def receive = {
1624
case Retrieve(id) => getSource(id)
1725
case Enqueue(id) => getSource(id)
26+
case Terminated(id) => {
27+
elasticRouter.removeRoutee(id)
28+
val r = context.actorOf(ElasticActor.props(configuration))
29+
context watch r
30+
elasticRouter = elasticRouter.addRoutee(r)
31+
}
1832
}
1933

2034
private def getSource(id: String) = {
21-
log.info("Creating actor to search for entry {}", id)
22-
val retrieveActor = context.actorOf(ElasticActor.props(configuration))
23-
retrieveActor forward GetSource(id, index)
35+
log.info("Forwarding search for entry {}", id)
36+
elasticRouter.route(GetSource(id, index), sender())
2437
}
2538
}
2639

2740
object ElasticActorManager{
2841
def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration))
2942
.withMailbox("es-priority-mailbox")
3043

44+
sealed trait ElasticMessage
45+
3146
final case class Retrieve(id: String) extends ElasticMessage
3247
final case class Enqueue(id: String) extends ElasticMessage
3348
}

src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticMessage.scala

Lines changed: 0 additions & 3 deletions
This file was deleted.

src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import akka.actor.{Actor, ActorLogging, ActorRef, Props}
55
import akka.actor.Timers
66
import akka.http.scaladsl.model.RemoteAddress
77
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter._
8+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage
89

910
import scala.concurrent.duration._
1011
import scala.collection.mutable

0 commit comments

Comments
 (0)