EF批量插入太慢?那是你的姿势纰谬

admin 2个月前 (07-25) 科技 57 0

也许所有的程序员应该都接触过批量插入的场景,我也信赖任何的程序员都能写出可正常运行的批量插入的代码。但怎样实现一个高效、快速插入的批量插入功效呢?

由于每个人的事情履历,事情年限的差别,在实现这样的一个需求时,可能手艺选型各有差别,有直接天生insert语句的,有用EF的或者其他的orm框架的。实在不管是手写insert照样使用EF,最终交给数据库执行的照样insert语句。下面是EF批量插入的示例代码:

var list = new List<Student>();
for (int i = 0; i < 100; i++)
{
    list.Add(new Student { CreateTime = DateTime.Now, Name = "zjjjjjj" });
}

await _context.Students.AddRangeAsync(list);
await _context.SaveChangesAsync();

天生的剧本截图如下:

这种实现方式在数据量100以内时,耗时还算可以。但若是要批量导入的数据到达万级的时刻,那耗时简直是灾难。我测试的数据如下(测试数据库为mysql,详细设置不详):

数据量 耗时(s)
10 0.028
1w 3.929
10w 31.280

10w的数据已经耗时超过了30s,我没有勇气测试100w数据的耗时,有兴趣的可以自行测试下。

下面就应该进入正题了,对于较大数据量(1000以上)场景下的批量插入,各个数据库应该都提供了相关的解决方案,由于事情所限,现在笔者仅接触过mysql和mssql。

mysql的实现方案是LOAD DATA下令,此下令吸收一个csv文件,然后将文件上传到数据库服务器后,剖析数据后插入。幸亏MySqlConnector提供了相关的封装,不用咱们去熟悉那么庞大的下令参数。

mssql实现的方案是使用SqlBulkCopy类,不外此类仅吸收DataTable类型的数据,以是,在批量插入的时刻,需要将数据源转换成DataTable。

综上所示,不管是mysql,照样mssql,均需要将数据源转换成指定的花样才可以使用批量导入的功效,以是这一块的主要焦点就是转换数据源花样。mysql需要转换成csv,mssql需要转换成DataTable。下面就来一起看看详细的转换的方式。

以下代码是转换csv和DataTable相关方式:

namespace FL.DbBulk
{
    public static class Extension
    {
        /// <summary>
        /// 获取实体影射的表名
        /// </summary>
        /// <param name="type"></param>
        /// <returns></returns>
        public static string GetMappingName(this System.Type type)
        {
            var key = $"batch{type.FullName}";

            var tableName = CacheService.Get(key);
            if (string.IsNullOrEmpty(tableName))
            {
                var tableAttr = type.GetCustomAttribute<TableAttribute>();
                if (tableAttr != null)
                {
                    tableName = tableAttr.Name;
                }
                else
                {
                    tableName = type.Name;
                }
                CacheService.Add(key, tableName);
            }
            return tableName;
        }

        public static List<EntityInfo> GetMappingProperties(this System.Type type)
        {
            var key = $"ICH.King.DbBulk{type.Name}";
            var list = CacheService.Get<List<EntityInfo>>(key);
            if (list == null)
            {
                list = new List<EntityInfo>();
                foreach (var propertyInfo in type.GetProperties())
                {
                    if (!propertyInfo.PropertyType.IsValueType &&
                        propertyInfo.PropertyType.Name != "Nullable`1" && propertyInfo.PropertyType != typeof(string)) continue;
                    var temp = new EntityInfo();
                    temp.PropertyInfo = propertyInfo;
                    temp.FieldName = propertyInfo.Name;
                    var attr = propertyInfo.GetCustomAttribute<ColumnAttribute>();
                    if (attr != null)
                    {
                        temp.FieldName = attr.Name;
                    }
                    temp.GetMethod = propertyInfo.CreateGetter();
                    list.Add(temp);
                }
                CacheService.Add(key, list);
            }

            return list;
        }

