|
import numpy as np |
|
from scipy.signal import savgol_filter |
|
import matplotlib.pyplot as plt |
|
from data_load import load_soil_data |
|
|
|
def apply_sg_filter(spectra, window_length=15, polyorder=2, deriv=0): |
|
""" |
|
应用Savitzky-Golay滤波器进行光谱平滑或求导 |
|
参数: |
|
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths) |
|
- window_length: 窗口长度,必须是奇数 |
|
- polyorder: 多项式最高阶数 |
|
- deriv: 求导阶数,0表示平滑,1表示一阶导数,2表示二阶导数 |
|
返回: |
|
- 处理后的光谱数据 |
|
""" |
|
return np.array([savgol_filter(spectrum, window_length, polyorder, deriv=deriv) |
|
for spectrum in spectra]) |
|
|
|
|
|
def apply_snv(spectra): |
|
""" |
|
应用标准正态变量(SNV)转换 (标准正态变量变换) |
|
参数: |
|
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths) |
|
|
|
返回: |
|
- SNV处理后的光谱数据 |
|
""" |
|
|
|
spectra_snv = np.zeros_like(spectra) |
|
for i in range(spectra.shape[0]): |
|
spectrum = spectra[i] |
|
|
|
mean = np.mean(spectrum) |
|
std = np.std(spectrum) |
|
|
|
spectra_snv[i] = (spectrum - mean) / std |
|
return spectra_snv |
|
|
|
|
|
|
|
|
|
def process_spectra(spectra, method='Abs-SG0'): |
|
""" |
|
根据指定方法处理光谱数据 |
|
|
|
参数: |
|
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths) |
|
- method: 处理方法,可选值包括: |
|
'Abs-SG0': SG平滑 |
|
'Abs-SG0-SNV': SG平滑+SNV |
|
'Abs-SG1': SG一阶导 |
|
'Abs-SG1-SNV': SG一阶导+SNV |
|
'Abs-SG2': SG二阶导 |
|
'Abs-SG2-SNV': SG二阶导+SNV |
|
|
|
返回: |
|
- 处理后的光谱数据 |
|
""" |
|
if method == 'Abs-SG0': |
|
return apply_sg_filter(spectra, deriv=0) |
|
elif method == 'Abs-SG0-SNV': |
|
sg_spectra = apply_sg_filter(spectra, deriv=0) |
|
return apply_snv(sg_spectra) |
|
elif method == 'Abs-SG1': |
|
return apply_sg_filter(spectra, deriv=1) |
|
elif method == 'Abs-SG1-SNV': |
|
sg_spectra = apply_sg_filter(spectra, deriv=1) |
|
return apply_snv(sg_spectra) |
|
elif method == 'Abs-SG2': |
|
return apply_sg_filter(spectra, deriv=2) |
|
elif method == 'Abs-SG2-SNV': |
|
sg_spectra = apply_sg_filter(spectra, deriv=2) |
|
return apply_snv(sg_spectra) |
|
else: |
|
raise ValueError(f"Unsupported method: {method}") |
|
|
|
|
|
|
|
|
|
def remove_wavelength_bands(spectra, wavelengths): |
|
""" |
|
移除400-499.5nm和2450-2499.5nm的波段 |
|
|
|
参数: |
|
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths) |
|
- wavelengths: 波长值数组 |
|
|
|
返回: |
|
- 处理后的光谱数据和对应的波长值 |
|
""" |
|
|
|
mask = ~((wavelengths >= 400) & (wavelengths <= 499.5) | |
|
(wavelengths >= 2450) & (wavelengths <= 2499.5)) |
|
|
|
|
|
filtered_spectra = spectra[:, mask] |
|
filtered_wavelengths = wavelengths[mask] |
|
|
|
return filtered_spectra, filtered_wavelengths |
|
|
|
|
|
|
|
|
|
def downsample_spectra(spectra, wavelengths, bin_size): |
|
""" |
|
对光谱数据进行降采样 |
|
参数: |
|
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths) |
|
- wavelengths: 波长值数组 |
|
- bin_size: 降采样窗口大小(5nm、10nm或15nm) |
|
|
|
返回: |
|
- 降采样后的光谱数据和对应的波长值 |
|
""" |
|
|
|
bins = np.arange(wavelengths[0], wavelengths[-1] + bin_size, bin_size) |
|
|
|
|
|
n_bins = len(bins) - 1 |
|
downsampled_spectra = np.zeros((spectra.shape[0], n_bins)) |
|
downsampled_wavelengths = np.zeros(n_bins) |
|
|
|
|
|
for i in range(n_bins): |
|
mask = (wavelengths >= bins[i]) & (wavelengths < bins[i+1]) |
|
if np.any(mask): |
|
downsampled_spectra[:, i] = np.mean(spectra[:, mask], axis=1) |
|
downsampled_wavelengths[i] = np.mean([bins[i], bins[i+1]]) |
|
|
|
return downsampled_spectra, downsampled_wavelengths |
|
|
|
|
|
|
|
|
|
|
|
def preprocess_with_downsampling(spectra, wavelengths, bin_size=5): |
|
""" |
|
完整的预处理流程:移除特定波段并进行降采样 |
|
|
|
参数: |
|
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths) |
|
- wavelengths: 波长值数组 |
|
- bin_size: 降采样窗口大小(5nm、10nm或15nm) |
|
|
|
返回: |
|
- 处理后的光谱数据和对应的波长值 |
|
""" |
|
|
|
filtered_spectra, filtered_wavelengths = remove_wavelength_bands(spectra, wavelengths) |
|
|
|
|
|
downsampled_spectra, downsampled_wavelengths = downsample_spectra( |
|
filtered_spectra, filtered_wavelengths, bin_size) |
|
|
|
return downsampled_spectra, downsampled_wavelengths |
|
|
|
|
|
|
|
|
|
def plot_processed_spectra_with_range(original_spectra, wavelengths=None): |
|
""" |
|
绘制处理方法的光谱图,包括平均曲线和范围 |
|
|
|
参数: |
|
- original_spectra: 原始光谱数据,形状为(n_samples, n_wavelengths) |
|
- wavelengths: 波长值,如果为None则使用索引值 |
|
""" |
|
methods = ['Abs-SG0', 'Abs-SG0-SNV', 'Abs-SG1', |
|
'Abs-SG1-SNV', 'Abs-SG2', 'Abs-SG2-SNV'] |
|
|
|
if wavelengths is None: |
|
wavelengths = np.arange(original_spectra.shape[1]) |
|
|
|
fig, axes = plt.subplots(2, 3, figsize=(18, 10)) |
|
axes = axes.ravel() |
|
|
|
for i, method in enumerate(methods): |
|
processed = process_spectra(original_spectra, method) |
|
mean_curve = np.mean(processed, axis=0) |
|
min_curve = np.min(processed, axis=0) |
|
max_curve = np.max(processed, axis=0) |
|
|
|
|
|
axes[i].fill_between(wavelengths, min_curve, max_curve, color='skyblue', alpha=0.3, label='Range') |
|
|
|
axes[i].plot(wavelengths, mean_curve, color='steelblue', label='Average Curve') |
|
|
|
|
|
axes[i].set_title(f'({chr(97 + i)}) {method}', loc='center', fontsize=12) |
|
axes[i].set_xlabel('Wavelength/nm', fontsize=10) |
|
axes[i].set_ylabel('Absorbance', fontsize=10) |
|
axes[i].legend() |
|
axes[i].grid(True) |
|
|
|
|
|
plt.tight_layout(h_pad=2.5, w_pad=3.0) |
|
plt.show() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
file_path = 'LUCAS.2009_abs.csv' |
|
target_columns = ['pH.in.CaCl2', 'pH.in.H2O', 'OC', 'CaCO3', 'N', 'P', 'K', 'CEC'] |
|
X_train, X_test, y_train, y_test ,wavelengths= load_soil_data(file_path, target_columns) |
|
|
|
|
|
X_train_2d = X_train.reshape(X_train.shape[0], -1) |
|
|
|
|
|
print("\n=== 光谱预处理结果 ===") |
|
plot_processed_spectra_with_range(X_train_2d, wavelengths) |
|
|
|
|
|
print("\n=== 波段移除和降采样结果 ===") |
|
bin_sizes = [5, 10, 15] |
|
|
|
|
|
plt.figure(figsize=(15, 5)) |
|
|
|
for i, bin_size in enumerate(bin_sizes): |
|
|
|
processed_spectra, processed_wavelengths = preprocess_with_downsampling( |
|
X_train_2d, wavelengths, bin_size) |
|
|
|
|
|
print(f"\n使用 {bin_size}nm 降采样:") |
|
print(f"处理后的光谱形状: {processed_spectra.shape}") |
|
print(f"波长数量: {len(processed_wavelengths)}") |
|
|
|
|
|
plt.subplot(1, 3, i+1) |
|
mean_curve = np.mean(processed_spectra, axis=0) |
|
std_curve = np.std(processed_spectra, axis=0) |
|
|
|
plt.plot(processed_wavelengths, mean_curve, 'b-', label=f'Mean ({bin_size}nm)') |
|
plt.fill_between(processed_wavelengths, |
|
mean_curve - std_curve, |
|
mean_curve + std_curve, |
|
color='skyblue', alpha=0.2, label='Standard Deviation Range') |
|
plt.title(f'Downsampling {bin_size}nm\n(Wavelengths: {len(processed_wavelengths)})') |
|
plt.xlabel('Wavelength (nm)') |
|
plt.ylabel('Absorbance') |
|
plt.legend() |
|
plt.grid(True) |
|
|
|
plt.tight_layout() |
|
plt.show() |
|
|
|
|
|
print("\n=== 完整预处理流程示例 ===") |
|
|
|
processed_spectra = process_spectra(X_train_2d, method='Abs-SG0-SNV') |
|
|
|
final_spectra, final_wavelengths = preprocess_with_downsampling( |
|
processed_spectra, wavelengths, bin_size=10) |
|
print(f"最终处理后的数据形状: {final_spectra.shape}") |
|
print(f"最终波长数量: {len(final_wavelengths)}") |
|
|