Skip to content
Randall Hauch edited this page Mar 5, 2015 · 24 revisions

The Debezium prototype was developed in a short period of time but still was able to successfully demonstrate the benefits of a stream-oriented approach for storing data for mobile apps. This page presents a summary of what this prototype accomplished.

Goals

The original goals for this prototype were as follows:

  • Scalable and fast data storage for mobile apps, with an emphasis on collections, entities, and subscriptions.
  • An API that is designed for MBaaS and mobile devices, especially data synchronization patterns and push notifications as defined by AeroGear.
  • Support multiple independent mobile apps or databases.
  • Support rolling upgrades while the system continues to run.
  • Prevent data loss even when faced with crashes and failures.
  • Assess usefulness of schemas, and investigate techniques to automatically learn the structure of stored entities.
  • Enable use within LiveOak and/or FeedHenry.
  • Plan for cloud deployment, and particularly OpenShift V3, identifying any limitations or roadblocks.

Approach

LiveOak already had support for storing entities within collections using CRUD interfaces and persistent storage with MongoDB and PostgreSQL. Extending support for other data sources, like Cassandra, via the same CRUD API is straightforward. However, one interesting but unexplored area for LiveOak was investigating how an MBaaS data service might be optimized to support mobile apps that synchronize locally-stored data with data stored within the data service. The AeroGear community developed several concepts that outlined how this might be accomplished, but the techniques placed a burden on the server to maintain significant state for each connected client. The challenge that AeroGear faced with their work highlights the difficulty of using a data service with a CRUD-based API.

This prototype effort attempted to find a simple API that worked well for data storage, data synchronization, and subscriptions/notifications. The prototype also investigated the impact this API had on the data storage techniques, taking into consideration the desire to scale to larger data sets and work efficiently with high degrees of concurrency. The prototype's architecture was also to enable rolling maintenance and upgrades without taking the entire system down, and to tolerate crashes with no data loss.

Since the prototype effort would likely entail some areas that involved low-risk and well-understood technologies or approaches, and if necessary these components would be simplified to allow the project to focus upon areas with more unknowns and/or higher-risk. For example, Debezium includes support for sending notifications to mobile devices. To save time, the project only stubbed out this functionality since it could be easily implemented in the future by plugging in and reusing existing AeroGear Unified Push Server functionality.

Developing apps for mobile devices

Mobile devices operate with different constraints than other traditional networked systems and thus have limited or intermittent connectivity. These limitations stem from characteristics of the mobile devices and networks:

  1. The limited power available to mobile devices and the resulting optimizations of mobile operating systems place different requirements on mobile apps. For example, mobile apps might only run when the user is using the app, and might have very limited functionality at other times. App functionality normally available with wifi might be limited when using cell networks.
  2. Mobile devices can switch networks frequently, meaning the IP address of the device might also change. Sometimes devices will have no network connectivity whatsoever. This impacts caching, load balancing, authentication, session-management, and other capabilities of backend services.
  3. The quality of service of mobile networks can vary greatly. Bandwidth is greatly affected by distance from towers or wifi stations, and by the number of devices on a given network.
  4. Different cost structures. Providers may throttle, limit and cap bandwidth, and users may pay significantly for exceeding the data usage caps.
  5. Separate push notification systems that are optimized for mobile devices eliminate the need for apps to rely upon upon durable long-lived connections, and work even when the app is not currently running on a device. However, these push notification systems support notifications with very limited payloads and do not guarantee to deliver all notifications.

These limitations place different constraints on mobile app developers, and often result in mobile apps that behave very differently that might desktop or web-based apps. Many apps will store at least some data locally so that the app remains functional even if there is limited or no available connectivity. While this improves the user experience, it also requires the mobile app developer to explicitly synchronize data when connections can be established, and to respond when data in the remote data service is changed by other users and devices.

