-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKafkaServer
163 lines (109 loc) · 8.48 KB
/
KafkaServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
Apacke Kafka is an heart of Confluent platform. apache kafka the most popular open source distributed streaming platform.
It has three key capabilities:
It lets you publish and subscribe to streams of records
It lets you store streams of records in a fault tolerant way
It lets you process streams of records
The Apache Kafka open source project consist of two key components:
•Kafka Brokers (open source). Kafka brokers that form the mesaging, data persistency and storage tier of Apache Kafka
•Kafka Java Client APIs (open source)
Producer API (open source). Java Client that allows an application to publish a stream records to one or more Kafka topics.
Consumer API (open source). Java Client that allows an application to subscribe to one or more topics and process the stream of records produced to them.
Streams API (open source). Allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams. It has a very low barrier to entry, easy operationalization, and a high-level DSL for writing stream processing applications. As such it is the most convenient yet scalable option to process and analyze data that is backed by Kafka.
Connect API (open source). A component to stream data between Kafka and other data systems in a scalable and reliable way. It makes it simple to configure connectors to move data into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing. Connectors can also deliver data from Kafka topics into secondary indexes like Elasticsearch or into batch systems such as Hadoop for offline analysis
Kafka is a message system like EMS, ActiveMQ.
Apache Kafka is a distributed publish-subscribe messaging system and a robust queue that can handle a high volume of data and enables you to pass messages from one end-point to another
Kafka is suitable for both offline and online message consumption. Kafka messages are persisted on the disk and replicated within the cluster to prevent data loss. Kafka is built on top of the ZooKeeper synchronization service. It integrates very well with Apache Storm and Spark for real-time streaming data analysis.
Benefits
•Following are a few benefits of Kafka −
•Reliability − Kafka is distributed, partitioned, replicated and fault tolerance.
•Scalability − Kafka messaging system scales easily without down time..
•Durability − Kafka uses Distributed commit log which means messages persists on disk as fast as possible, hence it is durable..
•Performance − Kafka has high throughput for both publishing and subscribing messages. It maintains stable performance even many TB of messages are stored.
Use Case:
Steam processing
Log aggregation solution
Metrics
Kafka uses topic for the messaging from one system to other.
Topics:
•A topic is a category or feed name to which records are published.
•Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.
•Is Topics has 1 or more partitions which looks like below.
•Each partition is ordered, immutable sequence of records that is continually appended.
•A record in a partition is asigned a squential ID number called offset that uniqly identify a record.
•Kafka retains all records wether or not they are consumed as per retention poloicy.
partitions
The partitions of a topic are distributed over servers in kafka cluster with each server handling data and requests for a share of the partition.
•For example, while creating a topic named T1, you might configure it to have three partitions. The server would create three log files, one for each of the T1 partitions. When a producer published a message to the topic, it would assign a partition ID for that message. The server would then append the message to the log file for that partition only.
•If you then started two consumers, the server might assign partitions 1 and 2 to the first consumer, and partition 3 to the second consumer. Each consumer would read only from its assigned partitions. You can see the Demo topic configured for three partitions in Figure 1.
Start Kafka Server without SSL:
update server.properties at CONFLUENT_HOME/etc/kafka
listeners=PLAINTEXT://localhost:9092
zookeeper.connect=localhost:2181
start kafka using the below command
nohup ./kafka-server-start /home/shamim/Confluent/confluent-3.2.2/etc/kafka/server.properties 2>&1 > /home/shamim/Confluent/kafkaserver.log &
verify the status of the kafka in the log file /home/shamim/Confluent/kafkaserver.log. you should not see any error.
additionally you can check if the kafka is started by running ps -ef | grep kafka.
start Kafka Server with SSL:
you can refer to the page for generating the keys in JKS format. -
update server.properties at CONFLUENT_HOME/etc/kafka
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9095
ssl.client.auth=none
advertised.host.name=localhost
ssl.keystore.location=/home/shamim/Confluent/security/kafka.server.keystore.jks
ssl.keystore.password=pwd
ssl.key.password=pwd
ssl.truststore.location=/home/shamim/Confluent/security/kafka.server.truststore.jks
ssl.truststore.password=pwd
advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9095
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
zookeeper.connect=localhost:2181
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1ssl.keystore.type=JKSssl.truststore.type=JKSssl.secure.random.implementation=SHA1PRNG
start kafka using the below command
nohup ./kafka-server-start /home/shamim/Confluent/confluent-3.2.2/etc/kafka/server.properties 2>&1 > /home/shamim/Confluent/kafkaserver.log &
verify the status of the kafka in the log file /home/shamim/Confluent/kafkaserver.log. you should not see any error.
we will also see details :
with addresses: PLAINTEXT -> EndPoint(localhost,9092,PLAINTEXT),SSL -> EndPoint(localhost,9094,SSL)
additionally you can check if the kafka is started by running ps -ef | grep kafka.
once the kafka server is started, we need to produce and consume the message. update the producer.properties and consumer.properties with below content:
bootstrap.servers=localhost
security.protocol=SSL
ssl.truststore.location=/home/shamim/Confluent/security/kafka.server.truststore.jks
ssl.truststore.password=pwd
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.type=JKS
ssl.jeystore.type = JKS
now create a topic using below command:
./kafka-topics --zookeeper localhost:2181 --create --topic test1 --partitions 1 --replication-factor 1
now start producer and consumer:
./kafka-console-producer --broker-list localhost:9095 --producer.config ../etc/kafka/producer.properties --topic test1
./kafka-console-consumer --bootstrap-server localhost:9095 --consumer.config ../etc/kafka/consumer.properties --topic test1 --from-beginning
also below is the sample java code for producing the message:
import org.apache.avro.Schema;
import org.apache.avro.generic.*;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.*;
import java.io.File;
import java.util.*;
import java.util.concurrent.*;
import org.apache.kafka.common.config.*;
public class ProducerClass {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9095");
props.put("max.block.ms", 3000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "C:\\Users\\shamim\\Desktop\\Security\\kafka.client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "pwd");
props.put(SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, "TLSv1.2,TLSv1.1,TLSv1");
props.put(SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, "JKS");
props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, "JKS");
Producer<String, String> producer = new KafkaProducer<>(props);
System.out.println("sending message...");
Future<RecordMetadata> s = producer.send(new ProducerRecord<String, String>("test1", "key","test message"));
System.out.println(s.get().offset());
producer.close();
}
}