Skip to content

Commit bb1438a

Browse files
authored
misc refactoring + integration tests (confluentinc#12)
* misc refactoring + integration tests * review changes * ProduceAsync method signature changes * Deserializing Consumer + misc improvements to Consumer * Removed Consumer.Start method, added Consumer.Poll * Consumer.Consume method signature change * Consumer benchmark tests * Redirect poll queue -> consumer poll queue in Consumer * get/query watermark API improvements + integration tests * DeserializingConsumer [bug fix / API changes / examples / integration tests] * misc tidyup + documentation * Offset class equality, inequality, hash fn, ToString + unit tests * OnLog integration tests + Flush method -> CLS compliant. * string.Join.. * structs -> immutable, equality ops. unit tests. name changes. misc other. * improvement to .Consume internals * error rethink * MessageInfo -> Message * timeout related API changes * tidyup * review changes
1 parent b1eaf65 commit bb1438a

File tree

102 files changed

+4368
-1506
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+4368
-1506
lines changed

README.md

+28-1
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,40 @@ confluent-kafka-dotnet - .NET Apache Kafka client
55

66
Forked from [rdkafka-dotnet](https://github.com/ah-/rdkafka-dotnet) by Andreas Heider
77

8-
Copyright (c) 2016 [Confluent Inc.](https://www.confluent.io), 2015-2016, [Andreas Heider](mailto:[email protected])
8+
Copyright (c) 2016-2017 [Confluent Inc.](https://www.confluent.io), 2015-2017, [Andreas Heider](mailto:[email protected])
99

1010

1111
## Usage
1212

1313
Just reference the [confluent-kafka-dotnet NuGet package](https://www.nuget.org/packages/confluent-kafka-dotnet)
1414

15+
## Build
16+
17+
To build the library or any test or example project, run the following from within the relevant project directory:
18+
19+
```
20+
dotnet restore
21+
dotnet build
22+
```
23+
24+
To run one of the examples, use the following from within the relevant project directory:
25+
26+
```
27+
dotnet run <command line args>
28+
```
29+
30+
To run the integration or unit tests, use the following from within the relevant project directory:
31+
32+
```
33+
dotnet test
34+
```
35+
36+
To create a nuget package, use the following from wihin `src/Confluent.Kafka`:
37+
38+
```
39+
dotnet pack
40+
```
41+
1542
## Examples
1643

1744
## Documentation

examples/AdvancedConsumer/Program.cs

+199-41
Original file line numberDiff line numberDiff line change
@@ -2,90 +2,248 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Text;
5+
using Confluent.Kafka.Serialization;
56

67

7-
namespace Confluent.Kafka.AdvancedConsumer
8+
/// <summary>
9+
/// Demonstrates use of the deserializing Consumer.
10+
/// </summary>
11+
namespace Confluent.Kafka.Examples.AdvancedConsumer
812
{
913
public class Program
1014
{
11-
public static void Run(string brokerList, List<string> topics)
12-
{
13-
bool enableAutoCommit = false;
14-
15-
var config = new Dictionary<string, object>
15+
private static Dictionary<string, object> constructConfig(string brokerList, bool enableAutoCommit) =>
16+
new Dictionary<string, object>
1617
{
1718
{ "group.id", "advanced-csharp-consumer" },
1819
{ "enable.auto.commit", enableAutoCommit },
20+
{ "auto.commit.interval.ms", 5000 },
1921
{ "statistics.interval.ms", 60000 },
20-
{ "bootstrap.servers", brokerList }
22+
{ "bootstrap.servers", brokerList },
23+
{ "default.topic.config", new Dictionary<string, object>()
24+
{
25+
{ "auto.offset.reset", "smallest" }
26+
}
27+
}
2128
};
2229

23-
using (var consumer = new EventConsumer(config))
30+
/// <summary>
31+
// In this example:
32+
/// - offsets are auto commited.
33+
/// - OnMessage is used to consume messages.
34+
/// - the poll loop is performed on a background thread started using Consumer.Start().
35+
/// </summary>
36+
public static void Run_Background(string brokerList, List<string> topics)
37+
{
38+
using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, true), null, new StringDeserializer(Encoding.UTF8)))
2439
{
25-
consumer.OnMessage += (obj, msg) => {
26-
string text = Encoding.UTF8.GetString(msg.Value, 0, msg.Value.Length);
27-
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
40+
// Note: All event handlers are executed on the same thread (the one created/started by the Consumer.Start())
2841

29-
if (!enableAutoCommit && msg.Offset % 10 == 0)
42+
consumer.OnMessage += (_, msg)
43+
=> Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
44+
45+
consumer.OnPartitionEOF += (_, end)
46+
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
47+
48+
consumer.OnError += (_, error)
49+
=> Console.WriteLine($"Error: {error}");
50+
51+
consumer.OnOffsetCommit += (_, commit) =>
52+
{
53+
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");
54+
55+
if (commit.Error)
3056
{
31-
Console.WriteLine($"Committing offset");
32-
consumer.Commit(msg).Wait();
33-
Console.WriteLine($"Committed offset");
57+
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
3458
}
59+
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
3560
};
3661

37-
consumer.OnConsumerError += (obj, errorCode) =>
62+
consumer.OnPartitionsAssigned += (_, partitions) =>
3863
{
39-
Console.WriteLine($"Consumer Error: {errorCode}");
64+
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
65+
consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p, Offset.Invalid)));
4066
};
4167

42-
consumer.OnEndReached += (obj, end) => {
43-
Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
68+
consumer.OnPartitionsRevoked += (_, partitions) =>
69+
{
70+
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
71+
consumer.Unassign();
72+
};
73+
74+
consumer.OnStatistics += (_, json)
75+
=> Console.WriteLine($"Statistics: {json}");
76+
77+
consumer.Subscribe(topics);
78+
79+
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
80+
81+
consumer.Start();
82+
83+
Console.WriteLine($"Started consumer, press enter to stop consuming");
84+
Console.ReadLine();
85+
86+
consumer.Stop();
87+
}
88+
}
89+
90+
/// <summary>
91+
// In this example:
92+
/// - offsets are auto commited.
93+
/// - consumer.Poll / OnMessage is used to consume messages.
94+
/// - the poll loop is performed on a separate thread.
95+
/// </summary>
96+
public static void Run_Poll(string brokerList, List<string> topics)
97+
{
98+
using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, true), null, new StringDeserializer(Encoding.UTF8)))
99+
{
100+
// Note: All event handlers are called on the main thread.
101+
102+
consumer.OnMessage += (_, msg)
103+
=> Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
104+
105+
consumer.OnPartitionEOF += (_, end)
106+
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
107+
108+
consumer.OnError += (_, error)
109+
=> Console.WriteLine($"Error: {error}");
110+
111+
consumer.OnOffsetCommit += (_, commit) =>
112+
{
113+
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");
114+
115+
if (commit.Error)
116+
{
117+
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
118+
}
119+
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
44120
};
45121

46-
consumer.OnError += (obj, error) => {
47-
Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");
122+
consumer.OnPartitionsAssigned += (_, partitions) =>
123+
{
124+
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
125+
consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p, Offset.Invalid)));
48126
};
49127

50-
if (enableAutoCommit)
128+
consumer.OnPartitionsRevoked += (_, partitions) =>
51129
{
52-
consumer.OnOffsetCommit += (obj, commit) => {
53-
if (commit.Error != ErrorCode.NO_ERROR)
54-
{
55-
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
56-
}
57-
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
58-
};
130+
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
131+
consumer.Unassign();
132+
};
133+
134+
consumer.OnStatistics += (_, json)
135+
=> Console.WriteLine($"Statistics: {json}");
136+
137+
consumer.Subscribe(topics);
138+
139+
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
140+
141+
var cancelled = false;
142+
Console.CancelKeyPress += (_, e) => {
143+
e.Cancel = true; // prevent the process from terminating.
144+
cancelled = true;
145+
};
146+
147+
Console.WriteLine("Ctrl-C to exit.");
148+
while(!cancelled)
149+
{
150+
consumer.Poll(TimeSpan.FromMilliseconds(100));
59151
}
152+
}
153+
}
154+
155+
/// <summary>
156+
/// In this example
157+
/// - offsets are manually committed.
158+
/// - consumer.Consume is used to consume messages.
159+
/// (all other events are still handled by event handlers)
160+
/// - no extra thread is created for the Poll (Consume) loop.
161+
/// </summary>
162+
public static void Run_Consume(string brokerList, List<string> topics)
163+
{
164+
using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
165+
{
166+
// Note: All event handlers are called on the main thread.
167+
168+
consumer.OnPartitionEOF += (_, end)
169+
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
170+
171+
consumer.OnError += (_, error)
172+
=> Console.WriteLine($"Error: {error}");
173+
174+
consumer.OnOffsetCommit += (_, commit) =>
175+
{
176+
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");
177+
178+
if (commit.Error)
179+
{
180+
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
181+
}
182+
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
183+
};
60184

61-
consumer.OnPartitionsAssigned += (obj, partitions) => {
185+
consumer.OnPartitionsAssigned += (_, partitions) =>
186+
{
62187
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
63-
consumer.Assign(partitions);
188+
consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p, Offset.Invalid)));
64189
};
65190

