Skip to content

Commit 420d7e6

Browse files
committed
Documentation improvements
1 parent 87a8a10 commit 420d7e6

File tree

8 files changed

+154
-109
lines changed

8 files changed

+154
-109
lines changed

README.md

+1-55
Original file line numberDiff line numberDiff line change
@@ -26,58 +26,4 @@ end
2626

2727
## Getting started
2828

29-
First, you need to migrate your database to create the outbox. Create a
30-
migration with Ecto and add the following:
31-
32-
```elixir
33-
defmodule MyApp.Repo.Migrations.AddKafkaesque
34-
def up do
35-
Kafkaesque.Migrations.up()
36-
end
37-
38-
def down do
39-
Kafkaesque.Migrations.down()
40-
end
41-
end
42-
```
43-
44-
Then, you must define a publisher module, associated with the Repo you
45-
ran the Kafkaesque migrations on. For example:
46-
47-
```elixir
48-
defmodule MyApp.KafkaPublisher do
49-
use Kafkaesque, repo: MyApp.Repo
50-
51-
# Optional, sets the partition for the message. Should return an integer
52-
# Defaults to returning 0.
53-
def partition(_topic, _body) do
54-
Enum.random(0..1)
55-
end
56-
57-
# Optional, encodes the body of the message, should return a String.
58-
# Defaults to the identity function.
59-
def encode(body) do
60-
Jason.encode!(body)
61-
end
62-
end
63-
```
64-
65-
Finally, you should start a `Kafkaesque` instance in your supervision tree.
66-
For example:
67-
68-
```elixir
69-
defmodule MyApp.Application do
70-
def start(_type, _args) do
71-
children = [
72-
MyApp.Repo,
73-
{Kafkaesque, [repo: MyApp.Repo, client_opts: [client_id: :my_client, brokers: [{"localhost, 9092"}]]]},
74-
]
75-
end
76-
end
77-
```
78-
79-
Done! Now you can publish messages. Try it out with:
80-
81-
```elixir
82-
MyApp.KafkaPublisher.publish("topic", %{hello: :kafka})`
83-
```
29+
Check the [Getting started](https://hexdocs.pm/kafkaesque/getting-started.html) guide in Hexdocs.

guides/getting-started.md

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Getting started
2+
3+
First, you need to migrate your database to create the outbox. Create a
4+
migration with Ecto and add the following:
5+
6+
```elixir
7+
defmodule MyApp.Repo.Migrations.AddKafkaesque
8+
def up do
9+
Kafkaesque.Migrations.up()
10+
end
11+
12+
def down do
13+
Kafkaesque.Migrations.down()
14+
end
15+
end
16+
```
17+
18+
Then, you must define an outbox module, associated with the Repo you
19+
ran the Kafkaesque migrations on. For example:
20+
21+
```elixir
22+
defmodule MyApp.Outbox do
23+
use Kafkaesque, repo: MyApp.Repo
24+
25+
# Optional, sets the partition for the message. Should return an integer
26+
# Defaults to returning 0.
27+
def partition(_topic, _body) do
28+
Enum.random(0..1)
29+
end
30+
31+
# Optional, encodes the body of the message, should return a String.
32+
# Defaults to the identity function.
33+
def encode(body) do
34+
Jason.encode!(body)
35+
end
36+
end
37+
```
38+
39+
Finally, you should start a `Kafkaesque` instance in your supervision tree.
40+
For example:
41+
42+
```elixir
43+
defmodule MyApp.Application do
44+
def start(_type, _args) do
45+
children = [
46+
MyApp.Repo,
47+
{MyApp.Outbox, [client_opts: [client_id: :my_client, brokers: [{"localhost, 9092"}]]]},
48+
]
49+
end
50+
end
51+
```
52+
53+
Done! Now you can publish messages. Try it out with:
54+
55+
```elixir
56+
MyApp.Outbox.publish("topic", %{hello: :kafka})`
57+
```

