布隆過濾器
https://pkg.go.dev/github.com/bits-and-blooms/bloom/v3
- 作用:
- 判斷一個元素是不是在集合中
- 工作原理:
- 一個位數組(bit array),初始全為0。
- 多個哈希函數,運算輸入,從而映射到位數組的不同位索引上,對應值改為1。
- 布隆過濾器是在redis外層的,對redis的請求先走布隆,布隆判斷查詢的數據是緩存命中的,那么走redis,否則攔截。通過這樣來處理緩存穿透問題。
一些值得注意的點
- 同一輸入用hash運算得來的位數組上的多個對應位置是可能相同的,即不同輸入,可能得到同一輸出。所以布隆過濾器有誤判的風險,不過用來處理緩存穿透是合適的。
- 假如輸入是“hello”,經過hash后對應0、1索引上的值變為1,現在又輸入“你好”,hash后是1、2索引上的值變為1,如果我要刪除hello,就會導致你好也被破壞。所以(基礎布隆過濾器)無法刪除元素。
- 輸入“hello”和“你好”經過hash后的對應位可能相同,這就是誤判的情況,如果實際緩存中只有“hello”那么查詢“你好”也會被引導到redis。
- 假如現在要查“hello”但是0、1上的預期值不為1,那么“hello”一定不在緩存。
- 總結:布隆過濾器可以判斷“可能存在”和“一定不存在”
實現細節梳理:
- 可以弄一個布隆預熱函數,運行時先從redis讀取所有緩存id運算好對應二進制數組的位置,這樣就相當于把當前所有的緩存的”特征值“都存到布隆過濾器了,(也可以開個定期觸發的協程,不斷調用)
package mainimport ("context""fmt""log""sync""time""github.com/bits-and-blooms/bloom/v3""github.com/go-redis/redis/v8"
)var (bloomFilter *bloom.BloomFiltercache sync.MapredisClient *redis.Client // Redis客戶端filterLock sync.Mutexctx = context.Background()
)func init() {// 初始化Redis連接redisClient = redis.NewClient(&redis.Options{Addr: "localhost:6379", // Redis地址Password: "", // 密碼DB: 0, // 數據庫})// 初始化布隆過濾器bloomFilter = bloom.NewWithEstimates(1_000_000, 0.001)// 從Redis預熱布隆過濾器if err := preheatBloomFilter(); err != nil {log.Fatalf("Failed to preheat bloom filter: %v", err)}
}// preheatBloomFilter 從Redis加載存在的key
func preheatBloomFilter() error {start := time.Now()log.Println("Starting bloom filter preheating...")// 1. 使用SCAN迭代所有product key(生產環境建議使用特定前綴)var cursor uint64var keys []stringfor {var err error// 假設product key的格式為 product:1001keys, cursor, err = redisClient.Scan(ctx, cursor, "product:*", 1000).Result()if err != nil {return fmt.Errorf("Redis SCAN failed: %w", err)}// 將找到的key添加到布隆過濾器for _, key := range keys {// 提取純ID(假設key格式為product:{id})id := key[8:] // 跳過"product:"前綴bloomFilter.AddString(id)}if cursor == 0 { // 迭代結束break}}// 2. 或者如果使用Set存儲所有ID(更推薦的方式)// 假設所有產品ID存儲在product:ids集合中ids, err := redisClient.SMembers(ctx, "product:ids").Result()if err != nil {return fmt.Errorf("Failed to get product IDs: %w", err)}for _, id := range ids {bloomFilter.AddString(id)}log.Printf("Bloom filter preheated. Total keys: %d, Duration: %v", len(ids)+len(keys), time.Since(start))return nil
}// 定期重建布隆過濾器(可選)
func startBloomFilterRebuildJob() {ticker := time.NewTicker(1 * time.Hour)go func() {for range ticker.C {filterLock.Lock()if err := preheatBloomFilter(); err != nil {log.Printf("Failed to rebuild bloom filter: %v", err)}filterLock.Unlock()}}()
}// getProduct 獲取商品信息(帶Redis緩存)
func getProduct(ctx context.Context, productID string) (string, error) {// 1. 布隆過濾器檢查if !bloomFilter.TestString(productID) {return "", fmt.Errorf("商品不存在")}// 2. 檢查Redis緩存cacheKey := "product:" + productIDval, err := redisClient.Get(ctx, cacheKey).Result()if err == nil {return val, nil}// 3. 查詢數據庫(這里演示直接返回)// 實際應該查詢真實數據庫,這里返回模擬數據productData := "商品詳情數據"// 4. 將新數據寫入Redisif err := redisClient.Set(ctx, cacheKey, productData, randomExpiration(30*time.Minute, 5*time.Minute)).Err(); err != nil {log.Printf("Failed to set Redis cache: %v", err)}// 5. 更新布隆過濾器(如果確認是新key)filterLock.Lock()bloomFilter.AddString(productID)filterLock.Unlock()return productData, nil
}// 生成隨機過期時間(防雪崩)
func randomExpiration(base, randomRange time.Duration) time.Duration {return base + time.Duration(rand.Int63n(int64(randomRange)))
}
代碼
bloom.go
package cacheimport ("context""errors""github.com/bits-and-blooms/bloom/v3""github.com/redis/go-redis/v9"pkgredis "shorturl/pkg/db/redis"
)// BloomFilter 布隆過濾器接口
type BloomFilter interface {// Add 添加元素到布隆過濾器Add(key string, value string) error// Exists 檢查元素是否可能存在于布隆過濾器中Exists(key string, value string) (bool, error)
}// RedisBloomFilter 基于Redis的布隆過濾器實現
type RedisBloomFilter struct {redisClient *redis.Clientdestroy func()// 布隆過濾器參數filter *bloom.BloomFilterkey string // 布隆過濾器在Redis中的鍵名
}// NewRedisBloomFilter 創建一個新的Redis布隆過濾器
func NewRedisBloomFilter(client *redis.Client, key string, expectedItems int, errorRate float64, destroy func()) BloomFilter {// 使用bits-and-blooms庫創建布隆過濾器filter := bloom.NewWithEstimates(uint(expectedItems), errorRate)return &RedisBloomFilter{redisClient: client,destroy: destroy,filter: filter,key: key,}
}// Add 添加元素到布隆過濾器
func (bf *RedisBloomFilter) Add(key string, value string) error {// 添加到內存中的布隆過濾器bf.filter.AddString(value)// 將布隆過濾器的位數組序列化并存儲到Redisbits, err := bf.filter.MarshalBinary()if err != nil {return err}// 存儲到Redisreturn bf.redisClient.Set(context.Background(), bf.key, bits, 0).Err()
}// Exists 檢查元素是否可能存在于布隆過濾器中
func (bf *RedisBloomFilter) Exists(key string, value string) (bool, error) {// 從Redis獲取布隆過濾器的位數組bits, err := bf.redisClient.Get(context.Background(), bf.key).Bytes()if err != nil {if errors.Is(err, redis.Nil) {// 如果布隆過濾器不存在,則元素一定不存在return false, nil}return false, err}// 反序列化布隆過濾器if err := bf.filter.UnmarshalBinary(bits); err != nil {return false, err}// 檢查元素是否可能存在return bf.filter.TestString(value), nil
}// BloomFilterFactory 布隆過濾器工廠接口
type BloomFilterFactory interface {// NewBloomFilter 創建一個新的布隆過濾器實例NewBloomFilter(key string, expectedItems int, errorRate float64) BloomFilter
}// RedisBloomFilterFactory 基于Redis的布隆過濾器工廠
type RedisBloomFilterFactory struct {redisPool pkgredis.RedisPool
}// NewRedisBloomFilterFactory 創建一個新的Redis布隆過濾器工廠
func NewRedisBloomFilterFactory(redisPool pkgredis.RedisPool) BloomFilterFactory {return &RedisBloomFilterFactory{redisPool: redisPool,}
}// NewBloomFilter 創建一個新的布隆過濾器實例
func (f *RedisBloomFilterFactory) NewBloomFilter(key string, expectedItems int, errorRate float64) BloomFilter {client := f.redisPool.Get()return NewRedisBloomFilter(client, key, expectedItems, errorRate, func() {f.redisPool.Put(client)})
}
bloom.go梳理和功能總結:
1. 核心功能
該文件實現了一個基于 Redis 的布隆過濾器(Bloom Filter),并提供了工廠模式來創建布隆過濾器實例。
2. 主要接口與結構
(1) BloomFilter 接口
定義了布隆過濾器的基本操作:
Add(key string, value string) error
:將元素添加到布隆過濾器。Exists(key string, value string) (bool, error)
:檢查元素是否可能存在于布隆過濾器中。
(2) RedisBloomFilter 結構
實現了 BloomFilter
接口,基于 Redis 存儲布隆過濾器的位數組:
- 字段:
redisClient *redis.Client
:Redis 客戶端。destroy func()
:釋放 Redis 連接的回調函數。filter *bloom.BloomFilter
:內存中的布隆過濾器實例。key string
:布隆過濾器在 Redis 中的鍵名。
- 方法:
Add
:將元素添加到內存中的布隆過濾器,并將位數組序列化后存儲到 Redis。Exists
:從 Redis 獲取布隆過濾器的位數組,反序列化后檢查元素是否存在。
(3) BloomFilterFactory 接口
定義了布隆過濾器工廠的基本操作:
NewBloomFilter(key string, expectedItems int, errorRate float64) BloomFilter
:創建一個新的布隆過濾器實例。
(4) RedisBloomFilterFactory 結構
實現了 BloomFilterFactory
接口,用于創建基于 Redis 的布隆過濾器實例:
- 字段:
redisPool pkgredis.RedisPool
:Redis 連接池。
- 方法:
NewBloomFilter
:從連接池獲取 Redis 客戶端,創建一個新的布隆過濾器實例,并在銷毀時釋放 Redis 連接。
3. 關鍵邏輯
(1) 布隆過濾器的初始化
-
使用
github.com/bits-and-blooms/bloom/v3
庫創建布隆過濾器實例:filter := bloom.NewWithEstimates(uint(expectedItems), errorRate)
-
參數說明:
expectedItems
:預計插入的元素數量。errorRate
:允許的誤報率。
(2) 元素的添加
-
將元素添加到內存中的布隆過濾器:
bf.filter.AddString(value)
-
將布隆過濾器的位數組序列化后存儲到 Redis:
bits, err := bf.filter.MarshalBinary() if err != nil {return err } return bf.redisClient.Set(context.Background(), bf.key, bits, 0).Err()
(3) 元素的存在性檢查
-
從 Redis 獲取布隆過濾器的位數組:
bits, err := bf.redisClient.Get(context.Background(), bf.key).Bytes()
-
如果 Redis 中不存在該鍵,則返回
false
表示元素一定不存在。 -
反序列化布隆過濾器并檢查元素是否存在:
if err := bf.filter.UnmarshalBinary(bits); err != nil {return false, err } return bf.filter.TestString(value), nil
(4) 工廠模式
- 工廠模式用于管理 Redis 連接池,確保每個布隆過濾器實例使用獨立的 Redis 連接,并在銷毀時釋放連接:
client := f.redisPool.Get() return NewRedisBloomFilter(client, key, expectedItems, errorRate, func() {f.redisPool.Put(client) })
4. 依賴庫
github.com/bits-and-blooms/bloom/v3
:布隆過濾器的核心實現。github.com/redis/go-redis/v9
:Redis 客戶端。shorturl/pkg/db/redis
:自定義的 Redis 連接池封裝。
https://github.com/0voice