66-
consumer.OnPartitionsRevoked += (obj, partitions) => {
191+
consumer.OnPartitionsRevoked += (_, partitions) =>
192+
{
67193
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
68194
consumer.Unassign();
69195
};
70196

71-
consumer.OnStatistics += (obj, json) => {
72-
Console.WriteLine($"Statistics: {json}");
73-
};
197+
consumer.OnStatistics += (_, json)
198+
=> Console.WriteLine($"Statistics: {json}");
74199

75200
consumer.Subscribe(topics);
76-
consumer.Start();
77201

78-
Console.WriteLine($"Assigned to: [{string.Join(", ", consumer.Assignment)}]");
79-
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
202+
Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");
80203

81-
Console.WriteLine($"Started consumer, press enter to stop consuming");
82-
Console.ReadLine();
204+
var cancelled = false;
205+
Console.CancelKeyPress += (_, e) => {
206+
e.Cancel = true; // prevent the process from terminating.
207+
cancelled = true;
208+
};
209+
210+
while (!cancelled)
211+
{
212+
Message<Null, string> msg;
213+
if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
214+
{
215+
continue;
216+
}
217+
218+
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
219+
220+
if (msg.Offset % 5 == 0)
221+
{
222+
Console.WriteLine($"Committing offset");
223+
consumer.Commit(msg);
224+
Console.WriteLine($"Committed offset");
225+
}
226+
}
83227
}
84228
}
85229