        /// <summary>
        /// 建立cvs字符串
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="entities"></param>
        /// <param name="primaryKey"></param>
        /// <returns></returns>
        public static string CreateCsv<T>(this IEnumerable<T> entities, string primaryKey = "")
        {
            var sb = new StringBuilder();
            var properties = typeof(T).GetMappingProperties().ToArray();
            foreach (var entity in entities)
            {
                for (int i = 0; i < properties.Length; i++)
                {
                    var ele = properties[i];
                    if (i != 0) sb.Append(",");
                    var value = ele.Get(entity);
                    if (ele.PropertyInfo.PropertyType.Name == "Nullable`1")
                    {
                        if (ele.PropertyInfo.PropertyType.GenericTypeArguments[0] == typeof(DateTime))
                        {
                            if (value == null)
                            {
                                sb.Append("NULL");
                            }
                            else
                            {
                                sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));
                            }
                            continue;
                        }
                    }

                    if (ele.PropertyInfo.PropertyType == typeof(DateTime))
                    {
                        sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));
                        continue;
                    }
                    //若是是主键&&string类型,且值不为空
                    if (ele.FieldName == primaryKey && ele.PropertyInfo.PropertyType == typeof(string))
                    {
                        sb.Append(Guid.NewGuid().ToString());
                        continue;
                    }
                    if (value == null)
                    {
                        continue;
                    }
                    if (ele.PropertyInfo.PropertyType == typeof(string))
                    {
                        var vStr = value.ToString();
                        if (vStr.Contains("\""))
                        {
                            vStr = vStr.Replace("\"", "\"\"");
                        }
                        if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n"))
                        {
                            vStr = $"\"{vStr}\"";
                        }
                        sb.Append(vStr);
                    }
                    else sb.Append(value);
                }
                sb.Append(IsWin() ? "\r\n" : "\n");
                //sb.AppendLine();
            }

            return sb.ToString();
        }

        public static bool IsWin()
        {
            return RuntimeInformation.IsOSPlatform(OSPlatform.Windows);
        }

        public static string CreateCsv(this DataTable table)
        {
            StringBuilder sb = new StringBuilder();
            DataColumn colum;
            foreach (DataRow row in table.Rows)
            {
                for (int i = 0; i < table.Columns.Count; i++)
                {
                    colum = table.Columns[i];
                    if (i != 0) sb.Append(",");
                    if (colum.DataType == typeof(string))
                    {
                        var vStr = row[colum].ToString();
                        if (vStr.Contains("\""))
                        {
                            vStr = vStr.Replace("\"", "\"\"");
                        }
                        if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n"))
                        {
                            vStr = $"\"{vStr}\"";
                        }
                        sb.Append(vStr);
                    }
                    else sb.Append(row[colum]);
                }
                sb.Append(IsWin() ? "\r\n" : "\n");
            }
            return sb.ToString();
        }


        public static DataTable ToDataTable<T>(this IEnumerable<T> list, string primaryKey = "")
        {
            var type = typeof(T);
            //获取实体映射的表名
            var mappingName = type.GetMappingName();
            var dt = new DataTable(mappingName);
            //获取实体映射的属性列表
            var columns = type.GetMappingProperties();
            dt.Columns.AddRange(columns.Select(x => new DataColumn(x.FieldName)).ToArray());
            foreach (var data in list)
            {
                var row = dt.NewRow();
                foreach (var entityInfo in columns)
                {
                    var value = entityInfo.Get(data);
                    if (primaryKey == entityInfo.FieldName && entityInfo.PropertyInfo.PropertyType == typeof(string))
                    {
                        row[entityInfo.FieldName] = value ?? Guid.NewGuid().ToString();
                    }
                    else
                    {
                        row[entityInfo.FieldName] = value;
                    }
                }
                dt.Rows.Add(row);
            }

            return dt;
        }

       
    }
}

转换成DataTable方式相对简朴,但这里我做了个优化下,当判断主键是string类型,且值为空时,会自动天生一个GUID,并给其赋值,这样做的目的是为了和EF原生的插入功效兼容。

天生Csv的相对对照贫苦,由于Csv是用逗号以及其他符号来区分每一行、每一列数据,但经常会存在要插入的数据包含了csv的特殊符号,这样情况下就需要做转义。另外,另有一个需要思量的问题,linux和windows默认的换行符是有区别的,windows的换行符为\r\n,而linux默认的是\n,以是在天生csv时,需要凭据差别的系统举行处置。

下面来看下详细怎么挪用相关的插入方式,首先看下mysql的,主要代码如下所示:

private async Task InsertCsvAsync(string csv, string tableName, List<string> columns)
{
    var fileName = Path.GetTempFileName();
    await File.WriteAllTextAsync(fileName, csv);
    var conn = _context.Database.GetDbConnection() as MySqlConnection;
    var loader = new MySqlBulkLoader(conn)
    {
        FileName = fileName,
        Local = true,
        LineTerminator = Extension.IsWin() ? "\r\n" : "\n",
        FieldTerminator = ",",
        TableName = tableName,
        FieldQuotationCharacter = '"',
        EscapeCharacter = '"',
        CharacterSet = "UTF8"
    };
    loader.Columns.AddRange(columns);
    await loader.LoadAsync();
}

在上述的代码中,首先建立一个临时文件,然后将其他数据源转换的csv内容写入到文件中,获取数据库毗邻,再然后建立MySqlBulkLoader类的实例,将相关参数举行复制后,还需要设置字段列表,最后执行LoadAsync下令。

下面是mssql的批量插入的焦点代码:

public async Task InsertAsync(DataTable table)
{
    if (table == null)
    {
        throw new ArgumentNullException();
    }

    if (string.IsNullOrEmpty(table.TableName))
    {
        throw new ArgumentNullException("DataTable的TableName属性不能为空");
    }
    var conn = (SqlConnection)_context.Database.GetDbConnection();
    await conn.OpenAsync();
    using (var bulk = new SqlBulkCopy(conn))
    {
        bulk.DestinationTableName = table.TableName;
        foreach (DataColumn column in table.Columns)
        {
            bulk.ColumnMappings.Add(column.ColumnName, column.ColumnName);
        }
        await bulk.WriteToServerAsync(table);
    }
}

