# ======= 导入库(保持不变) ======= import geopandas as gpd import pandas as pd import numpy as np import rasterio from rasterio import features from rasterstats import point_query from shapely.geometry import mapping from sklearn.model_selection import train_test_split, StratifiedKFold, cross_validate from sklearn.preprocessing import StandardScaler from sklearn.impute import SimpleImputer from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import ( accuracy_score, roc_auc_score, roc_curve, auc, r2_score, make_scorer ) from sklearn.inspection import permutation_importance from imblearn.over_sampling import SMOTE from scipy.ndimage import generic_filter import xgboost as xgb import shap import matplotlib.pyplot as plt import seaborn as sns import os import time import warnings warnings.filterwarnings('ignore') # ======================= 路径设置 ======================= shapefile_path = r"D:\desktop\数据分析\point.shp" dem_path = r"D:\desktop\数据分析\5.9随机森林\dem.tif" slope_path = r"D:\desktop\数据分析\5.9随机森林\slop.tif" aspect_path = r"D:\desktop\数据分析\5.9随机森林\aspect.tif" beacon_path = r"D:\desktop\数据分析\5.9随机森林\烽火台.shp" fort_path = r"D:\desktop\数据分析\聚落与驿递铺讯点.shp" road_path = r"D:\desktop\数据分析\4.25驿道复原\full_network.shp" river_path = r"D:\desktop\数据分析\river shp.shx".replace(".shx", ".shp") out_dir = r"D:\desktop\数据分析\5.13数据结果" os.makedirs(out_dir, exist_ok=True) # ======================= 读取与预处理 ======================= data = gpd.read_file(shapefile_path) dem = rasterio.open(dem_path) slope = rasterio.open(slope_path) aspect = rasterio.open(aspect_path) beacons = gpd.read_file(beacon_path).to_crs(dem.crs) forts = gpd.read_file(fort_path).to_crs(dem.crs) roads = gpd.read_file(road_path).to_crs(dem.crs) rivers = gpd.read_file(river_path).to_crs(dem.crs) if data.crs != dem.crs: data = data.to_crs(dem.crs) # 距离成本 data['dist_beacon'] = data.geometry.distance(beacons.unary_union) data['dist_fort'] = data.geometry.distance(forts.unary_union) data['dist_road'] = data.geometry.distance(roads.unary_union) data['dist_river'] = data.geometry.distance(rivers.unary_union) # 栅格点值 data['elevation'] = point_query(data, dem_path, interpolate='nearest') data['slope'] = point_query(data, slope_path, interpolate='nearest') data['aspect'] = point_query(data, aspect_path,interpolate='nearest') data['sin_aspect'] = np.sin(np.deg2rad(data['aspect'])) data['cos_aspect'] = np.cos(np.deg2rad(data['aspect'])) # 地形衍生指标 def calc_terrain_indices(raster, win_size=3): arr = raster.read(1).astype(float) arr[arr == raster.nodata] = np.nan def tpi(w): return w[len(w)//2] - np.nanmean(np.delete(w, len(w)//2)) def tri(w): return np.sqrt(np.nansum((w - w[len(w)//2])**2)) / (len(w) - 1) def rugged(w): return np.nanmax(w) - np.nanmin(w) return ( generic_filter(arr, tpi, size=win_size), generic_filter(arr, tri, size=win_size), generic_filter(arr, rugged, size=win_size) ) tpi_arr, tri_arr, rugged_arr = calc_terrain_indices(dem) for arr, name in zip([tpi_arr, tri_arr, rugged_arr], ['tpi','tri','rugged']): tmp = os.path.join(out_dir, f'{name}_temp.tif') prof = dem.profile.copy(); prof.update(dtype=rasterio.float32, count=1) with rasterio.open(tmp,'w',**prof) as dst: dst.write(arr.astype(rasterio.float32),1) data[name] = point_query(data, tmp, interpolate='nearest') # 特征与标签 data['x'] = data.geometry.x data['y'] = data.geometry.y features = [ 'elevation','slope','aspect','sin_aspect','cos_aspect', 'tpi','tri','rugged','dist_river', 'dist_beacon','dist_fort','dist_road','x','y' ] X = data[features] y = data['label'] # 缺失值与标准化 X = SimpleImputer(strategy='mean').fit_transform(X) X = StandardScaler().fit_transform(X) # 划分 X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.3, stratify=y, random_state=42 ) # ======================= 三模型训练 ======================= # LR logm = LogisticRegression(max_iter=1000).fit(X_train, y_train) # RF rf = RandomForestClassifier(n_estimators=100, random_state=42).fit(X_train, y_train) # XGB X_res, y_res = SMOTE(random_state=42).fit_resample(X_train, y_train) xgb_model = xgb.XGBClassifier(n_estimators=100,use_label_encoder=False,eval_metric='logloss') xgb_model.fit(X_res, y_res) # 性能打印 for name, model in [('Logistic',logm), ('RF',rf), ('XGBoost',xgb_model)]: y_pred = model.predict(X_test) y_prob = model.predict_proba(X_test)[:,1] print(f"{name} -> Acc: {accuracy_score(y_test,y_pred):.4f}, AUC: {roc_auc_score(y_test,y_prob):.4f}") # ======================= 打印特征贡献度 ======================= # 1. LR 系数 df_lr = pd.DataFrame({ 'Driving factor': features, 'coef': logm.coef_[0] }) df_lr['abs_coef'] = df_lr['coef'].abs() print("\nLogistic 回归因子贡献度(按绝对值排序):") print(df_lr.sort_values('abs_coef',ascending=False).reset_index(drop=True)) # 2. RF Permutation Importance perm = permutation_importance(rf, X_test, y_test, n_repeats=10, random_state=42, n_jobs=-1) df_rf = pd.DataFrame({ 'Driving factor': features, 'RF_importance': perm.importances_mean }) print("\n随机森林因子重要性(Permutation, 降序):") print(df_rf.sort_values('RF_importance',ascending=False).reset_index(drop=True)) # 3. XGB feature_importances_ df_xgb = pd.DataFrame({ 'Driving factor': features, 'XGB_importance': xgb_model.feature_importances_ }) print("\nXGBoost因子重要性(feature_importances_, 降序):") print(df_xgb.sort_values('XGB_importance',ascending=False).reset_index(drop=True)) # ======================= 绘制三模型对比柱状图 ======================= # 合并并归一化为百分比 df_all = df_lr[['Driving factor','abs_coef']].rename(columns={'abs_coef':'Logistic'}) df_all = df_all.merge(df_rf, on='Driving factor') df_all = df_all.merge(df_xgb, on='Driving factor') for col in ['Logistic','RF_importance','XGB_importance']: df_all[col] = df_all[col] / df_all[col].sum() * 100 df_all = df_all.rename(columns={ 'RF_importance':'Random Forest', 'XGB_importance':'XGBoost' }).sort_values('XGBoost',ascending=False).set_index('Driving factor') plt.figure(figsize=(12,6)) df_all[['Logistic','Random Forest','XGBoost']].plot(kind='bar', width=0.8) plt.title('三模型成本因子重要性对比') plt.ylabel('归一化重要性 (%)') plt.xticks(rotation=45,ha='right') plt.tight_layout() plt.grid(axis='y') plt.savefig(os.path.join(out_dir,'model_comparison_bar.png')) plt.close() # ======================= SHAP 分析 & 完整交互散点图 ======================= explainer = shap.TreeExplainer(xgb_model) shap_vals = explainer.shap_values(X_res) # 对每个因子绘制 dependence_plot(自动交互) n = len(features) cols = 4 rows = int(np.ceil(n/cols)) fig, axes = plt.subplots(rows, cols, figsize=(cols*4, rows*4)) for i, feat in enumerate(features): ax = axes.flatten()[i] shap.dependence_plot(feat, shap_vals, X_res, feature_names=features, interaction_index='auto', show=False, ax=ax) ax.set_title(feat) # 删除多余空白子图 for j in range(n, rows*cols): fig.delaxes(axes.flatten()[j]) plt.tight_layout() plt.savefig(os.path.join(out_dir,'shap_all_dependence.png')) plt.close() print("\n所有SHAP交互散点图已保存至:", os.path.join(out_dir,'shap_all_dependence.png'))