86230
public static void Main(string[] args)
87231
{
88-
Run(args[0], args.Skip(1).ToList());
232+
var mode = args[1];
233+
var topics = args.Skip(2).ToList();
234+
235+
switch (args[0])
236+
{
237+
case "poll":
238+
Run_Poll(mode, topics);
239+
break;
240+
case "consume":
241+
Run_Consume(mode, topics);
242+
break;
243+
case "background":
244+
Run_Background(mode, topics);
245+
break;
246+
}
89247
}
90248
}
91249
}

examples/AdvancedProducer/Program.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
using System.Threading.Tasks;
66
using Confluent.Kafka.Serialization;
77

8-
namespace Confluent.Kafka.AdvancedProducer
8+
namespace Confluent.Kafka.Examples.AdvancedProducer
99
{
1010
public class Program
1111
{
@@ -49,7 +49,7 @@ public static void Main(string[] args)
4949
Console.WriteLine("Ctrl-C to quit.\n");
5050

5151
var cancelled = false;
52-
Console.CancelKeyPress += (object sender, ConsoleCancelEventArgs e) => {
52+
Console.CancelKeyPress += (_, e) => {
5353
e.Cancel = true; // prevent the process from terminating.
5454
cancelled = true;
5555
};
@@ -80,7 +80,7 @@ public static void Main(string[] args)
8080
val = text.Substring(index + 1);
8181
}
8282

83-
Task<DeliveryReport> deliveryReport = producer.ProduceAsync(topicName, key, val);
83+
var deliveryReport = producer.ProduceAsync(topicName, key, val);
8484
var result = deliveryReport.Result; // synchronously waits for message to be produced.
8585
Console.WriteLine($"Partition: {result.Partition}, Offset: {result.Offset}");
8686
}

examples/Benchmark/Benchmark.xproj

-18
This file was deleted.

0 commit comments

Comments
 (0)