Skip to content

Commit b933a80

Browse files
committed
Improved priority handling
1 parent c6c53dc commit b933a80

File tree

2 files changed

+23
-25
lines changed

2 files changed

+23
-25
lines changed

Diff for: src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala

+16-14
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import akka.actor.{Actor, ActorLogging, Props}
44
import com.sksamuel.elastic4s.IndexAndType
55
import com.sksamuel.elastic4s.http.ElasticDsl._
66
import com.sksamuel.elastic4s.http.HttpClient
7-
import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource
7+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
88

99
import scala.concurrent.ExecutionContext
1010
import scala.concurrent.duration._
1111

12-
class ElasticActor(configuration: Configuration) extends Actor with ActorLogging{
12+
class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{
1313

1414
implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher")
1515
val client = HttpClient(configuration.elasticsearchClientUri)
@@ -19,21 +19,23 @@ class ElasticActor(configuration: Configuration) extends Actor with ActorLogging
1919
context.setReceiveTimeout(2 seconds)
2020

2121
override def receive = {
22-
case GetSource(id, index) => {
23-
log.info("Executing get on entry {}", id)
24-
def source = client.execute{
25-
get(id).from(index)
26-
}.await match {
27-
case Right(res) => res.body.get
28-
case Left(_) => Option.empty
29-
}
30-
sender().tell(source, context.self)
22+
case Enqueue(id) => getSource(id)
23+
case Retrieve(id) => getSource(id)
24+
}
25+
26+
private def getSource(id: String) = {
27+
log.info("Executing get on entry {}", id)
28+
def source = client.execute{
29+
get(id).from(index)
30+
}.await match {
31+
case Right(res) => res.body.get
32+
case Left(_) => Option.empty
3133
}
34+
sender().tell(source, context.self)
3235
}
3336
}
3437

3538
object ElasticActor{
36-
def props(configuration: Configuration) : Props = Props(new ElasticActor(configuration))
37-
38-
final case class GetSource(id: String, index: IndexAndType)
39+
def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index))
40+
.withMailbox("es-priority-mailbox")
3941
}

Diff for: src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala

+7-11
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,14 @@ package de.upb.cs.swt.delphi.webapi
22

33
import akka.actor.{Actor, ActorLogging, Props, Terminated}
44
import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router}
5-
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve}
6-
import de.upb.cs.swt.delphi.webapi.ElasticActor.GetSource
5+
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage
76

87
class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{
98

109
private val index = configuration.esProjectIndex
1110
private var elasticRouter = {
1211
val routees = Vector.fill(configuration.elasticActorPoolSize) {
13-
val r = context.actorOf(ElasticActor.props(configuration))
12+
val r = context.actorOf(ElasticActor.props(configuration, index))
1413
context watch r
1514
ActorRefRoutee(r)
1615
}
@@ -21,20 +20,17 @@ class ElasticActorManager(configuration: Configuration) extends Actor with Actor
2120
override def postStop(): Unit = log.info("Actor manager shut down")
2221

2322
override def receive = {
24-
case Retrieve(id) => getSource(id)
25-
case Enqueue(id) => getSource(id)
23+
case em: ElasticMessage => {
24+
log.info("Forwarding request {} to ElasticActor", em)
25+
elasticRouter.route(em, sender())
26+
}
2627
case Terminated(id) => {
2728
elasticRouter.removeRoutee(id)
28-
val r = context.actorOf(ElasticActor.props(configuration))
29+
val r = context.actorOf(ElasticActor.props(configuration, index))
2930
context watch r
3031
elasticRouter = elasticRouter.addRoutee(r)
3132
}
3233
}
33-
34-
private def getSource(id: String) = {
35-
log.info("Forwarding search for entry {}", id)
36-
elasticRouter.route(GetSource(id, index), sender())
37-
}
3834
}
3935

4036
object ElasticActorManager{

0 commit comments

Comments
 (0)