Skip to content

Support insert_all and upsert_all using MERGE #1315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Unreleased

#### Added

- [#1315](https://github.com/rails-sqlserver/activerecord-sqlserver-adapter/pull/1315) Add support for `insert_all` and `upsert_all`

## v8.0.4

#### Fixed
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,22 @@ ActiveRecord::ConnectionAdapters::SQLServerAdapter.showplan_option = 'SHOWPLAN_X
```
**NOTE:** The method we utilize to make SHOWPLANs work is very brittle to complex SQL. There is no getting around this as we have to deconstruct an already prepared statement for the sp_executesql method. If you find that explain breaks your app, simple disable it. Do not open a github issue unless you have a patch. Please [consult the Rails guides](http://guides.rubyonrails.org/active_record_querying.html#running-explain) for more info.

#### `insert_all` / `upsert_all` support

`insert_all` and `upsert_all` on other database system like MySQL, SQlite or PostgreSQL use a clause with their `INSERT` statement to either skip duplicates (`ON DUPLICATE KEY IGNORE`) or to update the existing record (`ON DUPLICATE KEY UPDATE`). Microsoft SQL Server does not offer these clauses, so the support for these two options is implemented slightly different.

Behind the scenes, we execute a `MERGE` query, which joins your data that you want to insert or update into the table existing on the server. The emphasis here is "JOINING", so we also need to remove any duplicates that might make the `JOIN` operation fail, e.g. something like this:

```ruby
Book.insert_all [
{ id: 200, author_id: 8, name: "Refactoring" },
{ id: 200, author_id: 8, name: "Refactoring" }
]
```

The removal of duplicates happens during the SQL query.

Because of this implementation, if you pass `on_duplicate` to `upsert_all`, make sure to assign your value to `target.[column_name]` (e.g. `target.status = GREATEST(target.status, 1)`). To access the values that you want to upsert, use `source.[column_name]`.

## New Rails Applications

Expand Down
157 changes: 145 additions & 12 deletions lib/active_record/connection_adapters/sqlserver/database_statements.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,18 +143,54 @@ def default_insert_value(column)
private :default_insert_value

def build_insert_sql(insert) # :nodoc:
sql = +"INSERT #{insert.into}"

if returning = insert.send(:insert_all).returning
returning_sql = if returning.is_a?(String)
returning
else
Array(returning).map { |column| "INSERTED.#{quote_column_name(column)}" }.join(", ")
end
sql << " OUTPUT #{returning_sql}"
# Use regular insert if not skipping/updating duplicates.
return build_sql_for_regular_insert(insert:) unless insert.skip_duplicates? || insert.update_duplicates?

insert_all = insert.send(:insert_all)
columns_with_uniqueness_constraints = get_columns_with_uniqueness_constraints(insert_all:, insert:)

# If we do not have any columns that might have conflicting values just execute a regular insert, else use merge.
if columns_with_uniqueness_constraints.flatten.empty?
build_sql_for_regular_insert(insert:)
else
build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:)
end
end


def build_sql_for_merge_insert(insert:, insert_all:, columns_with_uniqueness_constraints:) # :nodoc:
sql = <<~SQL
MERGE INTO #{insert.model.quoted_table_name} WITH (UPDLOCK, HOLDLOCK) AS target
USING (
SELECT *
FROM (
SELECT #{insert.send(:columns_list)}, #{partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)}
FROM (#{insert.values_list})
AS t1 (#{insert.send(:columns_list)})
) AS ranked_source
WHERE #{is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)}
) AS source
ON (#{joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)})
SQL

if insert.update_duplicates?
sql << " WHEN MATCHED THEN UPDATE SET "

if insert.raw_update_sql?
sql << insert.raw_update_sql
else
if insert.record_timestamps?
sql << build_sql_for_recording_timestamps_when_updating(insert:)
end

sql << insert.updatable_columns.map { |column| "target.#{quote_column_name(column)}=source.#{quote_column_name(column)}" }.join(",")
end
end
sql << " WHEN NOT MATCHED BY TARGET THEN"
sql << " INSERT (#{insert.send(:columns_list)}) VALUES (#{insert_all.keys_including_timestamps.map { |column| "source.#{quote_column_name(column)}" }.join(", ")})"
sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))
sql << ";"

sql << " #{insert.values_list}"
sql
end

Expand Down Expand Up @@ -406,11 +442,18 @@ def query_requires_identity_insert?(sql)
raw_table_name = get_raw_table_name(sql)
id_column = identity_columns(raw_table_name).first

id_column && sql =~ /^\s*(INSERT|EXEC sp_executesql N'INSERT)[^(]+\([^)]*\b(#{id_column.name})\b,?[^)]*\)/i ? SQLServer::Utils.extract_identifiers(raw_table_name).quoted : false
if id_column && (
sql =~ /^\s*(INSERT|EXEC sp_executesql N'INSERT)[^(]+\([^)]*\b(#{id_column.name})\b,?[^)]*\)/i ||
sql =~ /^\s*MERGE INTO.+THEN INSERT \([^)]*\b(#{id_column.name})\b,?[^)]*\)/im
)
SQLServer::Utils.extract_identifiers(raw_table_name).quoted
else
false
end
end

def insert_sql?(sql)
!(sql =~ /\A\s*(INSERT|EXEC sp_executesql N'INSERT)/i).nil?
!(sql =~ /\A\s*(INSERT|EXEC sp_executesql N'INSERT|MERGE INTO.+THEN INSERT)/im).nil?
end

def identity_columns(table_name)
Expand Down Expand Up @@ -455,6 +498,96 @@ def internal_raw_execute(sql, raw_connection, perform_do: false)

perform_do ? result.do : result
end

# === SQLServer Specific (insert_all / upsert_all support) ===================== #
def build_sql_for_returning(insert:, insert_all:)
return "" unless insert_all.returning

returning_values_sql = if insert_all.returning.is_a?(String)
insert_all.returning
else
Array(insert_all.returning).map do |attribute|
if insert.model.attribute_alias?(attribute)
"INSERTED.#{quote_column_name(insert.model.attribute_alias(attribute))} AS #{quote_column_name(attribute)}"
else
"INSERTED.#{quote_column_name(attribute)}"
end
end.join(",")
end

" OUTPUT #{returning_values_sql}"
end
private :build_sql_for_returning

def get_columns_with_uniqueness_constraints(insert_all:, insert:)
if (unique_by = insert_all.unique_by)
[unique_by.columns]
else
# Compare against every unique constraint (primary key included).
# Discard constraints that are not fully included on insert.keys. Prevents invalid queries.
# Example: ignore unique index for columns ["name"] if insert keys is ["description"]
(insert_all.send(:unique_indexes).map(&:columns) + [insert_all.primary_keys]).select do |columns|
columns.to_set.subset?(insert.keys)
end
end
end
private :get_columns_with_uniqueness_constraints

def build_sql_for_regular_insert(insert:)
sql = "INSERT #{insert.into}"
sql << build_sql_for_returning(insert:, insert_all: insert.send(:insert_all))
sql << " #{insert.values_list}"
sql
end
private :build_sql_for_regular_insert

# why is the "PARTITION BY" clause needed?
# in every DBMS system, insert_all / upsert_all is usually implemented with INSERT, that allows to define what happens
# when duplicates are found (SKIP OR UPDATE)
# by default rows are considered to be unique by every unique index on the table
# but since we have to use MERGE in MSSQL, which in return is a JOIN, we have to perform the "de-duplication" ourselves
# otherwise the "JOIN" clause would complain about non-unique values and being unable to JOIN the two tables
# this works easiest by using PARTITION and make sure that any record
# we are trying to insert is "the first one seen across all the potential columns with uniqueness constraints"
def partition_by_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)
columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index|
<<~PARTITION_BY
ROW_NUMBER() OVER (
PARTITION BY #{group_of_columns_with_uniqueness_constraints.map { |column| quote_column_name(column) }.join(",")}
ORDER BY #{group_of_columns_with_uniqueness_constraints.map { |column| "#{quote_column_name(column)} DESC" }.join(",")}
) AS rn_#{index}
PARTITION_BY
end.join(", ")
end
private :partition_by_columns_with_uniqueness_constraints

def is_first_record_across_all_uniqueness_constraints(columns_with_uniqueness_constraints:)
columns_with_uniqueness_constraints.map.with_index do |group_of_columns_with_uniqueness_constraints, index|
"rn_#{index} = 1"
end.join(" AND ")
end
private :is_first_record_across_all_uniqueness_constraints

def joining_on_columns_with_uniqueness_constraints(columns_with_uniqueness_constraints:)
columns_with_uniqueness_constraints.map do |columns|
columns.map do |column|
"target.#{quote_column_name(column)} = source.#{quote_column_name(column)}"
end.join(" AND ")
end.join(") OR (")
end
private :joining_on_columns_with_uniqueness_constraints

# normally, generating the CASE SQL is done entirely by Rails
# and you would just hook into "touch_model_timestamps_unless" to add your database-specific instructions
# however, since we need to have "target." for the assignment, we also generate the CASE switch ourselves
def build_sql_for_recording_timestamps_when_updating(insert:)
insert.model.timestamp_attributes_for_update_in_model.filter_map do |column_name|
if insert.send(:touch_timestamp_attribute?, column_name)
"target.#{quote_column_name(column_name)}=CASE WHEN (#{insert.updatable_columns.map { |column| "(COALESCE(target.#{quote_column_name(column)}, 'NULL') = COALESCE(source.#{quote_column_name(column)}, 'NULL'))" }.join(" AND ")}) THEN target.#{quote_column_name(column_name)} ELSE #{high_precision_current_timestamp} END,"
end
end.join
end
private :build_sql_for_recording_timestamps_when_updating
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,8 @@ def get_raw_table_name(sql)
.match(/\s*([^(]*)/i)[0]
elsif s.match?(/^\s*UPDATE\s+.*/i)
s.match(/UPDATE\s+([^\(\s]+)\s*/i)[1]
elsif s.match?(/^\s*MERGE INTO.*/i)
s.match(/^\s*MERGE\s+INTO\s+(\[?[a-z_ -]+\]?\.?\[?[a-z_ -]+\]?)\s+(AS|WITH|USING)/i)[1]
else
s.match(/FROM[\s|\(]+((\[[^\(\]]+\])|[^\(\s]+)\s*/i)[1]
end.strip
Expand Down
4 changes: 2 additions & 2 deletions lib/active_record/connection_adapters/sqlserver_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,11 @@ def supports_insert_returning?
end

def supports_insert_on_duplicate_skip?
false
true
end

def supports_insert_on_duplicate_update?
false
true
end

def supports_insert_conflict_target?
Expand Down
12 changes: 11 additions & 1 deletion test/cases/adapter_test_sqlserver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
fixtures :tasks

let(:basic_insert_sql) { "INSERT INTO [funny_jokes] ([name]) VALUES('Knock knock')" }
let(:basic_merge_sql) { "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [id], [name], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([id], [name]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name]" }
let(:basic_update_sql) { "UPDATE [customers] SET [address_street] = NULL WHERE [id] = 2" }
let(:basic_select_sql) { "SELECT * FROM [customers] WHERE ([customers].[id] = 1)" }

Expand Down Expand Up @@ -93,6 +94,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase

it "return unquoted table name object from basic INSERT UPDATE and SELECT statements" do
assert_equal "funny_jokes", connection.send(:get_table_name, basic_insert_sql)
assert_equal "ships", connection.send(:get_table_name, basic_merge_sql)
assert_equal "customers", connection.send(:get_table_name, basic_update_sql)
assert_equal "customers", connection.send(:get_table_name, basic_select_sql)
end
Expand Down Expand Up @@ -213,6 +215,10 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
@identity_insert_sql_unquoted_sp = "EXEC sp_executesql N'INSERT INTO funny_jokes (id, name) VALUES (@0, @1)', N'@0 int, @1 nvarchar(255)', @0 = 420, @1 = N'Knock knock'"
@identity_insert_sql_unordered_sp = "EXEC sp_executesql N'INSERT INTO [funny_jokes] ([name],[id]) VALUES (@0, @1)', N'@0 nvarchar(255), @1 int', @0 = N'Knock knock', @1 = 420"

@identity_merge_sql = "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [id], [name], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([id], [name]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name] WHEN NOT MATCHED BY TARGET THEN INSERT ([id], [name]) VALUES (source.[id], source.[name]) OUTPUT INSERTED.[id]"
@identity_merge_sql_unquoted = "MERGE INTO ships WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT id, name, ROW_NUMBER() OVER ( PARTITION BY id ORDER BY id DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 (id, name) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.id = source.id) WHEN MATCHED THEN UPDATE SET target.name = source.name WHEN NOT MATCHED BY TARGET THEN INSERT (id, name) VALUES (source.id, source.name) OUTPUT INSERTED.id"
@identity_merge_sql_unordered = "MERGE INTO [ships] WITH (UPDLOCK, HOLDLOCK) AS target USING ( SELECT * FROM ( SELECT [name], [id], ROW_NUMBER() OVER ( PARTITION BY [id] ORDER BY [id] DESC ) AS rn_0 FROM ( VALUES (101, N'RSS Sir David Attenborough') ) AS t1 ([name], [id]) ) AS ranked_source WHERE rn_0 = 1 ) AS source ON (target.[id] = source.[id]) WHEN MATCHED THEN UPDATE SET target.[name] = source.[name] WHEN NOT MATCHED BY TARGET THEN INSERT ([name], [id]) VALUES (source.[name], source.[id]) OUTPUT INSERTED.[id]"

@identity_insert_sql_non_dbo = "INSERT INTO [test].[aliens] ([id],[name]) VALUES(420,'Mork')"
@identity_insert_sql_non_dbo_unquoted = "INSERT INTO test.aliens ([id],[name]) VALUES(420,'Mork')"
@identity_insert_sql_non_dbo_unordered = "INSERT INTO [test].[aliens] ([name],[id]) VALUES('Mork',420)"
Expand All @@ -229,6 +235,10 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
assert_equal "[funny_jokes]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_unquoted_sp)
assert_equal "[funny_jokes]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_unordered_sp)

assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql)
assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql_unquoted)
assert_equal "[ships]", connection.send(:query_requires_identity_insert?, @identity_merge_sql_unordered)

assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo)
assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo_unquoted)
assert_equal "[test].[aliens]", connection.send(:query_requires_identity_insert?, @identity_insert_sql_non_dbo_unordered)
Expand All @@ -238,7 +248,7 @@ class AdapterTestSQLServer < ActiveRecord::TestCase
end

it "return false to #query_requires_identity_insert? for normal SQL" do
[basic_insert_sql, basic_update_sql, basic_select_sql].each do |sql|
[basic_insert_sql, basic_merge_sql, basic_update_sql, basic_select_sql].each do |sql|
assert !connection.send(:query_requires_identity_insert?, sql), "SQL was #{sql}"
end
end
Expand Down
Loading