-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathSchemaRegistry
356 lines (239 loc) · 15.9 KB
/
SchemaRegistry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Avro schemas. It stores a versioned history of all schemas, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility settings and expanded Avro support. It provides serializers that plug into Kafka clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format.
Schema Registry is a distributed storage layer for Avro Schemas which uses Kafka as its underlying storage mechanism. Some key design decisions:
Assigns globally unique ID to each registered schema. Allocated IDs are guaranteed to be monotonically increasing but not necessarily consecutive.
Kafka provides the durable backend, and functions as a write-ahead changelog for the state of Schema Registry and the schemas it contains.
Schema Registry is designed to be distributed, with single-master architecture, and ZooKeeper/Kafka coordinates master election (based on the configuration).
Start Schema Registry without SSL(PLAINText):
add below properties in CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
listeners=http://0.0.0.0:8081
kafkastore.connection.url=localhost:2181
kafkastore.topic=_schemas
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
Start Schema Registry with SSL
add below properties in CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
listeners=https://0.0.0.0:8085
kafkastore.connection.url=localhost:2181
kafkastore.topic=_schemas
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
bootstrap.servers=localhost:9095
kafkastore.security.protocol=SSL
security.protocol=SSL
kafkastore.ssl.truststore.location=/home/shamim/Confluent/security/kafka.server.truststore.jks
kafkastore.ssl.truststore.password=pwd
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.type=JKS
ssl.truststore.type = JKS
ssl.truststore.location =/home/shamim/Confluent/security/kafka.server.truststore.jks
ssl.truststore.password=pwd
ssl.keystore.password=pwd
ssl.keystore.location=/home/shamim/Confluent/security/kafka.server.keystore.jks
ssl.key.password=pwd
3. run the below command:
./schema-registry-start /home/shamim/Confluent/confluent-3.2.2/etc/schema-registry/schema-registry.properties
4. After schema registry is successfully started , we can see below lines in debug:
[2017-08-13 08:16:52,468] WARN Ignoring Kafka broker endpoint PLAINTEXT://localhost:9092 that does not match the setting for kafkastore.security.protocol=SSL (io.confluent.kafka.schemaregistry.storage.KafkaStore:301)
[2017-08-13 08:16:52,469] INFO Initializing KafkaStore with broker endpoints: SSL://localhost:9095 (io.confluent.kafka.schemaregistry.storage.KafkaStore:131)
[2017-08-13 08:16:52,486] WARN The replication factor of the schema topic _schemas is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers
[2017-08-13 08:16:54,303] INFO Started NetworkTrafficServerConnector@1ce61929{SSL-HTTP/1.1}{0.0.0.0:8085} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
Now we can use the url to see the list of all the schemas
http://localhost:8085/
we can now register the schema as below:
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"type\": \"string\"}"}' http://localhost:8085/subjects/Kafka-key/versions
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{ "schema": "{ \"type\": \"record\", \"name\": \"Person\", \"namespace\": \"com.ippontech.kafkatutorials\", \"fields\": [ { \"name\": \"firstName\", \"type\": \"string\" }, { \"name\": \"lastName\", \"type\": \"string\" }, { \"name\": \"birthDate\", \"type\": \"long\" } ]}" }' \
http://localhost:8081/subjects/persons-avro-value/versions
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{ "schema": "{ \"type\": \"record\", \"name\": \"Person\", \"namespace\": \"com.ippontech.kafkatutorials\", \"fields\": [ { \"name\": \"id\", \"type\": \"string\" }, { \"name\": \"name\", \"type\": \"string\" } ]}" }' \
http://localhost:8085/subjects/persons-avro-value/versions
to fetch the schemas:
http://localhost:8085/schemas/ids/41
Why Schema Registry?
Consumer has its schema which could be different than the producers.
The consumer schema is the schema the consumer is expecting the record/message to conform to. With the Schema Registry a compatibility check is performed and if the two schemas don’t match but are compatible, then the payload transformation happens via Avro Schema Evolution.
Kafka records can have a Key and a Value and both can have a schema.
Schema Registry Operations
The Schema Registry can store schemas for keys and values of Kafka records.
It can also list schemas by subject.
It can list all versions of a subject (schema).
It can retrieve a schema by version or id.
It can get the latest version of a schema.
Importantly, the Schema Registry can check to see if schema is compatible with a certain version. There is a compatibility level (BACKWARDS, FORWARDS, FULL, NONE) setting for the Schema Registry and an individual subject.
You can manage schemas via a REST API with the Schema registry.
Schema Registry Schema Compatibility Settings
Backward compatibility means data written with older schema is readable with a newer schema.
Forward compatibility means data written with newer schema is readable with old schemas.
Full compatibility means a new version of a schema is backward and forward compatible.
None disables schema validation and it not recommended. If you set the level to none then Schema Registry just stores the schema and Schema will not be validated for compatibility at all.
Schema Registry Config
The Schema compatibility checks can is configured globally or per subject.
The compatibility checks value is one of the following:
NONE - don’t check for schema compatibility
FORWARD - check to make sure last schema version is forward compatible with new schemas
BACKWARDS (default) - make sure new schema is backwards compatible with latest
FULL - make sure new schema is forwards and backwards compatible from latest to new and from new to latest
Schema Evolution
If an Avro schema is changed after data has been written to store using an older version of that schema, then Avro might do a Schema Evolution when you try to read that data.
From Kafka perspective, Schema evolution happens only during deserialization at Consumer (read). If Consumer’s schema is different from Producer’s schema, then value or key is automatically modified during deserialization to conform to consumers reader schema if possible.
Avro schema evolution is an automatic transformation of Avro schema between the consumer schema version and what the schema the producer put into the Kafka log. When Consumer schema is not identical to the Producer schema used to serialize the Kafka Record, then a data transformation is performed on the Kafka record’s key or value. If the schemas match then no need to do a transformation
Allowed Modification During Schema Evolution
You can add a field with a default to a schema. You can remove a field that had a default value. You can change a field’s order attribute. You can change a field’s default value to another value or add a default value to a field that did not have one. You can remove or add a field alias (keep in mind that this could break some consumers that depend on the alias). You can change a type to a union that contains original type. If you do any of the above, then your schema can use Avro’s schema evolution when reading with an old schema.
Rules of the Road for modifying Schema
If you want to make your schema evolvable, then follow these guidelines. Provide a default value for fields in your schema as this allows you to delete the field later. Never change a field’s data type. When adding a new field to your schema, you have to provide a default value for the field. Don’t rename an existing field (use aliases instead). You can add an alias
Let’s use an example to talk about this. The following example is from our Avro tutorial.
Employee example Avro Schema
{"namespace": "com.cloudurable.phonebook",
"type": "record",
"name": "Employee",
"doc" : "Represents an Employee at a company",
"fields": [
{"name": "firstName", "type": "string", "doc": "The persons given name"},
{"name": "nickName", "type": ["null", "string"], "default" : null},
{"name": "lastName", "type": "string"},
{"name": "age", "type": "int", "default": -1},
{"name": "emails", "default":[], "type":{"type": "array", "items": "string"}},
{"name": "phoneNumber", "type":
[ "null",
{ "type": "record", "name": "PhoneNumber",
"fields": [
{"name": "areaCode", "type": "string"},
{"name": "countryCode", "type": "string", "default" : ""},
{"name": "prefix", "type": "string"},
{"name": "number", "type": "string"}
]
}
]
},
{"name":"status", "default" :"SALARY", "type": { "type": "enum", "name": "Status",
"symbols" : ["RETIRED", "SALARY", "HOURLY", "PART_TIME"]}
}
]
}
Avro Schema Evolution Scenario
Let’s say our Employee record did not have an age in version 1 of the schema and then later we decided to add an age field with a default value of -1. Now let’s say we have a Producer using version 2 of the schema with age, and a Consumer using version 1 with no age.
The Producer uses version 2 of the Employee schema and creates a com.cloudurable.Employee record, and sets age field to 42, then sends it to Kafka topic new-employees. The Consumer consumes records from new-employees using version 1 of the Employee Schema. Since Consumer is using version 1 of the schema, the age field gets removed during deserialization.
The same consumer modifies some records and then writes the record to a NoSQL store. When the Consumer does this, the age field is missing from the record that it writes to the NoSQL store. Another client using version 2 of the schema which has the age, reads the record from the NoSQL store. The age field is missing from the record because the Consumer wrote it with version 1, thus the client reads the record and the age is set to default value of -1.
If you added the age and it was not optional, i.e., the age field did not have a default, then the Schema Registry could reject the schema, and the Producer could never it add it to the Kafka log.
Using REST Schema Registry REST API
Recall that the Schema Registry allows you to manage schemas using the following operations:
store schemas for keys and values of Kafka records
List schemas by subject.
list all versions of a subject (schema).
Retrieves a schema by version
Retrieves a schema by id
Retrieve the latest version of a schema
Perform compatibility checks
Set compatibility level globally
Set compatibility level globally
Recall that all of this is available via a REST API with the Schema Registry.
Sample Demo:
Step 1: Create an avro file with schema
{ "type": "record", "name": "Person", "namespace": "com.test", "fields": [ { "name": "firstName", "type": "string" }, { "name": "lastName", "type": "string" }, { "name": "birthDate", "type": "long" }, { "name": "address", "type": "long" },{"name":"edu","type":["null", "string"],"default":null} ]}
Step 2: use avro-tool to compile the schema file and generate a class
java -jar avro-tools-1.7.7.jar compile schema AvroProject\Person.avsc <Target Folder>
This will generate a java class. Copy the class and place in the Project in eclipse.
Step 3: Write Producer and consumer code below
Producer Avro:
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Properties;
import java.util.stream.IntStream;
public class ProducerAvro {
private static Producer<Long, Person> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "AvroProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
LongSerializer.class.getName());
// Configure the KafkaAvroSerializer.
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
KafkaAvroSerializer.class.getName());
// Schema Registry location.
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081");
return new KafkaProducer<>(props);
}
private final static String TOPIC = "persons-avro";
public static void main(String... args) {
Producer<Long, Person> producer = createProducer();
Person bob = Person.newBuilder().setFirstName("Wilson")
.setLastName("Jones")
.setAddress(5)
.setBirthDate(66)
.build();
IntStream.range(1, 100).forEach(index->{
producer.send(new ProducerRecord<>(TOPIC, 1L * index, bob));
});
producer.flush();
producer.close();
}
}
Step 4: Consumer Code
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import java.util.Collections;
import java.util.Properties;
import java.util.stream.IntStream;
public class ConsumerAvro {
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private final static String TOPIC = "persons-avro";
private static Consumer<Long, Person> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
//Use Kafka Avro Deserializer.
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
KafkaAvroDeserializer.class.getName()); //<----------------------
//Use Specific Record or else you get Avro GenericRecord.
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
//Schema registry location.
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:8081"); //<----- Run Schema Registry on 8081
return new KafkaConsumer<>(props);
}
public static void main(String... args) {
final Consumer<Long, Person> consumer = createConsumer();
consumer.subscribe(Collections.singletonList(TOPIC));
IntStream.range(1, 100).forEach(index -> {
final ConsumerRecords<Long, Person> records =
consumer.poll(1000);
if (records.count() == 0) {
System.out.println("None found");
} else records.forEach(record -> {
Person employeeRecord = record.value();
System.out.printf("%s %d %d %s \n", record.topic(),
record.partition(), record.offset(), employeeRecord);
});
});
}
}
Run the code and u can see output: