|
| 1 | +## ClusterLink Setup |
| 2 | + |
| 3 | +### Kafka Cluster with Mutual TLS Authentication |
| 4 | +In this example, source is running opensource kafka without any security and destination Confluent Server run in mutual TLS mode. |
| 5 | + |
| 6 | +## Set up Pre-requisites |
| 7 | +Set the tutorial directory for this tutorial under the directory you downloaded |
| 8 | +the tutorial files: |
| 9 | + |
| 10 | +``` |
| 11 | +export TUTORIAL_HOME=$PWD/hybrid/clusterlink/non_confluent_source_cluster |
| 12 | +``` |
| 13 | + |
| 14 | +``` |
| 15 | +kubectl create ns destination |
| 16 | +``` |
| 17 | + |
| 18 | +Deploy Confluent for Kubernetes (CFK) in cluster mode, so that the one CFK instance can manage Confluent deployments in multiple namespaces. Here, CFk is deployed to the `default` namespace. |
| 19 | + |
| 20 | +``` |
| 21 | +helm upgrade --install confluent-operator \ |
| 22 | + confluentinc/confluent-for-kubernetes \ |
| 23 | + --namespace default --set namespaced=false |
| 24 | +``` |
| 25 | + |
| 26 | +### Source Cluster Deployment |
| 27 | +``` |
| 28 | +kubectl -n destination apply -f $TUTORIAL_HOME/zk-kafka-source.yaml |
| 29 | +``` |
| 30 | + |
| 31 | +#### Create a topic on the source cluster and produce some data |
| 32 | + |
| 33 | +``` |
| 34 | +kubectl -n destination exec -it notcflt -- bash |
| 35 | +/opt/kafka_2.13-3.3.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic demo |
| 36 | +seq 1000 | /opt/kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic demo |
| 37 | +``` |
| 38 | + |
| 39 | +You will need to keep the cluster ID from the source cluster: |
| 40 | +``` |
| 41 | + grep cluster /tmp/kafka-logs/meta.properties | cut -d "=" -f 2 |
| 42 | +``` |
| 43 | + |
| 44 | +Use the above value in the field `clusterID` line 13 in file `$TUTORIAL_HOME/clusterlink.yaml`. |
| 45 | + |
| 46 | +### Destination Cluster Deployment |
| 47 | + |
| 48 | +### Create required secrets |
| 49 | + |
| 50 | +``` |
| 51 | +kubectl -n destination create secret generic credential \ |
| 52 | + --from-file=plain-users.json=$TUTORIAL_HOME/creds-kafka-sasl-users.json \ |
| 53 | + --from-file=plain.txt=$TUTORIAL_HOME/creds-client-kafka-sasl-user.txt \ |
| 54 | + --from-file=basic.txt=$TUTORIAL_HOME/creds-basic-users.txt |
| 55 | +``` |
| 56 | + |
| 57 | +Generate a CA pair to use in this tutorial: |
| 58 | +``` |
| 59 | +openssl genrsa -out $TUTORIAL_HOME/ca-key.pem 2048 |
| 60 | +openssl req -new -key $TUTORIAL_HOME/ca-key.pem -x509 \ |
| 61 | + -days 1000 \ |
| 62 | + -out $TUTORIAL_HOME/ca.pem \ |
| 63 | + -subj "/C=US/ST=CA/L=MountainView/O=Confluent/OU=Operator/CN=TestCA" |
| 64 | +``` |
| 65 | +Then, provide the certificate authority as a Kubernetes secret ca-pair-sslcerts |
| 66 | +``` |
| 67 | +kubectl -n destination create secret tls ca-pair-sslcerts \ |
| 68 | + --cert=$TUTORIAL_HOME/ca.pem \ |
| 69 | + --key=$TUTORIAL_HOME/ca-key.pem |
| 70 | +``` |
| 71 | + |
| 72 | +``` |
| 73 | +kubectl -n destination create secret generic destination-tls-zk1 \ |
| 74 | + --from-file=fullchain.pem=$TUTORIAL_HOME/../../../assets/certs/component-certs/generated/zookeeper-server.pem \ |
| 75 | + --from-file=cacerts.pem=$TUTORIAL_HOME/../../../assets/certs/component-certs/generated/cacerts.pem \ |
| 76 | + --from-file=privkey.pem=$TUTORIAL_HOME/../../../assets/certs/component-certs/generated/zookeeper-server-key.pem |
| 77 | +
|
| 78 | +kubectl -n destination create secret generic destination-tls-group1 \ |
| 79 | + --from-file=fullchain.pem=$TUTORIAL_HOME/../../../assets/certs/component-certs/generated/kafka-server.pem \ |
| 80 | + --from-file=cacerts.pem=$TUTORIAL_HOME/../../../assets/certs/component-certs/generated/cacerts.pem \ |
| 81 | + --from-file=privkey.pem=$TUTORIAL_HOME/../../../assets/certs/component-certs/generated/kafka-server-key.pem |
| 82 | + |
| 83 | +kubectl -n destination create secret generic rest-credential \ |
| 84 | + --from-file=basic.txt=$TUTORIAL_HOME/rest-credential.txt |
| 85 | + |
| 86 | +kubectl -n destination create secret generic password-encoder-secret \ |
| 87 | + --from-file=password-encoder.txt=$TUTORIAL_HOME/password-encoder-secret.txt |
| 88 | + |
| 89 | +``` |
| 90 | +#### Deploy destination zookeeper and kafka cluster in namespace `destination` |
| 91 | + |
| 92 | +``` |
| 93 | +kubectl apply -f $TUTORIAL_HOME/zk-kafka-destination.yaml |
| 94 | +``` |
| 95 | + |
| 96 | +After the Kafka cluster is in running state, create cluster link between source and destination. Cluster link will be created in the destination cluster. |
| 97 | + |
| 98 | +``` |
| 99 | +kubectl -n destination get pods |
| 100 | +``` |
| 101 | + |
| 102 | +#### Create clusterlink between source and destination |
| 103 | +``` |
| 104 | +kubectl apply -f $TUTORIAL_HOME/clusterlink.yaml |
| 105 | +kubectl -n destination get cl |
| 106 | +``` |
| 107 | + |
| 108 | +### Run test |
| 109 | + |
| 110 | +#### Open a new terminal and exec into destination kafka pod |
| 111 | +``` |
| 112 | +kubectl -n destination exec kafka-0 -it -- bash |
| 113 | +``` |
| 114 | + |
| 115 | +#### Create kafka.properties for destination kafka cluster |
| 116 | + |
| 117 | +``` |
| 118 | +cat <<EOF > /tmp/kafka.properties |
| 119 | +bootstrap.servers=kafka.destination.svc.cluster.local:9071 |
| 120 | +security.protocol=SSL |
| 121 | +ssl.truststore.location=/mnt/sslcerts/truststore.p12 |
| 122 | +ssl.truststore.password=mystorepassword |
| 123 | +ssl.keystore.location=/mnt/sslcerts/keystore.p12 |
| 124 | +ssl.keystore.password=mystorepassword |
| 125 | +EOF |
| 126 | +``` |
| 127 | + |
| 128 | +#### Validate topic is created in destination kafka cluster |
| 129 | + |
| 130 | +``` |
| 131 | +kafka-topics --describe --topic demo --bootstrap-server kafka.destination.svc.cluster.local:9071 --command-config /tmp/kafka.properties |
| 132 | +``` |
| 133 | + |
| 134 | +#### Consume in destination kafka cluster and confirm message delivery in destination cluster |
| 135 | + |
| 136 | +``` |
| 137 | +kafka-console-consumer --from-beginning --topic demo --bootstrap-server kafka.destination.svc.cluster.local:9071 --consumer.config /tmp/kafka.properties |
| 138 | +``` |
| 139 | + |
| 140 | +### Commands to be used on the Confluent server if needed |
| 141 | +``` |
| 142 | +kubectl -n destination exec kafka-0 -it -- bash |
| 143 | +cat <<EOF > /tmp/kafka.properties |
| 144 | +bootstrap.servers=kafka.destination.svc.cluster.local:9071 |
| 145 | +security.protocol=SSL |
| 146 | +ssl.truststore.location=/mnt/sslcerts/truststore.p12 |
| 147 | +ssl.truststore.password=mystorepassword |
| 148 | +ssl.keystore.location=/mnt/sslcerts/keystore.p12 |
| 149 | +ssl.keystore.password=mystorepassword |
| 150 | +EOF |
| 151 | +kafka-cluster-links --bootstrap-server kafka.destination.svc.cluster.local:9071 --command-config /tmp/kafka.properties --list |
| 152 | +kafka-cluster-links --bootstrap-server kafka.destination.svc.cluster.local:9071 --command-config /tmp/kafka.properties --list --link clusterlink-cflt --include-topics |
| 153 | +kafka-replica-status --bootstrap-server kafka.destination.svc.cluster.local:9071 --admin.config /tmp/kafka.properties --topics demo --include-mirror |
| 154 | +kafka-mirrors --describe --topics demo --bootstrap-server kafka.destination.svc.cluster.local:9071 --command-config /tmp/kafka.properties |
| 155 | +``` |
| 156 | + |
| 157 | +Check it's a read only: |
| 158 | +``` |
| 159 | +kafka-console-producer --topic demo --bootstrap-server kafka.destination.svc.cluster.local:9071 --producer.config /tmp/kafka.properties |
| 160 | +>test |
| 161 | +>[2023-01-18 11:10:59,329] ERROR Error when sending message to topic demo with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) |
| 162 | +org.apache.kafka.common.errors.InvalidRequestException: Cannot append records to read-only mirror topic 'demo' |
| 163 | +``` |
| 164 | +### Filter using |
| 165 | + |
| 166 | +Create a set of topics (filterdemo 1-5): |
| 167 | +``` |
| 168 | +kubectl -n destination exec -it notcflt -- bash |
| 169 | +for i in {1..5}; do /opt/kafka_2.13-3.3.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic filterdemo$i; done |
| 170 | +for i in {1..5}; do seq 1000 | /opt/kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic filterdemo$i; done |
| 171 | +``` |
| 172 | +Verify that there are messages in the source topic: |
| 173 | +``` |
| 174 | +/opt/kafka_2.13-3.3.1/bin/kafka-console-consumer.sh --from-beginning --topic filterdemo5 --bootstrap-server localhost:9092 |
| 175 | +``` |
| 176 | +First create a Cluster Link with autoCreateTopics disabled: |
| 177 | + |
| 178 | +``` |
| 179 | + mirrorTopicOptions: |
| 180 | + autoCreateTopics: |
| 181 | + enabled: false |
| 182 | + topicFilters: |
| 183 | + - filterType: INCLUDE |
| 184 | + name: filterdemo |
| 185 | + patternType: PREFIXED |
| 186 | + prefix: "dest-" |
| 187 | +``` |
| 188 | + |
| 189 | +``` |
| 190 | +kubectl -n destination apply -f $TUTORIAL_HOME/clusterlinkfilter-disabled.yaml |
| 191 | +kubectl -n destination get cl |
| 192 | +``` |
| 193 | + |
| 194 | +Notice that nothing is being created. |
| 195 | +Now change to `enabled: true`, wait 5 minutes ([default metadata duration](https://docs.confluent.io/platform/current/multi-dc-deployments/cluster-linking/topic-data-sharing.html#change-the-source-topics-partitions)): |
| 196 | +``` |
| 197 | +kubectl -n destination apply -f $TUTORIAL_HOME/clusterlinkfilter-enabled.yaml |
| 198 | +``` |
| 199 | +Check for the topics on the destination cluster: |
| 200 | +``` |
| 201 | +kafka-topics --bootstrap-server kafka.destination.svc.cluster.local:9071 --command-config /tmp/kafka.properties --list |
| 202 | +``` |
| 203 | +Once the topics are there, consume from the destination topics: |
| 204 | +``` |
| 205 | +kafka-console-consumer --from-beginning --topic dest-filterdemo5 --bootstrap-server kafka.destination.svc.cluster.local:9071 --consumer.config /tmp/kafka.properties |
| 206 | +``` |
| 207 | + |
| 208 | +## Filter and topic list together |
| 209 | + |
| 210 | +### With prefix |
| 211 | +If you're using `prefix` you must include `sourceTopicName` as well. |
| 212 | + |
| 213 | +``` |
| 214 | + mirrorTopics: |
| 215 | + - name: merge-dest-atopic # must match prefix as a name https://docs.confluent.io/operator/current/co-link-clusters.html#create-a-mirror-topic |
| 216 | + sourceTopicName: atopic |
| 217 | +``` |
| 218 | + |
| 219 | +Create a set of topics (newfilterdemo 1-5): |
| 220 | +``` |
| 221 | +kubectl -n destination exec -it notcflt -- bash |
| 222 | +for i in {1..5}; do /opt/kafka_2.13-3.3.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic newfilterdemo$i; done |
| 223 | +for i in {1..5}; do seq 1000 | /opt/kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic newfilterdemo$i; done |
| 224 | +
|
| 225 | +/opt/kafka_2.13-3.3.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic atopic |
| 226 | +seq 1000 | /opt/kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic atopic |
| 227 | +
|
| 228 | +``` |
| 229 | +Verify that there are messages in the source topic: |
| 230 | +``` |
| 231 | +/opt/kafka_2.13-3.3.1/bin/kafka-console-consumer.sh --from-beginning --topic newfilterdemo1 --bootstrap-server localhost:9092 |
| 232 | +``` |
| 233 | + |
| 234 | +Apply the cluster link: |
| 235 | +``` |
| 236 | +kubectl -n destination apply -f $TUTORIAL_HOME/clusterlinkfilter-enabled-and-list-prefix.yaml |
| 237 | +``` |
| 238 | +Check the topics on the destination cluster. |
| 239 | + |
| 240 | +### Without prefix |
| 241 | + |
| 242 | +If you're **NOT** using `prefix` pass the change like so: |
| 243 | + |
| 244 | +``` |
| 245 | + mirrorTopics: |
| 246 | + - name: nopffilterdemo |
| 247 | +``` |
| 248 | +Create a set of topics (newfilterdemo 1-5): |
| 249 | +``` |
| 250 | +kubectl -n destination exec -it notcflt -- bash |
| 251 | +for i in {1..5}; do /opt/kafka_2.13-3.3.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic nopffilterdemo$i; done |
| 252 | +for i in {1..5}; do seq 1000 | /opt/kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic nopffilterdemo$i; done |
| 253 | +
|
| 254 | +/opt/kafka_2.13-3.3.1/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic npfatopic |
| 255 | +seq 1000 | /opt/kafka_2.13-3.3.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic npfatopic |
| 256 | +
|
| 257 | +``` |
| 258 | +Verify that there are messages in the source topic: |
| 259 | +``` |
| 260 | +/opt/kafka_2.13-3.3.1/bin/kafka-console-consumer.sh --from-beginning --topic nopffilterdemo5 --bootstrap-server localhost:9092 |
| 261 | +``` |
| 262 | + |
| 263 | +Apply the cluster link: |
| 264 | +``` |
| 265 | +kubectl -n destination apply -f $TUTORIAL_HOME/clusterlinkfilter-enabled-and-list-noprefix.yaml |
| 266 | +``` |
| 267 | +Check the topics on the destination cluster. |
| 268 | + |
| 269 | + |
| 270 | +### Tear down |
| 271 | + |
| 272 | +``` |
| 273 | +kubectl -n destination delete -f $TUTORIAL_HOME/clusterlink.yaml |
| 274 | +kubectl -n destination delete -f $TUTORIAL_HOME/clusterlinkfilter-enabled.yaml |
| 275 | +kubectl -n destination delete -f $TUTORIAL_HOME/clusterlinkfilter-enabled-and-list-prefix.yaml |
| 276 | +kubectl -n destination delete -f $TUTORIAL_HOME/clusterlinkfilter-enabled-and-list-noprefix.yaml |
| 277 | +kubectl -n destination delete -f $TUTORIAL_HOME/zk-kafka-destination.yaml |
| 278 | +kubectl -n destination delete secret password-encoder-secret |
| 279 | +kubectl -n destination delete secret rest-credential |
| 280 | +kubectl -n destination delete secret destination-tls-group1 |
| 281 | +kubectl -n destination delete secret destination-tls-zk1 |
| 282 | +kubectl -n destination delete secret ca-pair-sslcerts |
| 283 | +kubectl -n destination delete secret credential |
| 284 | +kubectl -n destination delete -f $TUTORIAL_HOME/zk-kafka-source.yaml |
| 285 | +kubectl delete ns destination |
| 286 | +helm -n default delete confluent-operator |
| 287 | +``` |
0 commit comments