-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathRedis.java
488 lines (437 loc) · 16.9 KB
/
Redis.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
/*
* Made with all the love in the world
* by scireum in Remshalden, Germany
*
* Copyright by scireum GmbH
* http://www.scireum.de - [email protected]
*/
package sirius.db.redis;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.params.SetParams;
import sirius.kernel.Sirius;
import sirius.kernel.Startable;
import sirius.kernel.Stoppable;
import sirius.kernel.async.CallContext;
import sirius.kernel.async.Tasks;
import sirius.kernel.commons.Explain;
import sirius.kernel.commons.Strings;
import sirius.kernel.commons.Wait;
import sirius.kernel.commons.Watch;
import sirius.kernel.di.PartCollection;
import sirius.kernel.di.std.Part;
import sirius.kernel.di.std.Parts;
import sirius.kernel.di.std.Register;
import sirius.kernel.health.Average;
import sirius.kernel.health.Exceptions;
import sirius.kernel.health.Log;
import sirius.kernel.settings.Extension;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* Provides a thin layer to access Redis.
* <p>
* The configuration is loaded from <tt>redis.pools</tt>. By default the <b>system</b> pool is used,
* but multiple database can be used at the same time.
*/
@Register(classes = {Redis.class, Startable.class, Stoppable.class})
public class Redis implements Startable, Stoppable {
/**
* Contains the pool name of the default redis being used.
*/
public static final String POOL_SYSTEM = "system";
@Parts(Subscriber.class)
private PartCollection<Subscriber> subscribers;
private List<JedisPubSub> subscriptions = new CopyOnWriteArrayList<>();
private AtomicBoolean subscriptionsActive = new AtomicBoolean(true);
@Part
private Tasks tasks;
private static final String PREFIX_LOCK = "lock_";
private static final String SUFFIX_DATE = "_date";
/**
* Contains the logger for all redis related messages.
*/
public static final Log LOG = Log.get("redis");
protected Average messageDuration = new Average();
protected Average callDuration = new Average();
protected RedisDB system;
protected Map<String, RedisDB> databases = new ConcurrentHashMap<>();
protected void handlePubSubMessage(String channel, String message, Subscriber subscriber) {
tasks.executor("redis-pubsub").start(() -> {
Watch w = Watch.start();
try {
subscriber.onMessage(message);
} catch (Exception e) {
Exceptions.handle()
.to(LOG)
.error(e)
.withSystemErrorMessage("Failed to process a message '%s' for topic '%s': %s (%s)",
message,
subscriber.getTopic())
.handle();
}
w.submitMicroTiming("REDIS", channel);
messageDuration.addValue(w.elapsedMillis());
});
}
private void subscribe(Subscriber subscriber, JedisPubSub subscription) {
while (subscriptionsActive.get()) {
try (Jedis redis = getConnection()) {
LOG.INFO("Starting subscription for: %s", subscriber.getTopic());
redis.subscribe(subscription, subscriber.getTopic());
if (subscriptionsActive.get()) {
Wait.seconds(5);
}
} catch (Exception e) {
Exceptions.handle()
.to(LOG)
.error(e)
.withSystemErrorMessage("Failed to subscribe to a topic: %s (%s)")
.handle();
Wait.seconds(1);
}
}
LOG.INFO("Terminated subscription for: %s", subscriber.getTopic());
}
@Override
public int getPriority() {
return 50;
}
@Override
@SuppressWarnings("squid:S2250")
@Explain("There aren't that many subscriptions, so there is no performance hotspot")
public void started() {
if (!isConfigured()) {
return;
}
for (Subscriber subscriber : subscribers) {
JedisPubSub subscription = new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
handlePubSubMessage(channel, message, subscriber);
}
};
subscriptions.add(subscription);
new Thread(() -> subscribe(subscriber, subscription), "redis-subscriber-" + subscriber.getTopic()).start();
}
}
@Override
public void stopped() {
if (!isConfigured()) {
return;
}
subscriptionsActive.set(false);
for (JedisPubSub subscription : subscriptions) {
try {
subscription.unsubscribe();
} catch (Exception e) {
Exceptions.handle()
.to(LOG)
.error(e)
.withSystemErrorMessage("Failed to unsubscribe from a topic: %s (%s)")
.handle();
}
}
databases.values().forEach(RedisDB::close);
}
/**
* Returns Redis database using the configuration of the given name.
* <p>
* The configuration resides in <tt>redis.pools.[name]</tt>
*
* @param name the name of the pool to fetch
* @return the database initialized with the given config
*/
public RedisDB getPool(String name) {
return databases.computeIfAbsent(name, this::makeDatabase);
}
private RedisDB makeDatabase(String name) {
return new RedisDB(this, Sirius.getSettings().getExtension("redis.pools", name));
}
/**
* Lists all known pools.
*
* @return a list of all configured pools in the system configuration
*/
public List<String> getPools() {
return Sirius.getSettings().getExtensions("redis.pools").stream().map(Extension::getId).toList();
}
/**
* Provides access the to default (system) database.
*
* @return the default database
*/
public RedisDB getSystem() {
if (system == null) {
system = getPool(POOL_SYSTEM);
}
return system;
}
private Jedis getConnection() {
return getSystem().getConnection();
}
/**
* Invokes {@link RedisDB#isConfigured()} for the {@link #getSystem() system database}.
*
* @return <tt>true</tt> if the system database is configured, <tt>false otherwise</tt>
*/
public boolean isConfigured() {
return getSystem().isConfigured();
}
/**
* Invokes {@link RedisDB#query(Supplier, Function)} for the {@link #getSystem() system database}.
*
* @param description a description of the actions performed used for debugging and tracing
* @param task the actual task to perform using redis
* @param <T> the generic type of the result
* @return a result computed by <tt>task</tt>
*/
public <T> T query(Supplier<String> description, Function<Jedis, T> task) {
return getSystem().query(description, task);
}
/**
* Invokes {@link RedisDB#exec(Supplier, Consumer)} for the {@link #getSystem() system database}.
*
* @param description a description of the actions performed used for debugging and tracing
* @param task the actual task to perform using redis
*/
public void exec(Supplier<String> description, Consumer<Jedis> task) {
getSystem().exec(description, task);
}
/**
* Invokes {@link RedisDB#pushToQueue(String, String)} for the {@link #getSystem() system database}.
*
* @param queue the name of the queue
* @param data the data to push
*/
public void pushToQueue(String queue, String data) {
getSystem().pushToQueue(queue, data);
}
/**
* Invokes {@link RedisDB#pollQueue(String)} for the {@link #getSystem() system database}.
*
* @param queue the name of the queue
* @return the next entry in the queue or <tt>null</tt> if the queue is empty
*/
@Nullable
public String pollQueue(String queue) {
return getSystem().pollQueue(queue);
}
/**
* Invokes {@link RedisDB#publish(String, String)} for the {@link #getSystem() system database}.
*
* @param topic the name of the topic to broadcast to
* @param message the message to send
*/
public void publish(String topic, String message) {
getSystem().publish(topic, message);
}
/**
* Invokes {@link RedisDB#getInfo()} for the {@link #getSystem() system database}.
*
* @return a map containing statistical values supplied by the server
*/
public Map<String, String> getInfo() {
return getSystem().getInfo();
}
/**
* Data object for storing information of a redis lock
*/
public static class LockInfo {
/**
* The full name of the lock, as found in redis
*/
public final String key;
/**
* The name of the lock, without any reids prefixes
*/
public final String name;
/**
* The current value of the lock which can be used to determine who holds the lock
*/
public final String value;
/**
* The timestamp when the lock was last acquired
*/
public final LocalDateTime since;
/**
* The maximal time to live of the lock.
* <p>
* The lock will be auto released after a certain amount of seconds in case of a server crash
*/
public final Long ttl;
protected LockInfo(String key, String name, String value, LocalDateTime since, Long ttl) {
this.key = key;
this.name = name;
this.value = value;
this.since = since;
this.ttl = ttl;
}
}
/**
* Returns a list of all currently held locks.
* <p>
* This is mainly inteded to be used for monitoring and maintenance (e.g. {@link RedisCommand})
*
* @return a list of all currently known locks
*/
public List<LockInfo> getLockList() {
List<LockInfo> result = new ArrayList<>();
exec(() -> "Get List of Locks", redis -> {
for (String key : redis.keys(PREFIX_LOCK + "*")) {
if (!key.endsWith(SUFFIX_DATE)) {
computeLockInfo(redis, key).ifPresent(result::add);
}
}
});
return result;
}
protected Optional<LockInfo> computeLockInfo(Jedis redis, String key) {
String owner = redis.get(key);
String since = redis.get(key + SUFFIX_DATE);
if (Strings.isEmpty(since)) {
return Optional.empty();
}
Long ttl = redis.ttl(key);
String name = key.substring(PREFIX_LOCK.length());
LocalDateTime sinceDate = LocalDateTime.parse(since);
if (ttl != null && ttl < 0) {
ttl = null;
}
return Optional.of(new LockInfo(key, name, owner, sinceDate, ttl));
}
/**
* Tries to acquire the given lock in the given timeslot.
* <p>
* The system will try to acquire the given lock. If the lock is currently in use, it will retry
* in regular intervals until either the lock is acquired or the <tt>acquireTimeout</tt> is over.
* <p>
* A sane value for the timeout might be in the range of 5-50s, highly depending on the algorithm
* being protected by the lock. If the value is <tt>null</tt>, no retries will be performed.
* <p>
* The <tt>lockTimeout</tt> controls the max. age of the lock. After the given period, the lock
* will be released, even if unlock wasn't called. This is to prevent a cluster from locking itself
* out due to a single node crash. However, it is very important to chose a sane value here.
*
* @param lock the name of the lock to acquire
* @param acquireTimeout the max duration during which retires (in 1 second intervals) will be performed
* @param lockTimeout the max duration for which the lock will be kept before auto-releasing it
* @return <tt>true</tt> if the lock was acquired, <tt>false</tt> otherwise
*/
public boolean tryLock(@Nonnull String lock, @Nullable Duration acquireTimeout, @Nonnull Duration lockTimeout) {
try {
long timeout = acquireTimeout == null ? 0 : Instant.now().plus(acquireTimeout).toEpochMilli();
int waitInMillis = 500;
do {
boolean locked = query(() -> "Try to Lock: " + lock, redis -> {
String key = PREFIX_LOCK + lock;
String response = redis.set(key,
CallContext.getNodeName(),
SetParams.setParams().nx().ex(lockTimeout.getSeconds()));
if ("OK".equals(response)) {
redis.setex(key + SUFFIX_DATE, lockTimeout.getSeconds(), LocalDateTime.now().toString());
return true;
}
return false;
});
if (locked) {
return true;
}
Wait.millis(waitInMillis);
waitInMillis = Math.min(1500, waitInMillis + 500);
} while (System.currentTimeMillis() < timeout);
return false;
} catch (Exception e) {
Exceptions.handle(LOG, e);
return false;
}
}
/**
* Boilerplate method to perform the given task while holding the given lock.
* <p>
* See {@link #tryLock(String, Duration, Duration)} for details on acquiring a lock.
* <p>
* If the lock cannot be acquired, nothing will happen (neighter the task will be execute nor an exception will be
* thrown).
*
* @param lock the name of the lock to acquire
* @param acquireTimeout the max duration during which retires (in 1 second intervals) will be performed
* @param lockTimeout the max duration for which the lock will be kept before auto-releasing it
* @param lockedTask the task to execute while holding the given lock. The task will not be executed if the
* lock cannot be acquired within the given period
*/
public void tryLocked(@Nonnull String lock,
@Nullable Duration acquireTimeout,
@Nonnull Duration lockTimeout,
@Nonnull Runnable lockedTask) {
if (tryLock(lock, acquireTimeout, lockTimeout)) {
try {
lockedTask.run();
} finally {
unlock(lock);
}
}
}
/**
* Determines if the given lock is currently locked by this or another node.
*
* @param lock the lock to check
* @return <tt>true</tt> if the lock is currently active, <tt>false</tt> otherwise
*/
public boolean isLocked(@Nonnull String lock) {
return query(() -> "Check If Locked: " + lock, redis -> {
String key = PREFIX_LOCK + lock;
return redis.exists(key);
});
}
/**
* Releases the lock.
*
* @param lock the lock to release
*/
public void unlock(String lock) {
unlock(lock, false);
}
/**
* Releases the given lock.
*
* @param lock the lock to release
* @param force if <tt>true</tt>, the lock will even be released if it is held by another node. This is a very
* dangerous operation and should only be used by maintenance and management tools like {@link
* RedisCommand}.
*/
public void unlock(String lock, boolean force) {
exec(() -> "Unlock: " + lock, redis -> {
String key = PREFIX_LOCK + lock;
String lockOwner = redis.get(key);
if (force || Strings.areEqual(lockOwner, CallContext.getNodeName())) {
redis.del(key);
redis.del(key + SUFFIX_DATE);
} else {
if (lockOwner == null) {
LOG.WARN("Not going to unlock '%s' for '%s' as it seems to be expired already",
lock,
CallContext.getNodeName());
} else {
LOG.WARN("Not going to unlock '%s' for '%s' as it is currently held by '%s'",
lock,
CallContext.getNodeName(),
lockOwner);
}
}
});
}
}