The environments, requirements, architectural styles, and technology used to develop mobile apps is different than that for backend services, and this makes it difficult for a single developer to write and support a mobile app and custom backend services that support the app. Mobile Backends as a Service (MBaaS) attempt to provide out-of-the-box backend services so that mobile app developers can focus upon the mobile apps without having to write entire backend services. Some MBaaS providers also offer data services that allow mobile apps to store data in the cloud without having to write much (if any) code to run in the service.

Data service API for mobile apps

Many MBaaS services offer CRUD APIs for accessing and updating data. CRUD APIs are easy to understand, have been used quite a bit in service oriented architectures, and make it easy for various kinds of clients to generically access and update data. Unfortunately, these APIs don't apply nearly as well to mobile apps and actually make it very difficult for mobile devices to efficiently access and update data, and complicate data synchronization. And since making an update requires having the most recent version of data, these APIs make it far more updates from multiple apps will result in conflicts that either has to be handled by the data service (usually very difficult to do generically) or addressed by the client (increasing the complexity of the apps).

Instead, Debezium's API embraces the fact that the mobile app has locally stored some of the entities, even if that data may be slightly stale. To make changes to shared data, a mobile app simply records the changes it (or the user) wants to make to that data. These changes are based upon the latest version of data the app has, and are then submitted as a patch to Debezium. Debezium then applies this patch to the latest master copy of the data that it has, and it returns the complete updated representation of the data. When the client receives this updated representation, it simply has to write it to its local store. Mobile apps can also request they be sent push notifications when entities are created, changed, or removed, making it possible for mobile apps to respond to changes made by the app installed on other devices.

Because a patch contains only the changes to be made, each patch in effect captures the intent of how the data should be changed in the backend service. Mobile apps can also always ensure parts of the data are updated together, giving the mobile app fine-grained control over the consistency of the data updates. Patches to a set of entities can be grouped together and submitted as a batch to make more efficient use of network resources.

Of course, Debezium may not always be able to apply a patch. For example, while disconnected, a mobile app might have recorded changes to an entity that was since removed by other devices; any patch on the removed entity could not be applied. For this reason, when Debezium attempts to apply a patch it will return to the mobile apps whether that patch was successfully applied (and if not why not). In these cases, the mobile app can obtain the latest version of the entity, rebuild a new patch, and submit the patch again. But these failures should be far less frequent than with a traditional CRUD-based service.

Debezium also allows the client to add assertions to each patch. When the patch is submitted to Debezium, Debezium will first check each assertion against the latest data and only if all assertions are satisfied will the patch be applied. This gives even more control to the mobile app.

Stream-based

From Debezium's perspective, it is receiving a continuous stream of read requests and patches from mobile app clients. Debezium records these incoming requests in a durable, replicated, and persistent log. Once a request has been included in the log, it will always be processed. Typically this will happen almost immediately, but in the case of a catastrophic failure it will eventually be processed. Debezium is sequentially consistent, since individual updates to entities are atomic (strongly-consistent), but other corresponding changes to caches or other derived information are processed asynchronously but in-order.

Debezium uses a microservice to continuously consume these requests, process them, and write them out to another log (or stream). Other services then consume messages from these streams, process them, and generate output message on another stream. The output stream of one service is an input stream to one or more other services. Thus, data flows from service to service not via message passing but via these persistent streams.

Rather than use a traditional message bus that passes in-memory messages between services, Debezium uses a durable, partitioned, replicated, distributed persistent log for each stream. These logs are very efficiently written to disk and replicated across multiple machines, ensuring that the log remains available even if failures occur. In essence, as long as at least one copy of each Kafka log remain available, there will be no data loss. (Strictly speaking, downstream logs generated from upstream logs could be lost and rebuilt by reprocessing the upstream log.)

One very significant difference between Kafka logs and other message busses is that with Kafka, each Debezium microservice consumes its input stream at its own pace and can even rewind the stream needed. That means that a microservice can crash or go down for maintenance and catch up when it comes back. It also means that upstream services are completely independent of downstream services, and they can continue processing even when downstream services go down. This makes Debezium more tolerant of failures and easier to upgrade while running.

