主要功能是處理地理空間柵格數據(TIFF文件)和災害點數據(CSV文件),統計不同分區內的災害點分布情況,并計算災害相對密度等統計指標。
TIFF文件:已經重分類后的文件
CSV文件:災害點經緯度坐標
可根據實際的分類數修改下面代碼
import sys
import numpy as np
from osgeo import gdal
import pandas as pd
from typing import Dict, Tuple, Listdef read_tif_file(filepath: str) -> Tuple:"""讀取TIFF文件并處理NoData值"""dataset = gdal.Open(filepath)if dataset is None:raise ValueError("無法打開TIFF文件")# 獲取基礎信息projection = dataset.GetProjection()transform = dataset.GetGeoTransform()x_size = dataset.RasterXSizey_size = dataset.RasterYSizebands = dataset.RasterCount# 初始化存儲數組data_1 = np.zeros((y_size, x_size), dtype=np.float32)# 只處理第一個波段(根據需求調整)band = dataset.GetRasterBand(1)nodata = band.GetNoDataValue()arr = band.ReadAsArray(0, 0, x_size, y_size).astype(np.float32)# 處理NoData值if nodata is not None:arr[arr == nodata] = np.nandata_1 = arrreturn projection, transform, data_1, x_size, y_size, bandsdef disaster_point_optimized(excel_filepath: str, tif_filepath: str) -> Tuple[List[List[int]], int]:"""優化后的災害點坐標轉換"""try:# 讀取災害點數據disaster_df = pd.read_csv(excel_filepath, encoding='ISO-8859-1')required_cols = ['JD', 'WD']if not all(col in disaster_df.columns for col in required_cols):raise ValueError(f"CSV文件必須包含 {required_cols} 列")# 獲取地理轉換參數tif_data = read_tif_file(tif_filepath)transform = tif_data[1]x_size = tif_data[3]y_size = tif_data[4]# 坐標轉換邏輯longitude = disaster_df['JD'].valueslatitude = disaster_df['WD'].valuesx_origin = transform[0]y_origin = transform[3]x_res = transform[1]y_res = abs(transform[5])# 計算行列索引col_indices = ((longitude - x_origin) / x_res).astype(int)row_indices = ((y_origin - latitude) / y_res).astype(int)# 限制索引范圍col_indices = np.clip(col_indices, 0, x_size - 1)row_indices = np.clip(row_indices, 0, y_size - 1)return [[int(row), int(col)] for row, col in zip(row_indices, col_indices)], len(row_indices)except Exception as e:print(f"處理出錯: {str(e)}")return [], 0def count_partition_statistics(tif_filepath: str, csv_filepath: str) -> Dict[int, Dict[str, float]]:"""統計分區指標(含異常值過濾)"""# 獲取災害點坐標disaster_indices, total_points = disaster_point_optimized(csv_filepath, tif_filepath)# 讀取柵格數據tif_data = read_tif_file(tif_filepath)data_1 = tif_data[2]y_size = tif_data[4]x_size = tif_data[3]# 有效值過濾(處理NoData和異常值)valid_mask = ~np.isnan(data_1)data_1_int = np.full(data_1.shape, -9999, dtype=int) # 用非常見值填充無效位置data_1_int[valid_mask] = data_1[valid_mask].astype(int)# 統計有效分區(1-5)valid_values = (data_1_int >= 1) & (data_1_int <= 5)unique_values, counts = np.unique(data_1_int[valid_values], return_counts=True)pixel_counts = dict(zip(unique_values, counts))# 統計災害點分布disaster_counts = {}for row, col in disaster_indices:if 1 <= row < y_size and 0 <= col < x_size:value = data_1_int[row, col]if 1<= value <= 6: # 只統計有效分區disaster_counts[value] = disaster_counts.get(value, 0) + 1# 合并結果并計算統計量S_total = sum(pixel_counts.values())D_total = sum(disaster_counts.values())# 構建結果字典result = {}for value in range(1, 6): # 保證1-5都有輸出pc = pixel_counts.get(value, 0)dc = disaster_counts.get(value, 0)# 計算相對密度if D_total == 0 or pc == 0:rd = 0.0else:rd = (dc / D_total) / (pc / S_total)result[value] = {'pixel_count': pc,'disaster_count': dc,'relative_density': round(rd, 4)}return resultif __name__ == "__main__":tif_path = r''csv_path = r''stats = count_partition_statistics(tif_path, csv_path)# 格式化輸出print("分區值\t像元個數\t災害點個數\t相對密度")for value in sorted(stats.keys()):data = stats[value]print(f"{value}\t{data['pixel_count']}\t{data['disaster_count']}\t{data['relative_density']:.4f}")
效果: