Skip to content

Commit e55c981

Browse files
authored
#35 Add event type verification which prevents events from being processed if the type in metadata does not match with the expected event type (#37)
1 parent 3a5c84c commit e55c981

File tree

9 files changed

+150
-52
lines changed

9 files changed

+150
-52
lines changed

Diff for: paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/client/impl/ClientImpl.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiCursor;
3434
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiEventBatch;
3535
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
36+
import de.zalando.paradox.nakadi.consumer.core.exceptions.InvalidEventTypeException;
3637
import de.zalando.paradox.nakadi.consumer.core.http.HttpResponseChunk;
3738
import de.zalando.paradox.nakadi.consumer.core.http.handlers.EventUtils;
3839
import de.zalando.paradox.nakadi.consumer.core.http.okhttp.RxHttpRequest;
@@ -104,7 +105,8 @@ private List<NakadiPartition> getPartitions(final String content) {
104105
@Override
105106
public Single<String> getEvent(final EventTypeCursor cursor) {
106107
final Observable<HttpResponseChunk> request = getContent0(cursor, 1);
107-
return request.map(chunk -> getEvent0(chunk.getContent())).firstOrDefault(null).toSingle();
108+
return request.map(chunk -> getEvent0(chunk.getContent(), cursor.getEventType())).firstOrDefault(null)
109+
.toSingle();
108110
}
109111

110112
@Override
@@ -159,8 +161,8 @@ private Observable<HttpResponseChunk> getContent0(final EventTypeCursor cursor,
159161
});
160162
}
161163

