-
Notifications
You must be signed in to change notification settings - Fork 0
Streams
Debezium uses several different streams to store various kinds of data. Streams are implemented via Kafka topics. Therefore, each stream is produced by at least one service, and most streams are consumed by at least one service. Each stream can be partitioned and each partition is persisted durably (via append-only log) and replicated.
- entity-batches
- entity-patches
- entity-updates
- schema-patches
- schema-updates
- schema-learning
- partial-responses
- complete-responses
- connections
- zone-changes
- changes-by-device
- device-notifications
Description: The stream containing requests that each operate upon batch of entities within a database. Thus, the stream provides a durable record of each batch-level request as it is received by the driver.
Key: unique request ID
Partitioned by: unique request ID
Durability: ideally kept forever; otherwise time-based compacted
Message format: JSON
Required Fields:
Field | Type | Description |
---|---|---|
clientid | string | Unique identifier of the client process, typically represented as a type 4 (random) UUID as a hexadecimal string |
request | long | A unique request number specific to the client. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
dbid | string | Unique database name. |
begun | long | The client-specific timestamp at which the request was submitted to the client. |
patches | array | The array of raw patch documents (no other metadata), one for each patch. |
Optional Fields:
Field | Type | Default | Description |
---|---|---|---|
learning | boolean | false |
If true then the entity should be used by the schema learning algorithm. |
includeBefore | boolean | false |
If true then any results should include the representation of the entity before it is updated. |
includeAfter | boolean | false |
If true then any results should include the representation of the entity after it is updated. |
Sample Message: Here’s a sample batch request that contains a single patch to set three fields on a single entity in the “Contacts” collection.
{
"db": "my-db",
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"request": 2,
"user": "jsmith",
"begun": 1415747086227,
"includeBefore": true,
"includeAfter": true,
"learning" : true,
"patches" : [
{
"collection": "Contacts",
"zone": "default",
"entity": "400cb092-00af-4e44-bdcb-1a0c536692bf",
"ops": [
{
"op": "add",
"path": "firstName",
"value": "Sally"
},
{
"op": "add",
"path": "lastName",
"value": "Anderson"
},
{
"op": "add",
"path": "phone/home",
"value": "1-222-555-1234"
}
]
}
]
}
Description: The stream containing requests that each operate upon a single entity within a database. This stream provides a durable record of all entity patch requests received by the driver.
Key: entity ID
Partitioned by: entity ID
Durability: time-based compacted; the content is derived from entity-batches
Message format: JSON
Required Fields:
Field | Type | Description |
---|---|---|
clientid | string | Unique identifier of the client process, typically represented as a type 4 (random) UUID as a hexadecimal string |
request | long | A unique request number specific to the client. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
dbid | string | Unique database name. |
collection | string | The name of the collection (e.g., type of entity). |
zone | string | The name of the zone within the collection. |
entity | string | The unique identifier of the entity that is the target of the patch. |
parts | int | The total number of parts (e.g., patches) that make up the batch request. |
part | int | The 1-based part number for this patch within the batch. |
begun | long | The client-specific timestamp at which the request was submitted to the client. |
ops | array | The array of operation documents that make up this patch. |
Optional Fields:
Field | Type | Default | Description |
---|---|---|---|
learning | boolean | false |
If true then the entity should be used by the schema learning algorithm. |
includeBefore | boolean | false |
If true then any results should include the representation of the entity before it is updated. |
includeAfter | boolean | false |
If true then any results should include the representation of the entity after it is updated. |
Sample Message: Here’s a sample patch request to set three fields on a single entity in the “Contacts” collection.
{
"db": "my-db",
"collection": "Contacts",
"zone": "default",
"entity": "400cb092-00af-4e44-bdcb-1a0c536692bf",
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"request": 2,
"user": "jsmith",
"begun": 1415747086227,
"part": 1,
"parts": 1,
"includeBefore": true,
"includeAfter": true,
"learning" : true,
"ops": [
{
"op": "add",
"path": "firstName",
"value": "Sally"
},
{
"op": "add",
"path": "lastName",
"value": "Anderson"
},
{
"op": "add",
"path": "phone/home",
"value": "1-222-555-1234"
}
]
}
Description: The stream containing responses after each entity was updated. This stream provides a durable record of all entities that were changed.
Key: entity ID
Partitioned by: entity ID
Durability: time-based compacted; the content is derived ultimately from entity-batches
Message format: JSON
Required Fields:
Field | Type | Description |
---|---|---|
clientid | string | Unique identifier of the client process, typically represented as a type 4 (random) UUID as a hexadecimal string |
request | long | A unique request number specific to the client. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
dbid | string | Unique database name. |
collection | string | The name of the collection (e.g., type of entity). |
zone | string | The name of the zone within the collection. |
entity | string | The unique identifier of the entity that is the target of the patch. |
parts | int | The total number of parts (e.g., patches) that make up the batch request. |
part | int | The 1-based part number for this patch within the batch. |
begun | long | The client-specific timestamp at which the request was submitted to the client. |
ended | long | The timestamp at which the request was completed |
ops | array | The array of operation documents that make up this patch. |
status | int | Patch status code specifies whether the patch was successful or, if failed, the reason for the failure. Possible values are: '1' for success, '2' if the identified entity did not exist, or '3' if the patch could not be applied. See also 'error' field. |
Optional Fields:
Field | Type | Default | Description |
---|---|---|---|
learning | boolean | false |
If true then the entity should be used by the schema learning algorithm. |
includeBefore | boolean | false |
If true then any results should include the representation of the entity before it is updated. |
includeAfter | boolean | false |
If true then any results should include the representation of the entity after it is updated. |
error | string | null |
Description of the failure. Included only if the "status" field value was not '1' (success). |
before | document | null |
The complete representation of the entity before the patch was applied. Included only if the request’s "includeBefore" field value was previously "true". |
after | document | null |
The complete representation of the entity after the patch was applied. Included only if the request’s "includeAfter" field value was previously "true". |
Sample Message: Here’s a sample message that shows a successful update of the given entity in the "Contacts" collection, in response to the entity patch request given as the sample for the entity-patches stream.
{
"db": "my-db",
"collection": "Contacts",
"zone": "default",
"entity": "400cb092-00af-4e44-bdcb-1a0c536692bf",
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"request": 2,
"user": "jsmith",
"begun": 1415747086227,
"part": 1,
"parts": 1,
"ops": [
{
"op": "add",
"path": "firstName",
"value": "Sally"
},
{
"op": "add",
"path": "lastName",
"value": "Anderson"
},
{
"op": "add",
"path": "phone/home",
"value": "1-222-555-1234"
}
],
"status": 1,
"before": null,
"ended": 1415747086327,
"after": {
"firstName": "Sally",
"lastName": "Anderson",
"phone": {
"home": "1-222-555-1234"
}
}
}
There are several things to note. First, the only responses recorded in the entity-updates
stream are those for which the corresponding patch was successfully applied. That means that all messages in this stream should have a status
field value of "1" to denote successful patch application. Secondly, in this case the before
field is null
even though includeBefore
was true
-- that means that this entity was created as part of the application of the patch. Thirdly, because the includeAfter
was also true
, the representation of the updated entity is also included in the response.
Description: The stream containing requests that each operate upon a single database schema. This stream provides a durable record of all schema patch requests received by the driver.
Key: database ID
Partitioned by: database ID
Durability: ideally kept forever; otherwise time-based compacted
Message format: JSON
Required Fields:
Field | Type | Description |
---|---|---|
clientid | string | Unique identifier of the client process, typically represented as a type 4 (random) UUID as a hexadecimal string |
request | long | A unique request number specific to the client. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
dbid | string | Unique database name. |
parts | int | The total number of parts (e.g., patches) that make up the batch request. |
part | int | The 1-based part number for this patch within the batch. |
begun | long | The client-specific timestamp at which the request was submitted to the client. |
ops | array | The array of operation documents that make up this patch. |
Optional Fields:
Field | Type | Default | Description |
---|---|---|---|
includeBefore | boolean | false |
If true then any results should include the representation of the entity before it is updated. |
includeAfter | boolean | false |
If true then any results should include the representation of the entity after it is updated. |
Sample Message:
Here’s a sample patch request to define a new field named firstName
within the Contacts
entity type of the my-db
database.
{
"db": "my-db",
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"request": 2,
"user": "jsmith",
"begun": 1415747089227,
"part": 1,
"parts": 1,
"includeBefore": true,
"includeAfter": true,
"ops": [
{
"op": "add",
"path": "Contacts/fields/firstName",
"value": {
"type" : "STRING",
"optional" : false
}
}
]
}
Description: The stream containing requests that each operate upon a single database schema. This stream provides a durable record of all schema patch requests received by the driver.
Key: database ID
Partitioned by: database ID
Durability: time-based compacted; the content is derived from schema-patches
Message format: JSON
Required Fields:
Field | Type | Description |
---|---|---|
clientid | string | Unique identifier of the client process, typically represented as a type 4 (random) UUID as a hexadecimal string |
request | long | A unique request number specific to the client. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
dbid | string | Unique database name. |
parts | int | The total number of parts (e.g., patches) that make up the batch request. |
part | int | The 1-based part number for this patch within the batch. |
begun | long | The client-specific timestamp at which the request was submitted to the client. |
ended | long | The timestamp at which the request was completed |
ops | array | The array of operation documents that make up this patch. |
status | int | Patch status code specifies whether the patch was successful or, if failed, the reason for the failure. Possible values are: '1' for success, '2' if the identified entity did not exist, or '3' if the patch could not be applied. See also 'error' field. |
Optional Fields:
Field | Type | Default | Description |
---|---|---|---|
includeBefore | boolean | false |
If true then any results should include the representation of the entity before it is updated. |
includeAfter | boolean | false |
If true then any results should include the representation of the entity after it is updated. |
error | string | null |
Description of the failure. Included only if the "status" field value was not '1' (success). |
before | document | null |
The complete representation of the entity before the patch was applied. Included only if the request’s "includeBefore" field value was previously "true". |
after | document | null |
The complete representation of the entity after the patch was applied. Included only if the request’s "includeAfter" field value was previously "true". |
Sample Message:
Here’s a sample patch request to define a new field named homePhone
within the Contacts
entity type of the my-db
database. Note that before the update the schema already defined the Contacts
and Groups
entity types.
{
"db": "my-db",
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"request": 2,
"user": "jsmith",
"begun": 1415747089227,
"part": 1,
"parts": 1,
"includeBefore": true,
"includeAfter": true,
"ops": [
{
"op": "add",
"path": "Contacts/fields/homePhone",
"value": {
"type" : "STRING",
"optional" : true
}
}
],
"status": 1,
"ended": 1415747089327,
"before": {
"Contacts" : {
"fields" : {
"firstName" : {
"type" : "STRING",
"optional" : false
},
"lastName" : {
"type" : "STRING",
"optional" : false
}
}
},
"Groups" : {
"fields" : {
"groupName" : {
"type" : "STRING",
"optional" : false
}
}
}
},
"after": {
"Contacts" : {
"fields" : {
"firstName" : {
"type" : "STRING",
"optional" : false
},
"lastName" : {
"type" : "STRING",
"optional" : false
},
"homePhone" : {
"type" : "STRING",
"optional" : true
}
}
},
"Groups" : {
"fields" : {
"groupName" : {
"type" : "STRING",
"optional" : false
}
}
}
}
}
Description: The stream that contains a combination of entity update messages and entity type update messages. Actually, the latter are derived from the schema update messages: if an updated schema has 4 entity types, then this stream will contain 4 entity type update messages. This stream is partitioned by entity type, ensuring that all entities for a given entity type as well as changes to that entity type always are stored within the same partition.
Key: entity ID or database ID
Partitioned by: entity type (collection) of the entity and schema update
Durability: time-based compacted; the content is derived from schema-updates
Message format: JSON
- For schema updates, a structure outlined below.
The same messages (and thus structure) from entity-updates.
The messages have the same header fields as the schema-updates messages from which they are derived, but no ops
fields are included and the after
field contains only the representation of a single entity.
Required Fields:
Field | Type | Description |
---|---|---|
clientid | string | Unique identifier of the client process, typically represented as a type 4 (random) UUID as a hexadecimal string |
request | long | A unique request number specific to the client. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
dbid | string | Unique database name. |
parts | int | The total number of entity types in the schema. |
begun | long | The client-specific timestamp at which the request was submitted to the client. |
status | int | Always '1' signifying the entity type was successfully updated. |
after | document | The complete representation of the entity type after the patch was applied to the schema. |
Sample Message:
Here’s a sample patch message that shows an updated Contacts
entity type in the my-db
database.
{
"db": "my-db",
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"request": 2,
"user": "jsmith",
"begun": 1415747089837,
"part": 1,
"status": 1,
"after": {
"Contacts" : {
"fields" : {
"firstName" : {
"type" : "STRING",
"optional" : false
},
"lastName" : {
"type" : "STRING",
"optional" : false
},
"homePhone" : {
"type" : "STRING",
"optional" : true
}
}
}
}
}
Description: The stream that contains the responses to all processed requests, including entity patches and schema patches. This stream provides a way for client drivers to get the responses as they are produced. This stream is partitioned by the unique identifier of the client drivers that submitted the request; this way a single client driver instance only has to monitor the partition that contains its responses.
Key: unique request ID
Partitioned by: unique client ID
Durability: time-based compacted with age long enough to for responses to be delivered
Message format: JSON with the structure written to entity-updates and schema-updates
** Required Fields:**
The following are the minimal fields that must appear in this stream's messages.
Field | Type | Description |
---|---|---|
clientid | string | Unique identifier of the client process, typically represented as a type 4 (random) UUID as a hexadecimal string |
request | long | A unique request number specific to the client. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
dbid | string | Unique database name. |
parts | int | The total number of parts (e.g., patches) that make up the original request. |
part | int | The 1-based number for this part. |
begun | long | The client-specific timestamp at which the request was submitted to the client. |
ended | long | The timestamp at which the request was completed |
status | int | Patch status code specifies whether the patch was successful or, if failed, the reason for the failure. Possible values are: '1' for success, '2' if the identified entity did not exist, or '3' if the patch could not be applied. See also 'error' field. |
Description: The stream that contains a message with all partial responses for a given request. If a request has only one part, then the complete response will be identical to the partial response. If a request has multiple parts, then only after all parts have been completed will the aggregate message with all partial responses be written to this stream.
Key: unique request ID
Partitioned by: unique client ID
Durability: time-based compacted, long enough for the responses to be processed and read; derived entirely from partial-responses
Message format: JSON
Required Fields:
Field | Type | Description |
---|---|---|
clientid | string | Unique identifier of the client process, typically represented as a type 4 (random) UUID as a hexadecimal string |
request | long | A unique request number specific to the client. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
parts | int | The total number of parts (e.g., patches) that make up the batch request. |
begun | long | The client-specific timestamp at which the request was submitted to the client. |
ended | long | The timestamp at which the request was completed |
responses | array | An array containing each of the partial response messages that make up this total response |
Sample Message: Here’s a sample complete response message for a request with two parts that each created a separate "Contacts" entity.
{
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"request": 2,
"user": "jsmith",
"begun": 1415747086227,
"parts": 2,
"responses": [
{
"db": "my-db",
"collection": "Contacts",
"zone": "default",
"entity": "400cb092-00af-4e44-bdcb-1a0c536692bf",
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"request": 2,
"user": "jsmith",
"begun": 1415747086227,
"part": 1,
"parts": 2,
"ops": [
{
"op": "add",
"path": "firstName",
"value": "Sally"
},
{
"op": "add",
"path": "lastName",
"value": "Anderson"
},
{
"op": "add",
"path": "phone/home",
"value": "1-222-555-1234"
}
],
"status": 1,
"before": null,
"ended": 1415747086327,
"after": {
"firstName": "Sally",
"lastName": "Anderson",
"phone": {
"home": "1-222-555-1234"
}
}
},
{
"db": "my-db",
"collection": "Contacts",
"zone": "default",
"entity": "101c3c90-e304-4279-b6a0-020363c925c6",
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"request": 2,
"user": "jsmith",
"begun": 1415747086227,
"part": 2,
"parts": 2,
"ops": [
{
"op": "add",
"path": "firstName",
"value": "William"
},
{
"op": "add",
"path": "lastName",
"value": "Johnson"
},
{
"op": "add",
"path": "homePhone",
"value": "1-222-555-9876"
}
],
"status": 1,
"ended": 1415747086328,
"before": null,
"after": {
"firstName": "William",
"lastName": "Johnson",
"homePhone": "1-222-555-9876"
}
}
}
}
Description: The stream that is used to record which device and app are used when connecting to database using a driver. This information can be used to monitor and generate statistics of which versions of the app are being used by users and devices. In the prototype, a message is recorded for each method invocation on a driver instance. This may be too frequent; instead the app could explicitly invoke a method in the driver to register the device & user upon installation and upgrade of the app (and possibly upgrade of the OS when the app discovers the OS version has changed).
Key: username
Partitioned by: username
Durability: time-based compacted, kept long enough to capture the desired statistics
Message format: JSON
Required Fields:
Field | Type | Description |
---|---|---|
clientid | string | Unique identifier of the client process, typically represented as a type 4 (random) UUID as a hexadecimal string |
dbid | string | Unique database name. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
device | string | Unique device token for this app on the device. Some mobile OSes do not allow apps to obtain the actual unique device identifier, and instead return a device token. Debezium uses this field to associate one or more devices with each user. |
appVersion | string | The application-specific version identifier that initiated the connection |
begun | long | The client-specific timestamp at which the request was submitted to the client. |
Sample Message: Here’s a sample connection message:
{
"db": "my-db",
"clientid": "c297cf20-18bd-40e2-827d-9b563ca28ccd",
"user": "jsmith",
"device": "7980f77c-2529-4978-9281-c44297c0f57e",
"appVersion": "1.1",
"begun": 1415747103842
}
Description: The stream that records by zones the entity updates that occur, and that record which users/devices are subscribed to these changes within zones. Note that the latter are represented as entities within the built-in $zoneSubscription
collection of each database, although they are partitioned by the zone ID that is the subject of the subscription (rather than the zone ID of the $zoneSubscription
entity).
Key: entity ID
Partitioned by: zone ID
Durability: time-based compacted; the content is derived from entity-updates
Message format: JSON with the same message structure as entity-updates.
Description: The stream that records for each device the summaries of entity that changed within zones to which the device is subscribed. Each message is device-specific. For example, if two devices, "device1" and "device2" are both subscribed to the creation of new entities and deletion of entities within zone 'A', but only "device2" is subscribed to updates of entities within zone 'A', then when an entity in zone 'A' is created this stream will contain two messages: one representing the creation summary for "device1", and another representing the creation summary for "device2". However, if that same entity is updated, then this stream will contain only one message for "device2".
Key: device ID
Partitioned by: device ID
Durability: time-based compacted, kept long enough for the summaries to be processed and sent via push notifications
Message format: JSON
Required Fields:
Field | Type | Description |
---|---|---|
dbid | string | Unique database name. |
collection | string | The name of the collection (e.g., type of entity). |
zone | string | The name of the zone within the collection. |
entity | string | The unique identifier of the entity that is the target of the patch. |
notification | string | The unique identifier of this notification |
action | string | One of created , updated , or deleted
|
ended | long | The timestamp at which the entity was changed |
Sample Message: Here’s a sample summary message:
{
"db": "my-db",
"collection": "Contacts",
"zone": "default",
"entity": "101c3c90-e304-4279-b6a0-020363c925c6",
"notification" : "c4b61f69-2180-4471-b00f-179be3c918c1",
"action": "created",
"ended": 1415747086328
}
Note that the device is actually not included in the message, but it is included as the key for the message and used for partitioning.
Description: (Proposed) The stream records notification summaries for each device. Each message specifies the notifications that have yet to be acknowledged by the app on a particular device.
Key: device ID
Partitioned by: device ID
Durability: time-based compacted, kept long enough for the information to be processed (presumably by a service that sent info to a Unified Push Service).
Message format: JSON
Required Fields:
Field | Type | Description |
---|---|---|
dbid | string | Unique database name. |
user | string | Unique identifier of the user on behalf of which the client submitted the request. |
device | string | Unique device token for this app on the device. Some mobile OSes do not allow apps to obtain the actual unique device identifier, and instead return a device token. Debezium uses this field to associate one or more devices with each user. |
count | int | The number of unacknowledged notifications for this user on this device |
begun | long | The timestamp at which the first notification was added |
ended | long | The timestamp at which the last notification was added |
Sample Message: Here’s a sample connection message:
{
"db": "my-db",
"user": "jsmith",
"device": "7980f77c-2529-4978-9281-c44297c0f57e",
"count": 3,
"begun": 1415747103842,
"ended": 1415747104012
}