Another important characteristic of using persistent logs is that each microservice can run in a separate process from the other services. That makes it easy for different microservices to rely and depend upon different third-party services. It also makes it easy to maintain and upgrade individual services without affecting any of the other services. In fact, new versions of services could run alongside the older versions, which might be removed only when the new version has proven itself to be better. Running in separate processes is not a requirement, though, so it's certainly possible for multiple services to run within a single process.

Each microservice that produces a stream of output determines how that output is partitioned in the underlying log. Typically this is by some kind of key (e.g., entity id), ensuring that all messages with the same key end up in the same partition. And because partitions are totally ordered (there is no global ordering between partitions), the order of messages with the same key will always be enforced, even without transactions. Plus, Kafka can append to each partition without coordination, allowing the throughput of the system to scale linearly with the Kafka cluster size.

Debezium is thus comprised of a set of durable, partitioned, replicated, distributed persistent logs (streams), plus a set of microservices that each consume one or more streams and output to one or more other streams. The streams and services are carefully designed so that all processing is atomic, consistent, isolated, durable, and properly ordered.

The Debezium prototype used the Samza distributed stream processing library to simplify the microservice implementations. Samza not only makes it very easy to work with Kafka logs, but it also provides a number of built-in features that simplified the microservice's management of internal state. Samza currently runs on Apache Hadoop YARN](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html), making it easy to distribute and manage a cluster of microservice instances. Unfortunately, this made it very unlikely to run on OpenShift (see below).

Databases, entities and collections

A single Debezium installation organizes all data into databases. Typically, each mobile app would use a separate database, though it's possible that one database might be used by multiple apps, and that a single app might use multiple databases.

Each database is divided into collections, each of which contains entities with a similar meaning and structure but with unique identifiers. For example, an address book app might have a contacts collection. Each entity is represented as a JSON document with any number of fields, including nested documents. Not all fields must exist in all entities within a single collection, but the structure of most entities in one collection will be similar.

An application can store entities within a collection in separate zones. If the zone is not specified when the entity is created, a default zone is used. Mobile apps can create zone subscriptions to be notified when entities within that zone are created, updated, and/or deleted. The previously-mentioned address book mobile app might store all contacts for a given user within a unique zone, and it might subscribe to all creates, updates, and deletes within that zone. This way, a user can install the app on multiple devices, and all the apps are notified when the user uses one of the apps to add, change, or remove contacts.

Events and notifications

When an entity is created, updated, or deleted, Debezium figures out which apps (on which devices) have subscribed to that zone and based upon the subscription generates a push notification for each device/app combination. (Debezium does not currently send these notifications, but it'd be straightforward to create a new microservice that consumes these notification messages and sends them via AeroGear's Unified Push Server.)

Debezium distinguishes between events and notifications. A notification would be sent via mobile push notification services. These push notification services, however, limit the size and content of the actual push notifications, and do not guarantee that the push notifications will be delivered. In fact, when one of these services cannot deliver a push notification but then receives another push notification for that app/device combination, it will likely discard the earlier push notification. Therefore, Debezium uses these push notifications to simply signal to the target app that there are some number of events that the app has not yet seen. And Debezium captures and records the events, ensuring that all events for a particular app/device can be delivered. See the Subscriptions page for more details.

Schema learning

Debezium is able to monitor the managed entities and dynamically learn the schema of those entities. Developers may find this a very useful feature when they write new apps, since during development they can use their new apps to load some small sample of entities into their development database, and Debezium will automatically figure out the schema of those entities. As app development continues, the schema can be modified explicitly, or a developer can clean out a container and reload with new data to have the database relearn the new structure.

Debezium implements this feature with several microservices that continuously process the recorded entity patches to figure out whether fields (at any level within the entity’s JSON representation) are new, are modified, or are removed. It determines the best type for each field, even when the field in separate documents contains values of different types. The schema learning model also determines whether each field is required or optional by tracking whether every document contains the field. For example, if an entity is created without a particular field, that field will be considered optional. But if all other entities contain the field, and that entity is modified to contain the field, then the schema learning model will change the field in the schema to be required.

