Skip to content

Commit

Permalink
Merge branch 'release/0.9.97'
Browse files Browse the repository at this point in the history
  • Loading branch information
jfallows committed Oct 7, 2024
2 parents e70a261 + 91dfe54 commit 3f67e32
Show file tree
Hide file tree
Showing 178 changed files with 2,269 additions and 359 deletions.
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,23 @@

## [Unreleased](https://github.com/aklivity/zilla/tree/HEAD)

[Full Changelog](https://github.com/aklivity/zilla/compare/0.9.95...HEAD)
[Full Changelog](https://github.com/aklivity/zilla/compare/0.9.96...HEAD)

**Implemented enhancements:**

- Support `jwt` guarded identity via custom token claim [\#1276](https://github.com/aklivity/zilla/issues/1276)
- Support `insert into` to seed `kafka` messages via `risingwave` binding [\#1274](https://github.com/aklivity/zilla/issues/1274)

**Merged pull requests:**

- `pgsql` DROP TOPIC command to KafkaDeleteTopicsBeginEx plus catalog unregister subject [\#1280](https://github.com/aklivity/zilla/pull/1280) ([akrambek](https://github.com/akrambek))
- external udf - python support [\#1278](https://github.com/aklivity/zilla/pull/1278) ([ankitk-me](https://github.com/ankitk-me))
- Support jwt guarded identity via custom token claim [\#1277](https://github.com/aklivity/zilla/pull/1277) ([akrambek](https://github.com/akrambek))
- Support insert into to seed kafka messages via risingwave binding [\#1275](https://github.com/aklivity/zilla/pull/1275) ([akrambek](https://github.com/akrambek))

## [0.9.96](https://github.com/aklivity/zilla/tree/0.9.96) (2024-10-01)

[Full Changelog](https://github.com/aklivity/zilla/compare/0.9.95...0.9.96)

**Implemented enhancements:**

Expand Down
2 changes: 1 addition & 1 deletion build/flyweight-maven-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>build</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion build/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>zilla</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cloud/docker-image/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>cloud</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cloud/helm-chart/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>cloud</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion cloud/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>zilla</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion conf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>zilla</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-amqp.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql-kafka.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
public enum PgsqlKafkaCommandType
{
CREATE_TOPIC_COMMAND("CREATE TOPIC".getBytes()),
DROP_TOPIC_COMMAND("DROP TOPIC".getBytes()),
UNKNOWN_COMMAND("UNKNOWN".getBytes());

private final byte[] value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
public enum PgsqlKafkaCompletionCommand
{
CREATE_TOPIC_COMMAND("CREATE_TOPIC".getBytes()),
DROP_TOPIC_COMMAND("DROP_TOPIC".getBytes()),
UNKNOWN_COMMAND("UNKNOWN".getBytes());

private final byte[] value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static io.aklivity.zilla.runtime.engine.buffer.BufferPool.NO_SLOT;
import static io.aklivity.zilla.runtime.engine.catalog.CatalogHandler.NO_VERSION_ID;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.requireNonNull;

import java.io.InputStreamReader;
Expand Down Expand Up @@ -64,6 +65,7 @@
import net.sf.jsqlparser.parser.CCJSqlParserManager;
import net.sf.jsqlparser.statement.Statement;
import net.sf.jsqlparser.statement.create.table.CreateTable;
import net.sf.jsqlparser.statement.drop.Drop;

public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory
{
Expand Down Expand Up @@ -143,6 +145,7 @@ public final class PgsqlKafkaProxyFactory implements PgsqlKafkaStreamFactory
Object2ObjectHashMap<PgsqlKafkaCommandType, PgsqlDecoder> pgsqlDecoder =
new Object2ObjectHashMap<>();
pgsqlDecoder.put(PgsqlKafkaCommandType.CREATE_TOPIC_COMMAND, this::decodeCreateTopicCommand);
pgsqlDecoder.put(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND, this::decodeDropTopicCommand);
pgsqlDecoder.put(PgsqlKafkaCommandType.UNKNOWN_COMMAND, this::decodeUnknownCommand);
this.pgsqlDecoder = pgsqlDecoder;
}
Expand Down Expand Up @@ -232,6 +235,7 @@ private final class PgsqlProxy
private final String database;
private final PgsqlKafkaBindingConfig binding;
private final KafkaCreateTopicsProxy createTopicsProxy;
private final KafkaDeleteTopicsProxy deleteTopicsProxy;

private final IntArrayQueue queries;

Expand Down Expand Up @@ -281,6 +285,7 @@ private PgsqlProxy(
this.queries = new IntArrayQueue();

this.createTopicsProxy = new KafkaCreateTopicsProxy(routedId, resolvedId, this);
this.deleteTopicsProxy = new KafkaDeleteTopicsProxy(routedId, resolvedId, this);
}

private void onAppMessage(
Expand Down Expand Up @@ -488,7 +493,7 @@ private void onCommandCompleted(
doAppWindow(traceId, authorization);
}

public void onKafkaCreateTopicsBegin(
public void onKafkaBegin(
long traceId,
long authorization)
{
Expand Down Expand Up @@ -1018,7 +1023,81 @@ protected void onKafkaBegin(

if (!errorExits)
{
delegate.onKafkaCreateTopicsBegin(traceId, authorization);
delegate.onKafkaBegin(traceId, authorization);

doKafkaWindow(traceId, authorization);
doKafkaEnd(traceId, authorization);
}
else
{
delegate.cleanup(traceId, authorization);
}
}
}

private final class KafkaDeleteTopicsProxy extends KafkaProxy
{
private KafkaDeleteTopicsProxy(
long originId,
long routedId,
PgsqlProxy delegate)
{
super(originId, routedId, delegate);
}

private void doKafkaBegin(
long traceId,
long authorization,
List<String> topics)
{
initialSeq = delegate.initialSeq;
initialAck = delegate.initialAck;
initialMax = delegate.initialMax;
state = PgsqlKafkaState.openingInitial(state);

final KafkaBeginExFW kafkaBeginEx =
kafkaBeginExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.request(r -> r
.deleteTopics(c -> c
.names(ct ->
topics.forEach(t -> ct.item(i -> i.set(t, UTF_8))))
.timeout(config.kafkaTopicRequestTimeoutMs())))
.build();

kafka = newKafkaConsumer(this::onKafkaMessage, originId, routedId, initialId, initialSeq, initialAck, initialMax,
traceId, authorization, 0, kafkaBeginEx);
}

@Override
protected void onKafkaBegin(
BeginFW begin)
{
final long sequence = begin.sequence();
final long acknowledge = begin.acknowledge();
final long traceId = begin.traceId();
final long authorization = begin.authorization();
final OctetsFW extension = begin.extension();

assert acknowledge <= sequence;
assert sequence >= replySeq;
assert acknowledge >= replyAck;

replySeq = sequence;
replyAck = acknowledge;
state = PgsqlKafkaState.openingReply(state);

assert replyAck <= replySeq;

final ExtensionFW beginEx = extension.get(extensionRO::tryWrap);
final KafkaBeginExFW kafkaBeginEx =
beginEx != null && beginEx.typeId() == kafkaTypeId ? extension.get(kafkaBeginExRO::tryWrap) : null;

boolean errorExits = kafkaBeginEx.response().deleteTopics().topics().anyMatch(t -> t.error() != 0);

if (!errorExits)
{
delegate.onKafkaBegin(traceId, authorization);

doKafkaWindow(traceId, authorization);
doKafkaEnd(traceId, authorization);
Expand Down Expand Up @@ -1292,6 +1371,35 @@ else if (server.commandsProcessed == 0)
}
}

private void decodeDropTopicCommand(
PgsqlProxy server,
long traceId,
long authorization,
DirectBuffer buffer,
int offset,
int length)
{
if (server.commandsProcessed == 1)
{
server.onCommandCompleted(traceId, authorization, length, PgsqlKafkaCompletionCommand.DROP_TOPIC_COMMAND);
}
else if (server.commandsProcessed == 0)
{
final Drop drop = (Drop) parseStatement(buffer, offset, length);
final String topic = drop.getName().getName();

final PgsqlKafkaBindingConfig binding = server.binding;
final String subjectKey = String.format("%s.%s-key", server.database, topic);
final String subjectValue = String.format("%s.%s-value", server.database, topic);

binding.catalog.unregister(subjectKey);
binding.catalog.unregister(subjectValue);

final KafkaDeleteTopicsProxy deleteTopicsProxy = server.deleteTopicsProxy;
deleteTopicsProxy.doKafkaBegin(traceId, authorization, List.of("%s.%s".formatted(server.database, topic)));
}
}

private void decodeUnknownCommand(
PgsqlProxy server,
long traceId,
Expand Down Expand Up @@ -1351,6 +1459,13 @@ private Statement parseStatement(
sql = sql.replace("CREATE TOPIC", "CREATE TABLE");
statement = parserManager.parse(new StringReader(sql));
}
if (decodeCommandType(buffer, offset, length).
equals(PgsqlKafkaCommandType.DROP_TOPIC_COMMAND))
{
String sql = buffer.getStringWithoutLengthUtf8(offset, length);
sql = sql.replace("DROP TOPIC", "DROP TABLE");
statement = parserManager.parse(new StringReader(sql));
}
else
{
inputStream.wrap(buffer, offset, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,15 @@ public void shouldCreateTopic() throws Exception
{
k3po.finish();
}

@Test
@Configuration("proxy.yaml")
@Specification({
"${pgsql}/drop.topic/client",
"${kafka}/drop.topic/server"
})
public void shouldDropTopic() throws Exception
{
k3po.finish();
}
}
2 changes: 1 addition & 1 deletion incubator/binding-pgsql.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-pgsql/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion incubator/binding-risingwave.spec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<parent>
<groupId>io.aklivity.zilla</groupId>
<artifactId>incubator</artifactId>
<version>0.9.96</version>
<version>0.9.97</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ bindings:
options:
udf:
- server: http://localhost:8815
- server: http://localhost:8816
language: python
exit: app1
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@
"title": "Language",
"type": "string",
"default": "java",
"const": "java"
"enum":
[
"java",
"python"
]
}
},
"additionalProperties": false
},
"minItems": 1,
"maxItems": 1
"minItems": 1
}
},
"additionalProperties": false
Expand Down
Loading

0 comments on commit 3f67e32

Please sign in to comment.