lib/kafkaesque.ex

+39-22
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,31 @@
11
defmodule Kafkaesque do
22
@moduledoc """
3-
This module provides a message publishing API through `Kafkaesque.publish/3`.
3+
This module provides the main APIs for Kafkaesque
44
5-
It also allows you to `use` it and have a streamlined publishing function,
6-
possibly providing encodin function:
5+
### Introduction
76
7+
Kafkaesque is an outbox library built for PostgreSQL and designed, primarily,
8+
for Kafka, though usage with other software is possible and encouraged.
89
9-
defmodule MyApp.Kafka do
10-
use Kafkaesque, repo: MyApp.Repo
10+
- Transactional safety for messages: if they were created, they **will** be
11+
eventually published. They're only created if the transaction commits.
12+
- Ordering: messages are published sequentially for topic/partition combinations
13+
- Shutdown safety: has graceful shutdown and rescue for cases where it doesn't
14+
happen.
15+
- Observability: all operations publish telemetry events.
16+
- Garbage collection: outbox table is periodically cleaned.
17+
- Multi-node safe: safety powered by PostgreSQL.
1118
12-
# Optional, defaults to `body`
13-
def encode(body) do
14-
Jason.encode!(body)
15-
end
19+
For a comprehensive installation guide, check the [Getting started]("getting-started.md")
20+
guide.
1621
17-
def partition(_topic, %MyMessage{id: id}) do
18-
id
19-
end
20-
end
21-
22-
MyApp.Kafka.publish("my_topic", %{hello: :kafka})
23-
24-
`encode/1` should return a string, and defaults to the identity function.
22+
![Basic diagram](http://www.plantuml.com/plantuml/proxy?cache=no&src=https://raw.githubusercontent.com/v0idpwn/kafkaesque/master/diagrams/basic.iuml)
2523
26-
`partition/2` should return an integer, and defaults to 0. It will be used
27-
as the partition for the message.
24+
#### Note
2825
29-
See the documentation for `Kafkaesque.start_link/1` to learn about starting
30-
Kafkaesque in your application.
26+
For most cases, this module shouldn't be called directly. Instead, you want to
27+
`use` it to define an outbox for your application, and call the outbox, as you
28+
do with Ecto repos.
3129
"""
3230

3331
alias Kafkaesque.Message
@@ -60,7 +58,9 @@ defmodule Kafkaesque do
6058
end
6159

6260
@doc """
63-
Starts a Kafkaesque instance. Accepts the following opts:
61+
Starts a Kafkaesque instance.
62+
63+
Accepts the following opts:
6464
6565
- `:repo`: the repo where messages will be read from. Usually should be the
6666
same repo that you're writing to.
@@ -89,11 +89,16 @@ defmodule Kafkaesque do
8989
the table. Defaults to 72 hours.
9090
- `query_opts`: Options to pass to Ecto queries. Defaults to [log: false]
9191
"""
92+
@spec start_link(Keyword.t()) :: {:ok, pid()}
9293
def start_link(opts) do
9394
opts = Keyword.validate!(opts, [:repo] ++ @default_opts)
9495
Kafkaesque.Supervisor.start_link(opts)
9596
end
9697

98+
@doc """
99+
Child spec for a Kafkaesque instance
100+
"""
101+
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
97102
def child_spec(opts) do
98103
opts = Keyword.validate!(opts, [:repo] ++ @default_opts)
99104
Kafkaesque.Supervisor.child_spec(opts)
@@ -120,6 +125,18 @@ defmodule Kafkaesque do
120125
0
121126
end
122127

128+
@spec start_link(Keyword.t()) :: {:ok, pid}
129+
def start_link(opts) do
130+
opts = Keyword.merge(unquote(opts), opts)
131+
Kafkaesque.start_link(opts)
132+
end
133+
134+
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
135+
def child_spec(opts) do
136+
opts = Keyword.merge(unquote(opts), opts)
137+
Kafkaesque.child_spec(opts)
138+
end
139+
123140
defoverridable encode: 1, partition: 2
124141
end
125142
end

