🚦 差分隱私在運營指標:ABP 的 DP 計數器與噪聲預算
📚 目錄
- 🚦 差分隱私在運營指標:ABP 的 DP 計數器與噪聲預算
- 0. TL;DR 🚀
- 📈 一圖看懂(寫入→發布→預算→加噪)
- 1. 背景與邊界 🧭
- 2. DP 工程口徑 📐
- 3. 目標架構(ABP 集成)🏗?
- 4. 數據/配置模型 🗂?
- 5. 關鍵實現 🧩
- 5.1 接口統一(會計以窗口起點為主鍵)
- 5.2 隨機源與噪聲(開區間采樣 + 可緩存正態)
- 5.3 窗口對齊(上一窗,UTC)與預算鍵
- 5.4 Redis 會計(Lua 原子消費 + TTL 對齊“窗口剩余秒數” + 配置化上限)
- 5.5 發布 Worker(上一窗發布 + 冪等鎖 + 小樣本抑制 + 預算校驗 + 加噪)
- 5.6 比例/均值指標(分子/分母分別加噪 + 小樣本抑制)
- 6. 灰度與回滾 🎛?
- 7. 可視化與審計 📊
- ?? 窗口與 TTL 對齊示意
- 8. 評測腳本 🧪
- 9. 上線步驟 ?
- 10. 倉庫結構建議 🧱
- 11. 常見坑 🧨
0. TL;DR 🚀
- 👤 隱私單元(Privacy Unit)選擇 用戶/設備級,寫路徑先按隱私單元聚合并剪裁貢獻上界 K。
- ? 計數/求和 → Laplace(尺度
b = Δ/ε
);📈 比例/均值 → Gaussian(生產建議解析高斯校準 σ;示例以經典上界為保守缺省)。 - 🧩 后處理不降隱私:結果可做非負截斷、舍入、分桶、平滑等業務約束。
- 📒 預算賬本:按“租戶×指標×窗口開始時刻(UTC)”累計 ε/δ;Key 與窗口嚴格對齊;Lua 原子消費;TTL=窗口剩余秒數;超額硬阻斷。
- 🔒 發布冪等:周期任務在每個窗口結束時發布上一窗(UTC);用 Redis SET NX 建立
released
鎖,保證只發布一次(并可 Upsert 落庫)。 - 🧪 灰度與 A/B:按租戶/功能開關;以 MAPE / P95 誤差閾值放量;隨時回滾。
📈 一圖看懂(寫入→發布→預算→加噪)
1. 背景與邊界 🧭
- 🔐 與加密/RLS:加密/RLS 決定“誰能看真值”;差分隱私(DP)約束“公開后能泄露多少”。
- ? 適用:DAU/留存、功能調用次數、錯誤率、漏斗流量等群體指標;? 不用于強一致計費或逐個體對賬。
- ?? 風險與對策:合謀/跨窗差分/滑動窗反推 → 隱私預算、最小樣本門檻、發布限頻、審計追溯。
2. DP 工程口徑 📐
-
📏 敏感度 Δ:單個隱私單元在單窗口的最大貢獻(經 K 剪裁后)。
-
🔊 Laplace:
b = Δ/ε
;?? Gaussian:σ = f(ε, δ, Δ)
(生產建議“解析高斯”或接入 OpenDP 校準;示例用經典上界)。 -
🧮 組合/會計:
- 拉普拉斯:ε 線性加和(基礎組合)。
- 高斯:用 RDP 按 α 網格累加,再對給定 δ 反推
ε*(δ) = min_α {RDP(α) + ln(1/δ)/(α-1)}
,展示“已用/剩余”。
-
🧯 后處理不傷隱私:非負截斷、舍入、分桶、平滑等結果級操作不降低 DP 保證。
3. 目標架構(ABP 集成)🏗?
Abp.DpMetrics 模塊(應用層)
IDpCounter.Increment(tenant, metric, unitId, amount=1)
:寫路徑,按隱私單元聚合→剪裁 K→入庫。DpPublishWorker
:周期發布;按窗口結束發布上一窗(UTC);發布冪等鎖→預算校驗→加噪→后處理→審計落表。IPrivacyAccountant
(Redis/DB 實現):存(tenant, metric, windowStartUtc)
的 ε/δ 用量與上限;提供Lua 原子 TryConsume。IBudgetPolicyProvider
:預算上限(ε/δ cap)配置化,支持按租戶/指標/窗口粒度下發。IReleaseLog
:存噪聲參數、ε/δ、機制、真值哈希、seed 哈希、審批單。
組件關系圖
4. 數據/配置模型 🗂?
5. 關鍵實現 🧩
代碼為最小可運行模板,重點展示口徑與關鍵邊界處理;生產可替換為解析高斯校準、完善 RDP 會計、指標門檻配置中心化等。
5.1 接口統一(會計以窗口起點為主鍵)
public interface IDpCounter {Task IncrementAsync(string tenantId, string metric, string privacyUnitId, int amount = 1);
}public interface IPrivacyAccountant {Task<bool> TryConsumeAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window,double epsilon, double? delta);Task<(double usedEps, double? usedDelta, double capEps, double? capDelta)>GetUsageAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window);
}public interface IBudgetPolicyProvider {(double capEps, double? capDel) GetCaps(string tenantId, string metric, TimeSpan window);
}public enum DpMechanism { Laplace, Gaussian }
5.2 隨機源與噪聲(開區間采樣 + 可緩存正態)
public interface IRandomSource { double NextUnit(); }public sealed class CryptoRandom : IRandomSource {private readonly System.Security.Cryptography.RandomNumberGenerator _rng= System.Security.Cryptography.RandomNumberGenerator.Create();public double NextUnit(){Span<byte> b = stackalloc byte[8]; _rng.GetBytes(b);ulong u = BitConverter.ToUInt64(b);return (u >> 11) * (1.0 / (1UL << 53)); // [0,1)}
}public static class RandUtil {public static double Open01(IRandomSource r) {double u; do { u = r.NextUnit(); } while (u <= double.Epsilon || u >= 1.0 - double.Epsilon);return u; // (0,1)}
}public sealed class GaussianSampler {private readonly IRandomSource _r; private bool _has; private double _cache;public GaussianSampler(IRandomSource r){ _r = r; }public double Next(){if (_has){ _has=false; return _cache; }var u1 = RandUtil.Open01(_r); var u2 = RandUtil.Open01(_r);var mag = Math.Sqrt(-2.0 * Math.Log(u1));var z0 = mag * Math.Cos(2.0 * Math.PI * u2);var z1 = mag * Math.Sin(2.0 * Math.PI * u2);_cache = z1; _has = true; return z0; // 產出兩個,緩存一個}
}public static class DpNoise {public static double Laplace(double x, double b, IRandomSource r){double u = RandUtil.Open01(r) - 0.5; // (-0.5,0.5)double noise = -b * Math.Sign(u) * Math.Log(1 - 2 * Math.Abs(u));return Math.Max(0, x + noise); // 后處理:非負}public static double ClassicGaussianSigma(double deltaF, double eps, double delta){return Math.Sqrt(2 * Math.Log(1.25 / delta)) * deltaF / eps; // 經典上界}
}
5.3 窗口對齊(上一窗,UTC)與預算鍵
static (DateTimeOffset Start, DateTimeOffset End) PrevWindowUtc(IClock clock, TimeSpan w){var now = clock.Now.ToUniversalTime();long sec = (long)w.TotalSeconds;long end = (now.ToUnixTimeSeconds() / sec) * sec; // 當前窗結束(UTC)long start = end - sec; // 上一窗 [start,end)return (DateTimeOffset.FromUnixTimeSeconds(start), DateTimeOffset.FromUnixTimeSeconds(end));
}
static string BudgetKey(string tenant, string metric, TimeSpan w, DateTimeOffset windowStartUtc){return $"dp:budget:{tenant}:{metric}:{(int)w.TotalSeconds}:{windowStartUtc.ToUnixTimeSeconds()}";
}
5.4 Redis 會計(Lua 原子消費 + TTL 對齊“窗口剩余秒數” + 配置化上限)
public sealed class RedisPrivacyAccountant : IPrivacyAccountant {private readonly IConnectionMultiplexer _redis; private readonly IBudgetPolicyProvider _caps;public RedisPrivacyAccountant(IConnectionMultiplexer redis, IBudgetPolicyProvider caps){ _redis = redis; _caps = caps; }private const string LUA = @"local key = KEYS[1]local epsUse = tonumber(ARGV[1])local delUse = (ARGV[2] ~= '' and tonumber(ARGV[2]) or nil)local capEps = tonumber(ARGV[3])local capDel = (ARGV[4] ~= '' and tonumber(ARGV[4]) or nil)local ttlSec = tonumber(ARGV[5])local used = redis.call('GET', key)local usedEps, usedDel = 0, 0if used thenlocal i = string.find(used, '|')usedEps = tonumber(string.sub(used, 1, i-1))usedDel = tonumber(string.sub(used, i+1))endif usedEps + epsUse > capEps then return 0 endif capDel and delUse and (usedDel + delUse > capDel) then return 0 endusedEps = usedEps + epsUseusedDel = usedDel + (delUse or 0)redis.call('SET', key, usedEps .. '|' .. usedDel)if ttlSec > 0 then redis.call('EXPIRE', key, ttlSec) endreturn 1";public async Task<bool> TryConsumeAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window, double epsilon, double? delta){var db = _redis.GetDatabase();var key = BudgetKey(tenantId, metric, window, windowStartUtc);var (capEps, capDel) = _caps.GetCaps(tenantId, metric, window);var nowUtc = DateTimeOffset.UtcNow;var we = windowStartUtc + window;int ttlSec = (int)Math.Max(1, (we - nowUtc).TotalSeconds); // TTL 對齊窗口結束var result = (long)await db.ScriptEvaluateAsync(LUA,new RedisKey[]{ key },new RedisValue[]{epsilon,delta.HasValue ? (RedisValue)delta.Value : RedisValue.EmptyString,capEps,capDel.HasValue ? (RedisValue)capDel.Value : RedisValue.EmptyString,ttlSec});return result == 1L;}public async Task<(double, double?, double, double?)> GetUsageAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window){var db = _redis.GetDatabase();var key = BudgetKey(tenantId, metric, window, windowStartUtc);var s = await db.StringGetAsync(key);double usedEps = 0, usedDelRaw = 0;if (!s.IsNullOrEmpty){var parts = ((string)s!).Split('|');usedEps = double.Parse(parts[0], System.Globalization.CultureInfo.InvariantCulture);usedDelRaw= double.Parse(parts[1], System.Globalization.CultureInfo.InvariantCulture);}var (capEps, capDel) = _caps.GetCaps(tenantId, metric, window);double? usedDel = capDel.HasValue ? usedDelRaw : (double?)null;return (usedEps, usedDel, capEps, capDel);}var caps = _caps.GetCaps(tenantId, metric, window);return (usedEps, double.IsNaN(usedDel)? null : usedDel, caps.capEps, caps.capDel);}
}
5.5 發布 Worker(上一窗發布 + 冪等鎖 + 小樣本抑制 + 預算校驗 + 加噪)
public sealed class DpPublishWorker : AsyncPeriodicBackgroundWorkerBase, ITransientDependency {private readonly IClock _clock; private readonly IDpRepository _repo;private readonly IPrivacyAccountant _acct; private readonly IRandomSource _rng = new CryptoRandom();private readonly GaussianSampler _gau; private readonly IConnectionMultiplexer _redis;public DpPublishWorker(AbpTimer timer, IClock clock, IDpRepository repo, IPrivacyAccountant acct, IConnectionMultiplexer redis): base(timer){ _clock = clock; _repo = repo; _acct = acct; _gau = new GaussianSampler(_rng); _redis = redis; Timer.Period = 60_000; }protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext context){var ct = context.CancellationToken;foreach (var def in await _repo.ListMetricDefinitionsAsync()){ct.ThrowIfCancellationRequested();var (ws, we) = PrevWindowUtc(_clock, def.Window); // 發布上一窗(UTC)// 冪等發布鎖:只允許一個實例發布該窗var db = _redis.GetDatabase();int ttlSec = (int)Math.Max(1, (we - _clock.Now.ToUniversalTime()).TotalSeconds);var relKey = $"dp:released:{def.TenantId}:{def.Name}:{(int)def.Window.TotalSeconds}:{ws.ToUnixTimeSeconds()}";bool acquired = await db.StringSetAsync(relKey, "1", TimeSpan.FromSeconds(ttlSec), when: When.NotExists);if (!acquired) continue;var (trueCount, distinctUnits) = await _repo.AggregateAsync(def, ws, we);if (distinctUnits < def.MinDistinctUnits){ await _repo.LogSuppressedAsync(def, ws, we); continue; }var ok = await _acct.TryConsumeAsync(def.TenantId, def.Name, ws, def.Window, def.Epsilon, def.Delta);if (!ok){ await _repo.LogBudgetExceededAsync(def, ws, we); continue; }double noisy;if (def.Mechanism == DpMechanism.Laplace){var b = def.SensitivityDelta / def.Epsilon;noisy = DpNoise.Laplace(trueCount, b, _rng);} else {if (!def.Delta.HasValue) { await _repo.LogErrorAsync(def, ws, we, "Gaussian requires delta"); continue; }var sigma = DpNoise.ClassicGaussianSigma(def.SensitivityDelta, def.Epsilon, def.Delta.Value);noisy = Math.Max(0, trueCount + _gau.Next() * sigma);}await _repo.PublishAsync(def, ws, we, noisy, Hash64Stable(trueCount), def.Epsilon, def.Delta, def.Mechanism.ToString());}}
5.6 比例/均值指標(分子/分母分別加噪 + 小樣本抑制)
// 偽代碼:同窗同預算,或將 ε 在分子/分母間按重要性拆分
public async Task<double?> ReleaseRatioAsync(MetricDefinition numDef, MetricDefinition denDef, DateTimeOffset ws, DateTimeOffset we){var (num, nu) = await _repo.AggregateAsync(numDef, ws, we);var (den, du) = await _repo.AggregateAsync(denDef, ws, we);var minU = Math.Min(nu, du);if (minU < Math.Min(numDef.MinDistinctUnits, denDef.MinDistinctUnits)) return null; // 小樣本抑制// 分子與分母分別預算校驗與加噪(略,等同計數)var noisyNum = /* Laplace/Gaussian */ 0.0;var noisyDen = /* Laplace/Gaussian */ 1.0;if (noisyDen <= 0) return null; // 防爆炸var ratio = noisyNum / noisyDen;return Math.Clamp(ratio, 0, 1);
}
6. 灰度與回滾 🎛?
- 🔀 特性開關:按租戶啟用“DP化指標”。數值型特性可存“灰度比例”,由調用方按此值自定義分流;或在網關層做百分比分流。
- 🧪 A/B 驗證:監控 MAPE / P95;達標→放量;不達標→回滾到“內部僅真值”。
7. 可視化與審計 📊
- 📉 面板:ε/δ 消耗曲線、剩余額度、發布熱力圖、小樣本抑制/超額次數、失敗原因分布(超 ε/超 δ)。
- 🧾 審計:租戶、指標、窗口(UTC)、真值哈希、噪聲參數、ε/δ、機制、seedHash、審批單號。
- 🆘 運維:超額硬阻斷、速率限制、異常告警(深夜暴增、跨租戶)。
?? 窗口與 TTL 對齊示意
8. 評測腳本 🧪
# eval_dp_counter.py(Laplace 誤差曲線)
import numpy as npdef laplace_noise(b, size=1, rng=None):rng = rng or np.random.default_rng(2025)u = rng.random(size=size)u = np.clip(u, np.finfo(float).eps, 1-np.finfo(float).eps) - 0.5 # (-0.5,0.5)return -b * np.sign(u) * np.log(1 - 2 * np.abs(u))def eval_mape_p95(true_counts, epsilon, delta_f=1.0, trials=2000):b = delta_f / epsilonrows = []for n in true_counts:preds = np.clip(n + laplace_noise(b, trials), 0, None)mape = np.mean(np.abs(preds - n) / max(1, n))p95 = np.percentile(np.abs(preds - n), 95)rows.append((n, mape, p95))return rowsif __name__ == '__main__':Ns = [50, 100, 300, 1000, 3000, 10000]for eps in [0.3, 0.5, 1.0, 2.0]:print(eps, eval_mape_p95(Ns, eps))
經驗近似:Laplace 的
E(|噪聲|)≈b
,P95(|噪聲|)≈2.996·b
,可用于看板閾值與業務影響初判。
9. 上線步驟 ?
- 盤點指標 → 劃分“公開/內審”;
- 選隱私單元與剪裁 K;
- 為公開集合設預算(每窗 ε/δ 上限、最小樣本、發布頻率),由 IBudgetPolicyProvider 配置化下發;
- 接入
Abp.DpMetrics
,啟用 Redis & Background Worker; - 啟用發布冪等鎖 與 Lua 原子預算消費;
- 灰度:設置租戶特性與分流比例;
- A/B:看板跟蹤 MAPE/P95 與預算曲線,達標放量;
- 演練:預算見底/拒絕服務/回滾 SOP;
- 合規:導出審計日志與預算賬本快照(UTC)。
10. 倉庫結構建議 🧱
/src/Abp.DpMetrics/ // 模塊源碼(接口/機制/會計/Worker)
/src/Abp.DpMetrics.Tests/ // xUnit:Laplace 誤差、預算原子消費并發、窗口邊界、冪等鎖
/tools/eval/ // 評測腳本
/tools/dashboard/ // ε 賬本 & 誤差曲線儀表盤模板
/docker/docker-compose.yml // Redis(可選)
docker-compose(可選)
version: "3.9"
services:redis:image: redis:7-alpineports: ["6379:6379"]command: ["redis-server","--appendonly","yes"]
appsettings.json(片段)
{"Redis": { "Configuration": "localhost:6379" },"DistributedCache": { "KeyPrefix": "demo:dp:" }
}
11. 常見坑 🧨
- “解析高斯”誤稱:若未接入解析校準,應標注為“經典高斯上界”。
- 預算鍵未對齊窗:用 TTL 滑動計窗會與日報/小時看板錯位;務必以
windowStartUtc
為主鍵,TTL=窗口剩余秒數。 - 上一窗發布 & 冪等:避免按
floor(now,w)
導致跨點漏發/重復發;用released
鎖或 DB Upsert。 - 預算原子性:并發發布須用 Lua 原子或分布式鎖;推薦 Lua。
- 隨機源與開區間:避免
log(0)
邊界;Box–Muller 產出雙樣本需緩存。 - 比例/均值抑制:小樣本或分母過小應抑制結果;必要時加無偏修正。
- 審計哈希穩定:固定序列化精度/編碼,跨平臺一致。
- UTC 一致性:窗口、鍵、審計時間統一用 UTC,避免時區與夏令時影響。