Skip to content

Client Examples

Regunath B edited this page Apr 18, 2016 · 3 revisions

##Example : Sample-cluster-client Sample-cluster-client can be used for many types of relay events. We give an example of how to run the cluster-client for Mysql-Relay-Events.

  • First run the sample-mysql-relay server.

The steps to run a consumer of the mysql-relay events. Do the following:

  • Install the latest stable release of ZooKeeper from (http://zookeeper.apache.org/releases.html).
  • To start ZooKeeper you need a configuration file. Here is a sample, create it in $(Path_to_zookeeper)/conf/zoo.cfg:
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181

This file can be called anything, but for the sake of this discussion call it conf/zoo.cfg.

  • Now that you created the configuration file, you can start ZooKeeper:
bin/zkServer.sh start
  • git clone aesop (ignore if done already)
  • Run mvn clean install in sample-client-cluster directory. This might take a while to download Aesop, Trooper and their dependencies from the various Maven repositories.
  • Start the Client in sample-client-cluster directory:
sh sample-client-cluster/distribution/sample-client-cluster/start.sh
  • Now on any MySQL event, you will see updates on console in both Relay and Consumer Client like:

Relay:

15:46:49.229 [binlog-parser-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.r.p.m.i.DefaultBinLogEventMapper - Mapped GenricRecord for schema Person : {"id": 76, "firstName": "greg", "lastName": "chappel", "birthDate": 486844200000, "deleted": "false"}
Event type:16

Client:

15:46:49.280 [callback-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.s.c.c.c.AbstractMySqlEventConsumer - Event : MysqlBinLogEventImpl [entityName : com.flipkart.aesop.events.ortest.person, eventType : DELETE, pKeyList : [id], keyValuePairs : {firstName=greg, lastName=chappel, deleted=false, id=40, birthDate=1985-06-06 00:00:00.0}

##Example : Sample-elastic-search-client-cluster-consumer Sample-elastic-search-client-cluster-consumer implements Sample-Cluster-Client and consumes Mysql-Relay-Events and Upserts/Deletes into an ElasticSearch Cluster Database. Follow the steps to setup:

  • First run the sample-mysql-relay server.

The steps to run a consumer of the mysql-relay events. Do the following:

  • Install the latest stable release of ZooKeeper from (http://zookeeper.apache.org/releases.html).
  • To start ZooKeeper you need a configuration file. Here is a sample, create it in $(Path_to_zookeeper)/conf/zoo.cfg:
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181

This file can be called anything, but for the sake of this discussion call it conf/zoo.cfg.

  • Now that you created the configuration file, you can start ZooKeeper:
bin/zkServer.sh start
  • Start your ElasticSearch Cluster Server. The latest version of ElasticSearch is available here Elastic Search downloads. Run the cluster server in MultiCast,Unicast and TransportClient configuration and configure the Sample-elastic-search-client-cluster-consumer accordingly.
  • git clone aesop (ignore if done already)
  • Run mvn clean install in sample-elastic-search-client-cluster-consumer directory. This might take a while to download Aesop, Trooper and their dependencies from the various Maven repositories.
  • Configure the ElasticSearchClient as MulticastClient,UnicastClient and TransportClient according to Server configuration in
sample-elastic-search-client-cluster-consumer/src/main/resources/external/spring-client-config.xml. 

ElasticSearch Server Configuration can be done in sample-elastic-search-client-cluster-consumer/src/main/resources/config.infra.es.conf file. All demo configuration are provided. Provide Cluster Server IP instead of "localhost".

  • Start the Client in sample-elastic-search-client-cluster-consumer directory:
sh sample-elastic-search-client-cluster-consumer/distribution/start.sh
  • Now on any MySQL event, you will see updates on console in both Relay and Consumer Client like:

Relay:

15:46:49.229 [binlog-parser-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.r.p.m.i.DefaultBinLogEventMapper - Mapped GenricRecord for schema Person : {"id": 76, "firstName": "greg", "lastName": "chappel", "birthDate": 486844200000, "deleted": "false"}
Event type:16

Client:

15:07:12.014 [callback-1] INFO  c.l.d.c.consumer.LoggingConsumer.doStartDataEventSequence 344 c.l.d.c.consumer.LoggingConsumer - startDataEventSequence:{"sourceId":1,"sequence":4294968639}
15:07:12.017 [callback-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.e.AbstractEventConsumer - Event : AbstractEvent [fieldsMap={id=18, lastName=chappel, birthDate=1985-06-06 00:00:00.0, firstName=greg, deleted=false}, primaryKeysSet=[id], entityName=Person, namespaceName=or_test, eventType=UPSERT]
15:07:12.019 [callback-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.e.u.ElasticSearchUpsertDataLayer - Received Upsert Event. Event is AbstractEvent [fieldsMap={id=18, lastName=chappel, birthDate=1985-06-06 00:00:00.0, firstName=greg, deleted=false}, primaryKeysSet=[id], entityName=Person, namespaceName=or_test, eventType=UPSERT]
15:07:12.020 [callback-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.e.u.ElasticSearchUpsertDataLayer - Field Map Pair : {id=18, lastName=chappel, birthDate=1985-06-06 00:00:00.0, firstName=greg, deleted=false}
15:07:12.041 [callback-1] INFO  c.l.d.c.consumer.LoggingConsumer.endDataEventSequenceStats 517 c.l.d.c.consumer.LoggingConsumer - str: 1 updates => wt:14229.111;cb:27.218
events => bop=-1 eop=4294968639 Person=1 (24.854 ms) 
15:07:12.043 [callback-1] INFO  c.l.d.c.consumer.LoggingConsumer.doCheckpoint 457 c.l.d.c.consumer.LoggingConsumer - Checkpoint:{"sourceId":1,"sequence":4294968639}

You can verify the contents in ElasticSearch using the REST API.

##Example : Sample-Kafka Client in 5 minutes.

Follow the steps to setup:

  • First run the sample-mysql-relay server.

The steps to run a consumer of the mysql-relay events. Do the following:

  • Download the latest stable release of ZooKeeper from (http://zookeeper.apache.org/releases.html).
  • To start ZooKeeper you need a configuration file. Here is a sample, create it in $(Path_to_zookeeper)/conf/zoo.cfg:
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181

This file can be called anything, but for the sake of this discussion call it conf/zoo.cfg.

  • Now that you created the configuration file, you can start ZooKeeper:
bin/zkServer.sh start
sh sample-kafka-client-cluster-consumer/distribution/script/start.sh
  • Now on any MySQL event, you will see updates on console in both Relay and Consumer Client like:

Relay:

15:46:49.229 [binlog-parser-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.r.p.m.i.DefaultBinLogEventMapper - Mapped GenricRecord for schema Person : {"id": 76, "firstName": "greg", "lastName": "chappel", "birthDate": 486844200000, "deleted": "false"}
Event type:16

Client:

15:07:12.014 [callback-1] INFO  c.l.d.c.consumer.LoggingConsumer.doStartDataEventSequence 344 c.l.d.c.consumer.LoggingConsumer - startDataEventSequence:{"sourceId":1,"sequence":4294968639}
15:07:12.017 [callback-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.e.AbstractEventConsumer - Event : AbstractEvent [fieldsMap={id=18, lastName=chappel, birthDate=1985-06-06 00:00:00.0, firstName=greg, deleted=false}, primaryKeysSet=[id], entityName=Person, namespaceName=or_test, eventType=UPSERT]
15:07:12.019 [callback-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.p.k.p.i.SyncKafkaEventProcessor - Received Upsert Event. Event is AbstractEvent [fieldsMap={id=18, lastName=chappel, birthDate=1985-06-06 00:00:00.0, firstName=greg, deleted=false}, primaryKeysSet=[id], entityName=Person, namespaceName=or_test, eventType=UPSERT]
15:07:12.020 [callback-1] INFO  o.t.p.c.impl.logging.SLF4jLogWrapper.info 59 c.f.a.p.k.p.i.SyncKafkaEventProcessor - Field Map Pair : {id=18, lastName=chappel, birthDate=1985-06-06 00:00:00.0, firstName=greg, deleted=false}
15:07:12.041 [callback-1] INFO  c.l.d.c.consumer.LoggingConsumer.endDataEventSequenceStats 517 c.l.d.c.consumer.LoggingConsumer - str: 1 updates => wt:14229.111;cb:27.218
events => bop=-1 eop=4294968639 Person=1 (24.854 ms) 
15:07:12.043 [callback-1] INFO  c.l.d.c.consumer.LoggingConsumer.doCheckpoint 457 c.l.d.c.consumer.LoggingConsumer - Checkpoint:{"sourceId":1,"sequence":4294968639}

You can verify the contents in Kafka using the Kafka console consumer script from the kafka root directory

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 
Clone this wiki locally