Skip to content

Commit 87979d8

Browse files
authored
Add ability to override disable flag and actions on a rule (confluentinc#2377)
* Add ability to override disable flag and actions on a rule * Add test
1 parent 16456fd commit 87979d8

File tree

4 files changed

+174
-11
lines changed

4 files changed

+174
-11
lines changed

src/Confluent.SchemaRegistry/AsyncSerde.cs

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ protected async Task<object> ExecuteRules(
359359
for (int i = 0; i < rules.Count; i++)
360360
{
361361
Rule rule = rules[i];
362-
if (rule.Disabled)
362+
if (IsDisabled(rule))
363363
{
364364
continue;
365365
}
@@ -406,21 +406,21 @@ protected async Task<object> ExecuteRules(
406406
default:
407407
throw new ArgumentException("Unsupported rule kind " + rule.Kind);
408408
}
409-
await RunAction(ctx, ruleMode, rule, message != null ? rule.OnSuccess : rule.OnFailure,
409+
await RunAction(ctx, ruleMode, rule, message != null ? GetOnSuccess(rule) : GetOnFailure(rule),
410410
message, null, message != null ? null : ErrorAction.ActionType,
411411
ruleRegistry)
412412
.ConfigureAwait(continueOnCapturedContext: false);
413413
}
414414
catch (RuleException ex)
415415
{
416-
await RunAction(ctx, ruleMode, rule, rule.OnFailure, message,
416+
await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message,
417417
ex, ErrorAction.ActionType, ruleRegistry)
418418
.ConfigureAwait(continueOnCapturedContext: false);
419419
}
420420
}
421421
else
422422
{
423-
await RunAction(ctx, ruleMode, rule, rule.OnFailure, message,
423+
await RunAction(ctx, ruleMode, rule, GetOnFailure(rule), message,
424424
new RuleException("Could not find rule executor of type " + rule.Type),
425425
ErrorAction.ActionType, ruleRegistry)
426426
.ConfigureAwait(continueOnCapturedContext: false);
@@ -429,6 +429,45 @@ await RunAction(ctx, ruleMode, rule, rule.OnFailure, message,
429429
return message;
430430
}
431431

432+
private string GetOnSuccess(Rule rule)
433+
{
434+
if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride))
435+
{
436+
if (ruleOverride.OnSuccess != null)
437+
{
438+
return ruleOverride.OnSuccess;
439+
}
440+
}
441+
442+
return rule.OnSuccess;
443+
}
444+
445+
private string GetOnFailure(Rule rule)
446+
{
447+
if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride))
448+
{
449+
if (ruleOverride.OnFailure != null)
450+
{
451+
return ruleOverride.OnFailure;
452+
}
453+
}
454+
455+
return rule.OnFailure;
456+
}
457+
458+
private bool IsDisabled(Rule rule)
459+
{
460+
if (ruleRegistry.TryGetOverride(rule.Type, out RuleOverride ruleOverride))
461+
{
462+
if (ruleOverride.Disabled.HasValue)
463+
{
464+
return ruleOverride.Disabled.Value;
465+
}
466+
}
467+
468+
return rule.Disabled;
469+
}
470+
432471
private static IRuleExecutor GetRuleExecutor(RuleRegistry ruleRegistry, string type)
433472
{
434473
if (ruleRegistry.TryGetExecutor(type, out IRuleExecutor result))
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2024 Confluent Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// Refer to LICENSE for more information.
16+
17+
namespace Confluent.SchemaRegistry
18+
{
19+
/// <summary>
20+
/// A rule override.
21+
/// </summary>
22+
public class RuleOverride
23+
{
24+
public string Type { get; set; }
25+
26+
public string OnSuccess { get; set; }
27+
28+
public string OnFailure { get; set; }
29+
30+
public bool? Disabled { get; set; }
31+
32+
public RuleOverride(string type, string onSuccess, string onFailure, bool? disabled)
33+
{
34+
Type = type;
35+
OnSuccess = onSuccess;
36+
OnFailure = onFailure;
37+
Disabled = disabled;
38+
}
39+
}
40+
}

src/Confluent.SchemaRegistry/RuleRegistry.cs

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,16 @@ public class RuleRegistry
2626
{
2727
private readonly SemaphoreSlim ruleExecutorsMutex = new SemaphoreSlim(1);
2828
private readonly SemaphoreSlim ruleActionsMutex = new SemaphoreSlim(1);
29+
private readonly SemaphoreSlim ruleOverridesMutex = new SemaphoreSlim(1);
2930

3031
private IDictionary<string, IRuleExecutor> ruleExecutors = new Dictionary<string, IRuleExecutor>();
3132
private IDictionary<string, IRuleAction> ruleActions = new Dictionary<string, IRuleAction>();
33+
private IDictionary<string, RuleOverride> ruleOverrides = new Dictionary<string, RuleOverride>();
3234

3335
private static readonly RuleRegistry GLOBAL_INSTANCE = new RuleRegistry();
3436

3537
public static RuleRegistry GlobalInstance => GLOBAL_INSTANCE;
3638

37-
public static List<IRuleAction> GetRuleActions()
38-
{
39-
return GlobalInstance.GetActions();
40-
}
41-
4239
public void RegisterExecutor(IRuleExecutor executor)
4340
{
4441
ruleExecutorsMutex.Wait();
@@ -123,6 +120,48 @@ public List<IRuleAction> GetActions()
123120
}
124121
}
125122

123+
public void RegisterOverride(RuleOverride ruleOverride)
124+
{
125+
ruleOverridesMutex.Wait();
126+
try
127+
{
128+
if (!ruleOverrides.ContainsKey(ruleOverride.Type))
129+
{
130+
ruleOverrides.Add(ruleOverride.Type, ruleOverride);
131+
}
132+
}
133+
finally
134+
{
135+
ruleOverridesMutex.Release();
136+
}
137+
}
138+
139+
public bool TryGetOverride(string name, out RuleOverride ruleOverride)
140+
{
141+
ruleOverridesMutex.Wait();
142+
try
143+
{
144+
return ruleOverrides.TryGetValue(name, out ruleOverride);
145+
}
146+
finally
147+
{
148+
ruleOverridesMutex.Release();
149+
}
150+
}
151+
152+
public List<RuleOverride> GetOverrides()
153+
{
154+
ruleOverridesMutex.Wait();
155+
try
156+
{
157+
return new List<RuleOverride>(ruleOverrides.Values);
158+
}
159+
finally
160+
{
161+
ruleOverridesMutex.Release();
162+
}
163+
}
164+
126165
public static void RegisterRuleExecutor(IRuleExecutor executor)
127166
{
128167
GlobalInstance.RegisterExecutor(executor);
@@ -132,5 +171,10 @@ public static void RegisterRuleAction(IRuleAction action)
132171
{
133172
GlobalInstance.RegisterAction(action);
134173
}
174+
175+
public static void RegisterRuleOverride(RuleOverride ruleOverride)
176+
{
177+
GlobalInstance.RegisterOverride(ruleOverride);
178+
}
135179
}
136180
}