The prototype shows that basic schemas can be automatically learned via a continuous processing approach, and does not require analyzing the whole data set at once. However, even in a production system the automatically-generated schemas would likely need to be manually tweaked. Debezium exposes the schema for each database to the mobile apps, allowing them to be patched just like entities.

Debezium doesn't currently do much with the schemas, but it is likely that a schema is the most effective way to define the kinds of indexes that need to be maintained for the data set, and to document the structure of entities used by the mobile app. Automatically learning the schema makes eliminates much of the manual overhead normally placed upon developers, and allowing the generated schemas to be modified allows developers to make the changes they need to make.

Blocking within an asynchronous architecture

To submit a batch of read requests or patches for processing, the Debezium driver submits the batch of requests to the entity-batches stream, which is then asynchronously processed with downstream services. However, the driver needs to return a response for the request with either the entities requested in read requests or the status of the patch application for each entity. Debezium has a specific partial-responses stream that is written to by various services and read by threads within the driver instances.

This very simple system allows the drivers to register response handlers for each request, to have a central thread that processes the partial-responses stream, and to invoke the client's handler functions when each part of the total results is available.

Note that the design of the Debezium driver API is such that the client-supplied handler is called for each partial response. For example, if a client submits a batch request to read 3 entities and supplies a handler function, the Debezium driver:

  1. creates a unique ID for the request,
  2. associates the client-supplied handler with that request ID,
  3. submits the batch request to the entity-batches stream, and
  4. returns

The submitted batch request is then asynchronously processed by the entity-batch-service, which breaks it into 3 separate requests and records those requests on the entity-patches stream. The entity-storage-service asynchronously processes each of the requests in the entity-patches stream and for each will output the results (either the entity representation for a read request, or the updated entity for a patch) to the partial-results stream (keyed by the unique request ID).

The Debezium driver, meanwhile, has a thread that is consuming the partial-results stream looking only for those response messages with a request ID originating from that driver. For each applicable response message it finds and invokes the client-supplied handler with the response message.

This simple mechanism makes it possible for a cluster of Debezium drivers to each obtain immediate feedback when each part of its requests are completed, and for the drivers to invoke the client-specified handler functions.

Use within LiveOak and FeedHenry

This Debezium prototype was implemented as a set of separate Java modules and standalone microservices. One of these modules is the Debezium driver, which is a simple Java class intended to be embedded into an web server such as LiveOak, Wildfly, or Undertow.

For LiveOak a new module would adapt the Debezium driver into LiveOak resources so that LiveOak can expose a public RESTful API. This integration was determined to be low-risk and straightforward, so it was given a lower priority.

FeedHenry has a plugin architecture that would require creation of a JavaScript plugin to communicate with a backend server. That could be Debezium driver embedded within LiveOak or within a custom Undertow server. This was not completed due to time constraints.

Cloud and OpenShift

OpenShift V3 is an important Red Hat initiative, so it would be important that Debezium be easily deployed upon it. Apparently, Kafka has been deployed on Docker and Kubernetes, but Apache Samza is currently implemented to run on Apache Hadoop YARN. While it's possible to run YARN and Kubernetes, doing so is quite complicated. If Debezium is continued, one

Security and private/public collections

The original Debezium proposal suggested that it might be important to support collections that were publicly usable and collections that allows each user to see only their own data. This is actually very easy to do with the collections and zones design:

  • Private collections are merely collections where all user data is stored within a zone identified with the user's ID. The MBaaS service that uses Debezium can enforce this design and can ensure that user calls always are done within the user's zone.
  • Public collections are merely collections that allow any user to see the entities in any zone.
  • Hybrid collections store user-specific entities within user-specific zones, but shared entities in shared zones.

Benefits

