forked from delphi-hub/delphi-webapi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathElasticRequestLimiter.scala
82 lines (70 loc) · 3.15 KB
/
ElasticRequestLimiter.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package de.upb.cs.swt.delphi.webapi
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.actor.Timers
import akka.http.scaladsl.model.RemoteAddress
import de.upb.cs.swt.delphi.webapi.ElasticRequestLimiter._
import de.upb.cs.swt.delphi.webapi.ElasticActorManager.ElasticMessage
import scala.concurrent.duration._
import scala.collection.mutable
//Limits the number of requests any given IP can make by tracking how many requests an IP has made within a given
// window of time, and timing out any IP that exceeds a threshold by rejecting any further request for a period of time
class ElasticRequestLimiter(configuration: Configuration, nextActor: ActorRef) extends Actor with ActorLogging with Timers {
private val window = 1 second
private val threshold = 10
private val timeout = 2 hours
private var recentIPs: mutable.Map[String, Int] = mutable.Map()
private var blockedIPs: mutable.Set[String] = mutable.Set()
override def preStart(): Unit = {
log.info("Request limiter started")
timers.startPeriodicTimer(ClearTimer, ClearLogs, window)
}
override def postStop(): Unit = log.info("Request limiter shut down")
override def receive = {
case Validate(rawIp, message) => {
val ip = rawIp.toOption.map(_.getHostAddress).getOrElse("unknown")
//First, reject IPs marked as blocked
if (blockedIPs.contains(ip)) {
rejectRequest()
} else {
//Check if this IP has made any requests recently
if (recentIPs.contains(ip)) {
//If so, increment their counter and test if they have exceeded the request threshold
recentIPs.update(ip, recentIPs(ip) + 1)
if (recentIPs(ip) > threshold) {
//If the threshold has been exceeded, mark this IP as blocked and reject it, and set up a message to unblock it after a period
blockedIPs += ip
log.info("Blocked IP {} due to exceeding request frequency threshold", ip)
timers.startSingleTimer(ForgiveTimer(ip), Forgive(ip), timeout)
rejectRequest()
} else {
//Else, forward this message
nextActor forward message
}
} else {
//Else, register their request in the map and pass it to the next actor
recentIPs += (ip -> 1)
nextActor forward message
}
}
}
case ClearLogs =>
recentIPs.clear()
case Forgive(ip) => {
blockedIPs -= ip
log.info("Forgave IP {} after timeout", ip)
}
}
//Rejects requests from blocked IPs
private def rejectRequest() =
sender() ! "Sorry, you have exceeded the limit on request frequency for unregistered users.\n" +
"As a result, you have been timed out.\n" +
"Please wait a while or register an account with us to continue using this service."
}
object ElasticRequestLimiter{
def props(configuration: Configuration, nextActor: ActorRef) : Props = Props(new ElasticRequestLimiter(configuration, nextActor))
final case class Validate(rawIp: RemoteAddress, message: ElasticMessage)
final case object ClearLogs
final case class Forgive(ip: String)
final case object ClearTimer
final case class ForgiveTimer(ip: String)
}