diff --git a/README.rdoc b/README.rdoc index 465b1b9..c485108 100644 --- a/README.rdoc +++ b/README.rdoc @@ -265,6 +265,55 @@ function :: The name of the trigger function to call to log changes Note that it is probably a bad idea to use the same table argument to both +pgt_json_audit_log_setup+ and +pgt_json_audit_log+. +=== Transactional Outbox Events - pgt_outbox_setup and pgt_outbox_events + +These methods setup an outbox table and write events to it when +writes happen to the watched table. + +==== pgt_outbox_setup + +This creates an outbox table and a trigger function that will write +event data to the outbox table. This returns the name of the +trigger function created, which should be passed to ++pgt_outbox_events+. + +Arguments: +table :: The name of the table storing the audit logs. + +Options: +function_name :: The name of the trigger function +outbox_table :: The name for the outbox table. Defaults to table_outbox +event_prefix :: The prefix to use for event_type, defaults to table_ (table_updated, table_created, table_deleted) +boolean_completed_column :: If this is true, the :completed column will be boolean, otherwise it will be Time +uuid_primary_key :: Use a uuid type for the primary key of the outbox table +uuid_function :: The pl/pgsql function name to use for generating a uuid pkey. defaults to :uuid_generate_v4 +function_opts :: Options to pass to +create_function+ when creating the trigger function. +Column Name Options: (column type in parenthesis) +created_column :: defaults to :created (Time) +updated_column :: defaults to :updated (Time) +attempts_column :: defaults to :attempts (Integer) +attempted_column :: defaults to :attempted (Time) +completed_column :: defaults to :completed (Boolean or Time, depending on :boolean_completed_column) +event_type_column :: defaults to :event_type (String) +last_error_column :: defaults to :last_error (String) +data_before_column :: defaults to :data_before (jsonb) +data_after_column :: defaults to :data_after (jsonb) +metadata_column :: defaults to :metadata (jsonb) + +==== pgt_outbox_events + +This adds a trigger to the table that will store events in the outbox table +when updates occur on the table (and match the filter). + +Arguments: +table :: The name of the table to audit +function :: The name of the trigger function to call to log changes (usually returned from pgt_outbox_setup) + +Options: +events :: The events to care about. Defaults to [:updated, :deleted, :created] (all writes) +trigger_name :: The name for the trigger +when :: A filter for the trigger, where clause if you will + == Caveats If you have defined counter or sum cache triggers using this library diff --git a/lib/sequel/extensions/pg_triggers.rb b/lib/sequel/extensions/pg_triggers.rb index dd46953..a636236 100644 --- a/lib/sequel/extensions/pg_triggers.rb +++ b/lib/sequel/extensions/pg_triggers.rb @@ -322,6 +322,78 @@ def pgt_foreign_key_array(opts={}) SQL end + def pgt_outbox_setup(table, opts={}) + function_name = opts.fetch(:function_name, "pgt_outbox_#{pgt_mangled_table_name(table)}") + outbox_table = opts.fetch(:outbox_table, "#{table}_outbox") + quoted_outbox = quote_schema_table(outbox_table) + event_prefix = opts.fetch(:event_prefix, table) + created_column = opts.fetch(:created_column, :created) + updated_column = opts.fetch(:updated_column, :updated) + completed_column = opts.fetch(:completed_column, :completed) + attempts_column = opts.fetch(:attempts_column, :attempts) + attempted_column = opts.fetch(:attempted_column, :attempted) + event_type_column = opts.fetch(:event_type_column, :event_type) + last_error_column = opts.fetch(:last_error_column, :last_error) + data_after_column = opts.fetch(:data_after_column, :data_after) + data_before_column = opts.fetch(:data_before_column, :data_before) + metadata_column = opts.fetch(:metadata_column, :metadata) + boolean_completed_column = opts.fetch(:boolean_completed_column, false) + uuid_primary_key = opts.fetch(:uuid_primary_key, false) + run 'CREATE EXTENSION IF NOT EXISTS "uuid-ossp"' if uuid_primary_key + create_table(outbox_table) do + if uuid_primary_key + uuid_function = opts.fetch(:uuid_function, :uuid_generate_v4) + uuid :id, default: Sequel.function(uuid_function), primary_key: true + else + primary_key :id + end + Integer attempts_column, null: false, default: 0 + Time created_column + Time updated_column + Time attempted_column + if boolean_completed_column + FalseClass completed_column, null: false, default: false + else + Time completed_column + end + String event_type_column, null: false + String last_error_column + jsonb data_before_column + jsonb data_after_column + jsonb metadata_column + index Sequel.asc(created_column) + index Sequel.desc(attempted_column) + end + pgt_created_at outbox_table, created_column + pgt_updated_at outbox_table, updated_column + function_opts = { language: :plpgsql, returns: :trigger, replace: true }.merge(opts.fetch(:function_opts, {})) + create_function(function_name, <<-SQL, function_opts) + BEGIN + #{pgt_pg_trigger_depth_guard_clause(opts)} + IF (TG_OP = 'INSERT') THEN + INSERT INTO #{quoted_outbox} ("#{event_type_column}", "#{data_after_column}") VALUES + ('#{event_prefix}_created', to_jsonb(NEW)); + RETURN NEW; + ELSIF (TG_OP = 'UPDATE') THEN + INSERT INTO #{quoted_outbox} ("#{event_type_column}", "#{data_before_column}", "#{data_after_column}") VALUES + ('#{event_prefix}_updated', to_jsonb(OLD), to_jsonb(NEW)); + RETURN NEW; + ELSIF (TG_OP = 'DELETE') THEN + INSERT INTO #{quoted_outbox} ("#{event_type_column}", "#{data_before_column}") VALUES + ('#{event_prefix}_deleted', to_jsonb(OLD)); + RETURN OLD; + END IF; + END; + SQL + function_name + end + + def pgt_outbox_events(table, function, opts={}) + events = opts.fetch(:events, [:insert, :update, :delete]) + trigger_name = opts.fetch(:trigger_name, "pgt_outbox_#{pgt_mangled_table_name(table)}") + create_trigger(table, trigger_name, function, events: events, replace: true, each_row: true, after: true, when: opts[:when]) + end + private # Add or replace a function that returns trigger to handle the action, diff --git a/spec/sequel_postgresql_triggers_spec.rb b/spec/sequel_postgresql_triggers_spec.rb index bd05a50..f71cdd7 100644 --- a/spec/sequel_postgresql_triggers_spec.rb +++ b/spec/sequel_postgresql_triggers_spec.rb @@ -788,3 +788,138 @@ h.must_equal(:schema=>"public", :table=>"accounts", :action=>"DELETE", :prior=>{"a"=>3, "id"=>2}) end end if DB.server_version >= 90400 + +describe "Basic PostgreSQL Transactional Outbox" do + before do + DB.extension :pg_json + DB.create_table(:accounts){integer :id; String :s} + function_name = DB.pgt_outbox_setup(:accounts, :function_name=>:spgt_outbox_events) + DB.pgt_outbox_events(:accounts, function_name) + @logs = DB[:accounts_outbox].reverse(:created) + end + + after do + DB.drop_table(:accounts, :accounts_outbox) + DB.drop_function(:spgt_outbox_events) + end + + it "should store outbox events for writes on main table" do + @logs.first.must_be_nil + + ds = DB[:accounts] + ds.insert(id: 1, s: 'string') + ds.all.must_equal [{id: 1, s: 'string'}] + h = @logs.first + h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.must_equal(id: 1, attempts: 0, attempted: nil, completed: nil, event_type: "accounts_created", last_error: nil, data_before: nil, data_after: {"s" => "string", "id" => 1}, metadata: nil) + + ds.where(id: 1).update(s: 'string2') + ds.all.must_equal [{id: 1, s: 'string2'}] + h = @logs.first + h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.must_equal(id: 2, attempts: 0, attempted: nil, completed: nil, event_type: "accounts_updated", last_error: nil, data_before: {"s" => "string", "id" => 1}, data_after: {"s" => "string2", "id" => 1}, metadata: nil) + + ds.delete + ds.all.must_equal [] + h = @logs.first + h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.must_equal(id: 3, attempts: 0, attempted: nil, completed: nil, event_type: "accounts_deleted", last_error: nil, data_before: {"s" => "string2", "id" => 1}, data_after: nil, metadata: nil) + end +end if DB.server_version >= 90400 + +describe "PostgreSQL Transactional Outbox With UUID Pkey" do + before do + DB.extension :pg_json + DB.create_table(:accounts){integer :id; String :s} + function_name = DB.pgt_outbox_setup(:accounts, uuid_primary_key: true, function_name: :spgt_outbox_events) + DB.pgt_outbox_events(:accounts, function_name) + @logs = DB[:accounts_outbox].reverse(:created) + end + + after do + DB.drop_table(:accounts, :accounts_outbox) + DB.drop_function(:spgt_outbox_events) + end + + it "should store outbox events for writes on main table" do + @logs.first.must_be_nil + + ds = DB[:accounts] + ds.insert(id: 1, s: 'string') + ds.all.must_equal [{id: 1, s: 'string'}] + h = @logs.first + h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + id = h.delete(:id) + id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/) + h.must_equal(attempts: 0, attempted: nil, completed: nil, event_type: "accounts_created", last_error: nil, data_before: nil, data_after: {"s" => "string", "id" => 1}, metadata: nil) + + ds.where(id: 1).update(s: 'string2') + ds.all.must_equal [{id: 1, s: 'string2'}] + h = @logs.first + h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + id = h.delete(:id) + id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/) + h.must_equal(attempts: 0, attempted: nil, completed: nil, event_type: "accounts_updated", last_error: nil, data_before: {"s" => "string", "id" => 1}, data_after: {"s" => "string2", "id" => 1}, metadata: nil) + + ds.delete + ds.all.must_equal [] + h = @logs.first + h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + id = h.delete(:id) + id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/) + h.must_equal(attempts: 0, attempted: nil, completed: nil, event_type: "accounts_deleted", last_error: nil, data_before: {"s" => "string2", "id" => 1}, data_after: nil, metadata: nil) + end +end if DB.server_version >= 90400 + +describe "PostgreSQL Transactional Outbox With Boolean :completed field" do + before do + DB.extension :pg_json + DB.create_table(:accounts){integer :id; String :s} + function_name = DB.pgt_outbox_setup(:accounts, uuid_primary_key: true, boolean_completed_column: true, function_name: :spgt_outbox_events) + DB.pgt_outbox_events(:accounts, function_name) + @logs = DB[:accounts_outbox].reverse(:created) + end + + after do + DB.drop_table(:accounts, :accounts_outbox) + DB.drop_function(:spgt_outbox_events) + end + + it "should store outbox events for writes on main table" do + @logs.first.must_be_nil + + ds = DB[:accounts] + ds.insert(id: 1, s: 'string') + ds.all.must_equal [{id: 1, s: 'string'}] + h = @logs.first + h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + id = h.delete(:id) + id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/) + h.must_equal(attempts: 0, attempted: nil, completed: false, event_type: "accounts_created", last_error: nil, data_before: nil, data_after: {"s" => "string", "id" => 1}, metadata: nil) + + ds.where(id: 1).update(s: 'string2') + ds.all.must_equal [{id: 1, s: 'string2'}] + h = @logs.first + h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + id = h.delete(:id) + id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/) + h.must_equal(attempts: 0, attempted: nil, completed: false, event_type: "accounts_updated", last_error: nil, data_before: {"s" => "string", "id" => 1}, data_after: {"s" => "string2", "id" => 1}, metadata: nil) + + ds.delete + ds.all.must_equal [] + h = @logs.first + h.delete(:created).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + h.delete(:updated).to_i.must_be_close_to(10, DB.get(Sequel::CURRENT_TIMESTAMP).to_i) + id = h.delete(:id) + id.must_match(/\A\h{8}-\h{4}-\h{4}-\h{4}-\h{12}\z/) + h.must_equal(attempts: 0, attempted: nil, completed: false, event_type: "accounts_deleted", last_error: nil, data_before: {"s" => "string2", "id" => 1}, data_after: nil, metadata: nil) + end +end if DB.server_version >= 90400