diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf new file mode 100644 index 0000000..935d3b3 --- /dev/null +++ b/src/main/resources/application.conf @@ -0,0 +1,28 @@ +es-priority-mailbox { + mailbox-type = "de.upb.cs.swt.delphi.webapi.ElasticPriorityMailbox" +} + +akka.actor.deployment { + /espriomailboxactor { + mailbox = es-priority-mailbox + } +} + +akka { + http { + server { + remote-address-header = on + } + } +} + +# Use this dispatcher for actors that make blocking calls to the Elasticsearch database +elasticsearch-handling-dispatcher { + type = Dispatcher + executor = "thread-pool-executor" + thread-pool-executor { + fixed-pool-size = 4 + # This thread pool is intended for development purposes, and should be increased for production + } + throughput = 1 +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/AppLogging.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/AppLogging.scala new file mode 100644 index 0000000..673d2f2 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/AppLogging.scala @@ -0,0 +1,8 @@ +package de.upb.cs.swt.delphi.webapi + +import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.event.{BusLogging, LoggingAdapter} + +trait AppLogging { + def log(implicit system: ActorSystem): LoggingAdapter = new BusLogging(system.eventStream, this.getClass.getName, this.getClass, system.asInstanceOf[ExtendedActorSystem].logFilter) +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala index f61f6b6..53ed334 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Configuration.scala @@ -1,13 +1,20 @@ package de.upb.cs.swt.delphi.webapi -import com.sksamuel.elastic4s.ElasticsearchClientUri +import com.sksamuel.elastic4s.{ElasticsearchClientUri, IndexAndType} +import com.sksamuel.elastic4s.http.ElasticDsl._ /** * @author Ben Hermann */ -class Configuration(val bindHost: String = "0.0.0.0", +class Configuration( //Server and Elasticsearch configuration + val bindHost: String = "0.0.0.0", val bindPort: Int = 8080, val elasticsearchClientUri: ElasticsearchClientUri = ElasticsearchClientUri( - sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200"))) { + sys.env.getOrElse("DELPHI_ELASTIC_URI", "elasticsearch://localhost:9200")), + val esProjectIndex: IndexAndType = "delphi" / "project", + + //Actor system configuration + val elasticActorPoolSize: Int = 8 + ) { } diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala new file mode 100644 index 0000000..afe76fe --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActor.scala @@ -0,0 +1,41 @@ +package de.upb.cs.swt.delphi.webapi + +import akka.actor.{Actor, ActorLogging, Props} +import com.sksamuel.elastic4s.IndexAndType +import com.sksamuel.elastic4s.http.ElasticDsl._ +import com.sksamuel.elastic4s.http.HttpClient +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +class ElasticActor(configuration: Configuration, index: IndexAndType) extends Actor with ActorLogging{ + + implicit val executionContext: ExecutionContext = context.system.dispatchers.lookup("elasticsearch-handling-dispatcher") + val client = HttpClient(configuration.elasticsearchClientUri) + + override def preStart(): Unit = log.info("Search actor started") + override def postStop(): Unit = log.info("Search actor shut down") + context.setReceiveTimeout(2 seconds) + + override def receive = { + case Enqueue(id) => getSource(id) + case Retrieve(id) => getSource(id) + } + + private def getSource(id: String) = { + log.info("Executing get on entry {}", id) + def source = client.execute{ + get(id).from(index) + }.await match { + case Right(res) => res.body.get + case Left(_) => Option.empty + } + sender().tell(source, context.self) + } +} + +object ElasticActor{ + def props(configuration: Configuration, index: IndexAndType) : Props = Props(new ElasticActor(configuration, index)) + .withMailbox("es-priority-mailbox") +} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala new file mode 100644 index 0000000..aa8f6fc --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticActorManager.scala @@ -0,0 +1,44 @@ +package de.upb.cs.swt.delphi.webapi + +import akka.actor.{Actor, ActorLogging, Props, Terminated} +import akka.routing.{ActorRefRoutee, RoundRobinRoutingLogic, Router} +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage + +class ElasticActorManager(configuration: Configuration) extends Actor with ActorLogging{ + + private val index = configuration.esProjectIndex + private var elasticRouter = { + val routees = Vector.fill(configuration.elasticActorPoolSize) { + val r = context.actorOf(ElasticActor.props(configuration, index)) + context watch r + ActorRefRoutee(r) + } + Router(RoundRobinRoutingLogic(), routees) + } + + override def preStart(): Unit = log.info("Actor manager started") + override def postStop(): Unit = log.info("Actor manager shut down") + + override def receive = { + case em: ElasticMessage => { + log.info("Forwarding request {} to ElasticActor", em) + elasticRouter.route(em, sender()) + } + case Terminated(id) => { + elasticRouter.removeRoutee(id) + val r = context.actorOf(ElasticActor.props(configuration, index)) + context watch r + elasticRouter = elasticRouter.addRoutee(r) + } + } +} + +object ElasticActorManager{ + def props(configuration: Configuration) : Props = Props(new ElasticActorManager(configuration)) + .withMailbox("es-priority-mailbox") + + sealed trait ElasticMessage + + final case class Retrieve(id: String) extends ElasticMessage + final case class Enqueue(id: String) extends ElasticMessage +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala deleted file mode 100644 index 8e12d13..0000000 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticClient.scala +++ /dev/null @@ -1,20 +0,0 @@ -package de.upb.cs.swt.delphi.webapi - -import com.sksamuel.elastic4s.http.ElasticDsl._ -import com.sksamuel.elastic4s.http.HttpClient - -object ElasticClient { - - val configuration = new Configuration() - val client = HttpClient(configuration.elasticsearchClientUri) - val index = "delphi" / "project" - - //Returns an entry with the given ID as an option - def getSource(id: String) = - client.execute{ - get(id).from(index) - }.await match { - case Right(res) => res.body - case Left(_) => Option.empty - } -} diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticPriorityMailbox.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticPriorityMailbox.scala new file mode 100644 index 0000000..5600c01 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticPriorityMailbox.scala @@ -0,0 +1,14 @@ +package de.upb.cs.swt.delphi.webapi + +import akka.actor.ActorSystem +import akka.dispatch.{PriorityGenerator, UnboundedStablePriorityMailbox} +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} +import com.typesafe.config.Config + +class ElasticPriorityMailbox (settings: ActorSystem.Settings, config: Config) + extends UnboundedStablePriorityMailbox( + PriorityGenerator{ + case Retrieve(_) => 5 + case Enqueue(_) => 1 + case _ => 2 + }) diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala new file mode 100644 index 0000000..4395630 --- /dev/null +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/ElasticRequestLimiter.scala @@ -0,0 +1,82 @@ +package de.upb.cs.swt.delphi.webapi + + +import akka.actor.{Actor, ActorLogging, ActorRef, Props} +import akka.actor.Timers +import akka.http.scaladsl.model.RemoteAddress +import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter._ +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage + +import scala.concurrent.duration._ +import scala.collection.mutable + +//Limits the number of requests any given IP can make by tracking how many requests an IP has made within a given +// window of time, and timing out any IP that exceeds a threshold by rejecting any further request for a period of time +class ElasticRequestLimiter(configuration: Configuration, nextActor: ActorRef) extends Actor with ActorLogging with Timers { + + private val window = 1 second + private val threshold = 10 + private val timeout = 2 hours + + private var recentIPs: mutable.Map[String, Int] = mutable.Map() + private var blockedIPs: mutable.Set[String] = mutable.Set() + + override def preStart(): Unit = { + log.info("Request limiter started") + timers.startPeriodicTimer(ClearTimer, ClearLogs, window) + } + override def postStop(): Unit = log.info("Request limiter shut down") + + override def receive = { + case Validate(rawIp, message) => { + val ip = rawIp.toOption.map(_.getHostAddress).getOrElse("unknown") + //First, reject IPs marked as blocked + if (blockedIPs.contains(ip)) { + rejectRequest() + } else { + //Check if this IP has made any requests recently + if (recentIPs.contains(ip)) { + //If so, increment their counter and test if they have exceeded the request threshold + recentIPs.update(ip, recentIPs(ip) + 1) + if (recentIPs(ip) > threshold) { + //If the threshold has been exceeded, mark this IP as blocked and reject it, and set up a message to unblock it after a period + blockedIPs += ip + log.info("Blocked IP {} due to exceeding request frequency threshold", ip) + timers.startSingleTimer(ForgiveTimer(ip), Forgive(ip), timeout) + rejectRequest() + } else { + //Else, forward this message + nextActor forward message + } + } else { + //Else, register their request in the map and pass it to the next actor + recentIPs += (ip -> 1) + nextActor forward message + } + } + } + case ClearLogs => + recentIPs.clear() + case Forgive(ip) => { + blockedIPs -= ip + log.info("Forgave IP {} after timeout", ip) + } + } + + //Rejects requests from blocked IPs + private def rejectRequest() = + sender() ! "Sorry, you have exceeded the limit on request frequency for unregistered users.\n" + + "As a result, you have been timed out.\n" + + "Please wait a while or register an account with us to continue using this service." +} + +object ElasticRequestLimiter{ + def props(configuration: Configuration, nextActor: ActorRef) : Props = Props(new ElasticRequestLimiter(configuration, nextActor)) + + final case class Validate(rawIp: RemoteAddress, message: ElasticMessage) + final case object ClearLogs + final case class Forgive(ip: String) + + final case object ClearTimer + final case class ForgiveTimer(ip: String) +} \ No newline at end of file diff --git a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala index 16926b4..02e72ee 100644 --- a/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala +++ b/src/main/scala/de/upb/cs/swt/delphi/webapi/Server.scala @@ -1,19 +1,33 @@ package de.upb.cs.swt.delphi.webapi +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem import akka.http.scaladsl.server.HttpApp +import akka.pattern.ask +import akka.util.Timeout import de.upb.cs.swt.delphi.featuredefinitions.FeatureListMapping +import de.upb.cs.swt.delphi.webapi.ElasticActorManager.{Enqueue, Retrieve} +import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter.Validate import spray.json._ /** * Web server configuration for Delphi web API. */ -object Server extends HttpApp with JsonSupport { +object Server extends HttpApp with JsonSupport with AppLogging { + + private val configuration = new Configuration() + private val system = ActorSystem("delphi-webapi") + private val actorManager = system.actorOf(ElasticActorManager.props(configuration)) + private val requestLimiter = system.actorOf(ElasticRequestLimiter.props(configuration, actorManager)) + implicit val timeout = Timeout(5, TimeUnit.SECONDS) override def routes = path("version") { version } ~ path("features") { features } ~ pathPrefix("search" / Remaining) { query => search(query) } ~ - pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } + pathPrefix("retrieve" / Remaining) { identifier => retrieve(identifier) } ~ + pathPrefix("enqueue" / Remaining) { identifier => enqueue(identifier) } private def version = { @@ -34,9 +48,25 @@ object Server extends HttpApp with JsonSupport { def retrieve(identifier: String) = { get { - complete( - ElasticClient.getSource(identifier) - ) + pass { //TODO: Require authentication here + complete( + (actorManager ? Retrieve(identifier)).mapTo[String] + ) + } ~ extractClientIP{ ip => + complete( + (requestLimiter ? Validate(ip, Retrieve(identifier))).mapTo[String] + ) + } + } + } + + def enqueue(identifier: String) = { + get { + pass { //TODO: Require authorization here + complete( + (actorManager ? Enqueue(identifier)).mapTo[String] + ) + } } } @@ -51,6 +81,7 @@ object Server extends HttpApp with JsonSupport { def main(args: Array[String]): Unit = { val configuration = new Configuration() Server.startServer(configuration.bindHost, configuration.bindPort) + system.terminate() }