以上方式相对简朴,在此不做更多注释。

至此,mysql和mssql批量的导入的方案已经先容完毕,但可能就会有人说了,这跟EF似乎也没什么关系呀。
实在若是你有仔细看的话,或许能发现,我在代码中使用了一个名为_context字段,此字段实在就是EF的DbContext的实例。但文章内容到此时也没有完全的和EF连系,下面就来先容下若何更优雅的将此功效集成到EF中。

在.net core中,接入EF的时刻实在已经指定了使用的数据库类型,实例代码如下:

 services.AddDbContext<MyDbContext>(opt => opt.UseMySql("server=10.0.0.146;Database=demo;Uid=root;Pwd=123456;Port=3306;AllowLoadLocalInfile=true"))

既然以及指定了数据库类型,那么在挪用批量插入的时刻,应该就不需要让挪用者判断是使用mysql的方式,照样mssql的方式。详细怎么设计呢?且耐心往下看。

首先划分界说接口ISqlBulk,IMysqlBulk,ISqlServerBulk代码如下:

namespace FL.DbBulk
{
    public interface ISqlBulk
    {
        /// <summary>
        /// 批量导入数据
        /// </summary>
        /// <param name="table">数据源</param>
        void Insert(DataTable table);
        /// <summary>
        /// 批量导入数据
        /// </summary>
        /// <param name="table">数据源</param>
        Task InsertAsync(DataTable table);

        void Insert<T>(IEnumerable<T> enumerable) where T : class;
        Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class;
    }
}

IMysqlBulk,ISqlServerBulk接口继续ISqlBulk,代码如下:

namespace FL.DbBulk
{
    public interface IMysqlBulk : ISqlBulk
    {
        Task InsertAsync<T>(string csvPath, string tableName = "") where T : class;
    }
}
namespace FL.DbBulk
{
    public interface ISqlServerBulk:ISqlBulk
    {
        
    }
}

然后建立ISqlBulk实现类:

namespace FL.DbBulk
{
    public class SqlBulk : ISqlBulk
    {
        private ISqlBulk _bulk;
        public SqlBulk(DbContext context, IServiceProvider provider)
        {
            if (context.Database.IsMySql())
            {
                _bulk = provider.GetService<IMysqlBulk>();
            }
            else if (context.Database.IsSqlServer())
            {
                _bulk = provider.GetService<ISqlServerBulk>();
            }
        }

        public void Insert(DataTable table)
        {
            _bulk.Insert(table);
        }

        public async Task InsertAsync(DataTable table)
        {
            await _bulk.InsertAsync(table);
        }

        public void Insert<T>(IEnumerable<T> enumerable) where T : class
        {
            _bulk.Insert(enumerable);
        }

        public async Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class
        {
            await _bulk.InsertAsync(enumerable);
        }

    }
}

在SqlBulk的组织函数中,通过context.Database的扩展方式判断数据库的类型,然后再获取响应的接口的实例。再然后就是实现IMysqlBulk和ISqlServerBulk的实现类。上文已经把焦点代码贴出,再此为了篇幅,就不贴完整代码了。

再然后,就是提供一个注入services的方式,代码如下:

namespace Microsoft.Extensions.DependencyInjection
{
    public static class ServiceCollectionExtension
    {
        public static IServiceCollection AddBatchDB<T>(this IServiceCollection services) where  T:DbContext
        {
            services.TryAddScoped<IMysqlBulk, MysqlBulk>();
            services.TryAddScoped<ISqlServerBulk, SqlServerBulk>();
            services.TryAddScoped<ISqlBulk, SqlBulk>();
            services.AddScoped<DbContext, T>();
            return services;
        }
    }
}

有了以上代码,我们就可以通过在Startup中很利便的启用批量插入的功效了。

最后,贴出两种插入方式对比的测试数据:

数据量 EF默认耗时(s) ISqlBulk耗时(s)
10 0.028 0.030
1w 3.929 1.581
10w 31.280 15.408

以上测试数据均是使用同一个mysql数据库,差别设置以及网络环境下,测试的数据会有差异,有兴趣的可以自己试试。

至此,本人内容已完毕。

最后,贴出git地址,若是思绪或代码可以帮到你,迎接点赞,点star
https://github.com/fuluteam/FL.DbBulk.git

福禄ICH·架构组 福尔斯,

欧博allbet客户端

欢迎进入欧博allbet客户端(Allbet Game):www.aLLbetgame.us,欧博官网是欧博集团的官方网站。欧博官网开放Allbet注册、Allbe代理、Allbet电脑客户端、Allbet手机版下载等业务。

Sunbet声明:该文看法仅代表作者自己,与本平台无关。转载请注明:EF批量插入太慢?那是你的姿势纰谬

标签列表

    文章归档

      站点信息

      • 文章总数:465
      • 页面总数:0
      • 分类总数:8
      • 标签总数:804
      • 评论总数:0
      • 浏览总数:13467