test/Confluent.SchemaRegistry.Serdes.UnitTests/SerializeDeserialize.cs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,12 +275,12 @@ public void ISpecificRecordCELFieldTransform()
275275
schema.RuleSet = new RuleSet(new List<Rule>(),
276276
new List<Rule>
277277
{
278-
new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null,
278+
new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null,
279279
"typeName == 'STRING' ; value + '-suffix'", null, null, false)
280280
}
281281
);
282282
store[schemaStr] = 1;
283-
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
283+
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
284284
var config = new AvroSerializerConfig
285285
{
286286
AutoRegisterSchemas = false,
@@ -305,6 +305,46 @@ public void ISpecificRecordCELFieldTransform()
305305
Assert.Equal(user.favorite_number, result.favorite_number);
306306
}
307307

308+
[Fact]
309+
public void ISpecificRecordCELFieldTransformDisable()
310+
{
311+
var schemaStr = User._SCHEMA.ToString();
312+
var schema = new RegisteredSchema("topic-value", 1, 1, schemaStr, SchemaType.Avro, null);
313+
schema.RuleSet = new RuleSet(new List<Rule>(),
314+
new List<Rule>
315+
{
316+
new Rule("testCEL", RuleKind.Transform, RuleMode.Write, "CEL_FIELD", null, null,
317+
"typeName == 'STRING' ; value + '-suffix'", null, null, false)
318+
}
319+
);
320+
store[schemaStr] = 1;
321+
subjectStore["topic-value"] = new List<RegisteredSchema> { schema };
322+
var config = new AvroSerializerConfig
323+
{
324+
AutoRegisterSchemas = false,
325+
UseLatestVersion = true
326+
};
327+
RuleRegistry registry = new RuleRegistry();
328+
registry.RegisterOverride(new RuleOverride("CEL_FIELD", null, null, true));
329+
var serializer = new AvroSerializer<User>(schemaRegistryClient, config, registry);
330+
var deserializer = new AvroDeserializer<User>(schemaRegistryClient, null);
331+
332+
var user = new User
333+
{
334+
favorite_color = "blue",
335+
favorite_number = 100,
336+
name = "awesome"
337+
};
338+
339+
Headers headers = new Headers();
340+
var bytes = serializer.SerializeAsync(user, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
341+
var result = deserializer.DeserializeAsync(bytes, false, new SerializationContext(MessageComponentType.Value, testTopic, headers)).Result;
342+
343+
Assert.Equal("awesome", result.name);
344+
Assert.Equal("blue", result.favorite_color);
345+
Assert.Equal(user.favorite_number, result.favorite_number);
346+
}
347+
308348
[Fact]
309349
public void ISpecificRecordCELFieldCondition()
310350
{

0 commit comments

Comments
 (0)