# ======= 导入库(保持不变) ======= import geopandas as gpd import pandas as pd import numpy as np import rasterio from rasterio import features from rasterstats import point_query from shapely.geometry import mapping from sklearn.model_selection import train_test_split, StratifiedKFold, cross_validate from sklearn.preprocessing import StandardScaler from sklearn.impute import SimpleImputer from sklearn.linear_model import LogisticRegression from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, roc_auc_score, r2_score, make_scorer from sklearn.inspection import permutation_importance from imblearn.over_sampling import SMOTE from scipy.ndimage import generic_filter import xgboost as xgb import shap import matplotlib.pyplot as plt import seaborn as sns import os import warnings warnings.filterwarnings('ignore') # ======================= 路径设置 ======================= shapefile_path = r"D:\desktop\数据分析\point.shp" dem_path = r"D:\desktop\数据分析\5.9随机森林\dem.tif" slope_path = r"D:\desktop\数据分析\5.9随机森林\slop.tif" aspect_path = r"D:\desktop\数据分析\5.9随机森林\aspect.tif" beacon_path = r"D:\desktop\数据分析\5.9随机森林\烽火台.shp" fort_path = r"D:\desktop\数据分析\聚落与驿递铺讯点.shp" road_path = r"D:\desktop\数据分析\4.25驿道复原\full_network.shp" river_path = r"D:\desktop\数据分析\river shp.shx".replace(".shx", ".shp") out_dir = r"D:\desktop\数据分析\5.9随机森林\输出结果" os.makedirs(out_dir, exist_ok=True) # ======================= 读取数据 ======================= data = gpd.read_file(shapefile_path) dem = rasterio.open(dem_path) slope = rasterio.open(slope_path) aspect = rasterio.open(aspect_path) beacons = gpd.read_file(beacon_path).to_crs(dem.crs) forts = gpd.read_file(fort_path).to_crs(dem.crs) roads = gpd.read_file(road_path).to_crs(dem.crs) rivers = gpd.read_file(river_path).to_crs(dem.crs) if data.crs != dem.crs: data = data.to_crs(dem.crs) # ======================= 距离成本提取 ======================= data['dist_beacon'] = data.geometry.distance(beacons.unary_union) data['dist_fort'] = data.geometry.distance(forts.unary_union) data['dist_road'] = data.geometry.distance(roads.unary_union) # ======================= 栅格点值提取 ======================= data['elevation'] = point_query(data, dem_path, interpolate='nearest') data['slope'] = point_query(data, slope_path, interpolate='nearest') data['aspect'] = point_query(data, aspect_path, interpolate='nearest') data['sin_aspect'] = np.sin(np.deg2rad(data['aspect'])) data['cos_aspect'] = np.cos(np.deg2rad(data['aspect'])) # ======================= 派生地形指标 ======================= def calc_terrain_indices(raster, win_size=3): arr = raster.read(1).astype(float) arr[arr == raster.nodata] = np.nan def tpi(w): return w[len(w)//2] - np.nanmean(np.delete(w, len(w)//2)) def tri(w): return np.sqrt(np.nansum((w - w[len(w)//2])**2)) / (len(w) - 1) def rugged(w): return np.nanmax(w) - np.nanmin(w) tpi_arr = generic_filter(arr, tpi, size=win_size) tri_arr = generic_filter(arr, tri, size=win_size) rugged_arr = generic_filter(arr, rugged, size=win_size) return tpi_arr, tri_arr, rugged_arr tpi_arr, tri_arr, rugged_arr = calc_terrain_indices(dem) for arr, name in zip([tpi_arr, tri_arr, rugged_arr], ['tpi', 'tri', 'rugged']): temp_path = os.path.join(out_dir, f'{name}_temp.tif') profile = dem.profile.copy(); profile.update(dtype=rasterio.float32) with rasterio.open(temp_path, 'w', **profile) as dst: dst.write(arr.astype(rasterio.float32), 1) data[name] = point_query(data, temp_path, interpolate='nearest') # ======================= river 成本栅格 ======================= river_buffer = rivers.buffer(50) shapes = ((mapping(geom), 1) for geom in river_buffer) river_raster = features.rasterize(shapes=shapes, out_shape=dem.shape, transform=dem.transform, fill=0, dtype='uint8') river_temp = os.path.join(out_dir, 'river_temp.tif') profile = dem.profile.copy(); profile.update(dtype=rasterio.uint8) with rasterio.open(river_temp, 'w', **profile) as dst: dst.write(river_raster, 1) data['river_cost'] = point_query(data, river_temp, interpolate='nearest') # ======================= 特征与标签 ======================= data['x'] = data.geometry.x; data['y'] = data.geometry.y features = ['elevation','slope','aspect','sin_aspect','cos_aspect', 'tpi','tri','rugged','river_cost', 'dist_beacon','dist_fort','dist_road','x','y'] X = data[features]; y = data['label'] X = SimpleImputer(strategy='mean').fit_transform(X) X = StandardScaler().fit_transform(X) # ======================= Logistic & RandomForest ======================= X_train_lr, X_test_lr, y_train_lr, y_test_lr = train_test_split(X, y, test_size=0.3, stratify=y, random_state=42) logm = LogisticRegression(max_iter=1000).fit(X_train_lr, y_train_lr) print("Logistic AUC:", roc_auc_score(y_test_lr, logm.predict_proba(X_test_lr)[:,1])) rf = RandomForestClassifier(n_estimators=100, random_state=42).fit(X_train_lr, y_train_lr) print("RF AUC:", roc_auc_score(y_test_lr, rf.predict_proba(X_test_lr)[:,1])) # Permutation Importance perm = permutation_importance(rf, X_test_lr, y_test_lr, n_repeats=10, random_state=42, n_jobs=-1) fi_df = pd.DataFrame({'Driving factor':features,'Increase_MSE':perm.importances_mean}) # ======================= SMOTE + XGBoost ======================= X_res, y_res = SMOTE(random_state=42).fit_resample(X, y) cv = StratifiedKFold(5, shuffle=True, random_state=42) xgb_model = xgb.XGBClassifier(n_estimators=100, use_label_encoder=False, eval_metric='logloss') print("XGB CV acc:", np.mean([accuracy_score(y_res[test], xgb_model.fit(X_res[train],y_res[train]).predict(X_res[test])) for train,test in cv.split(X_res,y_res)])) X_tr, X_te, y_tr, y_te = train_test_split(X_res,y_res,stratify=y_res,test_size=0.3,random_state=42) xgb_model.fit(X_tr,y_tr); print("XGB AUC:", roc_auc_score(y_te, xgb_model.predict_proba(X_te)[:,1])) # SHAP Data explainer = shap.TreeExplainer(xgb_model); shap_vals = explainer.shap_values(X_tr) main_eff = np.abs(shap_vals).mean(0) inter_vals = explainer.shap_interaction_values(X_tr) inter_mean = np.abs(inter_vals).mean((0)) inter_eff = inter_mean.sum(1)-main_eff shap_contrib_df = pd.DataFrame({'Driving factor':features,'Main':main_eff,'Interaction':inter_eff}) shap_int_df = pd.DataFrame(inter_mean,index=features,columns=features).stack().reset_index() shap_int_df.columns=['feat1','feat2','mean_interaction'] # ======================= 绘图1: RF importance & R2 ======================= # Compute R2 vs n_features perf_rows=[] sorted_feats = fi_df.sort_values('Increase_MSE',ascending=False)['Driving factor'].tolist() for n in range(3,len(features)+1): top=sorted_feats[:n] scores = cross_validate(RandomForestClassifier(n_estimators=100,random_state=42), X_tr[:,[features.index(f) for f in top]],y_tr, cv=cv,return_train_score=True,scoring=make_scorer(r2_score)) for tr,ts in zip(scores['train_score'],scores['test_score']): perf_rows.append({'n_factors':n,'R2':tr,'Dataset':'Train'}) perf_rows.append({'n_factors':n,'R2':ts,'Dataset':'Test'}) perf_df = pd.DataFrame(perf_rows) fig,axes=plt.subplots(2,2,figsize=(12,10)) sns.boxplot(data=fi_df,y='Driving factor',x='Increase_MSE',order=fi_df.groupby('Driving factor')['Increase_MSE'].median().sort_values().index,ax=axes[0,0]) axes[0,0].set_title('(1) Increase in MSE by Driving Factor') sns.boxplot(data=perf_df,x='n_factors',y='R2',hue='Dataset',ax=axes[0,1]) axes[0,1].set_title('(2) R² vs Number of Factors'); axes[0,1].legend(loc='lower right') # empty subplots axes[1,0].axis('off'); axes[1,1].axis('off') plt.tight_layout(); plt.savefig(os.path.join(out_dir,'rf_perf_panel.png')); plt.close() # ======================= 绘图2: SHAP main vs interaction ======================= fig,axes=plt.subplots(1,2,figsize=(12,5)) pivot=shap_int_df.pivot('feat1','feat2','mean_interaction') sns.heatmap(pivot,cmap='RdBu_r',square=True,cbar_kws={'label':'mean interaction'},ax=axes[0]) axes[0].set_title('(1) SHAP Interaction Mean') shap_contrib_df.sort_values('Main',ascending=False).plot.bar(x='Driving factor',y=['Main','Interaction'],color=['salmon','skyblue'],ax=axes[1]) axes[1].set_title('(2) Main vs Interaction Contribution'); axes[1].set_ylabel('mean(SHAP)') plt.tight_layout(); plt.savefig(os.path.join(out_dir,'shap_contrib_panel.png')); plt.close() # ======================= 绘图3: SHAP dependence scatter ======================= dep_pairs=[('tpi','tri'),('tpi','rugged'),('slope','tri'),('slope','rugged'),('elevation','tpi'),('elevation','rugged'),('dist_road','river_cost')] fig,axes=plt.subplots(3,3,figsize=(12,10)) for i,(f,inter) in enumerate(dep_pairs): ax=axes.flat[i] shap.dependence_plot(f,shap_vals,X_tr,feature_names=features,interaction_index=inter,show=False,ax=ax) ax.set_title(f'{f} × {inter}') for j in range(len(dep_pairs),9): fig.delaxes(axes.flat[j]) plt.tight_layout(); plt.savefig(os.path.join(out_dir,'shap_dep_scatter.png')); plt.close()