This prototype effort had several positive lessons learned and benefits:

  1. A log- or stream-based architecture is very exciting, and makes it possible to have sequentially-consistent, replicated, and very durable data storage. It is a massive advantage for consumers to control where in the log they read, and is a big leap beyond traditional-based message-oriented approaches.
  2. A microservices-basd approach makes it very easy to encapsulate logic within small, self-contained, separately managed, and isolated/independent services. Microservices can be easily composed to add functionality to the system, often with no impact on any of the other microservices. Each microservice is composed of a single thread, keeping them very simple. These microservices can be deployed as separate processes, or several difference services can be packaged together for deployment to a single process. Additionally, each microservice can maintain its own state in a local or external store.
  3. Pairing microservices with persistent logs results in a very capable architecture with tremendous advantages, especially for high-volume processing use cases. This project showed that it is also very applicable for managing and storing data, even when that data is not within a traditional data storage system, and even without using transactions of any kind.
  4. Using partitioned logs and then clustering the microservices such that each partition is consumed by only one instance of a microservice makes it very easy to manage and partition state within the microservices.
  5. No transactions are required. The use of partitions, assignment of a partition to a instance of each microservice, and the single-threaded nature of each microservice means all data can be processed atomically without any transactions.
  6. Asynchronous notifications can easily be handled within this architecture using a simple approach via dedicated streams for the actions of interest.
  7. It is easy to record data-related events and subscriptions, and to use the mobile push notification frameworks to notify mobile apps of the underlying changes.
  8. Schema learning is very feasible, and implemented with this architecture eliminates the impact on other services while also keeping the learning service very simple.
  9. Adding new functionality is as simple as adding services that consume existing streams, and additional services have absolutely no impact on the performance of upstream services (or streams). Compare this with an interceptor framework, which is far more complex, requires ordering interceptors, and results in slower performance as larger numbers of interceptors are added.
  10. A proposed service may need an existing stream to be re-partitioned to more effectively partition the new service's design. This is very easily accomplished with a trivial microservice. Again, this shows how downstream services have no impact on upstream services.
  11. Monitoring of the services and components is easily accomplished with separate streams dedicated for metrics. There is very low overhead to writing to these streams, so it is often simpler to just have the services/components always write out the metrics. It is then up to downstream services that consume these streams with metric information to process, aggregate, and analyze the resulting information. In fact, not all the metric information need be used. Plus, windowing and aggregation is very naturally and efficiently done with microservices in a stream-oriented architecture.

Shortcomings

This prototype effort had several shortcomings:

  1. Lack of query capability: This was determined to be relatively low risk, since one very straightforward approach would be to add a new service that stores data in an ElasticSearch cluster/instance. The particular indexes for each collection could be controlled by the schemas. (One of the existing services already consumes schema changes, so that is straightforward.) Using a separate ElasticSearch cluster/instance would also make it very easy for the driver(s) to simply forward queries to ElasticSearch via REST calls and then immediately return the results. On the other hand, using messages to
  2. The inability to easily deploy Samza on OpenShift (see above). If Debezium is continued, Samza would have to be replaced or enhanced to support OpenShift. At this point, it's not clear which is the least amount of work, but ignoring that enhancing Samza to support OpenShift is probably a better approach.
  3. Deploying stateful PODs on Kubernetes/OpenShift is apparently not as easy as deploying stateless containers and PODs. Other than improving the ability for Kafka and Samza to run on top of Kubernetes/OpenShift (see below), perhaps the next best thing is to offer a series of Kubernetes/OpenShift pods:
  4. A pod for ZooKeeper instances (if it cannot be replaced with etcd)
  5. A pod for Kafka instances
  6. A pod for Debezium services
  7. Even though a microservices and stream-based architecture makes it easier to work on each individual service and stream, as a whole it is more complex that a monolithic application. Therefore, deployment and management may be more complex without careful design and support. This may suggest that the infrastructure underlying the Debezium services and streams may be useful for other purposes, and could be abstracted out. If this is the case that this infrastructure is focused upon simplifying deployment and management, then it might be possible for Debezium and other stream-oriented microservice systems to be substantially more complicated than monolithic systems. (In fact, if that infrastructure supported deployment as a monolithic system or as distributed microservices, then perhaps it might not be that much of a difference but would actually enable the same code to be deployed both ways.)

