文章目錄
- 前言
- 一、安裝包
- 二、使用步驟
- 1.實現SignalR Hub服務:
- 2.實現CSV文件解析及數據導入服務
- 3.控制器
- 4.前端實現(vue)
- 三、關鍵技術點說明
- 總結
前言
導入CSV文件中的數據到數據庫,使用CsvHelper解析CSV文件,SqlBulkCopy批量導入數據,SignalR Hub 進度推送。
一、安裝包
-
CsvHelper
Install-Package CsvHelper
-
SqlBulkCopy
Install-Package Microsoft.EntityFrameworkCore.SqlServer
注意安裝包版本,根據自己項目的版本選擇合適的Packages
二、使用步驟
1.實現SignalR Hub服務:
-
MyHubService.cs
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Identity; using Microsoft.AspNetCore.SignalR; using Microsoft.EntityFrameworkCore; using SignalRDemo.Data; using SignalRDemo.Entity; using SignalRDemo.Interfaces; using System.Collections.Concurrent; using System.Linq; using static Microsoft.EntityFrameworkCore.DbLoggerCategory.Database;namespace SignalRDemo.HubService {public class MyHubService:Hub{private readonly MyDbContext myDbContext;private readonly UserManager<User> userManager;// 在內存中緩存組信息以提高性能private static readonly ConcurrentDictionary<string, GroupInfo> _groups =new ConcurrentDictionary<string, GroupInfo>();// 存儲用戶-連接ID映射private static readonly ConcurrentDictionary<string, string> _userConnections = new();// 存儲組信息//private static readonly ConcurrentDictionary<string, HashSet<string>> _groups = new();// 存儲管理員列表private static readonly HashSet<string> _admins = new();private readonly IImportECDictService _importECDictService;public MyHubService(UserManager<User> userManager, MyDbContext myDbContext, IImportECDictService importECDictService){this.userManager = userManager;this.myDbContext = myDbContext;_importECDictService = importECDictService;}// 客戶端連接時調用public override async Task OnConnectedAsync(){var connectionID=Context.ConnectionId;var userID=Context.UserIdentifier;var userName = Context.User.Identity.Name;// 用戶連接時加入其所屬的組var userGroups = await GetUserGroupsAsync(Convert.ToInt64(userID));foreach (var groupName in userGroups){await Groups.AddToGroupAsync(Context.ConnectionId, groupName);}// 主動發送 connectionId 給客戶端await Clients.Caller.SendAsync("ReceiveConnectionId", Context.ConnectionId);await base.OnConnectedAsync();存儲用戶-連接 映射//_userConnections[connectionID] = userID; 將用戶加入"所有用戶"組//await Groups.AddToGroupAsync(connectionID, "AllUsers"); 如果用戶是管理員,加入管理員組//if (Context.User?.IsInRole("admin") == true)//{// _admins.Add(userID);// await Groups.AddToGroupAsync(connectionID, "AdminUsers");//}//await base.OnConnectedAsync();}// 客戶端斷開連接時調用public override async Task OnDisconnectedAsync(Exception? exception){var connectionID = Context.ConnectionId;if (_userConnections.TryRemove(connectionID,out var userID)){if (_admins.Contains(userID)){await Groups.RemoveFromGroupAsync(connectionID,"AdminUsers");}// 從所有組中移除await Groups.RemoveFromGroupAsync(connectionID, "AllUsers");}await base.OnDisconnectedAsync(exception);}// 在應用啟動時加載組信息public async Task InitializeGroupsAsync(){try{var groups = await myDbContext.Groups.Include(g => g.Members).ToListAsync();foreach (var group in groups){var groupInfo = new GroupInfo{GroupId = group.GroupId,GroupName = group.GroupName,MemberIds = group.Members.Select(m => m.UserId).ToHashSet()};_groups.TryAdd(group.GroupName, groupInfo);}}catch (Exception ex){throw;}}private async Task<IEnumerable<string>> GetUserGroupsAsync(long userId){return await myDbContext.GroupMembers.Where(gm => gm.UserId == userId).Select(gm => gm.Group.GroupName).ToListAsync();}/// <summary>/// 向所有用戶發送消息/// </summary>/// <param name="user"></param>/// <param name="content"></param>/// <returns></returns>[Authorize(Roles = "admin")]public async Task SendMessageAsync(string user, string content){//var connectionId = this.Context.ConnectionId;//string msg = $"{connectionId},{DateTime.Now.ToString()}:{user}";await Clients.All.SendAsync("ReceiveMsg", user, content);}/// <summary>/// 向特定用戶發送消息 /// </summary>/// <param name="toUserName">接收者</param>/// <param name="content">發送的消息</param>/// <returns></returns>public async Task SendPrivateMsgAsync(string toUserName, string content){try{var senderUserID = Context.UserIdentifier;var senderUser= await userManager.FindByIdAsync(senderUserID);var toUser = await userManager.FindByNameAsync(toUserName);await Clients.User(toUser.Id.ToString()).SendAsync("ReceivePrivateMsg", senderUser.UserName, content);}catch (Exception ex){throw;}} /// <summary>/// 向特定組發送消息/// </summary>/// <param name="groupName"></param>/// <param name="sender"></param>/// <param name="content"></param>/// <returns></returns>public async Task SendGroupMsgAsynnc(string groupName, string sender, string content){await Clients.Group(groupName).SendAsync("ReceiveGroupMsg", sender, groupName, content);}/// <summary>/// 向管理員組AdminUsers發送消息/// </summary>/// <param name="sender"></param>/// <param name="content"></param>/// <returns></returns>public async Task SendAdminMsgAsync(string sender, string content){await Clients.Group("AdminUsers").SendAsync("ReceiveAdminMsg", sender, content);}/// <summary>/// 向除發送者外的所有客戶端發送消息/// </summary>/// <param name="sender"></param>/// <param name="content"></param>/// <returns></returns>public async Task SendOthersMsg(string sender, string content){await Clients.Others.SendAsync("ReceiveMsg",sender, content);}/// <summary>/// 創建自定義組/// </summary>/// <param name="groupName"></param>/// <returns></returns>public async Task CreateGroup(string groupName){long userId = Convert.ToInt64(Context.UserIdentifier);if (_groups.ContainsKey(groupName)){await Clients.Caller.SendAsync("GroupCreationFailed", "組已存在");return;}// 創建新組并保存到數據庫var group = new Group{GroupName = groupName,CreatedAt = DateTime.UtcNow,CreatorId = userId};myDbContext.Groups.Add(group);await myDbContext.SaveChangesAsync();// 添加到內存緩存var groupInfo = new GroupInfo{GroupId = group.GroupId,GroupName = groupName,MemberIds = new HashSet<long> { userId }};_groups.TryAdd(groupName, groupInfo);// 創建者自動加入組 await AddUserToGroup(groupName, userId);await Clients.All.SendAsync("GroupCreated", groupName);}private async Task AddUserToGroup(string groupName, long userId){try{var groupInfo = _groups[groupName];// 添加到數據庫var groupMember = new GroupMember{GroupId = groupInfo.GroupId,UserId = userId,JoinedAt = DateTime.UtcNow};myDbContext.GroupMembers.Add(groupMember);await myDbContext.SaveChangesAsync();}catch (Exception){throw;}}/// <summary>/// 加入自定義組/// </summary>/// <param name="groupName"></param>/// <returns></returns>public async Task JoinGroup(string groupName){var userId = Convert.ToInt64(Context.UserIdentifier);if (!_groups.TryGetValue(groupName, out var groupInfo)){await Clients.Caller.SendAsync("JoinGroupFailed", "組不存在");return;}if (groupInfo.MemberIds.Contains(userId)){await Clients.Caller.SendAsync("JoinGroupFailed", "您已在該組中");return;}// 添加用戶到組await AddUserToGroup(groupName, userId);// 更新內存緩存groupInfo.MemberIds.Add(userId);// 將用戶加入 SignalR 組await Groups.AddToGroupAsync(Context.ConnectionId, groupName);await Clients.Group(groupName).SendAsync("UserJoinedGroup", Context.User.Identity.Name, groupName); }/// <summary>/// 用戶離開自定義組/// </summary>/// <param name="groupName"></param>/// <returns></returns>public async Task LeaveGroup(string groupName){var userId = Convert.ToInt64(Context.UserIdentifier); if (!_groups.TryGetValue(groupName, out var groupInfo) ||!groupInfo.MemberIds.Contains(userId)){await Clients.Caller.SendAsync("LeaveGroupFailed", "您不在該組中");return;}// 從組中移除用戶await RemoveUserFromGroup(groupName, userId);// 更新內存緩存groupInfo.MemberIds.Remove(userId);// 如果組為空,刪除組if (groupInfo.MemberIds.Count == 0){await DeleteGroup(groupName);}else{// 將用戶移出 SignalR 組await Groups.RemoveFromGroupAsync(Context.ConnectionId, groupName);await Clients.Group(groupName).SendAsync("UserLeftGroup", Context.User.Identity.Name, groupName);}}private async Task RemoveUserFromGroup(string groupName, long userId){var groupInfo = _groups[groupName];// 從數據庫移除var groupMember = await myDbContext.GroupMembers.FirstOrDefaultAsync(gm => gm.GroupId == groupInfo.GroupId && gm.UserId == userId);if (groupMember != null){myDbContext.GroupMembers.Remove(groupMember);await myDbContext.SaveChangesAsync();}}private async Task DeleteGroup(string groupName){if (_groups.TryRemove(groupName, out var groupInfo)){// 從數據庫刪除組var group = await myDbContext.Groups.FindAsync(groupInfo.GroupId);if (group != null){myDbContext.Groups.Remove(group);await myDbContext.SaveChangesAsync();}await Clients.All.SendAsync("GroupDeleted", groupName);}}public async Task SendProgress(string connectionId, ImportProgress progress){await Clients.Client(connectionId).SendAsync("ReceiveProgress", progress);}} }
2.實現CSV文件解析及數據導入服務
-
IImportECDictService.cs
namespace SignalRDemo.Interfaces {public interface IImportECDictService{Task ImportECDictAsync(string connectionId, IFormFile file);} }
-
ImportECDictService.cs
using SignalRDemo.Entity; using SignalRDemo.Interfaces; using System.Globalization; using CsvHelper; using CsvHelper.Configuration; using Microsoft.AspNetCore.SignalR; using SignalRDemo.HubService; using Microsoft.Data.SqlClient; using System.Data;namespace SignalRDemo.Repositories {public class ImportECDictService : IImportECDictService{private readonly IHubContext<MyHubService> _hubContext;private readonly IConfiguration _configuration;public ImportECDictService(IHubContext<MyHubService> hubContext, IConfiguration configuration){_hubContext = hubContext;_configuration = configuration;}public async Task ImportECDictAsync(string connectionId,IFormFile file){var progress = new ImportProgress { Status = "開始解析文件" };// 1. 解析CSV文件var records = await ParseCsvFile(file, connectionId, progress);// 2. 批量導入數據庫await BulkInsertToDatabase(records, connectionId, progress);}/// <summary>/// 解析CSV文件/// </summary>/// <param name="file"></param>/// <param name="connectionId"></param>/// <param name="progress"></param>/// <returns></returns>public async Task<List<ECDictCSV>> ParseCsvFile(IFormFile file,string connectionId,ImportProgress progress){var records = new List<ECDictCSV>();var config = new CsvConfiguration(CultureInfo.InvariantCulture){// 通過 CultureInfo 設置分隔符(英文環境默認為逗號)// 若需使用其他分隔符(如分號),可創建自定義 CultureInfoDelimiter = ",", // 此行為兼容性保留,實際由 CultureInfo 控制// 其他配置HasHeaderRecord = true,MissingFieldFound = null,HeaderValidated = null};using (var stream = file.OpenReadStream())using (var reader = new StreamReader(stream))using (var csv = new CsvReader(reader, config)){// 讀取所有記錄并映射到模型records = await csv.GetRecordsAsync<ECDictCSV>().ToListAsync();// 更新進度progress.TotalRecords = records.Count;progress.Status = "文件解析完成,準備導入數據庫";await _hubContext.Clients.Client(connectionId).SendAsync("ReceiveProgress", progress);}return records;}/// <summary>/// 分批次導入CSV數據到數據庫/// </summary>/// <param name="records"></param>/// <param name="connectionId"></param>/// <param name="progress"></param>/// <returns></returns>private async Task BulkInsertToDatabase(List<ECDictCSV> records,string connectionId,ImportProgress progress){const int batchSize = 100;var totalBatches = (int)Math.Ceiling((double)records.Count / batchSize);var connect=_configuration.GetSection("ConnectionStrings").Get<ConnectionStrings>();using (var connection = new SqlConnection(connect.DefaultConnection)){await connection.OpenAsync();for (int batchIndex = 0; batchIndex < totalBatches; batchIndex++){var batch = records.Skip(batchIndex * batchSize).Take(batchSize).ToList();using (var bulkCopy = new SqlBulkCopy(connection)){bulkCopy.DestinationTableName = "T_ECDictCSVs";// 映射列bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Word), "Word");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Phonetic), "Phonetic");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Definition), "Definition");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Translation), "Translation");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Pos), "Pos");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Collins), "Collins");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Oxford), "Oxford");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Tag), "Tag");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Bnc), "Bnc");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Frg), "Frg");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Exchange), "Exchange");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Detail), "Detail");bulkCopy.ColumnMappings.Add(nameof(ECDictCSV.Audio), "Audio");// 創建DataTablevar dataTable = new DataTable();dataTable.Columns.Add(nameof(ECDictCSV.Word), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Phonetic), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Definition), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Translation), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Pos), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Collins), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Oxford), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Tag), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Bnc), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Frg), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Exchange), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Detail), typeof(string));dataTable.Columns.Add(nameof(ECDictCSV.Audio), typeof(string));// 添加數據行foreach (var record in batch){dataTable.Rows.Add(record.Word,record.Phonetic, record.Definition,record.Translation,record.Pos, record.Collins, record.Oxford, record.Tag, record.Bnc, record.Frg, record.Exchange, record.Detail, record.Audio);}try{// 執行批量插入await bulkCopy.WriteToServerAsync(dataTable);// 更新進度progress.ProcessedRecords += batch.Count;progress.TotalRecords = records.Count;progress.Status = $"正在導入數據:批次 {batchIndex + 1}/{totalBatches}";await _hubContext.Clients.Client(connectionId).SendAsync("ReceiveProgress", progress);}catch (Exception ex){throw;}} }}// 導入完成progress.Status = "導入完成";await _hubContext.Clients.Client(connectionId).SendAsync("ReceiveProgress", progress);}} }
3.控制器
- ImportECDictController.cs
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Mvc; using SignalRDemo.Interfaces;namespace SignalRDemo.Controllers {[Route("api/[controller]/[action]")][ApiController]public class ImportECDictController : ControllerBase{private readonly IImportECDictService _importECDictService;public ImportECDictController(IImportECDictService importECDictService){_importECDictService = importECDictService;}[HttpPost]public async Task<IActionResult> UploadCsv(IFormFile file){if (file == null || file.Length == 0){return BadRequest("請選擇有效的 CSV 文件");}if (!Path.GetExtension(file.FileName).Equals(".csv", StringComparison.OrdinalIgnoreCase)){return BadRequest("文件必須是 CSV 格式");}// 獲取客戶端的SignalR連接IDvar connectionId = HttpContext.Request.Query["connectionId"].ToString();if (string.IsNullOrEmpty(connectionId))return BadRequest("缺少connectionId參數");try{await _importECDictService.ImportECDictAsync(connectionId,file);return Ok("導入任務已啟動");}catch (Exception ex){return StatusCode(500, $"解析失敗:{ex.Message}");}}} }
4.前端實現(vue)
-
示例:
<div class="card" v-if="state.isConnected"> <input type="file" id="csvFile" accept=".csv" /><button @click="uploadFile" :disabled="state.ecdictStatus.includes('正在導入數據')">導入</button><!-- 進度條組件 --><progress :value="state.currentCount" :max="state.total" class="custom-progress"/><!-- 顯示進度文本 --><div class="progress-info">{{ state.currentCount }} / {{ state.total }} ({{ progressPercentage }}%)</div><div id="status" style="color: #f87171;">{{state.ecdictStatus}}</div></div><script> import { reactive, computed,onMounted } from 'vue'; import * as signalR from '@microsoft/signalr';export default {setup() {const state = reactive({ serverUrl: "https://localhost:7183/Hubs/MyHubService",connection: null,isConnected: false,isConnecting: false,isLoggingIn: false,connectionId: null,total:0,currentCount:0,ecdictStatus:"",errorDetails:,// 消息記錄messages: [],});// 頁面加載時執行onMounted(async () => {//fetchRoles(); // 頁面加載時立即獲取角色列表});// 計算進度百分比const progressPercentage = computed(() => {return Math.round((state.currentCount / state.total) * 100);});const uploadFile= async()=>{ const fileInput = document.getElementById("csvFile");if (!fileInput.files || fileInput.files.length === 0) {alert("請選擇CSV文件");return;}const file = fileInput.files[0];const formData = new FormData();formData.append("file", file);try{if (!state.isConnected) return;state.ecdictStatus="正在導入數據";// 調用后端 API(需根據實際路徑調整)const apiUrl = state.serverUrl.split('/Hubs/')[0] || 'https://localhost:7183';const response = await fetch(`${apiUrl}/api/ImportECDict/UploadCsv?connectionId=${state.connectionId}`, {method: "POST", headers: {// 'Authorization': `Bearer ${state.token}` // 若需要認證},body: formData});if (!response.ok) {throw new Error(`導入失敗: ${response.status}`);}const upres = await response.json();}catch(error){console.error("導入數據失敗:", error);state.errorDetails = error.message;}};// 初始化SignalR連接const initSignalRConnection = async (token) => {state.isConnecting = true;state.connectionStatus = "正在連接...";state.errorDetails = "";try {if (state.connection) {await state.connection.stop();state.connection = null;}// 創建新連接state.connection = new signalR.HubConnectionBuilder().withUrl(state.serverUrl, {accessTokenFactory: () => token,skipNegotiation: true,transport: signalR.HttpTransportType.WebSockets}).withAutomaticReconnect().configureLogging(signalR.LogLevel.Information).build();// 注冊消息處理程序// 監聽服務端發送的 connectionIdstate.connection.on("ReceiveConnectionId", (connectionId) => {state.connectionId = connectionId;console.log("從服務端獲取的連接ID:", connectionId);});//監聽導入進度消息state.connection.on("ReceiveProgress",(progress)=>{console.warn("progress.ProcessedRecords ", progress.processedRecords);console.warn("progress.TotalRecords>0", progress.totalRecords);// 確保總記錄數和已處理記錄數有效if (progress.totalRecords > 0 && progress.processedRecords <= progress.totalRecords) {state.total = progress.totalRecords;state.currentCount = progress.processedRecords;state.ecdictStatus=progress.status;} else {console.warn("無效的進度數據:", progress);}// state.total=progress.TotalRecords;// state.currentCount=progress.ProcessedRecords;});// 連接狀態變化state.connection.onreconnecting(() => {state.isConnected = false;state.connectionStatus = "連接丟失,正在重連...";});state.connection.onreconnected(async (connectionId) => {state.isConnected = true;state.connectionStatus = "已重新連接";});state.connection.onclose(() => {state.isConnected = false;state.connectionStatus = "連接已關閉";});// 啟動連接await state.connection.start();alert("連接狀態:"+ state.connection.state); // 應為 "Connected"state.isConnected = true;state.isConnecting = false;state.connectionId = state.connection.connectionId;//因為異步原因,此處可能為空state.connectionStatus = "已連接";} catch (error) {state.isConnected = false;state.isConnecting = false;state.connectionStatus = `連接失敗: ${error.message}`;state.errorDetails = error.toString();} };return {...//其他方法uploadFile,progressPercentage};} } </script>
三、關鍵技術點說明
-
SqlBulkCopy 優化:
- 使用分批處理(每 X 條記錄一批)
- 直接映射列以提高性能
- 使用異步方法避免阻塞線程
-
進度通知機制:
- 客戶端通過 SignalR 建立持久連接
- 服務端按批次更新進度并推送
- 前端實時更新進度條和狀態信息
-
錯誤處理:
- 捕獲并返回導入過程中的異常
- 確保事務一致性(必要時可使用數據庫事務)
總結
通過以上步驟,可以實現簡單的CSV文件解析,批量導入數據庫,實時顯示進度的功能。