自曾列就别往下看 别折腾了
下面在sqlserver中演示
mysql 请google MySqlBulkLoader
oracle 请google OracleBulkCopy
表结构
DROP TABLE [dbo].[Product]
GO
CREATE TABLE [dbo].[Product] (
[Id] varchar(36) NOT NULL ,
[Name] varchar(255) NOT NULL ,
[Price] decimal(18,4) NOT NULL
)
GO
ALTER TABLE [dbo].[Product] ADD PRIMARY KEY ([Id])
GO
批量添加
public static void Insert<T>(string connectionString, List<T> dataList, string destinationTableName, int batchSize = )
{
DataTable dataTable = ConvertToDataTable(dataList);
Insert(connectionString, dataTable, destinationTableName, batchSize);
} public static void Insert(string connectionString, DataTable dataTable, string destinationTableName, int batchSize = )
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
if (connection.State != ConnectionState.Open)
{
connection.Open();
}
using (SqlTransaction transaction = connection.BeginTransaction())
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction))
{
bulkCopy.BatchSize = batchSize;
bulkCopy.DestinationTableName = destinationTableName;
try
{
bulkCopy.WriteToServer(dataTable);
transaction.Commit();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
transaction.Rollback();
}
}
}
}
}
批量添加测试代码
public static void Insert()
{
List<Product> products = new List<Product>();
for (int i = ; i < ; i++)
{
Product product = new Product
{
Id = Guid.NewGuid().ToString(),
Name = $"商品{i}",
Price = (decimal)i
};
products.Add(product);
}
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
Insert(SqLiteHelper.SqlServerConnection, products, "Product");
stopwatch.Stop();
Console.WriteLine("耗时:" + stopwatch.ElapsedMilliseconds);
}
批量更新
public static void Update<T>(string connectionString, List<T> list, string destinationTableName)
{
var dt = ConvertToDataTable(list);
using (SqlConnection connection = new SqlConnection(connectionString))
{
if (connection.State != ConnectionState.Open)
{
connection.Open();
}
using (SqlTransaction transaction = connection.BeginTransaction())
{
using (SqlCommand command = new SqlCommand(string.Empty, connection))
{
try
{
command.Transaction = transaction;
command.CommandText = "CREATE TABLE #TmpTable(Id varchar(36),Name varchar(255),Price decimal(18,4))";
command.ExecuteNonQuery();
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction))
{
bulkCopy.BulkCopyTimeout = ;
bulkCopy.DestinationTableName = "#TmpTable";
bulkCopy.WriteToServer(dt);
bulkCopy.Close();
}
command.CommandTimeout = ;
command.CommandText = "UPDATE T SET T.Name =Temp.Name FROM " + destinationTableName + " T INNER JOIN #TmpTable Temp ON T.Id=Temp.Id; DROP TABLE #TmpTable;";
command.ExecuteNonQuery();
transaction.Commit();
}
catch (Exception)
{
transaction.Rollback();
}
}
}
}
}
批量更新测试代码
public static List<string> GetList()
{
List<string> list = new List<string>();
using (SqlConnection conn = new SqlConnection(SqLiteHelper.SqlServerConnection))
{
using (SqlCommand command = new SqlCommand("SELECT TOP 5000 Id FROM Product", conn))
{
conn.Open();
var data = command.ExecuteReader();
while (data.Read())
{
list.Add(data["Id"].ToString());
}
}
} return list;
} public static void Update()
{
var list = GetList();
List<Product> products = new List<Product>();
for (int i = ; i < list.Count; i++)
{
Product product = new Product
{
Id = list[i],
Name = $"默认{i}",
Price = (decimal)i *
};
products.Add(product);
}
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
Update(SqLiteHelper.SqlServerConnection, products, "Product");
stopwatch.Stop();
Console.WriteLine("耗时:" + stopwatch.ElapsedMilliseconds);
}
List转DataTable
public static DataTable ConvertToDataTable<T>(IList<T> data)
{
PropertyDescriptorCollection properties = TypeDescriptor.GetProperties(typeof(T));
DataTable table = new DataTable();
foreach (PropertyDescriptor prop in properties)
{
table.Columns.Add(prop.Name, Nullable.GetUnderlyingType(prop.PropertyType) ?? prop.PropertyType);
}
foreach (T item in data)
{
DataRow row = table.NewRow();
foreach (PropertyDescriptor prop in properties)
{
row[prop.Name] = prop.GetValue(item) ?? DBNull.Value;
}
table.Rows.Add(row);
}
return table;
}
实体类
public class Product
{
public string Id { get; set; } public string Name { get; set; } public decimal Price { get; set; }
}
链接字符串配置
public class SqLiteHelper
{
public const string SqlServerConnection = "Data Source=IP;Initial Catalog=库名;uid=帐号;pwd=密码;MultipleActiveResultSets=True";
}
测试了一下 添加10W 差不多 10S左右
补充一个 多表操作
public static void Inserts(string connectionString, Dictionary<string, DataTable> dataTables, int batchSize = )
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
if (connection.State != ConnectionState.Open)
{
connection.Open();
}
using (SqlTransaction transaction = connection.BeginTransaction())
{
try
{
foreach (var item in dataTables)
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction))
{
bulkCopy.BatchSize = batchSize;
bulkCopy.DestinationTableName = item.Key;
bulkCopy.WriteToServer(item.Value);
}
}
transaction.Commit();
}
catch (Exception ex)
{ Console.WriteLine(ex.Message);
transaction.Rollback();
}
}
}
}
测试代码
public static void Inserts()
{
const int count = ;
List<Order> orders = new List<Order>();
List<Product> products = new List<Product>();
for (var i = ; i < count; i++)
{
Product product = new Product
{
Id = Guid.NewGuid().ToString(),
Name = $"商品{i}",
Price = i * 0.8M
};
products.Add(product);
Order order = new Order
{
Id = Guid.NewGuid().ToString(),
ProductId = product.Id,
Remake = "suggestions",
Status =
};
orders.Add(order);
}
var productsDataTable = Batch.ConvertToDataTable(products);
var ordersDataTable = Batch.ConvertToDataTable(orders);
Dictionary<string, DataTable> dataTables = new Dictionary<string, DataTable>
{ { "Product", productsDataTable},
{ "Orders",ordersDataTable}
}; Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
Inserts(SqLiteHelper.SqlServerConnection, dataTables);
stopwatch.Stop();
Console.WriteLine("耗时:" + stopwatch.ElapsedMilliseconds);
}
新增订单实体对象
public class Order
{
public string Id { get; set; } public string ProductId { get; set; } public int Status { get; set; } public string Remake { get; set; }
}
Order表结构
DROP TABLE [dbo].[Orders]
GO
CREATE TABLE [dbo].[Orders] (
[Id] varchar(36) NOT NULL ,
[ProductId] varchar(36) NOT NULL ,
[Status] int NOT NULL ,
[Remake] varchar(255) NOT NULL
) GO ALTER TABLE [dbo].[Orders] ADD PRIMARY KEY ([Id])
GO
批量删除也贴一个吧
public void BatchDelete<T>(List<T> idList)
{
var type = typeof(T);
var id = GetProperties(type);
var innerJoin = $"a.{id}=b.{id}";
var tempTableName = $"#TmpTable{type.Name}";
var dataTableName = BulkCopyRepositoryExtension.GetTableName(type);
var sqlConnection = (SqlConnection)_unit.Connection;
var sqlTransaction = (SqlTransaction)_unit.Transaction;
var sqlCommand = (SqlCommand)_unit.Command;
sqlCommand.CommandText = $"SELECT * INTO {tempTableName} FROM {dataTableName} WHERE 1 = 2";
sqlCommand.ExecuteNonQuery();
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.Default, sqlTransaction))
{
bulkCopy.DestinationTableName = tempTableName;
using (var reader = new ObjectReader(type, idList, BulkCopyRepositoryExtension.GetFields(type)))
{
bulkCopy.WriteToServer(reader);
}
}
sqlCommand.CommandText = $"DELETE a FROM {dataTableName} AS a INNER JOIN {tempTableName} AS b ON {innerJoin}; DROP TABLE {tempTableName};";
sqlCommand.ExecuteNonQuery();
}
批量删除关键代码已贴上 如需全部代码 QQ群 4816230869
为了满足关注粉丝的要求贴上说明
1、SqlBulkCopy类的构造方法
其中: conn表示一个SqlConnection对象
connStr表示数据库连接字符串
- SqlBulkCopy(conn)
- SqlBulkCopy(connStr)
- SqlBulkCopy(connStr, SqlBulkCopyOptions copyOptions)
- SqlBulkCopy(conn, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction)
其中还有几个陌生的对象:SqlBulkCopyOptions 和 SqlTransaction
1.1、SqlBulkCopyOptions类
这个类是一个枚举类型:
对象 | 值 | 备注 |
Default | 0 | |
KeepIdentity | 1 | 保留源标识值。 如果未指定,则由目标分配标识值。 |
CheckConstraints | 2 | 在插入数据的同时检查约束。 默认情况下,不检查约束。 |
TableLock | 4 | 在批量复制操作期间获取批量更新锁。 如果未指定,则使用行锁。 |
KeepNulls | 8 | 保留目标表中的空值,而不管默认值的设置如何。 如果未指定,则空值将由默认值替换(如果适用) |
FireTriggers | 16 | 指定后,会导致服务器为插入到数据库中的行激发插入触发器。 |
UseInternalTransaction | 32 | 如果已指定,则每一批批量复制操作将在事务中进行。 如果指示了此选项,并且为构造函数提供了 System.Data.SqlClient.SqlTransaction对象,则发生 System.ArgumentException(参数异常)。因为两个事务冲突了。 |
1.2、SqlTransaction类
这个类是事务类,是个密封类,实现了DbTransaction抽象类
2、SqlBulkCopy类的常用属性
属性名 | 功能 | 备注 |
BatchSize | 设置或获取每达到多少行就更新到服务器(也就是目标表) | 值为int, |
BulkCopyTimeout | 设置或获取超时时间 | 默认30秒,如果设置成0,将无限制等待, 值为int,单位为秒 |
DestinationTableName | 设置或获取服务器上的目标表的名称 | 也就是批量更新的目标表, 值为String类型 |
EnableStreaming | 设置或获取是否支持传输 IDataReader 对象的数据 | true为支持, 值为bool类型 |
NotifyAfter | 设置或获取在生成通知事件之前要处理的行数 | 默认为0, 值为int类型, |
ColumnMappings | 获取列映射定义数据源中的列和目标表中的列之间的映射关系 | 返回值为SqlBulkCopyColumnMappingCollection |
2.1、表中的SqlBulkCopyColumnMappingCollection类型是一个映射集合类,是目标表的列和源表的列的映射关系的集合。
这个类是一个密封类,不能被继承,实现了一个CollectionBase抽象类。
SqlBulkCopyColumnMappingCollection没有提供构造方法,我们也不需要去newat的对象,主要是使用它的几个重载的Add()方法
Add()有五个重载的方法:
- SqlBulkCopyColumnMapping Add(SqlBulkCopyColumnMapping bulkCopyColumnMapping);
- SqlBulkCopyColumnMapping Add(string sourceColumn, string destinationColumn);
- SqlBulkCopyColumnMapping Add(int sourceColumnIndex, string destinationColumn);
- SqlBulkCopyColumnMapping Add(string sourceColumn, int destinationColumnIndex);
- SqlBulkCopyColumnMapping Add(int sourceColumnIndex, int destinationColumnIndex);
其中四个方法是类似的,都是对应的列名或者列的位置。
第一个方法是添加一个已经构建好的SqlBulkCopyColumnMapping对象,
他也有集合常用的方法:
方法名 | 功能 | 备注 |
Clear(); | 清除集合中的映射关系 | |
Contains(SqlBulkCopyColumnMapping value); | 判断是否包含指定映射关系 | |
IndexOf(SqlBulkCopyColumnMapping value); | 返回指定映射关系的位置 | |
Remove(SqlBulkCopyColumnMapping value); | 移除指定映射关系 | |
RemoveAt(int index); | 移除指定位置的映射关系 | |
Insert(int index, SqlBulkCopyColumnMapping value); | 在指定位置插入映射关系 | |
CopyTo(SqlBulkCopyColumnMapping[] array, int index); | 从指定位置开始将映射关系复制到指定数组中 | index指定的集合中的位置, 而不是数组中的角标 |
3、SqlBulkCopy类的常用方法
- WriteToServer,这个方法重载了四次,功能是将数据写到目的表中。
WriteToServer(DataRow[] rows); | 将 DataRow 数组所有元素写到目标表中 |
WriteToServer(DataTable table); | 将 DataTable 所有行写到目标表中 |
WriteToServer(IDataReader reader); | 将指定的 IDataReader 对象中的数据写到目标表中 |
WriteToServer(DataTable table, DataRowState rowState); | 将 DataTable 中指定状态的所有行写到目标表中 |
【上表中的 DataRowState 状态行可以参考这篇博客DataTable的AcceptChanges()方法和DataRow的RowState属性】