lib/kafkaesque/kafka_client.ex

+4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
defmodule Kafkaesque.Client do
2+
@moduledoc """
3+
Behaviour for clients
4+
"""
5+
26
alias Kafkaesque.Message
37

48
@type client() :: term()

lib/kafkaesque/query.ex

+27-24
Original file line numberDiff line numberDiff line change
@@ -34,30 +34,33 @@ defmodule Kafkaesque.Query do
3434
|> order_by([m], m.id)
3535
|> limit(^demand)
3636

37-
repo.transaction(fn ->
38-
repo.query!("SELECT pg_advisory_xact_lock($1)", [@xact_lock_key], query_opts)
39-
40-
Message
41-
|> where([m], m.id in subquery(subset))
42-
|> select([m, _], m)
43-
|> update([m],
44-
set: [
45-
state: :publishing,
46-
attempted_at: fragment("CURRENT_TIMESTAMP"),
47-
attempted_by: ^inspect(node())
48-
],
49-
inc: [attempt: 1]
50-
)
51-
|> repo.update_all([], query_opts)
52-
|> case do
53-
{0, nil} ->
54-
{0, []}
55-
56-
{count, messages} ->
57-
sorted = Enum.sort(messages, fn m1, m2 -> m1.id <= m2.id end)
58-
{count, sorted}
59-
end
60-
end, query_opts)
37+
repo.transaction(
38+
fn ->
39+
repo.query!("SELECT pg_advisory_xact_lock($1)", [@xact_lock_key], query_opts)
40+
41+
Message
42+
|> where([m], m.id in subquery(subset))
43+
|> select([m, _], m)
44+
|> update([m],
45+
set: [
46+
state: :publishing,
47+
attempted_at: fragment("CURRENT_TIMESTAMP"),
48+
attempted_by: ^inspect(node())
49+
],
50+
inc: [attempt: 1]
51+
)
52+
|> repo.update_all([], query_opts)
53+
|> case do
54+
{0, nil} ->
55+
{0, []}
56+
57+
{count, messages} ->
58+
sorted = Enum.sort(messages, fn m1, m2 -> m1.id <= m2.id end)
59+
{count, sorted}
60+
end
61+
end,
62+
query_opts
63+
)
6164
end
6265

6366
@spec update_success_batch(Ecto.Repo.t(), [pos_integer()], Keyword.t()) :: :ok

lib/kafkaesque/rescuer.ex

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ defmodule Kafkaesque.Rescuer do
3232
interval_ms = Keyword.fetch!(opts, :rescuer_interval_ms)
3333
limit_ms = Keyword.fetch!(opts, :rescuer_limit_ms)
3434

35-
{:ok, %{repo: repo, query_opts: query_opts, interval_ms: interval_ms, limit_ms: limit_ms}, {:continue, :rescue}}
35+
{:ok, %{repo: repo, query_opts: query_opts, interval_ms: interval_ms, limit_ms: limit_ms},
36+
{:continue, :rescue}}
3637
end
3738

3839
@impl GenServer

mix.exs

+18-2
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
defmodule Kafkaesque.MixProject do
22
use Mix.Project
33

4-
@version "0.0.1"
4+
@version "1.0.0-rc.0"
55

66
def project do
77
[
88
app: :kafkaesque,
99
version: @version,
10-
elixir: "~> 1.11",
10+
elixir: "~> 1.13",
1111
start_permanent: Mix.env() == :prod,
1212
aliases: aliases(),
1313
elixirc_paths: elixirc_paths(Mix.env()),
1414
deps: deps(),
1515
description: description(),
1616
package: package(),
1717
name: "Kafkaesque",
18+
docs: docs(),
1819
source_url: "https://github.com/v0idpwn/kafkaesque"
1920
]
2021
end
@@ -39,6 +40,21 @@ defmodule Kafkaesque.MixProject do
3940
]
4041
end
4142