162-
private String getEvent0(final String content) {
163-
final NakadiEventBatch<String> events = EventUtils.getRawEventBatch(objectMapper, content);
164+
private String getEvent0(final String content, final EventType eventType) {
165+
final NakadiEventBatch<String> events = EventUtils.getRawEventBatch(objectMapper, content, eventType);
164166
checkArgument(events != null);
165167
return Iterables.getOnlyElement(events.getEvents());
166168
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package de.zalando.paradox.nakadi.consumer.core.exceptions;
2+
3+
public class InvalidEventTypeException extends UnrecoverableException {
4+
public InvalidEventTypeException() {
5+
super();
6+
}
7+
8+
public InvalidEventTypeException(final String message) {
9+
super(message);
10+
}
11+
12+
public InvalidEventTypeException(final String message, final Throwable cause) {
13+
super(message, cause);
14+
}
15+
16+
public InvalidEventTypeException(final Throwable cause) {
17+
super(cause);
18+
}
19+
}

Diff for: paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/http/handlers/AbstractResponseHandler.java

+3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.fasterxml.jackson.databind.ObjectMapper;
88

9+
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
910
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
1011
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiEventCursor;
1112
import de.zalando.paradox.nakadi.consumer.core.partitioned.PartitionCoordinator;
@@ -19,6 +20,7 @@ abstract class AbstractResponseHandler implements ResponseHandler {
1920
protected final EventTypePartition eventTypePartition;
2021
protected final PartitionCoordinator coordinator;
2122
protected final String consumerName;
23+
protected final EventType eventType;
2224

2325
AbstractResponseHandler(final String consumerName, final EventTypePartition eventTypePartition,
2426
final PartitionCoordinator coordinator, final Logger log, final ObjectMapper jsonMapper) {
@@ -27,6 +29,7 @@ abstract class AbstractResponseHandler implements ResponseHandler {
2729
this.log = log;
2830
this.jsonMapper = jsonMapper;
2931
this.consumerName = consumerName;
32+
this.eventType = eventTypePartition.getEventType();
3033
}
3134

3235
String[] getEvents(final String string) {

Diff for: paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/http/handlers/EventUtils.java

+31-17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package de.zalando.paradox.nakadi.consumer.core.http.handlers;
22

3+
import static java.lang.String.format;
4+
35
import static java.util.Objects.requireNonNull;
46

57
import static com.google.common.base.Preconditions.checkArgument;
@@ -20,16 +22,19 @@
2022
import com.fasterxml.jackson.databind.ObjectMapper;
2123
import com.fasterxml.jackson.databind.node.ArrayNode;
2224

25+
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
2326
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiCursor;
2427
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiEventBatch;
28+
import de.zalando.paradox.nakadi.consumer.core.exceptions.InvalidEventTypeException;
2529
import de.zalando.paradox.nakadi.consumer.core.utils.ThrowableUtils;
2630

2731
public class EventUtils {
2832
private static final Logger LOGGER = LoggerFactory.getLogger(EventUtils.class);
2933

3034
private EventUtils() { }
3135

32-
public static NakadiEventBatch<String> getRawEventBatch(final ObjectMapper jsonMapper, final String string) {
36+
public static NakadiEventBatch<String> getRawEventBatch(final ObjectMapper jsonMapper, final String string,
37+
final EventType eventType) {
3338
try {
3439
final EventReader reader = new EventReader(jsonMapper, string).invoke();
3540
final JsonNode eventsNode = reader.getEventsNode();
@@ -40,18 +45,17 @@ public static NakadiEventBatch<String> getRawEventBatch(final ObjectMapper jsonM
4045

4146
final ArrayNode arrayNode = (ArrayNode) eventsNode;
4247
arrayNode.elements().forEachRemaining(element -> {
43-
if (!element.isNull()) {
44-
String rawEvent = null;
45-
try {
46-
rawEvent = jsonMapper.writeValueAsString(element);
47-
} catch (JsonProcessingException e) {
48-
ThrowableUtils.throwException(e);
49-
}
50-
51-
// back to string -> better solution should be provided
52-
if (StringUtils.isNotEmpty(rawEvent)) {
53-
rawEvents.add(rawEvent);
54-
}
48+
String rawEvent = null;
49+
try {
50+
checkEventType(element, eventType);
51+
rawEvent = jsonMapper.writeValueAsString(element);
52+
} catch (JsonProcessingException e) {
53+
ThrowableUtils.throwException(e);
54+
}
55+
56+
// back to string -> better solution should be provided
57+
if (StringUtils.isNotEmpty(rawEvent)) {
58+
rawEvents.add(rawEvent);
5559
}
5660
});
5761
} else {
@@ -66,7 +70,8 @@ public static NakadiEventBatch<String> getRawEventBatch(final ObjectMapper jsonM
6670
}
6771
}
6872

69-
public static NakadiEventBatch<JsonNode> getJsonEventBatch(final ObjectMapper jsonMapper, final String string) {
73+
public static NakadiEventBatch<JsonNode> getJsonEventBatch(final ObjectMapper jsonMapper, final String string,
74+
final EventType eventType) {
7075
try {
7176
final EventReader reader = new EventReader(jsonMapper, string).invoke();
7277
final JsonNode eventsNode = reader.getEventsNode();
@@ -77,9 +82,8 @@ public static NakadiEventBatch<JsonNode> getJsonEventBatch(final ObjectMapper js
7782

7883
final ArrayNode arrayNode = (ArrayNode) eventsNode;
7984
arrayNode.elements().forEachRemaining(element -> {
80-
if (!element.isNull()) {
81-
jsonEvents.add(element);
82-
}
85+
checkEventType(element, eventType);
86+
jsonEvents.add(element);
8387
});
8488
} else {
8589
jsonEvents = Collections.emptyList();
@@ -93,6 +97,16 @@ public static NakadiEventBatch<JsonNode> getJsonEventBatch(final ObjectMapper js
9397
}
9498
}
9599

100+
private static void checkEventType(final JsonNode element, final EventType eventType) {
101+
if (!element.isNull() && element.has("metadata")) {
102+
if (element.get("metadata").has("event_type")
103+
&& !element.get("metadata").get("event_type").asText().equals(eventType.getName())) {
104+
throw new InvalidEventTypeException(format("Unexpected event type (expected=[%s], actual=[%s])",
105+
eventType.getName(), element.get("metadata").get("event_type").asText()));
106+
}
107+
}
108+
}
109+
96110
private static class EventReader {
97111
private ObjectMapper jsonMapper;
98112
private String string;

Diff for: paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/http/handlers/JsonEventResponseBulkHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,6 @@ public JsonEventResponseBulkHandler(final String consumerName, final EventTypePa
1717

1818
@Override
1919
NakadiEventBatch<JsonNode> getEventBatch(final String string) {
20-
return EventUtils.getJsonEventBatch(jsonMapper, string);
20+
return EventUtils.getJsonEventBatch(jsonMapper, string, eventType);
2121
}
2222
}

Diff for: paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/http/handlers/JsonEventResponseHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ public JsonEventResponseHandler(final String consumerName, final EventTypePartit
1616

1717
@Override
1818
NakadiEventBatch<JsonNode> getEventBatch(final String string) {
19-
return EventUtils.getJsonEventBatch(jsonMapper, string);
19+
return EventUtils.getJsonEventBatch(jsonMapper, string, eventType);
2020
}
2121
}

Diff for: paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/http/handlers/RawEventResponseBulkHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ public RawEventResponseBulkHandler(final String consumerName, final EventTypePar
1515

1616
@Override
1717
NakadiEventBatch<String> getEventBatch(final String string) {
18-
return EventUtils.getRawEventBatch(jsonMapper, string);
18+
return EventUtils.getRawEventBatch(jsonMapper, string, eventType);
1919
}
2020
}

Diff for: paradox-nakadi-consumer-core/src/main/java/de/zalando/paradox/nakadi/consumer/core/http/handlers/RawEventResponseHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,6 @@ public RawEventResponseHandler(final String consumerName, final EventTypePartiti
1515

1616
@Override
1717
NakadiEventBatch<String> getEventBatch(final String string) {
18-
return EventUtils.getRawEventBatch(jsonMapper, string);
18+
return EventUtils.getRawEventBatch(jsonMapper, string, eventType);
1919
}
2020
}

Diff for: paradox-nakadi-consumer-core/src/test/java/de/zalando/paradox/nakadi/consumer/core/client/impl/ClientImplTest.java

+88-28
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package de.zalando.paradox.nakadi.consumer.core.client.impl;
22

3+
import static java.lang.String.format;
4+
35
import static org.assertj.core.api.Assertions.assertThat;
46
import static org.assertj.core.api.Assertions.assertThatThrownBy;
57

@@ -10,34 +12,74 @@
1012
import java.util.List;
1113
import java.util.Optional;
1214

13-
import org.junit.Ignore;
15+
import org.junit.After;
16+
import org.junit.Before;
1417
import org.junit.Test;
1518

1619
import de.zalando.paradox.nakadi.consumer.core.domain.EventType;
1720
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypeCursor;
1821
import de.zalando.paradox.nakadi.consumer.core.domain.EventTypePartition;
1922
import de.zalando.paradox.nakadi.consumer.core.domain.NakadiPartition;
23+
import de.zalando.paradox.nakadi.consumer.core.exceptions.InvalidEventTypeException;
24+
import de.zalando.paradox.nakadi.consumer.core.exceptions.UnrecoverableException;
2025
import de.zalando.paradox.nakadi.consumer.core.http.handlers.testdomain.OrderReceived;
2126

27+
import okhttp3.mockwebserver.MockResponse;
28+
import okhttp3.mockwebserver.MockWebServer;
29+
2230
import rx.Single;
2331

2432
public class ClientImplTest {
2533

26-
private static final String NAKADI_URL = "http://localhost:8080";
27-
28-
private static EventType ORDER_RECEIVED = EventType.of("order.ORDER_RECEIVED");
34+
private static final EventType ORDER_RECEIVED = EventType.of("order.ORDER_RECEIVED");
35+
36+
private static final String FIRST_ORDER = "ORDER_001";
37+
private static final String SECOND_ORDER = "ORDER_002";
38+
39+
private static final String WRONG_EVENT_TYPE = "__not_configured__";
40+
41+
private static final String NAKADI_RESPONSE_PARTITIONS = "[{\"oldest_available_offset\":\"000000000000000000\","
42+
+ "\"newest_available_offset\":\"BEGIN\",\"partition\":\"0\"}]";
43+
private static final String NAKADI_RESPONSE_ONE_ORDER = "{\"cursor\":{\"partition\":\"0\","
44+
+ "\"offset\":\"000000000000000000\"},\"events\":[{\"metadata\": "
45+
+ "{\"eid\": \"4ae5011e-eb01-11e5-8b4a-1c6f65464fc6\",\"occurred_at\": \"2016-03-15T23:56:11+01:00\"},"
46+
+ "\"order_number\": \"" + FIRST_ORDER + "\"}]}";
47+
private static final String NAKADI_RESPONSE_NO_EVENTS_FOUND = "{\"cursor\":{\"partition\":\"0\","
48+
+ "\"offset\":\"000000000000000000\"},\"events\":[]}";
49+
private static final String NAKADI_RESPONSE_NOT_FOUND = "{\"type\":\"http://httpstatus.es/404\","
50+
+ "\"title\":\"Not Found\",\"status\":404,\"detail\":\"topic not found\"}";
51+
private static final String NAKADI_RESPONSE_TWO_ORDERS_ONE_WRONG_TYPE = "{\"cursor\":{\"partition\":\"0\","
52+
+ "\"offset\":\"000000000000000000\"},\"events\":[{\"metadata\": "
53+
+ "{\"eid\": \"4ae5011e-eb01-11e5-8b4a-1c6f65464fc6\",\"occurred_at\": \"2016-03-15T23:56:11+01:00\","
54+
+ "\"event_type\": \"" + WRONG_EVENT_TYPE + "\"},\"order_number\": \"" + FIRST_ORDER + "\"},{\"metadata\": "
55+
+ "{\"eid\": \"4ae5011e-eb01-11e5-8b4a-1c6f65464fc6\",\"occurred_at\": \"2016-03-15T23:56:11+01:00\","
56+
+ "\"event_type\": \"" + ORDER_RECEIVED.getName() + "\"},\"order_number\": \"" + SECOND_ORDER + "\"}]}";
57+
58+
private MockWebServer mockNakadi;
59+
60+
private static ClientImpl clientImpl;
61+
62+
@Before
63+
public void setUp() throws IOException {
64+
mockNakadi = new MockWebServer();
65+
mockNakadi.start(0);
66+
clientImpl = ClientImpl.Builder.of(mockNakadi.url("/").toString()).build();
67+
}
2968

30-
private static ClientImpl clientImpl = ClientImpl.Builder.of(NAKADI_URL).build();
69+
@After
70+
public void tearDown() throws IOException {
71+
mockNakadi.close();
72+
}
3173

3274
@Test
3375
public void testShouldThrowWhenRequestingUnconsumedEventsWithNullCursors() {
34-
assertThatThrownBy(() -> { clientImpl.getCursorsLag(null); }).isInstanceOf(NullPointerException.class)
35-
.hasMessage("cursors must not be null");
76+
assertThatThrownBy(() -> clientImpl.getCursorsLag(null)).isInstanceOf(NullPointerException.class).hasMessage(
77+
"cursors must not be null");
3678
}
3779

3880
@Test
3981
public void testShouldThrowWhenRequestingUnconsumedEventsWithEmptyCursors() {
40-
assertThatThrownBy(() -> { clientImpl.getCursorsLag(Collections.emptyList()); }).isInstanceOf(
82+
assertThatThrownBy(() -> clientImpl.getCursorsLag(Collections.emptyList())).isInstanceOf(
4183
IllegalArgumentException.class).hasMessage("cursors must not be empty");
4284
}
4385

@@ -47,63 +89,81 @@ public void testShouldThrowWhenRequestingUnconsumedEventsForDifferentEventTypes(
4789
"0"), "BEGIN");
4890
final EventTypeCursor eventTypeCursorDifferentType = EventTypeCursor.of(EventTypePartition.of(
4991
EventType.of("test-event-two"), "0"), "BEGIN");
50-
assertThatThrownBy(() -> {
51-
clientImpl.getCursorsLag(
52-
Arrays.asList(eventTypeCursor, eventTypeCursorDifferentType));
53-
}).isInstanceOf(IllegalArgumentException.class).hasMessage(
54-
"cursors must contain cursors of only one type");
92+
assertThatThrownBy(() ->
93+
clientImpl.getCursorsLag(
94+
Arrays.asList(eventTypeCursor, eventTypeCursorDifferentType))).isInstanceOf(
95+
IllegalArgumentException.class).hasMessage("cursors must contain cursors of only one type");
5596
}
5697

5798
@Test
58-
@Ignore("Needs Nakadi server")
59-
public void testGetNakadiPartitionsFound() {
99+
public void testGetNakadiPartitionsFound() throws IOException {
100+
mockNakadi.enqueue(new MockResponse().setBody(NAKADI_RESPONSE_PARTITIONS));
101+
60102
final List<NakadiPartition> list = clientImpl.getPartitions(ORDER_RECEIVED).toBlocking().value();
61103
assertThat(list).isNotEmpty();
62104
}
63105

64106
@Test
65-
@Ignore("Needs Nakadi server")
66-
public void testGetNakadiPartitionsNotFound() {
67-
final List<NakadiPartition> list = clientImpl.getPartitions(EventType.of("__not_configured__"))
107+
public void testGetNakadiPartitionsNotFound() throws IOException {
108+
mockNakadi.enqueue(new MockResponse().setResponseCode(404).setBody(NAKADI_RESPONSE_NOT_FOUND));
109+
110+
final List<NakadiPartition> list = clientImpl.getPartitions(EventType.of(WRONG_EVENT_TYPE))
68111
.onErrorReturn(throwable -> Collections.emptyList()).toBlocking()
69112
.value();
70113
assertThat(list).isEmpty();
71114
}
72115

73116
@Test
74-
@Ignore("Needs Nakadi server")
75-
public void testGetNakadiEventPayload() {
117+
public void testGetNakadiEventPayload() throws IOException {
118+
mockNakadi.enqueue(new MockResponse().setBody(NAKADI_RESPONSE_ONE_ORDER));
119+
76120
final EventTypeCursor cursor = EventTypeCursor.of(EventTypePartition.of(ORDER_RECEIVED, "0"), "0");
77121
final String event = clientImpl.getEvent(cursor).toBlocking().value();
78122
assertThat(event).startsWith("{\"metadata\":{");
79123
assertThat(event).contains("\"order_number\"");
80124
}
81125

82126
@Test
83-
@Ignore("Needs Nakadi server")
84-
public void testGetNakadiEventStream() {
127+
public void testGetNakadiEventStream() throws IOException {
128+
mockNakadi.enqueue(new MockResponse().setBody(NAKADI_RESPONSE_ONE_ORDER));
129+
85130
final EventTypeCursor cursor = EventTypeCursor.of(EventTypePartition.of(ORDER_RECEIVED, "0"), "BEGIN");
86131
final String stream = clientImpl.getContent(cursor).toBlocking().value();
87-
assertThat(stream).startsWith("{\"cursor\":{\"partition\":\"0\",\"offset\":\"0\"}");
132+
assertThat(stream).startsWith("{\"cursor\":{\"partition\":\"0\",\"offset\":\"000000000000000000\"}");
88133
assertThat(stream).contains("\"order_number\"");
89134
}
90135

91136
@Test
92-
@Ignore("Needs Nakadi server")
93-
public void testGetNakadiEventObject() {
137+
public void testGetNakadiEventObject() throws IOException {
138+
mockNakadi.enqueue(new MockResponse().setBody(NAKADI_RESPONSE_ONE_ORDER));
139+
94140
final EventTypeCursor cursor = EventTypeCursor.of(EventTypePartition.of(ORDER_RECEIVED, "0"), "0");
95141
final OrderReceived event = getEvent(cursor, OrderReceived.class).toBlocking().value();
96142
assertThat(event.getOrderNumber()).isNotEmpty();
97143
}
98144

99145
@Test
100-
@Ignore("Needs Nakadi server")
101-
public void testGetNakadiEventNotFound() {
102-
final EventTypeCursor cursor = EventTypeCursor.of(EventTypePartition.of(ORDER_RECEIVED, "0"), "100000");
146+
public void testGetNakadiEventNotFound() throws IOException {
147+
mockNakadi.enqueue(new MockResponse().setBody(NAKADI_RESPONSE_NO_EVENTS_FOUND));
148+
149+
final EventTypeCursor cursor = EventTypeCursor.of(EventTypePartition.of(ORDER_RECEIVED, "0"),
150+
"00000000000100000");
103151
final String event = clientImpl.getEvent(cursor).onErrorReturn(throwable -> null).toBlocking().value();
104152
assertThat(event).isNull();
105153
}
106154

155+
@Test
156+
public void testShouldStopProcessingAndThrowUnrecoverableExceptionWhenItReceivesWrongEventTypes()
157+
throws IOException {
158+
mockNakadi.enqueue(new MockResponse().setBody(NAKADI_RESPONSE_TWO_ORDERS_ONE_WRONG_TYPE));
159+
160+
final EventTypeCursor cursor = EventTypeCursor.of(EventTypePartition.of(ORDER_RECEIVED, "0"), "0");
161+
assertThatThrownBy(() -> getEvent(cursor, OrderReceived.class).toBlocking().value()).isInstanceOf(
162+
InvalidEventTypeException.class).isInstanceOf(UnrecoverableException.class)
163+
.hasMessage(format("Unexpected event type (expected=[%s], actual=[%s])",
164+
ORDER_RECEIVED.getName(), WRONG_EVENT_TYPE));
165+
}
166+
107167
private <T> Single<T> getEvent(final EventTypeCursor cursor, final Class<T> clazz) {
108168
return clientImpl.getEvent(cursor).map(content -> getEvent0(content, clazz)).map(o -> o.orElse(null));
109169
}

0 commit comments

Comments
 (0)