Skip to content

Commit 376a325

Browse files
authored
Add OAuth examples (confluentinc#1947)
* Add OAuth examples * Add comments and copyright * Address comments
1 parent 9afe648 commit 376a325

File tree

7 files changed

+633
-0
lines changed

7 files changed

+633
-0
lines changed

Confluent.Kafka.sln

+42
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Confluent.SchemaRegistry.Se
6363
EndProject
6464
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ExactlyOnceOldBroker", "examples\ExactlyOnceOldBroker\ExactlyOnceOldBroker.csproj", "{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}"
6565
EndProject
66+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuthConsumer", "examples\OAuthConsumer\OAuthConsumer.csproj", "{85ABD85A-53A2-4222-BE99-CE51F639F623}"
67+
EndProject
68+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuthOIDC", "examples\OAuthOIDC\OAuthOIDC.csproj", "{493C1E83-B424-488D-B6D6-713D07EF4152}"
69+
EndProject
70+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "OAuthProducer", "examples\OAuthProducer\OAuthProducer.csproj", "{E72DAB16-FAF7-4365-8151-9450007C93A0}"
71+
EndProject
6672
Global
6773
GlobalSection(SolutionConfigurationPlatforms) = preSolution
6874
Debug|Any CPU = Debug|Any CPU
@@ -400,6 +406,42 @@ Global
400406
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Release|x64.Build.0 = Release|Any CPU
401407
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Release|x86.ActiveCfg = Release|Any CPU
402408
{8F582FFF-EA30-47F1-89D2-81A37F5E7E0C}.Release|x86.Build.0 = Release|Any CPU
409+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
410+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Debug|Any CPU.Build.0 = Debug|Any CPU
411+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Debug|x64.ActiveCfg = Debug|Any CPU
412+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Debug|x64.Build.0 = Debug|Any CPU
413+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Debug|x86.ActiveCfg = Debug|Any CPU
414+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Debug|x86.Build.0 = Debug|Any CPU
415+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Release|Any CPU.ActiveCfg = Release|Any CPU
416+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Release|Any CPU.Build.0 = Release|Any CPU
417+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Release|x64.ActiveCfg = Release|Any CPU
418+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Release|x64.Build.0 = Release|Any CPU
419+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Release|x86.ActiveCfg = Release|Any CPU
420+
{85ABD85A-53A2-4222-BE99-CE51F639F623}.Release|x86.Build.0 = Release|Any CPU
421+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
422+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Debug|Any CPU.Build.0 = Debug|Any CPU
423+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Debug|x64.ActiveCfg = Debug|Any CPU
424+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Debug|x64.Build.0 = Debug|Any CPU
425+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Debug|x86.ActiveCfg = Debug|Any CPU
426+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Debug|x86.Build.0 = Debug|Any CPU
427+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Release|Any CPU.ActiveCfg = Release|Any CPU
428+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Release|Any CPU.Build.0 = Release|Any CPU
429+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Release|x64.ActiveCfg = Release|Any CPU
430+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Release|x64.Build.0 = Release|Any CPU
431+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Release|x86.ActiveCfg = Release|Any CPU
432+
{493C1E83-B424-488D-B6D6-713D07EF4152}.Release|x86.Build.0 = Release|Any CPU
433+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
434+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Debug|Any CPU.Build.0 = Debug|Any CPU
435+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Debug|x64.ActiveCfg = Debug|Any CPU
436+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Debug|x64.Build.0 = Debug|Any CPU
437+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Debug|x86.ActiveCfg = Debug|Any CPU
438+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Debug|x86.Build.0 = Debug|Any CPU
439+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Release|Any CPU.ActiveCfg = Release|Any CPU
440+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Release|Any CPU.Build.0 = Release|Any CPU
441+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Release|x64.ActiveCfg = Release|Any CPU
442+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Release|x64.Build.0 = Release|Any CPU
443+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Release|x86.ActiveCfg = Release|Any CPU
444+
{E72DAB16-FAF7-4365-8151-9450007C93A0}.Release|x86.Build.0 = Release|Any CPU
403445
EndGlobalSection
404446
GlobalSection(NestedProjects) = preSolution
405447
{09C3255B-1972-4EB8-91D0-FB9F5CD82BCB} = {1EFCD839-0726-4BCE-B745-1E829991B1BC}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<ProjectTypeGuids>{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
5+
<AssemblyName>OAuthConsumer</AssemblyName>
6+
<TargetFramework>net6.0</TargetFramework>
7+
<OutputType>Exe</OutputType>
8+
<LangVersion>7.1</LangVersion>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.9.3" /> -->
13+
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
14+
</ItemGroup>
15+
16+
<ItemGroup>
17+
<PackageReference Include="NJsonSchema" Version="10.8.0" />
18+
</ItemGroup>
19+
20+
</Project>

examples/OAuthConsumer/Program.cs

+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
// Copyright 2022 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
using System;
18+
using System.Collections.Generic;
19+
using System.Text;
20+
using System.Text.RegularExpressions;
21+
using System.Threading;
22+
using System.Threading.Tasks;
23+
using Newtonsoft.Json;
24+
25+
/// <summary>
26+
/// An example showing consumer
27+
/// with a custom OAUTHBEARER token implementation.
28+
/// </summary>
29+
namespace Confluent.Kafka.Examples.OAuthConsumer
30+
{
31+
/// <summary>
32+
/// A class to store the token and related properties.
33+
/// </summary>
34+
class OAuthBearerToken
35+
{
36+
public string TokenValue { get; set; }
37+
public long Expiration { get; set; }
38+
public String Principal { get; set; }
39+
public Dictionary<String, String> Extensions { get; set; }
40+
}
41+
42+
public class Program
43+
{
44+
private const String OauthConfigRegexPattern = "^(\\s*(\\w+)\\s*=\\s*(\\w+))+\\s*$"; // 1 or more name=value pairs with optional ignored whitespace
45+
private const String OauthConfigKeyValueRegexPattern = "(\\w+)\\s*=\\s*(\\w+)"; // Extract key=value pairs from OAuth Config
46+
private const String PrincipalClaimNameKey = "principalClaimName";
47+
private const String PrincipalKey = "principal";
48+
private const String ScopeKey = "scope";
49+
50+
51+
public static async Task Main(string[] args)
52+
{
53+
if (args.Length != 5)
54+
{
55+
Console.WriteLine("Usage: .. brokerList topic group \"principal=<value> scope=<scope>\"");
56+
return;
57+
}
58+
string bootstrapServers = args[1];
59+
string topicName = args[2];
60+
string groupId = args[3];
61+
string oauthConf = args[4];
62+
63+
if (!Regex.IsMatch(oauthConf, OauthConfigRegexPattern))
64+
{
65+
Console.WriteLine($"Invalid OAuth config {oauthConf} passed.");
66+
Environment.Exit(1);
67+
}
68+
69+
var consumerConfig = new ConsumerConfig
70+
{
71+
BootstrapServers = bootstrapServers,
72+
SecurityProtocol = SecurityProtocol.SaslPlaintext,
73+
SaslMechanism = SaslMechanism.OAuthBearer,
74+
SaslOauthbearerConfig = oauthConf,
75+
GroupId = groupId,
76+
AutoOffsetReset = AutoOffsetReset.Earliest,
77+
EnableAutoOffsetStore = false,
78+
};
79+
80+
// Callback to handle OAuth bearer token refresh. It creates an unsecured JWT based on the configuration defined
81+
// in OAuth Config and sets the token on the client for use in any future authentication attempt.
82+
// It must be invoked whenever the client requires a token (i.e. when it first starts and when the
83+
// previously-received token is 80% of the way to its expiration time).
84+
void OauthCallback(IClient client, string cfg)
85+
{
86+
try
87+
{
88+
var token = retrieveUnsecuredToken(cfg);
89+
client.OAuthBearerSetToken(token.TokenValue, token.Expiration, token.Principal);
90+
}
91+
catch (Exception e)
92+
{
93+
client.OAuthBearerSetTokenFailure(e.ToString());
94+
}
95+
}
96+
97+
98+
using (var consumer = new ConsumerBuilder<string, string>(consumerConfig)
99+
.SetOAuthBearerTokenRefreshHandler(OauthCallback).Build())
100+
{
101+
Console.WriteLine("\n-----------------------------------------------------------------------");
102+
Console.WriteLine($"Consumer {consumer.Name} consuming from topic {topicName}.");
103+
Console.WriteLine("-----------------------------------------------------------------------");
104+
Console.WriteLine("Ctrl-C to quit.\n");
105+
106+
consumer.Subscribe(topicName);
107+
CancellationTokenSource cts = new CancellationTokenSource();
108+
Console.CancelKeyPress += (_, e) =>
109+
{
110+
e.Cancel = true; // prevent the process from terminating.
111+
cts.Cancel();
112+
};
113+
114+
try
115+
{
116+
while (true)
117+
{
118+
try
119+
{
120+
var consumeResult = consumer.Consume(cts.Token);
121+
122+
Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
123+
try
124+
{
125+
consumer.StoreOffset(consumeResult);
126+
}
127+
catch (KafkaException e)
128+
{
129+
Console.WriteLine($"Store Offset error: {e.Error.Reason}");
130+
}
131+
}
132+
catch (ConsumeException e)
133+
{
134+
Console.WriteLine($"Consume error: {e.Error.Reason}");
135+
}
136+
}
137+
}
138+
catch (OperationCanceledException)
139+
{
140+
Console.WriteLine("Closing consumer.");
141+
consumer.Close();
142+
}
143+
}
144+
}
145+
146+
private static string ToUnpaddedBase64(string s)
147+
=> Convert.ToBase64String(Encoding.UTF8.GetBytes(s)).TrimEnd('=');
148+
149+
private static OAuthBearerToken retrieveUnsecuredToken(String oauthConfig)
150+
{
151+
Console.WriteLine("Refreshing the token");
152+
153+
var parsedConfig = new Dictionary<String, String>();
154+
foreach (Match match in Regex.Matches(oauthConfig, OauthConfigKeyValueRegexPattern))
155+
{
156+
parsedConfig[match.Groups[1].ToString()] = match.Groups[2].ToString();
157+
}
158+
159+
if (!parsedConfig.ContainsKey(PrincipalKey) || !parsedConfig.ContainsKey(ScopeKey) || parsedConfig.Count > 2)
160+
{
161+
throw new Exception($"Invalid OAuth config {oauthConfig} passed.");
162+
}
163+
164+
var principalClaimName = parsedConfig.ContainsKey(PrincipalClaimNameKey) ? parsedConfig[PrincipalClaimNameKey] : "sub";
165+
var principal = parsedConfig[PrincipalKey];
166+
var scopeValue = parsedConfig[ScopeKey];
167+
168+
var issuedAt = DateTimeOffset.UtcNow;
169+
var expiresAt = issuedAt.AddSeconds(5); // setting a low value to show the token refresh in action.
170+
171+
var header = new
172+
{
173+
alg = "none",
174+
typ = "JWT"
175+
};
176+
177+
var payload = new Dictionary<String, Object>
178+
{
179+
{principalClaimName, principal},
180+
{"iat", issuedAt.ToUnixTimeSeconds()},
181+
{"exp", expiresAt.ToUnixTimeSeconds()},
182+
{ScopeKey, scopeValue}
183+
};
184+
185+
var headerJson = JsonConvert.SerializeObject(header);
186+
var payloadJson = JsonConvert.SerializeObject(payload);
187+
188+
return new OAuthBearerToken
189+
{
190+
TokenValue = $"{ToUnpaddedBase64(headerJson)}.{ToUnpaddedBase64(payloadJson)}.",
191+
Expiration = expiresAt.ToUnixTimeMilliseconds(),
192+
Principal = principal,
193+
};
194+
}
195+
}
196+
197+
}

examples/OAuthOIDC/OAuthOIDC.csproj

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<ProjectTypeGuids>{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
5+
<AssemblyName>OAuthOIDC</AssemblyName>
6+
<TargetFramework>net6.0</TargetFramework>
7+
<OutputType>Exe</OutputType>
8+
<LangVersion>7.1</LangVersion>
9+
</PropertyGroup>
10+
11+
<ItemGroup>
12+
<!-- nuget package reference: <PackageReference Include="Confluent.Kafka" Version="1.9.3" /> -->
13+
<ProjectReference Include="../../src/Confluent.Kafka/Confluent.Kafka.csproj" />
14+
</ItemGroup>
15+
16+
<ItemGroup>
17+
<PackageReference Include="NJsonSchema" Version="10.8.0" />
18+
</ItemGroup>
19+
20+
</Project>

0 commit comments

Comments
 (0)