-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathBulkOperation.cs
102 lines (90 loc) · 3.85 KB
/
BulkOperation.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
using System;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace BulkOperations
{
public static class BulkOperation
{
public static void Run()
{
var stopwatch = new Stopwatch();
stopwatch.Start();
ConsoleLog.Write("Início processamento");
InsertAsync().Wait();
stopwatch.Stop();
ConsoleLog.Write($"Processamento concluído em {stopwatch.ElapsedTimeFmt()}");
}
public static async Task InsertAsync(CancellationToken cancellationToken = default(CancellationToken), SqlTransaction transaction = null)
{
using (var connection = new SqlConnection())
{
connection.Configure();
await connection.OpenAsync(cancellationToken).ConfigureAwait(true);
const int recordsToGenerate = 100_000;
ConsoleLog.Write($"Gerando {recordsToGenerate:n0} registros clientes");
var customers = Customer.Generate(recordsToGenerate);
ConsoleLog.Write("Inserindo registros no banco de dados");
using (var insertBulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction))
{
using (var customerReader = new ObjectDataReader<Customer>(customers.GetEnumerator()))
{
insertBulk.Configure("Customer");
await insertBulk.WriteToServerAsync(customerReader, cancellationToken).ConfigureAwait(false);
}
}
using (var command = new SqlCommand("", connection))
{
try
{
ConsoleLog.Write("Criando tabela temporária e copiando registros");
command.CommandText = BulkUpdate.CreateTempTable;
command.ExecuteNonQuery();
try
{
ConsoleLog.Write("Atualizando registros: Tabela temporária -> Customers");
command.CommandTimeout = 300;
command.CommandText = BulkUpdate.UpdateTable;
command.ExecuteNonQuery();
}
finally
{
ConsoleLog.Write("Excluindo tabela temporária");
command.CommandText = BulkUpdate.DropTempTable;
command.ExecuteNonQuery();
}
}
catch (Exception e)
{
ConsoleLog.Write(e.Message);
}
}
}
}
private static void Configure(this SqlBulkCopy bulk, string tableName)
{
bulk.MapEntity(tableName);
bulk.ConfigureOptions();
bulk.ConfigureLog();
}
private static void MapEntity(this SqlBulkCopy bulk, string tableName)
{
bulk.DestinationTableName = tableName;
bulk.ColumnMappings.Add(nameof(Customer.Id), "Id");
bulk.ColumnMappings.Add(nameof(Customer.FirstName), "FirstName");
bulk.ColumnMappings.Add(nameof(Customer.LastName), "LastName");
bulk.ColumnMappings.Add(nameof(Customer.DateOfBirth), "DateOfBirth");
}
private static void ConfigureOptions(this SqlBulkCopy bulk)
{
bulk.EnableStreaming = true;
bulk.BatchSize = 10_000;
bulk.NotifyAfter = 1_000;
}
private static void ConfigureLog(this SqlBulkCopy bulk)
{
bulk.SqlRowsCopied += (sender, e) => ConsoleLog.Write($"{DateTime.Now} RowsCopied: {e.RowsCopied:n0}");
}
}
}