Skip to content

Commit

Permalink
Added post-processing limit
Browse files Browse the repository at this point in the history
  • Loading branch information
ValentinZakharov committed Feb 12, 2025
1 parent 94715ff commit 6d5c67d
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.datadog.appsec.api.security;

import java.util.concurrent.atomic.AtomicLong;

// Number of post-processing tasks (e.g. AppSecRequestContext keep opened)
public class PostProcessingCounter extends AtomicLong {
public static final long MAX_POST_PROCESSING_TASKS = 16;

public boolean tryIncrement() {
while (true) {
long current = this.get();
if (current >= MAX_POST_PROCESSING_TASKS) {
// Do not increment it's already at the maximum
return false;
}
if (this.compareAndSet(current, current + 1)) {
return true;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import com.datadog.appsec.AppSecSystem;
import com.datadog.appsec.api.security.ApiSecurityRequestSampler;
import com.datadog.appsec.api.security.PostProcessingCounter;
import com.datadog.appsec.config.TraceSegmentPostProcessor;
import com.datadog.appsec.event.EventProducerService;
import com.datadog.appsec.event.EventProducerService.DataSubscriberInfo;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class GatewayBridge {

private static final String METASTRUCT_EXPLOIT = "exploit";

private final PostProcessingCounter postProcessingCounter = new PostProcessingCounter();

private final SubscriptionService subscriptionService;
private final EventProducerService producerService;
private final ApiSecurityRequestSampler requestSampler;
Expand Down Expand Up @@ -833,9 +836,10 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) {
if (route instanceof String) {
ctx.setRoute((String) route);
}
if (requestSampler.preSampleRequest(ctx)) {
if (requestSampler.preSampleRequest(ctx) && postProcessingCounter.tryIncrement()) {
// The request is pre-sampled - we need to post-process it
spanInfo.setRequiresPostProcessing(true);
postProcessingCounter.incrementAndGet();
}

return NoopFlow.INSTANCE;
Expand Down Expand Up @@ -905,6 +909,8 @@ private void onPostProcessing(RequestContext ctx_) {

maybeExtractSchemas(ctx);
ctx.close();
// Decrease the counter to allow the next request to be post-processed
postProcessingCounter.decrementAndGet();
}

public void stop() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.datadog.appsec.api.security

import spock.lang.Specification

import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors

class PostProcessingCounterSpec extends Specification {

def "should increment successfully if below the limit"() {
given:
def counter = new PostProcessingCounter()

when:
def result = counter.tryIncrement()

then:
result == true
counter.get() == 1
}

def "should not increment if max limit is reached"() {
given:
def counter = new PostProcessingCounter()
counter.set(PostProcessingCounter.MAX_POST_PROCESSING_TASKS) // Manually setting to max

when:
def result = counter.tryIncrement()

then:
result == false
counter.get() == PostProcessingCounter.MAX_POST_PROCESSING_TASKS // Should remain unchanged
}

def "should handle concurrent increments safely"() {
given:
def counter = new PostProcessingCounter()
def threads = 20
def executor = Executors.newFixedThreadPool(threads)
def latch = new CountDownLatch(threads)
def successes = Collections.synchronizedList([])

when:
(1..threads).each {
executor.submit {
def result = counter.tryIncrement()
if (result) {
successes.add(1)
}
latch.countDown()
}
}
latch.await()
executor.shutdown()

then:
successes.size() <= PostProcessingCounter.MAX_POST_PROCESSING_TASKS // Only max increments should succeed
counter.get() == PostProcessingCounter.MAX_POST_PROCESSING_TASKS // Should be exactly at the limit
}
}

0 comments on commit 6d5c67d

Please sign in to comment.