import rasterio import numpy as np import geopandas as gpd from skimage.graph import route_through_array from shapely.geometry import LineString import os import pandas as pd # === 参数设置 === cost_raster_path = r"D:\desktop\数据分析\4.23驿道复原\加权总和.tif" stations_path = r"D:\desktop\数据分析\5.15边界模拟\边界点1644.shp" output_dir = r"D:\desktop\数据分析\4.25驿道复原" os.makedirs(output_dir, exist_ok=True) # === 1. 读取成本栅格(并提取仿射变换和 CRS) === with rasterio.open(cost_raster_path) as src: cost = src.read(1).astype(float) cost_transform = src.transform cost_crs = src.crs # 将 NaN 视为高阻 cost[np.isnan(cost)] = cost.max() * 10 # === 2. 读取驿站点 Shapefile 并统一 CRS === stations = gpd.read_file(stations_path).to_crs(cost_crs) # === 3. 为行列索引转换准备逆仿射变换 === inv_transform = ~cost_transform # === 4. 确定中心点和目标点 === field_name = 'name' center_name = '蔚州城' if field_name not in stations.columns: raise KeyError(f"字段 '{field_name}' 不存在,请确认字段名。") if center_name not in stations[field_name].values: raise ValueError(f"未找到中心点 '{center_name}',可选值:{stations[field_name].unique()}") center_geom = stations.loc[stations[field_name] == center_name, 'geometry'].iloc[0] # 地理坐标 cx, cy = center_geom.x, center_geom.y # 转为像素坐标 ccol, crow = inv_transform * (cx, cy) center_row, center_col = int(round(crow)), int(round(ccol)) targets = stations[stations[field_name] != center_name] network_lines = [] # === 5. 对每个目标点,计算路径 === for _, row in targets.iterrows(): name = row[field_name] gx, gy = row.geometry.x, row.geometry.y tcol, trow = inv_transform * (gx, gy) end_row, end_col = int(round(trow)), int(round(tcol)) # 检查范围 if not (0 <= center_row < cost.shape[0] and 0 <= center_col < cost.shape[1]): print(f"❌ 起点超出范围: {center_name} → 行列 ({center_row},{center_col})") break if not (0 <= end_row < cost.shape[0] and 0 <= end_col < cost.shape[1]): print(f"❌ 终点超出范围: {name} → 行列 ({end_row},{end_col})") continue try: indices, weight = route_through_array( cost, (center_row, center_col), (end_row, end_col), fully_connected=True ) indices = np.array(indices) # 像素坐标转地理坐标 path_coords = [cost_transform * (col, row) for row, col in indices] line = LineString(path_coords) # 生成 GeoDataFrame gdf = gpd.GeoDataFrame( { 'from': [center_name], 'to': [name], 'cost': [weight] }, geometry=[line], crs=cost_crs ) out_fp = os.path.join(output_dir, f"path_{center_name}_to_{name}.shp") gdf.to_file(out_fp) print(f"✅ 路径已生成: {center_name}→{name},保存为 {out_fp}") network_lines.append(gdf) except ValueError as e: print(f"❌ 无法计算路径 {center_name}→{name}:{e}") continue # === 6. 合并所有路径为完整网络 === if network_lines: full = gpd.GeoDataFrame(pd.concat(network_lines, ignore_index=True), crs=cost_crs) out_full = os.path.join(output_dir, "full_network.shp") full.to_file(out_full) print(f"🌐 已合并所有路径并保存为 {out_full}") else: print("⚠️ 未生成任何路径,网络图层为空。")