43+
defp docs do
44+
[
45+
main: "Kafkaesque",
46+
source_ref: "#v{@version}",
47+
extra_section: "GUIDES",
48+
extras: extras()
49+
]
50+
end
51+
52+
defp extras do
53+
[
54+
"guides/getting-started.md"
55+
]
56+
end
57+
4258
defp description do
4359
"Transactional outbox for kafka"
4460
end

mix.lock

+6-5
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@
55
"db_connection": {:hex, :db_connection, "2.4.3", "3b9aac9f27347ec65b271847e6baeb4443d8474289bd18c1d6f4de655b70c94d", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c127c15b0fa6cfb32eed07465e05da6c815b032508d4ed7c116122871df73c12"},
66
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
77
"dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"},
8-
"earmark_parser": {:hex, :earmark_parser, "1.4.12", "b245e875ec0a311a342320da0551da407d9d2b65d98f7a9597ae078615af3449", [:mix], [], "hexpm", "711e2cc4d64abb7d566d43f54b78f7dc129308a63bc103fbd88550d2174b3160"},
8+
"earmark_parser": {:hex, :earmark_parser, "1.4.31", "a93921cdc6b9b869f519213d5bc79d9e218ba768d7270d46fdcf1c01bacff9e2", [:mix], [], "hexpm", "317d367ee0335ef037a87e46c91a2269fef6306413f731e8ec11fc45a7efd059"},
99
"ecto": {:hex, :ecto, "3.9.5", "9f0aa7ae44a1577b651c98791c6988cd1b69b21bc724e3fd67090b97f7604263", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d4f3115d8cbacdc0bfa4b742865459fb1371d0715515842a1fb17fe31920b74c"},
1010
"ecto_sql": {:hex, :ecto_sql, "3.9.2", "34227501abe92dba10d9c3495ab6770e75e79b836d114c41108a4bf2ce200ad5", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.9.2", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6.0", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16.0 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1eb5eeb4358fdbcd42eac11c1fbd87e3affd7904e639d77903c1358b2abd3f70"},
1111
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
12-
"ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"},
12+
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
1313
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
1414
"jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
1515
"kafka_protocol": {:hex, :kafka_protocol, "4.1.0", "53fac8866969484f783bff204bd4e41e62a97ce9753c83f802a08d5bfc0e0c4c", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "61cb8b80199bf95122cf8073e0f4c0ad62f82515b4d44c54f946a5972c3f5fa5"},
16-
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
17-
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
18-
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
16+
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
17+
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
18+
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
19+
"nimble_parsec": {:hex, :nimble_parsec, "1.3.0", "9e18a119d9efc3370a3ef2a937bf0b24c088d9c4bf0ba9d7c3751d49d347d035", [:mix], [], "hexpm", "7977f183127a7cbe9346981e2f480dc04c55ffddaef746bd58debd566070eef8"},
1920
"postgrex": {:hex, :postgrex, "0.16.5", "fcc4035cc90e23933c5d69a9cd686e329469446ef7abba2cf70f08e2c4b69810", [:mix], [{:connection, "~> 1.1", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "edead639dc6e882618c01d8fc891214c481ab9a3788dfe38dd5e37fd1d5fb2e8"},
2021
"snappyer": {:hex, :snappyer, "1.2.8", "201ce9067a33c71a6a5087c0c3a49a010b17112d461e6df696c722dcb6d0934a", [:rebar3], [], "hexpm", "35518e79a28548b56d8fd6aee2f565f12f51c2d3d053f9cfa817c83be88c4f3d"},
2122
"supervisor3": {:hex, :supervisor3, "1.1.11", "d81cdec31d102fde407423e1d05b569572850deebed86b951d5233c387cba80b", [:rebar3], [], "hexpm", "e6c2dedbcabcba24995a218aca12db5e208b80d3252692b22ef0f1a266104b50"},

0 commit comments

Comments
 (0)