Skip to content

Commit ef60ff2

Browse files
committed
Authenticate with Microsoft Entra ID using Token Cache
1 parent beb0618 commit ef60ff2

File tree

3 files changed

+174
-4
lines changed

3 files changed

+174
-4
lines changed

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,10 +78,16 @@
7878
<version>${pulsar.version}</version>
7979
</dependency>
8080

81+
<dependency>
82+
<groupId>com.azure</groupId>
83+
<artifactId>azure-identity</artifactId>
84+
<version>1.11.2</version>
85+
</dependency>
86+
8187
<dependency>
8288
<groupId>redis.clients</groupId>
8389
<artifactId>jedis</artifactId>
84-
<version>4.4.3</version>
90+
<version>5.1.0</version>
8591
<type>jar</type>
8692
<scope>compile</scope>
8793
</dependency>

src/main/java/fi/hsl/common/pulsar/PulsarApplication.java

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
package fi.hsl.common.pulsar;
22

3+
import com.azure.core.credential.AccessToken;
4+
import com.azure.core.credential.TokenRequestContext;
5+
import com.azure.identity.DefaultAzureCredential;
6+
import com.azure.identity.DefaultAzureCredentialBuilder;
37
import com.typesafe.config.Config;
48
import fi.hsl.common.health.HealthServer;
9+
import fi.hsl.common.redis.RedisUtils;
510
import org.apache.pulsar.client.admin.PulsarAdmin;
611
import org.apache.pulsar.client.api.*;
712
import org.jetbrains.annotations.NotNull;
813
import org.jetbrains.annotations.Nullable;
914
import org.slf4j.Logger;
1015
import org.slf4j.LoggerFactory;
1116
import redis.clients.jedis.Jedis;
17+
import redis.clients.jedis.exceptions.JedisException;
1218

1319
import java.util.Arrays;
1420
import java.util.HashMap;
@@ -19,6 +25,9 @@
1925
import java.util.regex.Pattern;
2026
import java.util.stream.Collectors;
2127