Next steps

The result of this short-lived prototype shows that there is substantial promise in stream-oriented processing and with using logs. JBoss and Red Hat should consider using more of this technology.

There are several things that could happen next:

  1. Continue development of Debezium as a data service for mobile apps. This could be done as either a new OSS project or within the LiveOak efforts. Short term goals would be to issue a release as soon as possible, and work with the LiveOak and AeroGear to provide a working MBaaS data service offering the new API. Some technology used within the prototype might need to be re-evaluated (such as avoiding Samza), and other technology might be considered (such as ElasticSearch, Cassandra, etc.). Other enhancement ideas include:
  2. Add metrics that describe how well each service is processing each message, and which requests resulted in an error in the service.
  3. Re-evaluate using JSON for messages stored in the streams.
  4. Continuous queries
  5. Take the lessons learned about CQRS (separating writes from reads), event sourcing (recording all changes as input events), stream orientation, transaction logs, microservices, the patch-based API, and deconstructed databases to start a new project that makes it easy for developers to build custom document storage and asynchronous event-based processing services and workflows. They would do this by create the combinations simple stream-consuming services that provide the necessary functionality. Some turn-key services would be provided, including request/patch storage, document storage and updates, continuous queries, subscriptions, event/subscriptions, artifact storage, mobile device records, push notifications, schema learning, etc. Custom services would implement a simple interface, and could use capabilities/components provided by the framework (e.g., key-value storage, range indexes, aggregation, windowing, etc.). If possible, some services could be implemented by merely customizing a provided implementation. Nevertheless, perhaps the most important feature would be to enable deploying these event- and stream-oriented workflows on a range of environments with little (if any) change, other than choosing and configuring the underlying environment to suit the requirements. For example, a workflow could be embedded within an app and deployed to an app server, using only local file storage. On the other hand, if the workflow needs to scale and distribute/replicate data, then same workflow could be deployed as an OpenShift application that uses Kafka for replicated, durable, and fault-tolerant stream storage. Debezium (the mobile data service) could be implemented using this framework, but other kinds of data storing applications (e.g., a schema evolution registry for Avro schemas) could just as easily be built without having to reimplement or integrate much of the same technology and components. Each deployment environment would combine a set of existing technology (like Vert.x, Fabric8, RocksDB, Docker, Kubernetes/OpenShift, Kafka, etc.) to provide the necessary foundation for the runtime requirements of the stream-based workflows. (Note: How different is this from Apache Spark Streaming? It would be data-storage-oriented, but perhaps that could be done on top of Spark Streaming? Unfortunately, apparently Spark runs well on top of Mesos, but not sure how well that fits into OpenShift plans.)
  6. Look for other uses of log-based and stream-based processing within JBoss and RedHat, including:
  7. Data virtualization and integration, including perhaps change data capture.
  8. Monitoring to better support much larger volumes of metrics data, reducing the monitoring overhead and footprint in the monitored services and increasing the flexibility in the analysis and reporting system.
  9. Add integrated support for Kafka (and message logs in general) to Vert.x, to abstract away the implementation of the logs from the framework.
  10. Provide other new turn-key microservices (similar to KeyCloak) for use by enterprise apps and within Fuse and MBaaS (e.g., LiveOak and FeedHenry), such as: 1. event storage (for use with push notifications) 1. device management and storage 1. continuous queries 1. data subscriptions 1. artifact storage 1. distributed listeners for asynchronous microservices (allowing clients to block until their requests are partially processed)
  11. Collaborating with Apache Kafka to offer integrated support for Kubernetes/OpenShift. For example, perhaps consider using etcd rather than ZooKeeper for coordination.
  12. Collaborating with Apache Samza to offer integrated support for Kubernetes/OpenShift. Because some services will own their own state and that state can be local, if those services fail in OpenShift (or other fabrics) it may take time to start up and synchronize an additional process. This may suggest a need to offer "warm" (or in-sync) replicas of services to use upon failure of other primary services.