@@ -48,6 +48,12 @@ public override string ToString()
4848 }
4949 }
5050
51+ public enum QueryType
52+ {
53+ Insert ,
54+ Merge
55+ }
56+
5157 /// <typeparam name="T">A user-defined POCO that represents a row of the user's table</typeparam>
5258 internal class SqlAsyncCollector < T > : IAsyncCollector < T > , IDisposable
5359 {
@@ -224,6 +230,13 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
224230 throw ex ;
225231 }
226232
233+ IEnumerable < string > bracketedColumnNamesFromItem = GetColumnNamesFromItem ( rows . First ( ) )
234+ . Where ( prop => ! tableInfo . PrimaryKeys . Any ( k => k . IsIdentity && string . Equals ( k . Name , prop , StringComparison . Ordinal ) ) ) // Skip any identity columns, those should never be updated
235+ . Select ( prop => prop . AsBracketQuotedString ( ) ) ;
236+ var table = new SqlObject ( fullTableName ) ;
237+ string mergeOrInsertQuery = tableInfo . QueryType == QueryType . Insert ? TableInformation . GetInsertQuery ( table , bracketedColumnNamesFromItem ) :
238+ TableInformation . GetMergeQuery ( tableInfo . PrimaryKeys , table , bracketedColumnNamesFromItem ) ;
239+
227240 TelemetryInstance . TrackEvent ( TelemetryEventName . UpsertStart , props ) ;
228241 this . _logger . LogDebugWithThreadId ( "BEGIN UpsertRowsTransaction" ) ;
229242 var transactionSw = Stopwatch . StartNew ( ) ;
@@ -241,7 +254,7 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
241254 {
242255 batchCount ++ ;
243256 GenerateDataQueryForMerge ( tableInfo , batch , out string newDataQuery , out string rowData ) ;
244- command . CommandText = $ "{ newDataQuery } { tableInfo . Query } ;";
257+ command . CommandText = $ "{ newDataQuery } { mergeOrInsertQuery } ;";
245258 this . _logger . LogDebugWithThreadId ( $ "UpsertRowsTransactionBatch - Query={ command . CommandText } ") ;
246259 par . Value = rowData ;
247260 await command . ExecuteNonQueryAsync ( ) ;
@@ -340,12 +353,12 @@ private static void GenerateDataQueryForMerge(TableInformation table, IEnumerabl
340353 else
341354 {
342355 // SQL Server allows 900 bytes per primary key, so use that as a baseline
343- var combinedPrimaryKey = new StringBuilder ( 900 * table . PrimaryKeys . Count ( ) ) ;
356+ var combinedPrimaryKey = new StringBuilder ( 900 * table . PrimaryKeyProperties . Count ( ) ) ;
344357 // Look up primary key of T. Because we're going in the same order of properties every time,
345358 // we can assume that if two rows with the same primary key are in the list, they will collide
346- foreach ( PropertyInfo primaryKey in table . PrimaryKeys )
359+ foreach ( PropertyInfo primaryKeyProperty in table . PrimaryKeyProperties )
347360 {
348- object value = primaryKey . GetValue ( row ) ;
361+ object value = primaryKeyProperty . GetValue ( row ) ;
349362 // Identity columns are allowed to be optional, so just skip the key if it doesn't exist
350363 if ( value == null )
351364 {
@@ -379,7 +392,9 @@ private static void GenerateDataQueryForMerge(TableInformation table, IEnumerabl
379392
380393 public class TableInformation
381394 {
382- public IEnumerable < PropertyInfo > PrimaryKeys { get ; }
395+ public List < PrimaryKey > PrimaryKeys { get ; }
396+
397+ public IEnumerable < PropertyInfo > PrimaryKeyProperties { get ; }
383398
384399 /// <summary>
385400 /// All of the columns, along with their data types, for SQL to use to turn JSON into a table
@@ -392,10 +407,9 @@ public class TableInformation
392407 public IEnumerable < string > ColumnDefinitions => this . Columns . Select ( c => $ "{ c . Key } { c . Value } ") ;
393408
394409 /// <summary>
395- /// T-SQL merge or insert statement generated from primary keys
396- /// and column names for a specific table.
410+ /// Whether to use an insert query or merge query.
397411 /// </summary>
398- public string Query { get ; }
412+ public QueryType QueryType { get ; }
399413
400414 /// <summary>
401415 /// Whether at least one of the primary keys on this table is an identity column
@@ -407,11 +421,12 @@ public class TableInformation
407421 /// </summary>
408422 public JsonSerializerSettings JsonSerializerSettings { get ; }
409423
410- public TableInformation ( IEnumerable < PropertyInfo > primaryKeys , IDictionary < string , string > columns , string query , bool hasIdentityColumnPrimaryKeys )
424+ public TableInformation ( List < PrimaryKey > primaryKeys , IEnumerable < PropertyInfo > primaryKeyProperties , IDictionary < string , string > columns , QueryType queryType , bool hasIdentityColumnPrimaryKeys )
411425 {
412426 this . PrimaryKeys = primaryKeys ;
427+ this . PrimaryKeyProperties = primaryKeyProperties ;
413428 this . Columns = columns ;
414- this . Query = query ;
429+ this . QueryType = queryType ;
415430 this . HasIdentityColumnPrimaryKeys = hasIdentityColumnPrimaryKeys ;
416431
417432 // Convert datetime strings to ISO 8061 format to avoid potential errors on the server when converting into a datetime. This
@@ -625,24 +640,19 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
625640
626641 // If any identity columns or columns with default values aren't included in the object then we have to generate a basic insert since the merge statement expects all primary key
627642 // columns to exist. (the merge statement can handle nullable columns though if those exist)
628- bool usingInsertQuery = ( hasIdentityColumnPrimaryKeys || hasDefaultColumnPrimaryKeys ) && missingPrimaryKeysFromItem . Any ( ) ;
629-
630- IEnumerable < string > bracketedColumnNamesFromItem = objectColumnNames
631- . Where ( prop => ! primaryKeys . Any ( k => k . IsIdentity && string . Equals ( k . Name , prop , StringComparison . Ordinal ) ) ) // Skip any identity columns, those should never be updated
632- . Select ( prop => prop . AsBracketQuotedString ( ) ) ;
633- string query = usingInsertQuery ? GetInsertQuery ( table , bracketedColumnNamesFromItem ) : GetMergeQuery ( primaryKeys , table , bracketedColumnNamesFromItem ) ;
643+ QueryType queryType = ( hasIdentityColumnPrimaryKeys || hasDefaultColumnPrimaryKeys ) && missingPrimaryKeysFromItem . Any ( ) ? QueryType . Insert : QueryType . Merge ;
634644
635645 tableInfoSw . Stop ( ) ;
636646 var durations = new Dictionary < TelemetryMeasureName , double > ( )
637647 {
638648 { TelemetryMeasureName . GetColumnDefinitionsDurationMs , columnDefinitionsSw . ElapsedMilliseconds } ,
639649 { TelemetryMeasureName . GetPrimaryKeysDurationMs , primaryKeysSw . ElapsedMilliseconds }
640650 } ;
641- sqlConnProps . Add ( TelemetryPropertyName . QueryType , usingInsertQuery ? "insert" : "merge" ) ;
651+ sqlConnProps . Add ( TelemetryPropertyName . QueryType , queryType . ToString ( ) ) ;
642652 sqlConnProps . Add ( TelemetryPropertyName . HasIdentityColumn , hasIdentityColumnPrimaryKeys . ToString ( ) ) ;
643653 TelemetryInstance . TrackDuration ( TelemetryEventName . GetTableInfo , tableInfoSw . ElapsedMilliseconds , sqlConnProps , durations ) ;
644654 logger . LogDebugWithThreadId ( $ "END RetrieveTableInformationAsync Duration={ tableInfoSw . ElapsedMilliseconds } ms DB and Table: { sqlConnection . Database } .{ fullName } . Primary keys: [{ string . Join ( "," , primaryKeys . Select ( pk => pk . Name ) ) } ]. SQL Column and Definitions: [{ string . Join ( "," , columnDefinitionsFromSQL ) } ] Object columns: [{ string . Join ( "," , objectColumnNames ) } ]") ;
645- return new TableInformation ( primaryKeyProperties , columnDefinitionsFromSQL , query , hasIdentityColumnPrimaryKeys ) ;
655+ return new TableInformation ( primaryKeys , primaryKeyProperties , columnDefinitionsFromSQL , queryType , hasIdentityColumnPrimaryKeys ) ;
646656 }
647657 }
648658
0 commit comments