28+
import static fi.hsl.common.redis.RedisUtils.createJedisClient;
29+
import static fi.hsl.common.redis.RedisUtils.extractUsernameFromToken;
30+
2231
public class PulsarApplication implements AutoCloseable {
2332
private static final Logger log = LoggerFactory.getLogger(PulsarApplication.class);
2433

@@ -152,9 +161,54 @@ public PulsarApplicationContext initialize(@NotNull Config config) throws Except
152161
@NotNull
153162
protected Jedis createRedisClient(@NotNull String redisHost, int port, int connTimeOutSecs) {
154163
log.info("Connecting to Redis at " + redisHost + ":" + port + " with connection timeout of (s): "+ connTimeOutSecs);
155-
int timeOutMs = connTimeOutSecs * 1000;
156-
Jedis jedis = new Jedis(redisHost, port, timeOutMs);
157-
jedis.connect();
164+
165+
//Construct a Token Credential from Identity library, e.g. DefaultAzureCredential / ClientSecretCredential / Client CertificateCredential / ManagedIdentityCredential etc.
166+
DefaultAzureCredential defaultAzureCredential = new DefaultAzureCredentialBuilder().build();
167+
168+
// Fetch a Microsoft Entra token to be used for authentication. This token will be used as the password.
169+
TokenRequestContext trc = new TokenRequestContext().addScopes("https://redis.azure.com/.default");
170+
RedisUtils.TokenRefreshCache tokenRefreshCache = new RedisUtils.TokenRefreshCache(defaultAzureCredential, trc);
171+
AccessToken accessToken = tokenRefreshCache.getAccessToken();
172+
173+
// SSL connection is required.
174+
boolean useSsl = true;
175+
String username = extractUsernameFromToken(accessToken.getToken());
176+
177+
// Create Jedis client and connect to the Azure Cache for Redis over the TLS/SSL port using the access token as password.
178+
// Note: Cache Host Name, Port, Microsoft Entra access token and SSL connections are required below.
179+
Jedis jedis = createJedisClient(redisHost, port, username, accessToken, useSsl);
180+
181+
// Configure the jedis instance for proactive authentication before token expires.
182+
tokenRefreshCache.setJedisInstanceToAuthenticate(jedis);
183+
184+
int maxTries = 3;
185+
int i = 0;
186+
187+
while (i < maxTries) {
188+
try {
189+
// Set a value against your key in the Redis cache.
190+
jedis.set("Az:key", "testValue");
191+
System.out.println(jedis.get("Az:key"));
192+
break;
193+
} catch (JedisException e) {
194+
// Handle The Exception as required in your application.
195+
e.printStackTrace();
196+
197+
// For Exceptions containing Invalid Username Password / Permissions not granted error messages, look at troubleshooting section at the end of document.
198+
199+
// Check if the client is broken, if it is then close and recreate it to create a new healthy connection.
200+
if (jedis.isBroken()) {
201+
jedis.close();
202+
accessToken = tokenRefreshCache.getAccessToken();
203+
jedis = createJedisClient(redisHost, port, username, accessToken, useSsl);
204+
205+
// Configure the jedis instance for proactive authentication before token expires.
206+
tokenRefreshCache.setJedisInstanceToAuthenticate(jedis);
207+
}
208+
}
209+
i++;
210+
}
211+
158212
log.info("Redis connected: " + jedis.isConnected());
159213
return jedis;
160214
}

src/main/java/fi/hsl/common/redis/RedisUtils.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package fi.hsl.common.redis;
22

3+
import com.azure.core.credential.AccessToken;
4+
import com.azure.core.credential.TokenCredential;
5+
import com.azure.core.credential.TokenRequestContext;
6+
import com.azure.core.util.CoreUtils;
7+
import com.google.gson.JsonObject;
8+
import com.google.gson.JsonParser;
39
import fi.hsl.common.pulsar.PulsarApplicationContext;
410
import fi.hsl.common.transitdata.TransitdataProperties;
511
import org.jetbrains.annotations.NotNull;
@@ -10,9 +16,12 @@
1016
import redis.clients.jedis.params.ScanParams;
1117
import redis.clients.jedis.resps.ScanResult;
1218

19+
import java.nio.charset.StandardCharsets;
20+
import java.time.Duration;
1321
import java.time.OffsetDateTime;
1422
import java.time.format.DateTimeFormatter;
1523
import java.util.*;
24+
import java.util.concurrent.ThreadLocalRandom;
1625

1726
public class RedisUtils {
1827
private static final Logger log = LoggerFactory.getLogger(RedisUtils.class);
@@ -227,4 +236,105 @@ public boolean checkResponse(@Nullable final String response) {
227236
public boolean checkResponse(@Nullable final Long response) {
228237
return response != null && response == 1;
229238
}
239+
240+
// Azure Cache for Redis helper code
241+
public static Jedis createJedisClient(String cacheHostname, int port, String username, AccessToken accessToken, boolean useSsl) {
242+
return new Jedis(cacheHostname, port, DefaultJedisClientConfig.builder()
243+
.password(accessToken.getToken())
244+
.user(username)
245+
.ssl(useSsl)
246+
.build());
247+
}
248+
249+
public static String extractUsernameFromToken(String token) {
250+
String[] parts = token.split("\\.");
251+
String base64 = parts[1];
252+
253+
switch (base64.length() % 4) {
254+
case 2:
255+
base64 += "==";
256+
break;
257+
case 3:
258+
base64 += "=";
259+
break;
260+
}
261+
262+
byte[] jsonBytes = Base64.getDecoder().decode(base64);
263+
String json = new String(jsonBytes, StandardCharsets.UTF_8);
264+
JsonObject jwt = JsonParser.parseString(json).getAsJsonObject();
265+
266+
return jwt.get("oid").getAsString();
267+
}
268+
269+
/**
270+
* The token cache to store and proactively refresh the access token.
271+
*/
272+
public static class TokenRefreshCache {
273+
private final TokenCredential tokenCredential;
274+
private final TokenRequestContext tokenRequestContext;
275+
private final Timer timer;
276+
private volatile AccessToken accessToken;
277+
private final Duration maxRefreshOffset = Duration.ofMinutes(5);
278+
private final Duration baseRefreshOffset = Duration.ofMinutes(2);
279+
private Jedis jedisInstanceToAuthenticate;
280+
private String username;
281+
282+
/**
283+
* Creates an instance of TokenRefreshCache
284+
* @param tokenCredential the token credential to be used for authentication.
285+
* @param tokenRequestContext the token request context to be used for authentication.
286+
*/
287+
public TokenRefreshCache(TokenCredential tokenCredential, TokenRequestContext tokenRequestContext) {
288+
this.tokenCredential = tokenCredential;
289+
this.tokenRequestContext = tokenRequestContext;
290+
this.timer = new Timer();
291+
}
292+
293+
/**
294+
* Gets the cached access token.
295+
* @return the AccessToken
296+
*/
297+
public AccessToken getAccessToken() {
298+
if (accessToken != null) {
299+
return accessToken;
300+
} else {
301+
TokenRefreshTask tokenRefreshTask = new TokenRefreshTask();
302+
accessToken = tokenCredential.getToken(tokenRequestContext).block();
303+
timer.schedule(tokenRefreshTask, getTokenRefreshDelay());
304+
return accessToken;
305+
}
306+
}
307+
308+
private class TokenRefreshTask extends TimerTask {
309+
// Add your task here
310+
public void run() {
311+
accessToken = tokenCredential.getToken(tokenRequestContext).block();
312+
username = extractUsernameFromToken(accessToken.getToken());
313+
System.out.println("Refreshed Token with Expiry: " + accessToken.getExpiresAt().toEpochSecond());
314+
315+
if (jedisInstanceToAuthenticate != null && !CoreUtils.isNullOrEmpty(username)) {
316+
jedisInstanceToAuthenticate.auth(username, accessToken.getToken());
317+
System.out.println("Refreshed Jedis Connection with fresh access token, token expires at : "
318+
+ accessToken.getExpiresAt().toEpochSecond());
319+
}
320+
timer.schedule(new TokenRefreshTask(), getTokenRefreshDelay());
321+
}
322+
}
323+
324+
private long getTokenRefreshDelay() {
325+
return ((accessToken.getExpiresAt()
326+
.minusSeconds(ThreadLocalRandom.current().nextLong(baseRefreshOffset.getSeconds(), maxRefreshOffset.getSeconds()))
327+
.toEpochSecond() - OffsetDateTime.now().toEpochSecond()) * 1000);
328+
}
329+
330+
/**
331+
* Sets the Jedis to proactively authenticate before token expiry.
332+
* @param jedisInstanceToAuthenticate the instance to authenticate
333+
* @return the updated instance
334+
*/
335+
public TokenRefreshCache setJedisInstanceToAuthenticate(Jedis jedisInstanceToAuthenticate) {
336+
this.jedisInstanceToAuthenticate = jedisInstanceToAuthenticate;
337+
return this;
338+
}
339+
}
230340
}

0 commit comments

Comments
 (0)