C#實現列式存儲數據指南
一、列式存儲概述
列式存儲(Columnar Storage)是一種數據存儲方式,它將數據按列而非行組織。與傳統的行式存儲相比,列式存儲在以下場景具有優勢:
- ??分析型查詢??:聚合計算、分組統計等操作效率更高
- ??壓縮率更高??:相同數據列式存儲通常占用更少空間
- ??I/O效率更高??:只需讀取相關列的數據
二、C#實現列式存儲方案
方案1:使用數組實現簡單列式存儲
public class ColumnarStorage<T>
{private readonly List<List<T>> _columns = new List<List<T>>();private readonly Dictionary<string, int> _columnIndexes = new Dictionary<string, int>();public void AddColumn(string columnName, IEnumerable<T> values){if (_columnIndexes.ContainsKey(columnName))throw new ArgumentException($"Column '{columnName}' already exists");var column = values.ToList();_columns.Add(column);_columnIndexes[columnName] = _columns.Count - 1;}public void AddRow(IEnumerable<T> values){if (values.Count() != _columns.Count)throw new ArgumentException("Number of values doesn't match number of columns");for (int i = 0; i < values.Count(); i++){_columns[i].Add(values.ElementAt(i));}}public IEnumerable<T> GetColumn(string columnName){if (!_columnIndexes.TryGetValue(columnName, out int index))throw new ArgumentException($"Column '{columnName}' not found");return _columns[index];}public IEnumerable<T> GetRow(int rowIndex){return _columns.Select(c => c[rowIndex]);}// 其他方法如查詢、聚合等...
}
??使用示例??:
var storage = new ColumnarStorage<int>();
storage.AddColumn("ID", new List<int> { 1, 2, 3 });
storage.AddColumn("Value", new List<int> { 10, 20, 30 });// 添加行(不推薦,因為列式存儲通常先定義列再填充數據)
// storage.AddRow(new List<int> { 4, 40 });// 查詢
var values = storage.GetColumn("Value"); // 返回 [10, 20, 30]
方案2:使用專業列式存儲庫 - Parquet.NET
??安裝NuGet包??:
dotnet add package Parquet.Net
??實現示例??:
using Parquet;
using Parquet.Data;// 創建數據字段
var fields = new[]
{new DataField<int>("id"),new DataField<string>("name"),new DataField<double>("value")
};// 創建數據集
var dataSet = new DataSet(fields)
{new Row(1, "Item1", 10.5),new Row(2, "Item2", 20.3),new Row(3, "Item3", 30.7)
};// 寫入Parquet文件
using (var stream = File.Create("data.parquet"))
{ParquetWriter.WriteFile(dataSet, stream);
}// 讀取Parquet文件
using (var stream = File.OpenRead("data.parquet"))
{var reader = new ParquetReader(stream);var ds = reader.ReadDataSet();foreach (var row in ds[0]){Console.WriteLine($"{row[0]} {row[1]} {row[2]}");}
}
方案3:使用內存列式存儲 - Apache Arrow
??安裝NuGet包??:
dotnet add package Apache.Arrow
??實現示例??:
using Apache.Arrow;
using Apache.Arrow.Arrays;// 創建列
var idColumn = new Int32Array.Builder().Append(1).Append(2).Append(3).Build();var nameColumn = new BinaryArray.Builder(BinaryArrayBuilderOptions.Default).Append("Item1").Append("Item2").Append("Item3").Build();var valueColumn = new DoubleArray.Builder().Append(10.5).Append(20.3).Append(30.7).Build();// 創建表
var schema = new Schema(new[]
{new Field("id", new Int32Type()),new Field("name", new BinaryType()),new Field("value", new DoubleType())
});var table = new Table(schema, new[] { idColumn, nameColumn, valueColumn });// 序列化為Arrow格式
using (var stream = File.Create("data.arrow"))
{var writer = new ArrowStreamWriter(table, new MessageSerializer(), stream);writer.WriteTable(table);writer.Dispose();
}// 從Arrow文件讀取
using (var stream = File.OpenRead("data.arrow"))
{var reader = new ArrowStreamReader(stream);var table = reader.ReadNextTable();foreach (var row in table.ToEnumerable()){Console.WriteLine($"{row[0]} {row[1]} {row[2]}");}
}
三、列式存儲優化技巧
1. 數據壓縮
??使用Parquet的壓縮選項??:
var writerProperties = new WriterProperties(CompressionCodec.Snappy // 使用Snappy壓縮
);using (var stream = File.Create("compressed.parquet"))
{ParquetWriter.WriteFile(dataSet, stream, writerProperties);
}
2. 列式查詢優化
??實現列式索引??:
public class ColumnIndex<T>
{private readonly Dictionary<T, List<int>> _index = new Dictionary<T, List<int>>();public void Build(IEnumerable<T> column){int rowIndex = 0;foreach (var value in column){if (!_index.ContainsKey(value))_index[value] = new List<int>();_index[value].Add(rowIndex++);}}public IEnumerable<int> Lookup(T value){return _index.TryGetValue(value, out var rows) ? rows : Enumerable.Empty<int>();}
}
3. 內存映射文件
??使用MemoryMappedFile處理大文件??:
using var mmf = MemoryMappedFile.CreateFromFile("large.parquet");
using var accessor = mmf.CreateViewAccessor();// 直接訪問文件中的特定列數據
四、完整示例:實現簡單的列式數據庫
public class ColumnarDatabase
{private readonly Dictionary<string, List<object>> _columns = new();private readonly Dictionary<string, Type> _columnTypes = new();public void CreateTable(IEnumerable<string> columnNames, IEnumerable<Type> columnTypes){if (columnNames.Count() != columnTypes.Count())throw new ArgumentException("Column names and types count mismatch");for (int i = 0; i < columnNames.Count(); i++){_columns[columnNames.ElementAt(i)] = new List<object>();_columnTypes[columnNames.ElementAt(i)] = columnTypes.ElementAt(i);}}public void InsertRow(IEnumerable<object> values){if (values.Count() != _columns.Count)throw new ArgumentException("Values count doesn't match columns count");var enumerators = values.GetEnumerator();foreach (var column in _columns.Values){enumerators.MoveNext();column.Add(enumerators.Current);}}public IEnumerable<object> GetColumn(string columnName){return _columns[columnName];}public IEnumerable<object[]> GetRowsWhere(string columnName, object value){var columnIndex = _columns.Keys.ToList().IndexOf(columnName);if (columnIndex == -1) yield break;var column = _columns.Values.ElementAt(columnIndex);var indexes = new ColumnIndex<object>().Build(column).Where(kvp => kvp.Key.Equals(value)).SelectMany(kvp => kvp.Value);for (int i = 0; i < column.Count; i++){if (indexes.Contains(i)){yield return _columns.Values.Select(c => c[i]).ToArray();}}}// 更多方法...
}
五、性能對比測試
??測試代碼??:
var columnar = new ColumnarStorage<int>();
var rowBased = new List<List<int>>();// 填充100萬行數據
var sw = Stopwatch.StartNew();
for (int i = 0; i < 1_000_000; i++)
{columnar.AddColumn("ID", new List<int> { i });columnar.AddColumn("Value", new List<int> { i * 2 });// 行式存儲(僅演示,實際不推薦這樣使用)if (i == 0) rowBased.Add(new List<int>());rowBased[0].Add(i);rowBased[0].Add(i * 2);
}
sw.Stop();
Console.WriteLine($"填充時間: {sw.ElapsedMilliseconds}ms");// 查詢測試
sw.Restart();
var columnarValues = columnar.GetColumn("Value");
sw.Stop();
Console.WriteLine($"列式查詢時間: {sw.ElapsedMilliseconds}ms");sw.Restart();
var rowBasedValues = rowBased.SelectMany(r => r).Where((v, i) => i % 2 == 1); // 模擬查詢第二列
sw.Stop();
Console.WriteLine($"行式查詢時間: {sw.ElapsedMilliseconds}ms");
??預期結果??:
- 列式存儲在查詢特定列時性能更好
- 行式存儲在插入時可能更快(但實際列式存儲優化后插入也很快)
六、實際應用場景
-
??數據分析平臺??:
- 處理TB級數據
- 高效聚合計算
- 列式壓縮節省存儲空間
-
??商業智能工具??:
- 快速生成報表
- 復雜查詢優化
- 多維分析支持
-
??時序數據處理??:
- 高效存儲時間序列數據
- 快速聚合計算
- 支持降采樣查詢
七、擴展建議
-
??實現列式壓縮??:
public byte[] CompressColumn<T>(List<T> column) {using var ms = new MemoryStream();using (var writer = new BinaryWriter(ms)){foreach (var item in column){// 實現自定義壓縮邏輯writer.Write(item.ToString());}}return ms.ToArray(); }
-
??添加索引支持??:
public class ColumnIndex {private readonly Dictionary<object, List<int>> _valueToRows = new();public void Build<T>(List<T> column){for (int i = 0; i < column.Count; i++){if (!_valueToRows.ContainsKey(column[i]))_valueToRows[column[i]] = new List<int>();_valueToRows[column[i]].Add(i);}}public IEnumerable<int> GetRows(T value) => _valueToRows.TryGetValue(value, out var rows) ? rows : Enumerable.Empty<int>(); }
-
??支持列式存儲格式??:
- Parquet
- ORC
- Apache Arrow
- 自定義二進制格式