Skip to content

Commit be7898f

Browse files
authored
feat: Adds transactional outbox setup and trigger (#1)
* feat: Adds transactional outbox setup and trigger * docs: Adds README section about outbox trigger * test: Adds spec for outbox trigger * test: Completes basic spec for outbox * test: Gets specs back to 100% line/branch coverage
1 parent 327cd90 commit be7898f

File tree

3 files changed

+268
-21
lines changed

3 files changed

+268
-21
lines changed

README.rdoc

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,55 @@ function :: The name of the trigger function to call to log changes
265265
Note that it is probably a bad idea to use the same table argument
266266
to both +pgt_json_audit_log_setup+ and +pgt_json_audit_log+.
267267

268+
=== Transactional Outbox Events - pgt_outbox_setup and pgt_outbox_events
269+
270+
These methods setup an outbox table and write events to it when
271+
writes happen to the watched table.
272+
273+
==== pgt_outbox_setup
274+
275+
This creates an outbox table and a trigger function that will write
276+
event data to the outbox table. This returns the name of the
277+
trigger function created, which should be passed to
278+
+pgt_outbox_events+.
279+
280+
Arguments:
281+
table :: The name of the table storing the audit logs.
282+
283+
Options:
284+
function_name :: The name of the trigger function
285+
outbox_table :: The name for the outbox table. Defaults to table_outbox
286+
event_prefix :: The prefix to use for event_type, defaults to table_ (table_updated, table_created, table_deleted)
287+
boolean_completed_column :: If this is true, the :completed column will be boolean, otherwise it will be timestamptz
288+
uuid_primary_key :: Use a uuid type for the primary key of the outbox table
289+
uuid_function :: The pl/pgsql function name to use for generating a uuid pkey. defaults to :generate_uuid_v4
290+
function_opts :: Options to pass to +create_function+ when creating the trigger function.
291+
Column Name Options: (column type in parenthesis)
292+
created_column :: defaults to :created (timestamptz)
293+
updated_column :: defaults to :updated (timestamptz)
294+
attempts_column :: defaults to :attempts (Integer)
295+
attempted_column :: defaults to :attempted (timestamptz)
296+
completed_column :: defaults to :completed (Boolean or timestamptz, depending on :boolean_completed_column)
297+
event_type_column :: defaults to :event_type (String)
298+
last_error_column :: defaults to :last_error (String)
299+
data_before_column :: defaults to :data_before (jsonb)
300+
data_after_column :: defaults to :data_after (jsonb)
301+
metadata_column :: defaults to :metadata (jsonb)
302+
303+
==== pgt_outbox_events
304+
305+
This adds a trigger to the table that will store events in the outbox table
306+
when updates occur on the table (and match the filter).
307+
308+
Arguments:
309+
table :: The name of the table to audit
310+
function :: The name of the trigger function to call to log changes (usually returned from pgt_outbox_setup)
311+
312+
Options:
313+
events :: The events to care about. Defaults to [:updated, :deleted, :created] (all writes)
314+
trigger_name :: The name for the trigger
315+
when :: A filter for the trigger, where clause if you will
316+
268317
== Caveats
269318

270319
If you have defined counter or sum cache triggers using this library

lib/sequel/extensions/pg_triggers.rb

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,70 @@ def pgt_foreign_key_array(opts={})
322322
SQL
323323
end
324324

325+
def pgt_outbox_setup(table, opts={})
326+
function_name = opts.fetch(:function_name, "pgt_outbox_#{pgt_mangled_table_name(table)}")
327+
outbox_table = opts.fetch(:outbox_table, "#{table}_outbox")
328+
quoted_outbox = quote_schema_table(outbox_table)
329+
event_prefix = opts.fetch(:event_prefix, table)
330+
created_column = opts.fetch(:created_column, :created)
331+
updated_column = opts.fetch(:updated_column, :updated)
332+
event_type_column = opts.fetch(:event_type_column, :event_type)
333+
data_after_column = opts.fetch(:data_after_column, :data_after)
334+
data_before_column = opts.fetch(:data_before_column, :data_before)
335+
boolean_completed_column = opts.fetch(:boolean_completed_column, false)
336+
uuid_primary_key = opts.fetch(:uuid_primary_key, false)
337+
run 'CREATE EXTENSION IF NOT EXISTS "uuid-ossp"' if uuid_primary_key
338+
create_table(outbox_table) do
339+
if uuid_primary_key
340+
uuid_function = opts.fetch(:uuid_function, :uuid_generate_v4)
341+
uuid :id, default: Sequel.function(uuid_function), primary_key: true
342+
else
343+
primary_key :id
344+
end
345+
Integer opts.fetch(:attempts_column, :attempts), null: false, default: 0
346+
column created_column, :timestamptz
347+
column updated_column, :timestamptz
348+
column opts.fetch(:attempted_column, :attempted), :timestamptz
349+
if boolean_completed_column
350+
FalseClass opts.fetch(:completed_column, :completed), null: false, default: false
351+
else
352+
column opts.fetch(:completed_column, :completed), :timestamptz
353+
end
354+
String event_type_column, null: false
355+
String opts.fetch(:last_error_column, :last_error)
356+
jsonb data_before_column
357+
jsonb data_after_column
358+
jsonb opts.fetch(:metadata_column, :metadata)
359+
end
360+
pgt_created_at outbox_table, created_column
361+
pgt_updated_at outbox_table, updated_column
362+
create_function(function_name, (<<-SQL), {:language=>:plpgsql, :returns=>:trigger, :replace=>true}.merge(opts[:function_opts]||{}))
363+
BEGIN
364+
#{pgt_pg_trigger_depth_guard_clause(opts)}
365+
IF (TG_OP = 'INSERT') THEN
366+
INSERT INTO #{quoted_outbox} ("#{event_type_column}", "#{data_after_column}") VALUES
367+
('#{event_prefix}_created', to_jsonb(NEW));
368+
RETURN NEW;
369+
ELSIF (TG_OP = 'UPDATE') THEN
370+
INSERT INTO #{quoted_outbox} ("#{event_type_column}", "#{data_before_column}", "#{data_after_column}") VALUES
371+
('#{event_prefix}_updated', to_jsonb(OLD), to_jsonb(NEW));
372+
RETURN NEW;
373+
ELSIF (TG_OP = 'DELETE') THEN
374+
INSERT INTO #{quoted_outbox} ("#{event_type_column}", "#{data_before_column}") VALUES
375+
('#{event_prefix}_deleted', to_jsonb(OLD));
376+
RETURN OLD;
377+
END IF;
378+
END;
379+
SQL
380+
function_name
381+
end
382+
383+
def pgt_outbox_events(table, function, opts={})
384+
events = opts.fetch(:events, [:insert, :update, :delete])
385+
trigger_name = opts.fetch(:trigger_name, "pgt_outbox_#{pgt_mangled_table_name(table)}")
386+
create_trigger(table, trigger_name, function, events: events, replace: true, each_row: true, after: true, when: opts[:when])
387+
end
388+
325389
private
326390

327391
# Add or replace a function that returns trigger to handle the action,

spec/sequel_postgresql_triggers_spec.rb

Lines changed: 155 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
require 'sequel_postgresql_triggers'
2626
else
2727
puts "Running specs with extension"
28-
DB.extension :pg_triggers
28+
DB.extension :pg_triggers
2929
end
3030
DB.extension :pg_array
3131

@@ -59,31 +59,31 @@
5959

6060
DB[:entries].insert(:id=>2, :account_id=>1)
6161
DB[:accounts].order(:id).select_map(:num_entries).must_equal [2, 0]
62-
62+
6363
DB[:entries].insert(:id=>3, :account_id=>nil)
6464
DB[:accounts].order(:id).select_map(:num_entries).must_equal [2, 0]
65-
65+
6666
DB[:entries].where(:id=>3).update(:account_id=>2)
6767
DB[:accounts].order(:id).select_map(:num_entries).must_equal [2, 1]
68-
68+
6969
DB[:entries].where(:id=>2).update(:account_id=>2)
7070
DB[:accounts].order(:id).select_map(:num_entries).must_equal [1, 2]
71-
71+
7272
DB[:entries].where(:id=>2).update(:account_id=>nil)
7373
DB[:accounts].order(:id).select_map(:num_entries).must_equal [1, 1]
74-
74+
7575
DB[:entries].where(:id=>2).update(:id=>4)
7676
DB[:accounts].order(:id).select_map(:num_entries).must_equal [1, 1]
77-
77+
7878
DB[:entries].where(:id=>4).update(:account_id=>2)
7979
DB[:accounts].order(:id).select_map(:num_entries).must_equal [1, 2]
80-
80+
8181
DB[:entries].where(:id=>4).update(:account_id=>nil)
8282
DB[:accounts].order(:id).select_map(:num_entries).must_equal [1, 1]
83-
83+
8484
DB[:entries].filter(:id=>4).delete
8585
DB[:accounts].order(:id).select_map(:num_entries).must_equal [1, 1]
86-
86+
8787
DB[:entries].delete
8888
DB[:accounts].order(:id).select_map(:num_entries).must_equal [0, 0]
8989
end
@@ -195,34 +195,34 @@
195195

196196
DB[:entries].insert(:id=>2, :account_id=>1, :amount=>200)
197197
DB[:accounts].order(:id).select_map(:balance).must_equal [300, 0]
198-
198+
199199
DB[:entries].insert(:id=>3, :account_id=>nil, :amount=>500)
200200
DB[:accounts].order(:id).select_map(:balance).must_equal [300, 0]
201-
201+
202202
DB[:entries].where(:id=>3).update(:account_id=>2)
203203
DB[:accounts].order(:id).select_map(:balance).must_equal [300, 500]
204-
204+
205205
DB[:entries].exclude(:id=>2).update(:amount=>Sequel.*(:amount, 2))
206206
DB[:accounts].order(:id).select_map(:balance).must_equal [400, 1000]
207-
207+
208208
DB[:entries].where(:id=>2).update(:account_id=>2)
209209
DB[:accounts].order(:id).select_map(:balance).must_equal [200, 1200]
210-
210+
211211
DB[:entries].where(:id=>2).update(:account_id=>nil)
212212
DB[:accounts].order(:id).select_map(:balance).must_equal [200, 1000]
213-
213+
214214
DB[:entries].where(:id=>2).update(:id=>4)
215215
DB[:accounts].order(:id).select_map(:balance).must_equal [200, 1000]
216-
216+
217217
DB[:entries].where(:id=>4).update(:account_id=>2)
218218
DB[:accounts].order(:id).select_map(:balance).must_equal [200, 1200]
219-
219+
220220
DB[:entries].where(:id=>4).update(:account_id=>nil)
221221
DB[:accounts].order(:id).select_map(:balance).must_equal [200, 1000]
222-
222+
223223
DB[:entries].filter(:id=>4).delete
224224
DB[:accounts].order(:id).select_map(:balance).must_equal [200, 1000]
225-
225+
226226
DB[:entries].delete
227227
DB[:accounts].order(:id).select_map(:balance).must_equal [0, 0]
228228
end
@@ -748,7 +748,6 @@
748748
end
749749
end
750750

751-
752751
describe "PostgreSQL JSON Audit Logging" do
753752
before do
754753
DB.extension :pg_json
@@ -788,3 +787,138 @@
788787
h.must_equal(:schema=>"public", :table=>"accounts", :action=>"DELETE", :prior=>{"a"=>3, "id"=>2})
789788
end
790789
end if DB.server_version >= 90400
790+
791+
describe "Basic PostgreSQL Transactional Outbox" do
792+
before do
793+
DB.extension :pg_json
794+
DB.create_table(:accounts){integer :id; String :s}
795+
function_name = DB.pgt_outbox_setup(:accounts, :function_name=>:spgt_outbox_events)
796+
DB.pgt_outbox_events(:accounts, function_name)
797+
@logs = DB[:accounts_outbox].reverse(:created)
798+
end
799+
800+
after do
801+
DB.drop_table(:accounts, :accounts_outbox)
802+
DB.drop_function(:spgt_outbox_events)
803+
end
804+
805+
it "should store outbox events for writes on main table" do
806+
@logs.first.must_be_nil
807+
808+
ds = DB[:accounts]
809+
ds.insert(id: 1, s: 'string')
810+
ds.all.must_equal [{id: 1, s: 'string'}]
811+
h = @logs.first
812+
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
813+
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
814+
h.must_equal(id: 1, attempts: 0, attempted: nil, completed: nil, event_type: "accounts_created", last_error: nil, data_before: nil, data_after: {"s" => "string", "id" => 1}, metadata: nil)
815+
816+
ds.where(id: 1).update(s: 'string2')
817+
ds.all.must_equal [{id: 1, s: 'string2'}]
818+
h = @logs.first
819+
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
820+
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
821+
h.must_equal(id: 2, attempts: 0, attempted: nil, completed: nil, event_type: "accounts_updated", last_error: nil, data_before: {"s" => "string", "id" => 1}, data_after: {"s" => "string2", "id" => 1}, metadata: nil)
822+
823+
ds.delete
824+
ds.all.must_equal []
825+
h = @logs.first
826+
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
827+
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
828+
h.must_equal(id: 3, attempts: 0, attempted: nil, completed: nil, event_type: "accounts_deleted", last_error: nil, data_before: {"s" => "string2", "id" => 1}, data_after: nil, metadata: nil)
829+
end
830+
end if DB.server_version >= 90400
831+
832+
describe "PostgreSQL Transactional Outbox With UUID Pkey" do
833+
before do
834+
DB.extension :pg_json
835+
DB.create_table(:accounts){integer :id; String :s}
836+
function_name = DB.pgt_outbox_setup(:accounts, uuid_primary_key: true, function_name: :spgt_outbox_events)
837+
DB.pgt_outbox_events(:accounts, function_name)
838+
@logs = DB[:accounts_outbox].reverse(:created)
839+
end
840+
841+
after do
842+
DB.drop_table(:accounts, :accounts_outbox)
843+
DB.drop_function(:spgt_outbox_events)
844+
end
845+
846+
it "should store outbox events for writes on main table" do
847+
@logs.first.must_be_nil
848+
849+
ds = DB[:accounts]
850+
ds.insert(id: 1, s: 'string')
851+
ds.all.must_equal [{id: 1, s: 'string'}]
852+
h = @logs.first
853+
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
854+
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
855+
id = h.delete(:id)
856+
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
857+
h.must_equal(attempts: 0, attempted: nil, completed: nil, event_type: "accounts_created", last_error: nil, data_before: nil, data_after: {"s" => "string", "id" => 1}, metadata: nil)
858+
859+
ds.where(id: 1).update(s: 'string2')
860+
ds.all.must_equal [{id: 1, s: 'string2'}]
861+
h = @logs.first
862+
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
863+
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
864+
id = h.delete(:id)
865+
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
866+
h.must_equal(attempts: 0, attempted: nil, completed: nil, event_type: "accounts_updated", last_error: nil, data_before: {"s" => "string", "id" => 1}, data_after: {"s" => "string2", "id" => 1}, metadata: nil)
867+
868+
ds.delete
869+
ds.all.must_equal []
870+
h = @logs.first
871+
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
872+
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
873+
id = h.delete(:id)
874+
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
875+
h.must_equal(attempts: 0, attempted: nil, completed: nil, event_type: "accounts_deleted", last_error: nil, data_before: {"s" => "string2", "id" => 1}, data_after: nil, metadata: nil)
876+
end
877+
end if DB.server_version >= 90400
878+
879+
describe "PostgreSQL Transactional Outbox With UUID Pkey" do
880+
before do
881+
DB.extension :pg_json
882+
DB.create_table(:accounts){integer :id; String :s}
883+
function_name = DB.pgt_outbox_setup(:accounts, uuid_primary_key: true, boolean_completed_column: true, function_name: :spgt_outbox_events)
884+
DB.pgt_outbox_events(:accounts, function_name)
885+
@logs = DB[:accounts_outbox].reverse(:created)
886+
end
887+
888+
after do
889+
DB.drop_table(:accounts, :accounts_outbox)
890+
DB.drop_function(:spgt_outbox_events)
891+
end
892+
893+
it "should store outbox events for writes on main table" do
894+
@logs.first.must_be_nil
895+
896+
ds = DB[:accounts]
897+
ds.insert(id: 1, s: 'string')
898+
ds.all.must_equal [{id: 1, s: 'string'}]
899+
h = @logs.first
900+
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
901+
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
902+
id = h.delete(:id)
903+
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
904+
h.must_equal(attempts: 0, attempted: nil, completed: false, event_type: "accounts_created", last_error: nil, data_before: nil, data_after: {"s" => "string", "id" => 1}, metadata: nil)
905+
906+
ds.where(id: 1).update(s: 'string2')
907+
ds.all.must_equal [{id: 1, s: 'string2'}]
908+
h = @logs.first
909+
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
910+
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
911+
id = h.delete(:id)
912+
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
913+
h.must_equal(attempts: 0, attempted: nil, completed: false, event_type: "accounts_updated", last_error: nil, data_before: {"s" => "string", "id" => 1}, data_after: {"s" => "string2", "id" => 1}, metadata: nil)
914+
915+
ds.delete
916+
ds.all.must_equal []
917+
h = @logs.first
918+
h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
919+
h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i)
920+
id = h.delete(:id)
921+
id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/)
922+
h.must_equal(attempts: 0, attempted: nil, completed: false, event_type: "accounts_deleted", last_error: nil, data_before: {"s" => "string2", "id" => 1}, data_after: nil, metadata: nil)
923+
end
924+
end if DB.server_version >= 90400

0 commit comments

Comments
 (0)