- Same as listed here
- A working RabbitMQ Cluster with TLS
- The RabbitMQ Topology Operator to trust the CA
NOTE: We currently don't support the ssl_options.fail_if_no_peer_cert = true
option in the RabbitmqCluster aditional configuration.
Create a new namespace for the demo. All the commands are expected to be executed from the root of this repo.
kubectl apply -f samples/rabbitmq-mtls/100-namespace.yaml
or
kubectl create ns rabbitmq-mtls-sample
kubectl apply -f samples/rabbitmq-mtls/200-selfsigned-issuer-and-cert.yaml
We'll need this secrets in the cert-manager
namespace for the ClusterIssuer
to work:
kubectl get secret ca-secret --namespace=rabbitmq-mtls-sample -oyaml | grep -v '^\s*namespace:\s' | kubectl apply --namespace=cert-manager -f -
We'll need this secrets in the rabbitmq-system
namespace too:
kubectl get secret ca-secret --namespace=rabbitmq-mtls-sample -oyaml | grep -v '^\s*namespace:\s' | kubectl apply --namespace=rabbitmq-system -f -
Now let's create the certificate:
kubectl apply -f samples/rabbitmq-mtls/300-cluster-issuer-and-cert.yaml
Create a RabbitMQ Cluster:
kubectl apply -f samples/rabbitmq-mtls/400-rabbitmq.yaml
or
kubectl apply -f - << EOF
apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: rabbitmq
namespace: rabbitmq-mtls-sample
annotations:
spec:
replicas: 1
tls:
caSecretName: ca-secret
secretName: tls-secret
disableNonTLSListeners: true
rabbitmq:
additionalConfig: |
ssl_options.verify = verify_peer
EOF
For the RabbitMQ Topology Operator to recognize the CA we need to patch it as explained here: RabbitMQ Topology Operator to trust the CA
TL;DR:
kubectl -n rabbitmq-system patch deployment messaging-topology-operator --patch "spec:
template:
spec:
containers:
- name: manager
volumeMounts:
- mountPath: /etc/ssl/certs/rabbitmq-ca.crt
name: rabbitmq-ca
subPath: ca.crt
volumes:
- name: rabbitmq-ca
secret:
defaultMode: 420
secretName: ca-secret"
After this steps you are ready to the next steps:
What this demo shows is how to connect to create a RabbitMQ Broker that uses mTls for the communication with RabbitMQ instances. It demonstrates how failed events get sent to Dead Letter Sink while successfully processed events do not.
-
failer is a function which takes in a CloudEvent and depending on what the specified HTTP response code in the message data is will respond with that. So, to simulate a failure, we just send it a CloudEvent with a payload of 500, and it's going to simulate a failure, by default it will respond with a 200, hence indicating that it processed the event successfully, and it should be considered handled.
-
pingsource is a Knative source which sends a CloudEvent on pre-defined intervals.
-
event-display which is a tool that logs the CloudEvent that it receives formatted nicely.
Demo creates two PingSource
s and has them send an event once a minute. One of
them sends an event that has responsecode set to 200 (event processed
successfully) and one that has responsecode set to 500 (event processing
failed).
Demo creates a Broker
, using an external RabbitMQ instance, with the delivery configuration that specifies that
failed events will be delivered to event-display
.
Demo creates a Trigger
that wires PingSource
events to go to the failer
.
kubectl apply -f samples/rabbitmq-mtls/broker-files/100-broker-config.yaml
or
kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1alpha1
kind: RabbitmqBrokerConfig
metadata:
name: default-config
namespace: rabbitmq-mtls-sample
spec:
# vhost: you-rabbitmq-vhost
rabbitmqClusterReference:
namespace: rabbitmq-mtls-sample
name: rabbitmq-secret-credentials
queueType: quorum
EOF
kubectl apply -f samples/rabbitmq-mtls/broker-files/200-broker.yaml
or
kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: default
namespace: rabbitmq-mtls-sample
annotations:
eventing.knative.dev/broker.class: RabbitMQBroker
spec:
config:
apiVersion: eventing.knative.dev/v1alpha1
kind: RabbitmqBrokerConfig
name: default-config
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
namespace: rabbitmq-mtls-sample
retry: 5
EOF
Then create the Knative Serving Service which will receive any failed events.
kubectl apply -f samples/rabbitmq-mtls/500-sink.yaml
or
kubectl apply -f - << EOF
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
namespace: rabbitmq-mtls-sample
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
EOF
Now the Broker will become ready, might take a few seconds to get the Service up and running.
kubectl -n rabbitmq-mtls-sample get brokers
NAME URL AGE READY REASON
default http://default-broker-ingress.rabbitmq-mtls-sample.svc.cluster.local 2m39s True
kubectl apply -f samples/rabbitmq-mtls/broker-files/300-ping-sources.yaml
or
kubectl apply -f - << EOF
apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
name: ping-source
namespace: rabbitmq-mtls-sample
spec:
schedule: "*/1 * * * *"
data: '{"responsecode": 200}'
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
namespace: rabbitmq-mtls-sample
EOF
kubectl apply -f - << EOF
apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
name: ping-source-2
namespace: rabbitmq-mtls-sample
spec:
schedule: "*/1 * * * *"
data: '{"responsecode": 500}'
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
namespace: rabbitmq-mtls-sample
EOF
kubectl apply -f samples/rabbitmq-mtls/broker-files/400-trigger.yaml
or
kubectl apply -f - << EOF
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: failer-trigger
namespace: rabbitmq-mtls-sample
annotations:
# Value must be between 1 and 1000
# A value of 1 RabbitMQ Trigger behaves as a FIFO queue
# Values above 1 break message ordering guarantees and can be seen as more performance oriented.
# rabbitmq.eventing.knative.dev/parallelism: "10"
spec:
broker: default
filter:
attributes:
type: dev.knative.sources.ping
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: failer
namespace: rabbitmq-mtls-sample
EOF
kubectl apply -f samples/rabbitmq-mtls/broker-files/500-failer.yaml
or
kubectl apply -f 'https://storage.googleapis.com/knative-nightly/eventing-rabbitmq/latest/failer.yaml' -n rabbitmq-mtls-sample
Look at the failer pod logs, you see it's receiving both 200/500 responses.
kubectl -n rabbitmq-mtls-sample -l='serving.knative.dev/service=failer' logs -c user-container
2020/10/06 10:35:00 using response code: 200
2020/10/06 10:35:00 using response code: 500
2020/10/06 10:35:00 using response code: 500
2020/10/06 10:35:00 using response code: 500
2020/10/06 10:35:01 using response code: 500
2020/10/06 10:35:01 using response code: 500
2020/10/06 10:35:03 using response code: 500
You see there are both 200 / 500 events there. And more importantly, you can see that 200 is only sent once to the failer since it's processed correctly. However, the 500 is sent a total of 6 times because we have specified the retry of 5 (original, plus 5 retries for a total of 6 log entries).
However, the event-display (the Dead Letter Sink) only sees the failed events with the response code set to 500.
kubectl -n rabbitmq-mtls-sample -l='serving.knative.dev/service=event-display' logs -c user-container
☁️ cloudevents.Event
Validation: valid
Context Attributes,
specversion: 1.0
type: dev.knative.sources.ping
source: /apis/v1/namespaces/rabbitmq-mtls-sample/pingsources/ping-source-2
id: 166e89ff-19c7-4e9a-a593-9ed30dca0d7d
time: 2020-10-06T10:35:00.307531386Z
datacontenttype: application/json
Data,
{
"responsecode": 500
}
kubectl delete ns rabbitmq-mtls-sample
kubectl delete secret ca-secret -n rabbitmq-system
This demo will use a RabbitMQ Source to fetch messages from a RabbitMQ Exchange, convert them into CloudEvents and send them to a Sink. The complete list of the Source's config parameters are shown here
-
perf-test RabbitMQ has a throughput testing tool, PerfTest, that is based on the Java client and can be configured to simulate basic to advanced workloads of messages sent to a RabbitMQ Cluster. More info about the perf-test testing tool can be found here
-
event-display which is a tool that logs the CloudEvent that it receives formatted nicely.
Demo creates a PerfTest
and has it executes a loop where it sends 1 event per second for 30 seconds, and then no events for 30 seconds to the RabbitMQ Cluster Exchange
called eventing-rabbitmq-source
predeclared by the user.
Demo creates a Source
to read messages from the eventing-rabbitmq-source
Exchange
and to send them to the event-display
sink
after the translation to CloudEvents.
This will create a Kubernetes Deployment which sends events to the RabbitMQ Cluster Exchange
kubectl apply -f samples/rabbitmq-mtls/source-files/100-perf-test.yaml
Messages from the rabbitmq-perf-test
Then create the Knative Serving Service which will receive translated events.
kubectl apply -f samples/rabbitmq-mtls/500-sink.yaml
or
kubectl apply -f - << EOF
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: event-display
namespace: rabbitmq-mtls-sample
spec:
template:
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
EOF
kubectl apply -f samples/rabbitmq-mtls/source-files/200-source.yaml
or
kubectl apply -f - << EOF
apiVersion: sources.knative.dev/v1alpha1
kind: RabbitmqSource
metadata:
name: rabbitmq-source
namespace: rabbitmq-mtls-sample
spec:
rabbitmqClusterReference:
namespace: rabbitmq-mtls-sample
name: rabbitmq
rabbitmqResourcesConfig:
# vhost: you-rabbitmq-vhost
exchangeName: "eventing-rabbitmq-source"
queueName: "eventing-rabbitmq-source"
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
namespace: rabbitmq-mtls-sample
EOF
Check the event-display (the Dead Letter Sink) to see if it is receiving events. It may take a while for the Source to start sending events to the Sink, so be patient :p!
kubectl -n rabbitmq-mtls-sample -l='serving.knative.dev/service=event-display' logs -c user-container
☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.rabbitmq.event
source: /apis/v1/namespaces/rabbitmq-mtls-sample/rabbitmqsources/rabbitmq-source
subject: f147099d-c64d-41f7-b8eb-a2e53b228349
id: f147099d-c64d-41f7-b8eb-a2e53b228349
time: 2021-12-16T20:11:39.052276498Z
datacontenttype: application/json
Data,
{
...
Random Data
...
}
kubectl delete -f samples/rabbitmq-mtls/source-files/200-source.yaml
kubectl delete -f samples/rabbitmq-mtls/
kubectl delete secret ca-secret -n rabbitmq-system