ResNet1D / data_processing.py
AISkywalker's picture
Upload 65 files
3a85408 verified
import numpy as np
from scipy.signal import savgol_filter
import matplotlib.pyplot as plt
from data_load import load_soil_data
def apply_sg_filter(spectra, window_length=15, polyorder=2, deriv=0):
"""
应用Savitzky-Golay滤波器进行光谱平滑或求导
参数:
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths)
- window_length: 窗口长度,必须是奇数
- polyorder: 多项式最高阶数
- deriv: 求导阶数,0表示平滑,1表示一阶导数,2表示二阶导数
返回:
- 处理后的光谱数据
"""
return np.array([savgol_filter(spectrum, window_length, polyorder, deriv=deriv)
for spectrum in spectra])
def apply_snv(spectra):
"""
应用标准正态变量(SNV)转换 (标准正态变量变换)
参数:
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths)
返回:
- SNV处理后的光谱数据
"""
# 对每个样本进行SNV转换
spectra_snv = np.zeros_like(spectra)
for i in range(spectra.shape[0]):
spectrum = spectra[i]
# 计算均值和标准差
mean = np.mean(spectrum)
std = np.std(spectrum)
# 应用SNV转换
spectra_snv[i] = (spectrum - mean) / std
return spectra_snv
def process_spectra(spectra, method='Abs-SG0'):
"""
根据指定方法处理光谱数据
参数:
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths)
- method: 处理方法,可选值包括:
'Abs-SG0': SG平滑
'Abs-SG0-SNV': SG平滑+SNV
'Abs-SG1': SG一阶导
'Abs-SG1-SNV': SG一阶导+SNV
'Abs-SG2': SG二阶导
'Abs-SG2-SNV': SG二阶导+SNV
返回:
- 处理后的光谱数据
"""
if method == 'Abs-SG0':
return apply_sg_filter(spectra, deriv=0)
elif method == 'Abs-SG0-SNV':
sg_spectra = apply_sg_filter(spectra, deriv=0)
return apply_snv(sg_spectra)
elif method == 'Abs-SG1':
return apply_sg_filter(spectra, deriv=1)
elif method == 'Abs-SG1-SNV':
sg_spectra = apply_sg_filter(spectra, deriv=1)
return apply_snv(sg_spectra)
elif method == 'Abs-SG2':
return apply_sg_filter(spectra, deriv=2)
elif method == 'Abs-SG2-SNV':
sg_spectra = apply_sg_filter(spectra, deriv=2)
return apply_snv(sg_spectra)
else:
raise ValueError(f"Unsupported method: {method}")
def remove_wavelength_bands(spectra, wavelengths):
"""
移除400-499.5nm和2450-2499.5nm的波段
参数:
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths)
- wavelengths: 波长值数组
返回:
- 处理后的光谱数据和对应的波长值
"""
# 创建掩码,保留所需波段
mask = ~((wavelengths >= 400) & (wavelengths <= 499.5) |
(wavelengths >= 2450) & (wavelengths <= 2499.5))
# 应用掩码
filtered_spectra = spectra[:, mask]
filtered_wavelengths = wavelengths[mask]
return filtered_spectra, filtered_wavelengths
def downsample_spectra(spectra, wavelengths, bin_size):
"""
对光谱数据进行降采样
参数:
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths)
- wavelengths: 波长值数组
- bin_size: 降采样窗口大小(5nm、10nm或15nm)
返回:
- 降采样后的光谱数据和对应的波长值
"""
# 计算每个bin的边界
bins = np.arange(wavelengths[0], wavelengths[-1] + bin_size, bin_size)
# 初始化结果数组
n_bins = len(bins) - 1
downsampled_spectra = np.zeros((spectra.shape[0], n_bins))
downsampled_wavelengths = np.zeros(n_bins)
# 对每个bin进行平均
for i in range(n_bins):
mask = (wavelengths >= bins[i]) & (wavelengths < bins[i+1])
if np.any(mask):
downsampled_spectra[:, i] = np.mean(spectra[:, mask], axis=1)
downsampled_wavelengths[i] = np.mean([bins[i], bins[i+1]])
return downsampled_spectra, downsampled_wavelengths
def preprocess_with_downsampling(spectra, wavelengths, bin_size=5):
"""
完整的预处理流程:移除特定波段并进行降采样
参数:
- spectra: 输入光谱数据,形状为(n_samples, n_wavelengths)
- wavelengths: 波长值数组
- bin_size: 降采样窗口大小(5nm、10nm或15nm)
返回:
- 处理后的光谱数据和对应的波长值
"""
# 首先移除指定波段
filtered_spectra, filtered_wavelengths = remove_wavelength_bands(spectra, wavelengths)
# 然后进行降采样
downsampled_spectra, downsampled_wavelengths = downsample_spectra(
filtered_spectra, filtered_wavelengths, bin_size)
return downsampled_spectra, downsampled_wavelengths
def plot_processed_spectra_with_range(original_spectra, wavelengths=None):
"""
绘制处理方法的光谱图,包括平均曲线和范围
参数:
- original_spectra: 原始光谱数据,形状为(n_samples, n_wavelengths)
- wavelengths: 波长值,如果为None则使用索引值
"""
methods = ['Abs-SG0', 'Abs-SG0-SNV', 'Abs-SG1',
'Abs-SG1-SNV', 'Abs-SG2', 'Abs-SG2-SNV']
if wavelengths is None:
wavelengths = np.arange(original_spectra.shape[1])
fig, axes = plt.subplots(2, 3, figsize=(18, 10)) # 布局:2行3列
axes = axes.ravel()
for i, method in enumerate(methods):
processed = process_spectra(original_spectra, method) # 获取处理后的数据
mean_curve = np.mean(processed, axis=0) # 平均光谱曲线
min_curve = np.min(processed, axis=0) # 最小值光谱
max_curve = np.max(processed, axis=0) # 最大值光谱
# 绘制范围
axes[i].fill_between(wavelengths, min_curve, max_curve, color='skyblue', alpha=0.3, label='Range')
# 绘制平均曲线
axes[i].plot(wavelengths, mean_curve, color='steelblue', label='Average Curve')
# 设置标题和图例
axes[i].set_title(f'({chr(97 + i)}) {method}', loc='center', fontsize=12) # a, b, c...
axes[i].set_xlabel('Wavelength/nm', fontsize=10)
axes[i].set_ylabel('Absorbance', fontsize=10)
axes[i].legend()
axes[i].grid(True)
# 调整布局
plt.tight_layout(h_pad=2.5, w_pad=3.0)
plt.show()
# 示例调用
if __name__ == '__main__':
# 1. 加载数据
file_path = 'LUCAS.2009_abs.csv'
target_columns = ['pH.in.CaCl2', 'pH.in.H2O', 'OC', 'CaCO3', 'N', 'P', 'K', 'CEC']
X_train, X_test, y_train, y_test ,wavelengths= load_soil_data(file_path, target_columns)
# 2. 将数据重塑为2D
X_train_2d = X_train.reshape(X_train.shape[0], -1)
# 4. 展示原始数据的光谱处理结果
print("\n=== 光谱预处理结果 ===")
plot_processed_spectra_with_range(X_train_2d, wavelengths)
# 5. 移除特定波段并进行不同程度的降采样
print("\n=== 波段移除和降采样结果 ===")
bin_sizes = [5, 10, 15] # 不同的降采样窗口大小
# 为不同的降采样结果创建一个新的图
plt.figure(figsize=(15, 5))
for i, bin_size in enumerate(bin_sizes):
# 处理数据
processed_spectra, processed_wavelengths = preprocess_with_downsampling(
X_train_2d, wavelengths, bin_size)
# 打印信息
print(f"\n使用 {bin_size}nm 降采样:")
print(f"处理后的光谱形状: {processed_spectra.shape}")
print(f"波长数量: {len(processed_wavelengths)}")
# 绘制降采样结果
plt.subplot(1, 3, i+1)
mean_curve = np.mean(processed_spectra, axis=0)
std_curve = np.std(processed_spectra, axis=0)
plt.plot(processed_wavelengths, mean_curve, 'b-', label=f'Mean ({bin_size}nm)')
plt.fill_between(processed_wavelengths,
mean_curve - std_curve,
mean_curve + std_curve,
color='skyblue', alpha=0.2, label='Standard Deviation Range')
plt.title(f'Downsampling {bin_size}nm\n(Wavelengths: {len(processed_wavelengths)})')
plt.xlabel('Wavelength (nm)')
plt.ylabel('Absorbance')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# 6. 展示完整预处理流程的示例
print("\n=== 完整预处理流程示例 ===")
# 先进行光谱预处理
processed_spectra = process_spectra(X_train_2d, method='Abs-SG0-SNV')
# 然后进行波段移除和降采样
final_spectra, final_wavelengths = preprocess_with_downsampling(
processed_spectra, wavelengths, bin_size=10)
print(f"最终处理后的数据形状: {final_spectra.shape}")
print(f"最终波长数量: {len(final_wavelengths)}")