Skip to content

Commit

Permalink
GITBOOK-1109: Deduplication Service + Reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
Lalith Kota authored and gitbook-bot committed Oct 19, 2024
1 parent 2b85ce4 commit e584f56
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 39 deletions.
12 changes: 8 additions & 4 deletions SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [Deduplication](social-registry/features/deduplication/README.md)
* [📔 User Guides](social-registry/features/deduplication/user-guides/README.md)
* [📔 Configure ID Deduplication, Deduplicate, and Save Duplicate Groups/Individuals](social-registry/features/deduplication/user-guides/configure-id-deduplication-deduplicate-and-save-duplicate-groups-individuals.md)
* [Deduplicator Service](social-registry/features/deduplication/deduplicator-service.md)
* [Lock and Unlock](social-registry/features/lock-and-unlock.md)
* [Enumerator](social-registry/features/enumerator/README.md)
* [Enumerator ID](social-registry/features/enumerator/enumerator-id.md)
Expand Down Expand Up @@ -58,7 +59,9 @@
* [📔 ID Authentication Process](social-registry/features/id-integration/id-authentication/user-guides/id-authentication-process.md)
* [📔 eSignet Client Creation](social-registry/features/id-integration/id-authentication/user-guides/esignet-client-creation.md)
* [Fayda ID Integration](social-registry/features/id-integration/fayda-id-integration.md)
* [Verifiable Credentials Issuance](social-registry/features/verifiable-credentials-issuance.md)
* [Verifiable Credentials Issuance](social-registry/features/verifiable-credentials-issuance/README.md)
* [📔 User Guides](social-registry/features/verifiable-credentials-issuance/user-guides/README.md)
* [📔 Configure Inji to download Social Registry VCs](social-registry/features/verifiable-credentials-issuance/user-guides/configure-inji-to-download-social-registry-vcs.md)
* [Computed fields](social-registry/features/score-computation.md)
* [Record Revision History](social-registry/features/record-revision-history.md)
* [SPAR Integration for Account Info](social-registry/features/spar-integration-for-account-info.md)
Expand Down Expand Up @@ -388,9 +391,10 @@
* [Apache Superset](monitoring-and-reporting/apache-superset.md)
* [Reporting Framework](monitoring-and-reporting/reporting-framework/README.md)
* [📔 User Guides](monitoring-and-reporting/reporting-framework/user-guides/README.md)
* [Connector Creation Guide](monitoring-and-reporting/reporting-framework/user-guides/connector-creation-guide.md)
* [Dashboards Creation Guide](monitoring-and-reporting/reporting-framework/user-guides/dashboards-creation-guide.md)
* [Installation & Troubleshooting](monitoring-and-reporting/reporting-framework/user-guides/installation-and-troubleshooting.md)
* [📔 Connector Creation Guide](monitoring-and-reporting/reporting-framework/user-guides/connector-creation-guide.md)
* [📔 Dashboards Creation Guide](monitoring-and-reporting/reporting-framework/user-guides/dashboards-creation-guide.md)
* [📔 Installation & Troubleshooting](monitoring-and-reporting/reporting-framework/user-guides/installation-and-troubleshooting.md)
* [Page 1](monitoring-and-reporting/reporting-framework/user-guides/page-1.md)
* [Kafka Connect Transform Reference](monitoring-and-reporting/reporting-framework/kafka-connect-transform-reference.md)
* [System Logging](monitoring-and-reporting/logging.md)
* [System Health](monitoring-and-reporting/system-health.md)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Kafka Connect Transform Reference

