EF Core 批量寫與“軟實時”一致性:ExecuteUpdate / COPY / SqlBulkCopy 的取舍與事務權衡 ?
📚 目錄
- EF Core 批量寫與“軟實時”一致性:ExecuteUpdate / COPY / SqlBulkCopy 的取舍與事務權衡 ?
- 1. 術語與目標 🧭
- 2. 技術選型總覽 🧰
- 3. 建模與熱點隔離 🌋
- 4. 寫通道:背壓 + 微批 🛣?
- 5. 三類批量寫法與工程細節 🔧
- 5.1 集合級更新/刪除:`ExecuteUpdate / ExecuteDelete` ??
- 5.2 PostgreSQL:`COPY BINARY`(Npgsql) 🐘🚀
- 5.3 SQL Server:`SqlBulkCopy`(常規 + 流式)🧱??
- 6. 事務與隔離級別 🧪
- 7. 一致性策略(面向業務的選擇)??
- 8. 讀寫一致性與緩存 🧩
- 9. 可觀測性與告警(最低集)📈🔔
- 10. 實驗與基準 🧪🧪
- 10.1 微基準(BenchmarkDotNet)
- 10.2 壓測(k6):**uuidv4()** ?
- 11. 失敗與回滾策略 🛡?
- 12. 代碼與配置清單 🧱
- 12.1 模型
- 12.2 DbContext(PG/SQL Server 通用)
- 12.3 批量實現(核心方法)
- 12.4 Web API 入隊口(限流/限包/校驗)
- 12.5 appsettings(關鍵參數)
- 13. 選型決策樹 🌳
TL;DR
插入為主:PostgreSQL 選 COPY(Binary);SQL Server 選 SqlBulkCopy。
更新/刪除為主:優先 ExecuteUpdate/ExecuteDelete(集合級 DML,無需加載實體)。
Upsert:PG 用
INSERT ... ON CONFLICT DO UPDATE
;SQL Server 用MERGE
(小批量+唯一約束,謹慎)。一致性:高吞吐優先 有界通道 + 微批,以時間窗 + 條數雙門限觸發;讀側承諾軟實時 SLO并可回補。
要點:
COPY FROM
會觸發表觸發器/檢查約束(不觸發 rules);timestamptz
只接受 UTC。SqlBulkCopy
默認不觸發觸發器/不檢查約束;可用選項顯式開啟;超大批量改流式。
1. 術語與目標 🧭
- 批量寫:一次提交多行 DML,追求吞吐、降低往返/日志/WAL 開銷。
- 軟實時一致性:允許讀模型滯后幾十毫秒~數秒,但有可觀測上限(SLO)與回補(冪等補償/重放)。
- 目標:最大化寫吞吐(不拖垮庫)、讀延遲可控、失敗可回收、全鏈路可觀測。
2. 技術選型總覽 🧰
場景 | 首選 | 備選 | 關鍵點 | |
---|---|---|---|---|
批量插入(PG) | COPY (Binary) | 多值 INSERT/ON CONFLICT | COPY 吞吐最佳;COPY FROM 觸發表觸發器、檢查約束(不觸發 rules)。 | |
批量插入(SQL Server) | SqlBulkCopy | 多值 INSERT/MERGE | 默認不觸發觸發器/不檢查約束,需 `FireTriggers | CheckConstraints;可用 TableLock`。 |
批量更新/刪除 | ExecuteUpdate/ExecuteDelete | MERGE/手寫 SQL | 集合級 DML,不載入實體,單語句更新/刪除。 | |
Upsert | PG ON CONFLICT DO UPDATE | SQL Server MERGE | 有唯一約束/冪等鍵;MERGE 歷史缺陷多,生產務必小批+足量測試。 | |
讀你所寫 | 小批同步提交+版本水位 | —— | 吞吐受限,適合強一致界面。 | |
軟實時(推薦) | 有界通道+微批刷寫 | Outbox/事件→讀庫 | 給定追平上限(SLO);提供刷新/通知與水位線。 |
數據通路(分層 + 批觸發條件):
3. 建模與熱點隔離 🌋
- 冪等鍵:如
(tenant_id, business_id)
唯一約束,支撐 Upsert 與重放;Upsert 日志需記錄影響行數與沖突鍵用于審計。 - 分區/分片:分散寫熱點到不同分區/索引;SQL Server 可使用分區表與合適
FILLFACTOR
,PG 可用聲明式分區。 - 索引策略:大批量導入前可暫時禁用或移除次要非聚集索引,導入后再重建;或設置較低
FILLFACTOR
減少頁分裂。 - 任務/隊列表(PG):
FOR UPDATE SKIP LOCKED
支持多消費者不阻塞拉取,適合軟實時流水線。
4. 寫通道:背壓 + 微批 🛣?
關鍵:有界通道(滿→等待/丟棄/降級)、事件驅動消費(
WaitToReadAsync
)、雙門限(條數/時間)、錯誤防護/死信、冪等重放。
// Program.cs
builder.Services.AddHostedService<BulkWriter>();
builder.Services.AddSingleton(Channel.CreateBounded<WriteItem>(new BoundedChannelOptions(50_000) {FullMode = BoundedChannelFullMode.Wait, // DropOldest/DropNewest/DropWrite 按需選擇SingleReader = true, SingleWriter = false}));// (可選)DbContext 池化,降低分配開銷
builder.Services.AddDbContextPool<AppDbContext>(o => /* options */);
public sealed class BulkWriter : BackgroundService
{private readonly Channel<WriteItem> _ch;private readonly IServiceProvider _sp;private const int MaxBatch = 5000;private static readonly TimeSpan MaxWait = TimeSpan.FromMilliseconds(100);public BulkWriter(Channel<WriteItem> ch, IServiceProvider sp) { _ch = ch; _sp = sp; }protected override async Task ExecuteAsync(CancellationToken ct){var buffer = new List<WriteItem>(MaxBatch);var lastFlush = System.Diagnostics.Stopwatch.StartNew();while (await _ch.Reader.WaitToReadAsync(ct)){while (_ch.Reader.TryRead(out var item)){buffer.Add(item);if (buffer.Count >= MaxBatch) break;}if (buffer.Count >= MaxBatch || lastFlush.Elapsed >= MaxWait){await FlushSafeAsync(buffer, ct);buffer.Clear();lastFlush.Restart();}}// drainif (buffer.Count > 0) await FlushSafeAsync(buffer, ct);}private async Task FlushSafeAsync(List<WriteItem> batch, CancellationToken ct){if (batch.Count == 0) return;try { await FlushAsync(batch, ct); }catch (Exception ex){// TODO: 打點/日志await DeadLetterSink.WriteAsync(batch, ex, ct); // -> 死信}}private async Task FlushAsync(List<WriteItem> batch, CancellationToken ct){using var scope = _sp.CreateScope();var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();if (db.Database.IsNpgsql()) await BulkImpl.PostgresCopyAsync(db, batch, ct);else if (db.Database.IsSqlServer()){if (batch.Count >= 100_000)await BulkImpl.SqlServerBulkCopyStreamAsync(db, batch, ct); // 超大批:流式elseawait BulkImpl.SqlServerBulkCopyAsync(db, batch, ct); // 常規:DataTable}}
}
時序(背壓/Accepted/批刷寫):
5. 三類批量寫法與工程細節 🔧
5.1 集合級更新/刪除:ExecuteUpdate / ExecuteDelete
??
var cutoff = DateTimeOffset.UtcNow.AddHours(-1);
var affected = await db.Orders.Where(o => o.TenantId == tenant && o.Status == OrderStatus.Pending && o.UpdatedAt < cutoff).ExecuteUpdateAsync(s => s.SetProperty(o => o.Status, _ => OrderStatus.Closed).SetProperty(o => o.UpdatedAt, _ => DateTimeOffset.UtcNow), ct);
// 記錄 affected 便于審計;注意:該路徑繞開樂觀并發標記/行版本
5.2 PostgreSQL:COPY BINARY
(Npgsql) 🐘🚀
await using var conn = (NpgsqlConnection)db.Database.GetDbConnection();
if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);await using var wr = conn.BeginBinaryImport("COPY public.metrics (tenant_id, id, ts, value) FROM STDIN (FORMAT BINARY)");foreach (var r in rows)
{wr.StartRow();wr.Write(r.TenantId, NpgsqlDbType.Text);wr.Write(r.Id, NpgsqlDbType.Uuid);// ? timestamptz 只接受 UTC(Offset=0)wr.Write(r.Ts.ToUniversalTime(), NpgsqlDbType.TimestampTz);wr.Write(r.Value, NpgsqlDbType.Double);
}
await wr.CompleteAsync(ct); // 未 Complete/Dispose 即取消并回滾
多次 COPY 納入同一事務:外層
BeginTransaction()
,多次 COPY 后統一Commit()
。
5.3 SQL Server:SqlBulkCopy
(常規 + 流式)🧱??
常規(DataTable) —— 便于快速復現:
await using var conn = (SqlConnection)db.Database.GetDbConnection();
if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);
await using var tx = await conn.BeginTransactionAsync(ct);using var bulk = new SqlBulkCopy(conn,SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.TableLock,(SqlTransaction)tx)
{DestinationTableName = "[dbo].[metrics]",BatchSize = 5000,BulkCopyTimeout = 120
};var table = new DataTable();
table.Columns.Add("tenant_id", typeof(string));
table.Columns.Add("id", typeof(Guid));
table.Columns.Add("ts", typeof(DateTimeOffset)); // -> datetimeoffset
table.Columns.Add("value", typeof(double));
foreach (var r in rows) table.Rows.Add(r.TenantId, r.Id, r.Ts, r.Value);bulk.ColumnMappings.Add("tenant_id", "tenant_id");
bulk.ColumnMappings.Add("id", "id");
bulk.ColumnMappings.Add("ts", "ts");
bulk.ColumnMappings.Add("value", "value");bulk.NotifyAfter = 5000;
bulk.SqlRowsCopied += (_, e) => Console.WriteLine($"Copied: {e.RowsCopied}");await bulk.WriteToServerAsync(table, ct);
await tx.CommitAsync(ct);
超大批(流式:低內存) —— 切換閾值示例 >= 100_000
:
public static async Task SqlServerBulkCopyStreamAsync(AppDbContext db, List<WriteItem> rows, CancellationToken ct)
{var conn = (SqlConnection)db.Database.GetDbConnection();if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);var meta = new[]{new SqlMetaData("tenant_id", SqlDbType.NVarChar, 128),new SqlMetaData("id", SqlDbType.UniqueIdentifier),new SqlMetaData("ts", SqlDbType.DateTimeOffset),new SqlMetaData("value", SqlDbType.Float)};IEnumerable<SqlDataRecord> Stream(){foreach (var r in rows){var rec = new SqlDataRecord(meta);rec.SetString(0, r.TenantId);rec.SetGuid(1, r.Id);rec.SetDateTimeOffset(2, r.Ts);rec.SetDouble(3, r.Value);yield return rec;}}using var bulk = new SqlBulkCopy(conn){DestinationTableName = "[dbo].[metrics]",BatchSize = 5000,BulkCopyTimeout = 120};await bulk.WriteToServerAsync(Stream().GetEnumerator(), ct);
}
說明:
SqlBulkCopyOptions
默認不觸發觸發器/不檢查約束;如需等效常規 DML,請顯式開啟。若有標識列并需保留源值,使用KeepIdentity
。
6. 事務與隔離級別 🧪
-
小批多提 vs 大批一提:小批降低鎖持有和回滾代價;大批吞吐更高但失敗成本大。
-
隔離級別:
- SQL Server:啟用 RCSI(
READ_COMMITTED_SNAPSHOT ON
)在讀已提交下讀取版本,減少讀寫沖突。 - PostgreSQL:任務/隊列使用
FOR UPDATE SKIP LOCKED
避免“搶同一行”。
- SQL Server:啟用 RCSI(
-
冪等:唯一鍵 + Upsert,保證“效果一次”。
7. 一致性策略(面向業務的選擇)??
策略 | 描述 | 讀你所寫 | 吞吐 | 實施 |
---|---|---|---|---|
強同步 | API 等到落庫 | 強 | 中 | 小批同步提交 |
軟實時(推薦) | API 快速 ACK,后臺批寫,承諾追平上限(SLO) | 可配 | 高 | 有界通道+微批+“水位線” |
最終一致 | 讀庫異步構建 | 弱 | 最高 | Outbox/Event→讀庫 |
一致性戰略圖(含 p99 SLA):
8. 讀寫一致性與緩存 🧩
- 寫后短期寫側緩存(30–120s TTL),鍵含
(tenant,key,version)
; - 批更新后發布失效事件或寫水位線(如
last_applied_offset
),供前端判斷“是否追平”。
9. 可觀測性與告警(最低集)📈🔔
- 寫通道:入隊速率、隊列深度、高/低水位、批大小、批耗時 p95/p99、失敗/重試率。
- 數據庫:鎖等待、WAL/日志增長、檢查點時間、索引膨脹。
- SLO:入隊→可讀 p99 ≤ 2s,連續 5 分鐘越界告警。
- 實現建議:統一封裝 Prometheus/OpenTelemetry 指標(把
SqlRowsCopied
、EFLogTo
、隊列深度等采集起來)。
10. 實驗與基準 🧪🧪
10.1 微基準(BenchmarkDotNet)
[MemoryDiagnoser]
public class BulkBench
{private readonly List<WriteItem> _batch = DataGen.Generate(5000);private AppDbContext _db = default!;[GlobalSetup]public void Setup() => _db = DbFactory.Create();[Benchmark] public Task PG_Copy() => BulkImpl.PostgresCopyAsync(_db, _batch, CancellationToken.None);[Benchmark] public Task SQL_BulkCopy() => BulkImpl.SqlServerBulkCopyAsync(_db, _batch, CancellationToken.None);[Benchmark] public Task EF_ExecuteUpdate() => BulkImpl.ExecuteUpdateAsync(_db, CancellationToken.None);
}
10.2 壓測(k6):uuidv4() ?
import http from 'k6/http';
import { sleep } from 'k6';
import { uuidv4 } from 'https://jslib.k6.io/k6-utils/1.4.0/index.js';export const options = { vus: 50, duration: '2m' };function genItems(n) {const items = [];for (let i = 0; i < n; i++) {items.push({tenantId: 't1',id: uuidv4(), // ?ts: new Date().toISOString(),value: Math.random()});}return items;
}export default function () {const payload = JSON.stringify({ tenantId: 't1', items: genItems(100) });http.post('http://localhost:5000/api/ingest', payload, { headers: { 'Content-Type': 'application/json' } });sleep(0.1);
}
11. 失敗與回滾策略 🛡?
- 批失敗二分:將失敗批二分定位壞記錄(或壞子集),異常數據入死信并記錄原因/哈希。
- 重試:指數退避,超限轉死信;日終冪等補償腳本按唯一鍵重放。
- 審計:批次表記錄
batch_id
、count
、failures
、dur_ms
、時間戳。
建議準備
dead_letters
表(含批次 ID、異常摘要、payload hash、首次/末次時間、重試次數)。
12. 代碼與配置清單 🧱
12.1 模型
public record WriteItem(string TenantId, Guid Id, DateTimeOffset Ts, double Value);public class Metric
{public string TenantId { get; set; } = default!;public Guid Id { get; set; }public DateTimeOffset Ts { get; set; }public double Value { get; set; }
}
12.2 DbContext(PG/SQL Server 通用)
public class AppDbContext : DbContext
{public DbSet<Metric> Metrics => Set<Metric>();public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }protected override void OnModelCreating(ModelBuilder b){b.Entity<Metric>(e =>{e.ToTable("metrics");e.HasKey(x => new { x.TenantId, x.Id }); // 冪等鍵e.Property(x => x.Ts).HasColumnName("ts");e.HasIndex(x => x.Ts);});}
}
12.3 批量實現(核心方法)
public static class BulkImpl
{public static async Task PostgresCopyAsync(AppDbContext db, List<WriteItem> rows, CancellationToken ct){var conn = (NpgsqlConnection)db.Database.GetDbConnection();if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);await using var wr = conn.BeginBinaryImport("COPY public.metrics (tenant_id, id, ts, value) FROM STDIN (FORMAT BINARY)");foreach (var r in rows){wr.StartRow();wr.Write(r.TenantId, NpgsqlDbType.Text);wr.Write(r.Id, NpgsqlDbType.Uuid);wr.Write(r.Ts.ToUniversalTime(), NpgsqlDbType.TimestampTz); // ? UTCwr.Write(r.Value, NpgsqlDbType.Double);}await wr.CompleteAsync(ct); // 未 Complete 則回滾}// 常規模式:DataTablepublic static async Task SqlServerBulkCopyAsync(AppDbContext db, List<WriteItem> rows, CancellationToken ct){var table = new DataTable();table.Columns.Add("tenant_id", typeof(string));table.Columns.Add("id", typeof(Guid));table.Columns.Add("ts", typeof(DateTimeOffset));table.Columns.Add("value", typeof(double));foreach (var r in rows) table.Rows.Add(r.TenantId, r.Id, r.Ts, r.Value);var conn = (SqlConnection)db.Database.GetDbConnection();if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);using var bulk = new SqlBulkCopy(conn,SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.TableLock,null){DestinationTableName = "[dbo].[metrics]",BatchSize = 5000,BulkCopyTimeout = 120};bulk.ColumnMappings.Add("tenant_id", "tenant_id");bulk.ColumnMappings.Add("id", "id");bulk.ColumnMappings.Add("ts", "ts");bulk.ColumnMappings.Add("value", "value");bulk.NotifyAfter = 5000;bulk.SqlRowsCopied += (_, e) => Console.WriteLine($"Copied: {e.RowsCopied}");await bulk.WriteToServerAsync(table, ct);}// 流式模式:IEnumerable<SqlDataRecord>(低內存)public static async Task SqlServerBulkCopyStreamAsync(AppDbContext db, List<WriteItem> rows, CancellationToken ct){var conn = (SqlConnection)db.Database.GetDbConnection();if (conn.State != ConnectionState.Open) await conn.OpenAsync(ct);var meta = new[]{new SqlMetaData("tenant_id", SqlDbType.NVarChar, 128),new SqlMetaData("id", SqlDbType.UniqueIdentifier),new SqlMetaData("ts", SqlDbType.DateTimeOffset),new SqlMetaData("value", SqlDbType.Float)};IEnumerable<SqlDataRecord> Stream(){foreach (var r in rows){var rec = new SqlDataRecord(meta);rec.SetString(0, r.TenantId);rec.SetGuid(1, r.Id);rec.SetDateTimeOffset(2, r.Ts);rec.SetDouble(3, r.Value);yield return rec;}}using var bulk = new SqlBulkCopy(conn){DestinationTableName = "[dbo].[metrics]",BatchSize = 5000,BulkCopyTimeout = 120};await bulk.WriteToServerAsync(Stream().GetEnumerator(), ct);}public static async Task ExecuteUpdateAsync(AppDbContext db, CancellationToken ct){var cutoff = DateTimeOffset.UtcNow.AddHours(-1);var rows = await db.Metrics.Where(m => m.Ts < cutoff).ExecuteUpdateAsync(s => s.SetProperty(m => m.Value, m => m.Value * 0.99), ct);// rows -> 審計}
}
12.4 Web API 入隊口(限流/限包/校驗)
[ApiController, Route("api/ingest")]
public class IngestController : ControllerBase
{private readonly Channel<WriteItem> _ch;public IngestController(Channel<WriteItem> ch) => _ch = ch;[HttpPost][RequestSizeLimit(5 * 1024 * 1024)] // 5MB:防大包;按需上調public async Task<IActionResult> Post([FromBody] IngestRequest req, CancellationToken ct){if (req.Items is null || req.Items.Count == 0 || req.Items.Count > 10_000)return BadRequest("Items count out of range");foreach (var it in req.Items){var ok = await _ch.Writer.WaitToWriteAsync(ct) && _ch.Writer.TryWrite(it);if (!ok) return StatusCode(StatusCodes.Status429TooManyRequests); // 背壓}return Accepted(); // 軟實時快速 ACK}
}public record IngestRequest(string TenantId, List<WriteItem> Items);
12.5 appsettings(關鍵參數)
{"Bulk": {"MaxBatchSize": 5000,"MaxBatchIntervalMs": 100,"ChannelCapacity": 50000,"Retry": { "MaxAttempts": 3, "BaseDelayMs": 100 }},"ConnectionStrings": {"Pg": "Host=localhost;Username=postgres;Password=postgres;Database=app;","Sql": "Server=localhost,1433;User Id=sa;Password=Pass@word1;Encrypt=False;TrustServerCertificate=True"}
}