Skip to content

Commit

Permalink
housekeeping (#19)
Browse files Browse the repository at this point in the history
* Update linter

* Fix linter errors

* Track tools in tools.go

* Improve github workflow scripts

* Run paramgen

* Fix missing renames

* Fix gci linter error

* Fix failing test github workflow

* Add install-tools script

* Remove unnecessary workflow

* Add missing docs to tls.enabled

* Make delivery.contentType default to application/json

* Move shared queue config into Config struct

* Join docker tls and non tls rabbitmq services

* Add pipeline.yml config example

* Remove unused sdk.Position data

* Add record structure docs

---------

Co-authored-by: Guillem <[email protected]>
Co-authored-by: Guillem <[email protected]>
  • Loading branch information
3 people authored May 29, 2024
1 parent 5f9e24e commit 7f97c97
Show file tree
Hide file tree
Showing 19 changed files with 418 additions and 279 deletions.
4 changes: 2 additions & 2 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ Fixes # (issue)

### Quick checks:

- [ ] There is no other [pull request](https://github.com/alarbada/conduit-connector-rabbitmq/pulls) for the same update/change.
- [ ] There is no other [pull request](https://github.com/conduitio-labs/conduit-connector-rabbitmq/pulls) for the same update/change.
- [ ] I have written unit tests.
- [ ] I have made sure that the PR is of reasonable size and can be easily reviewed.
- [ ] I have made sure that the PR is of reasonable size and can be easily reviewed.
24 changes: 24 additions & 0 deletions .github/workflows/validate-generated-files.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: validate-generated-files

on:
push:
branches: [ main ]
pull_request:

jobs:
validate-generated-files:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

- name: Check generated files
run: |
export PATH=$PATH:$(go env GOPATH)/bin
make install-tools generate
git diff
git diff --exit-code --numstat
106 changes: 58 additions & 48 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
run:
timeout: 5m

linters-settings:
gofmt:
simplify: false
govet:
check-shadowing: false
nolintlint:
allow-unused: false # report any unused nolint directives
require-explanation: true # require an explanation for nolint directives
Expand All @@ -23,84 +22,95 @@ issues:
linters:
- dogsled
- gosec
- revive
- gocognit
- errcheck
- forcetypeassert
- funlen
- goerr113
- dupl
- prealloc

linters:
# please, do not use `enable-all`: it's deprecated and will be removed soon.
# inverted configuration with `enable-all` and `disable` is not scalable during updates of golangci-lint
disable-all: true
enable:
- asasalint
- asciicheck
- bidichk
- bodyclose
- containedctx
- contextcheck
- decorder
# - depguard
- dogsled
- dupl
- dupword
- durationcheck
- errcheck
- errchkjson
- errname
# - errorlint
# - exhaustive
# - exhaustivestruct
- errorlint
- execinquery
- exhaustive
- exportloopref
# - forbidigo
# - forcetypeassert
# - funlen
# - gochecknoinits
- forcetypeassert
- funlen
- gci
- ginkgolinter
- gocheckcompilerdirectives
- gochecknoinits
- gocognit
- goconst
- gocritic
- gocyclo
# - cyclop # not interested in package complexities at the moment
# - godot
- godot
# - goerr113
- gofmt
# - gofumpt
- gofumpt
- goheader
- goimports
- revive
# - gomnd
- gomoddirectives
- gomodguard
- goprintffuncname
- gosec
- gosimple
- gosmopolitan
- govet
# - ifshort
- grouper
- importas
- ineffassign
# - importas
# - lll
# - misspell
- interfacebloat
# - ireturn # Doesn't have correct support for generic types https://github.com/butuzov/ireturn/issues/37
- loggercheck
- maintidx
- makezero
# - nakedret
# - nilerr
# - nilnil
# - nlreturn
- mirror
- misspell
- musttag
- nakedret
- nestif
- nilerr
- nilnil
- noctx
- nolintlint
# - paralleltest
- nosprintfhostport
- prealloc
- predeclared
- promlinter
- reassign
- revive
- rowserrcheck
- sqlclosecheck
- staticcheck
- stylecheck
- sqlclosecheck
# - tagliatelle
# - tenv
- tenv
- testableexamples
# - thelper
# - tparallel
- typecheck
- unconvert
# - unparam
- unparam
- unused
- usestdlibvars
- wastedassign
- whitespace
# - wrapcheck
# - wsl

# don't enable:
# - asciicheck
# - dupl
# - gochecknoglobals
# - gocognit
# - godox
# - goerr113
# - maligned
# - nestif
# - prealloc
# - testpackage
# - wsl
- wrapcheck
- zerologlint
2 changes: 1 addition & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ builds:
env:
- CGO_ENABLED=0
ldflags:
- "-s -w -X 'github.com/alarbada/conduit-connector-rabbitmq.version={{ .Tag }}'"
- "-s -w -X 'github.com/conduitio-labs/conduit-connector-rabbitmq.version={{ .Tag }}'"
checksum:
name_template: checksums.txt
archives:
Expand Down
99 changes: 76 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
# Conduit Connector for RabbitMQ
The RabbitMQ connector is one of [Conduit](https://github.com/ConduitIO/conduit) builtin plugins. It provides both, a
source and a destination connector for [RabbitMQ](https://rabbitmq.com/).

The RabbitMQ connector is one of [Conduit](https://github.com/ConduitIO/conduit) builtin plugins. It provides both, a source and a destination connector for [RabbitMQ](https://rabbitmq.com/).

It uses the [AMQP 0-9-1 Model](https://www.rabbitmq.com/tutorials/amqp-concepts) to connect to RabbitMQ.


## What data does the OpenCDC record consist of?

| Field | Description |
|-------------------------|-----------------------------------------------------------------------------------------|
| `record.Position` | json object with the delivery tag and the queue name from where the record was read from.|
| `record.Operation` | currently fixed as "create". |
| `record.Metadata` | a string to string map, with keys prefixed as `rabbitmq.{DELIVERY_PROPERTY}`. |
| `record.Key` | the message id from the read message. |
| `record.Payload.Before` | <empty> |
| `record.Payload.After` | the message body |

## How to Build?

Expand All @@ -11,25 +25,15 @@ Run `make build` to compile the connector.
Execute `make test` to perform all non-tls tests. Execute `make test-tls` for the TLS tests. Both commands use docker files located at `test/docker-compose.yml` and `test/docker-compose-tls.yml` respectively.
Tests require docker-compose v2.

## Shared Configuration Parameters

Shared between source and destination connectors.
## Source Configuration Parameters

| Name | Description | Required | Default Value |
|------------------|-----------------------------------------------------|----------|---------------|
| Name | Description | Required | Default Value |
|------------------------|-----------------------------------------------------------------------------|----------|---------------|
| `url` | The RabbitMQ server's URL. | Yes | |
| `tls.enabled` | Flag to enable or disable TLS. | false | `false` |
| `tls.clientCert` | Path to the client certificate for TLS. | No | |
| `tls.clientKey` | Path to the client's key for TLS. | No | |
| `tls.caCert` | Path to the CA (Certificate Authority) certificate for TLS. | No | |

## Source Connector

The source connector extracts data from RabbitMQ and sends it to downstream systems via Conduit.

### Source Configuration Parameters

| Name | Description | Required | Default Value |
|------------------------|-----------------------------------------------------------------------------|----------|---------------|
| `queue.name` | The name of the RabbitMQ queue to consume messages from. | Yes | |
| `queue.durable` | Specifies whether the queue is durable. | No | `true` |
| `queue.autoDelete` | If the queue will auto-delete. | No | `false` |
Expand All @@ -41,14 +45,15 @@ The source connector extracts data from RabbitMQ and sends it to downstream syst
| `consumer.noLocal` | If the server should not deliver messages published by the same connection. | No | `false` |
| `consumer.noWait` | If the consumer should be declared without waiting for server confirmation. | No | `false` |

## Destination Connector
## Destination Configuration Parameters

The destination connector sends data from upstream systems to RabbitMQ via Conduit.

### Destination Configuration Parameters

| Name | Description | Required | Default Value |
|----------------------------|---------------------------------------------------------------------|----------|---------------|
| Name | Description | Required | Default Value |
|------------------------|-----------------------------------------------------------------------------|----------|---------------|
| `url` | The RabbitMQ server's URL. | Yes | |
| `tls.enabled` | Flag to enable or disable TLS. | false | `false` |
| `tls.clientCert` | Path to the client certificate for TLS. | No | |
| `tls.clientKey` | Path to the client's key for TLS. | No | |
| `tls.caCert` | Path to the CA (Certificate Authority) certificate for TLS. | No | |
| `queue.name` | The name of the RabbitMQ queue where messages will be published to. | Yes | |
| `queue.durable` | Specifies whether the queue is durable. | No | `true` |
| `queue.autoDelete` | If the queue will auto-delete. | No | `false` |
Expand All @@ -73,3 +78,51 @@ The destination connector sends data from upstream systems to RabbitMQ via Condu
| `exchange.internal` | If the exchange is internal. | No | `false` |
| `exchange.noWait` | If the exchange is declared without waiting for server reply. | No | `false` |
| `routingKey` | The routing key to use when publishing to an exchange. | No | |


## Example pipeline.yml file

Here's an example of a `pipeline.yml` file using `file to RabbitMQ` and `RabbitMQ to file` pipelines:

```yaml
version: 2.0
pipelines:
- id: file-to-rabbitmq
status: running
connectors:
- id: file.in
type: source
plugin: builtin:file
name: file-destination
settings:
path: ./file.in
- id: rabbitmq.out
type: destination
plugin: standalone:rabbitmq
name: rabbitmq-source
settings:
url: amqp://guest:guest@localhost:5672/
queue.name: demo-queue
sdk.record.format: template
sdk.record.format.options: '{{ printf "%s" .Payload.After }}'

- id: rabbitmq-to-file
status: running
connectors:
- id: rabbitmq.in
type: source
plugin: standalone:rabbitmq
name: rabbitmq-source
settings:
url: amqp://guest:guest@localhost:5672/
queue.name: demo-queue

- id: file.out
type: destination
plugin: builtin:file
name: file-destination
settings:
path: ./file.out
sdk.record.format: template
sdk.record.format.options: '{{ printf "%s" .Payload.After }}'
```
3 changes: 1 addition & 2 deletions cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
package main

import (
sdk "github.com/conduitio/conduit-connector-sdk"

rabbitmq "github.com/conduitio-labs/conduit-connector-rabbitmq"
sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
Expand Down
11 changes: 5 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type Config struct {
// URL is the RabbitMQ server URL
URL string `json:"url" validate:"required"`

TLS TLSConfig `json:"tls"`
TLS TLSConfig `json:"tls"`
Queue QueueConfig `json:"queue"`
}

type TLSConfig struct {
Expand Down Expand Up @@ -62,7 +63,7 @@ type QueueConfig struct {
NoWait bool `json:"noWait" default:"false"`
}

// to use with ampq.Channel Consume method
// to use with ampq.Channel Consume method.
type ConsumerConfig struct {
// Name is the name of the consumer
Name string `json:"name"`
Expand All @@ -83,7 +84,6 @@ type ConsumerConfig struct {
type SourceConfig struct {
Config

Queue QueueConfig `json:"queue"`
Consumer ConsumerConfig `json:"consumer"`
}

Expand Down Expand Up @@ -115,8 +115,8 @@ type DeliveryConfig struct {
// ContentEncoding specifies the encoding of the message content.
ContentEncoding string `json:"contentEncoding"`

// ContentType specifies the MIME type of the message content. Default is "text/plain".
ContentType string `json:"contentType" default:"text/plain"`
// ContentType specifies the MIME type of the message content. Defaults to "application/json".
ContentType string `json:"contentType" default:"application/json"`

// DeliveryMode indicates the message delivery mode. Non-persistent (1) or persistent (2). Default is 2 (persistent).
DeliveryMode uint8 `json:"deliveryMode" default:"2" validation:"inclusion=1|2"`
Expand Down Expand Up @@ -152,7 +152,6 @@ type DeliveryConfig struct {
type DestinationConfig struct {
Config

Queue QueueConfig `json:"queue"`
Delivery DeliveryConfig `json:"delivery"`
Exchange ExchangeConfig `json:"exchange"`

Expand Down
4 changes: 2 additions & 2 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (d *Destination) Write(ctx context.Context, records []sdk.Record) (int, err
return len(records), nil
}

func (d *Destination) Teardown(_ context.Context) error {
func (d *Destination) Teardown(ctx context.Context) error {
errs := make([]error, 0, 2)
if d.ch != nil {
if err := d.ch.Close(); err != nil {
Expand All @@ -172,7 +172,7 @@ func (d *Destination) Teardown(_ context.Context) error {
return err
}

sdk.Logger(context.Background()).Info().Msg("destination teardown complete")
sdk.Logger(ctx).Info().Msg("destination teardown complete")

return nil
}
Loading

0 comments on commit 7f97c97

Please sign in to comment.