also supports the extraction of nested fieldsThis document is the configuration reference guide for Kafka SMTs developed by OpenG2P, that can be used on [OpenSearch Sink Connectors](https://github.com/OpenG2P/openg2p-reporting).
This document is the configuration reference guide for Kafka SMTs developed by OpenG2P, that can be used on [OpenSearch Sink Connectors](https://github.com/OpenG2P/openg2p-reporting).

Following is a list of some of the other transformations available on the OpenSearch Connectors, apart from the ones developed by OpenG2P:

Expand All @@ -27,28 +27,39 @@ Following is a list of some of the other transformations available on the OpenSe
<table><thead><tr><th width="210">Field name</th><th width="138">Field title</th><th>Description</th><th width="100">Default Value</th></tr></thead><tbody><tr><td>query.type</td><td>Query Type</td><td><p>This is the type of query made to retrieve new field values.</p><p>Supported values:</p><ul><li><code>es</code> (Elasticsearch based).</li></ul></td><td>es</td></tr><tr><td>input.fields</td><td>Input Fields</td><td><p>List of comma-separated fields that will be considered as input fields in the current record.</p><p>Nested input fields are supported, like: (where profile is json that contains name and birthdate fields)</p><pre class="language-json"><code class="lang-json">profile.name,profile.birthdate
</code></pre></td><td></td></tr><tr><td>output.fields</td><td>Output Fields</td><td>List of comma-separated fields to be added to this record.</td><td></td></tr><tr><td>input.default.values</td><td>Input Default Values</td><td>List of comma-separated values to give in place of the input fields when an input field is empty or null.<br>Length of this has to match that of <code>input.fields</code>.</td><td></td></tr><tr><td>es.index</td><td>ES Index</td><td>Elasticsearch(or OpenSearch) index to query for.</td><td></td></tr><tr><td>es.input.fields</td><td>ES Input Fields</td><td>List of comma-separated fields, to be queried on the ES index, each of which maps to the fields on <code>input.fields</code>.<br>Length of this has to match that of <code>input.fields</code>.</td><td></td></tr><tr><td>es.output.fields</td><td>ES Output Fields</td><td>List of comma-separated fields, to be retrieved from the ES query response document, each of which maps to the fields on <code>output.fields</code>. <br>Length of this has to match that of <code>output.fields</code>.</td><td></td></tr><tr><td>es.input.query.add.keyword</td><td>ES Input Query Add Keyword</td><td>Whether or not to add <code>.keyword</code> to the <code>es.input.fields</code> during the term query. Supported values: <code>true</code> / <code>false</code> .</td><td>false</td></tr><tr><td>es.security.enabled</td><td>ES Security Enabled</td><td>If this value is given as <code>true</code>, then Security is enabled on ES.</td><td></td></tr><tr><td>es.url</td><td>ES Url</td><td>Elasticsearch/OpenSearch base URL.</td><td></td></tr><tr><td>es.username</td><td>ES Username</td><td></td><td></td></tr><tr><td>es.password</td><td>ES Password</td><td></td><td></td></tr></tbody></table>

### ExtractFieldAdv
### DynamicNewFieldInsertBack

#### Class name:

* `org.openg2p.reporting.kafka.connect.ExtractFieldAdv$Key` - Applies transform only to the _Key_ of Kafka Connect Record.
* `org.openg2p.reporting.kafka.connect.ExtractFieldAdv$Value` - Applies transform only to the _Value_ of Kafka Connect Record.
* `org.openg2p.reporting.kafka.connect.DynamicNewFieldInsertBack$Key` - Applies transform only to the _Key_ of Kafka Connect Record.
* `org.openg2p.reporting.kafka.connect.DynamicNewFieldInsertBack$Value` - Applies transform only to the _Value_ of Kafka Connect Record.

#### Description:

* This transformation can be used to extract, merge, and/or rename fields in the record.
* This also supports the extraction of nested fields.
* This transformation can be used to add additional data to documents of different index.
* If record matches the configured condition, the given data will be updated into the record with given id.

#### Configuration:

<table><thead><tr><th width="210">Field name</th><th width="138">Field title</th><th>Description</th><th width="100">Default Value</th></tr></thead><tbody><tr><td>query.type</td><td>Query Type</td><td><p>This is the type of query made to retrieve new field values.</p><p>Supported values:</p><ul><li><code>es</code> (Elasticsearch based).</li></ul></td><td>es</td></tr><tr><td>id.expr</td><td>ID Jq Expression</td><td>Jq expression to evaluate the ID of the external document into which the data is supposed to be updated.</td><td></td></tr><tr><td>condition</td><td>Condition</td><td>Jq expression that evaluates to a boolean value which decides whether or not to update.</td><td></td></tr><tr><td>value</td><td>Value</td><td>Jq expression of the value, that evaluates to a JSON, that is to be updated into the external document.</td><td></td></tr><tr><td>es.index</td><td>ES Index</td><td>Elasticsearch(or OpenSearch) index to update into.</td><td></td></tr><tr><td>es.security.enabled</td><td>ES Security Enabled</td><td>If this value is given as <code>true</code>, then Security is enabled on ES.</td><td></td></tr><tr><td>es.url</td><td>ES Url</td><td>Elasticsearch/OpenSearch base URL.</td><td></td></tr><tr><td>es.username</td><td>ES Username</td><td></td><td></td></tr><tr><td>es.password</td><td>ES Password</td><td></td><td></td></tr></tbody></table>

### ApplyJq

#### Class name:

* `org.openg2p.reporting.kafka.connect.ApplyJq$Key` - Applies transform only to the _Key_ of Kafka Connect Record.
* `org.openg2p.reporting.kafka.connect.ApplyJq$Value` - Applies transform only to the _Value_ of Kafka Connect Record.

#### Description:

* This transformation applies the given Jq expression on the current record and replace the current record with the result from Jq.&#x20;
* This transformation can be used for operations like extracting, merging, removing, and/or renaming fields.
* For example:
* `"field": "payload",` : The `payload` field is extracted and the record is replaced with the extracted value.
* `"field": "payload.before",` : The `before` field inside `payload` field is extracted and the record is replaced with the extracted value.
* `"field": "source,payload.before,payload.after",` : The `source` field and the `before` and `after` fields inside the `payload` field are merged, and the record is replaced with the final merged value. Fields in `after` will be prioritized over fields in `before`, and `before` is prioritized over `source` and so on from the config list. Maps or arrays inside the above fields will be merged.
* `"field": "source.ts_ms->source_ts_ms,source.table->source_table",` : The `ts_ms` field in `source` will be extracted and renamed to `source_ts_ms`. The `table` field in `source` will be extracted and renamed to `source_table`. The final record contains only `source_ts_ms` and `source_table` fields.
* `"field": "source.ts_ms->source_ts_ms,payload.before,payload.after",` : The `source_ts_ms` field is added to the merged value of `before` and `after` fields (from `payload` ), and the record is replaced with the final merged value. If the `source_ts_ms` field is already present in `after` , it will be prioritised over the one from `source`.
* This transformation extends from the `ExtractField` transform by Apache.[https://kafka.apache.org/documentation/#org.apache.kafka.connect.transforms.ExtractField](https://kafka.apache.org/documentation/#org.apache.kafka.connect.transforms.ExtractField)
* `"expr": ".payload.after + {source_ts_ms: .payload.source.ts_ms}",` : The expr field should contain a valid Jq expression.

#### Configuration:

<table><thead><tr><th>Field name</th><th>Field title</th><th width="268">Description</th><th>Default value</th></tr></thead><tbody><tr><td>field</td><td>Field name to extract</td><td>Name of the field (or list of fields) to be extracted and merged.</td><td></td></tr><tr><td>array.merge.strategy</td><td>Array merge strategy</td><td><ul><li>Strategy to merge nested arrays.</li><li><p>Available values:</p><ul><li><code>concat</code> : Merge two arrays.</li><li><code>replace</code> : Replace array with new array.</li></ul></li></ul></td><td>concat</td></tr><tr><td>map.merge.strategy</td><td>Map merge strategy</td><td><ul><li>Strategy to merge nested maps.</li><li><p>Available values:</p><ul><li><code>deep</code> : Deep merge two maps.</li><li><code>replace</code> : Replace map with new map.</li></ul></li></ul></td><td>deep</td></tr></tbody></table>
<table><thead><tr><th>Field name</th><th>Field title</th><th width="268">Description</th><th>Default value</th></tr></thead><tbody><tr><td>expr</td><td>Expression</td><td>Jq expression to be applied.</td><td></td></tr><tr><td>behavior.on.error</td><td>Behaviour on error</td><td><p>What to do when encountering error applying Jq expression. Possible values:</p><ul><li><code>halt</code> : Throws exception upon encountering error.</li><li><code>ignore</code> : Ignores any errors encountered.</li></ul></td><td>halt</td></tr></tbody></table>

### StringToJson

Expand Down Expand Up @@ -109,9 +120,28 @@ Following is a list of some of the other transformations available on the OpenSe

#### Configuration

<table><thead><tr><th width="210">Field name</th><th width="138">Field title</th><th>Description</th><th width="100">Default Value</th></tr></thead><tbody><tr><td>ts.order</td><td>Timestamp order</td><td>List of comma-separated fields to select output from. The output will be selected based on whichever field in the order is not null first. Nested fields are supported.</td><td></td></tr><tr><td>output.field</td><td>Output Field</td><td>Name of the output field into which the selected timestamp is put.</td><td><p></p><pre class="language-json"><code class="lang-json">@ts_generated
<table><thead><tr><th width="210">Field name</th><th width="138">Field title</th><th>Description</th><th width="100">Default Value</th></tr></thead><tbody><tr><td>ts.order</td><td>Timestamp order</td><td>List of comma-separated fields to select output from. The output will be selected based on whichever field in the order is not null first. Nested fields are supported.</td><td></td></tr><tr><td>output.field</td><td>Output Field</td><td>Name of the output field into which the selected timestamp is put.</td><td><pre class="language-json"><code class="lang-json">@ts_generated
</code></pre></td></tr></tbody></table>

### TriggerDeduplication

#### Class name:

* `org.openg2p.reporting.kafka.connect.TriggerDeduplication$Key` - Applies transform only to the _Key_ of Kafka Connect Record.
* `org.openg2p.reporting.kafka.connect.TriggerDeduplication$Value` - Applies transform only to the _Value_ of Kafka Connect Record.

#### Description:

* This transformation can be used to trigger deduplication when there is a change in any one of the configured fields.
* This transformation is best used before applying any other transformation.

#### Configuration

<table><thead><tr><th width="210">Field name</th><th width="138">Field title</th><th>Description</th><th width="100">Default Value</th></tr></thead><tbody><tr><td>deduplication.base.url</td><td>Base URL of Deduplicator Service</td><td></td><td></td></tr><tr><td>dedupe.config.name</td><td>Dedupe Config name</td><td>Name of config used for deduplication by deduplicator</td><td>default</td></tr><tr><td>id.expr</td><td>ID Jq Expression</td><td>Jq expression that evaluates the ID of the document that is to be deduplicated</td><td><pre><code>.payload.after.id
</code></pre></td></tr><tr><td>before.expr</td><td>Before Jq Expression</td><td>Jq expression that evaluates the before part of the change. (Used to compare fields with the after part of the change).</td><td><pre><code>.payload.before
</code></pre></td></tr><tr><td>after.expr</td><td>After Jq Expression</td><td>Jq expression that evaluates the after part of the change. (Used to compare fields with the before part of the change).</td><td><pre><code>.payload.after
</code></pre></td></tr><tr><td>wait.before.exec.secs</td><td>Wait before Exec (in secs)</td><td>Time to wait (in secs) before starting deduplication. Useful so that the transformations get applied and the record get indexed into OpenSearch</td><td>10</td></tr></tbody></table>

## Source code

[https://github.com/OpenG2P/openg2p-reporting/tree/develop/opensearch-kafka-connector](https://github.com/OpenG2P/openg2p-reporting/tree/develop/opensearch-kafka-connector)
Loading

0 comments on commit e584f56

Please sign in to comment.