Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
"""Untitled20.ipynb | |
Automatically generated by Colaboratory. | |
Original file is located at | |
https://colab.research.google.com/drive/1XZbCNfIzuxHNNECK_uGluXC65NH9yulc | |
""" | |
def greet(name): | |
return "Hello " + name + "!" | |
greet("World") | |
import gradio | |
import pandas as pd | |
import numpy as np | |
from sklearn.decomposition import PCA | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.pipeline import Pipeline | |
import multiprocessing as mp | |
#catboost | |
from catboost import Pool, CatBoostRegressor | |
modelos_cargados = [] | |
for i in range(3): | |
model = CatBoostRegressor() | |
model.load_model(f'./model_{i}.cbm') | |
modelos_cargados.append(model) | |
def load_npz_file(filepath, | |
masked = True, | |
pad_mask = True): | |
'''load in numpy zipped files. Use masked =True to mask masked values (pad with 0's)''' | |
with np.load(filepath) as npz: | |
arr = np.ma.MaskedArray(**npz) | |
if masked == True: | |
if pad_mask : # pad masked pixels with 0's to preserve shape | |
mask = arr.mask | |
return np.where(mask==True,0,arr.data) | |
return arr | |
return arr.data | |
def load_and_reshape(filepath): | |
'''load and reshape array''' | |
#load array | |
arr = load_npz_file(filepath, | |
masked=False, | |
pad_mask=False) | |
depth,height,width = arr.shape | |
# reshape to depth last format | |
arr = arr.reshape((height,width,depth)) | |
#scale values | |
# arr = arr / scaling_values | |
#resize | |
# arr = cv2.resize(arr,CFG.img_size) | |
return arr | |
def get_array_properties(arr): | |
'''get reduced properties for array with shape (h,w,channels==150)''' | |
#area of array | |
area_arr = arr[:,:,0].size | |
#max min range | |
arr_max = arr.max(axis=(0,1)) | |
arr_range = arr_max - arr.min(axis=(0,1)) | |
#central tendencies | |
mean_arr = arr.mean(axis=(0,1)) | |
std_arr = arr.std(axis=(0,1)) | |
median_arr = np.median(arr,axis=(0,1)) | |
#first 25 % | |
q1 = np.percentile(a=arr,q=25,axis=(0,1)) | |
#last 25 % | |
q3 = np.percentile(a=arr,q=75,axis=(0,1)) | |
#iqr | |
iqr = q3 - q1 | |
#first 10 | |
d1 = np.percentile(a=arr,q=10,axis=(0,1)) | |
#last 10 | |
d10 = np.percentile(a=arr,q=90,axis=(0,1)) | |
return np.array((area_arr,*mean_arr,*std_arr,*median_arr,*q1,*q3,*arr_max,*arr_range,*d1,*d10,*iqr)) | |
def get_agg_properties(filepath): | |
arr = load_and_reshape(filepath) | |
# properties of each band(range of each band) | |
properties = get_array_properties(arr) | |
return properties | |
array_cols = ['array_area', | |
*[f'mean_{i}' for i in range(1,151)], | |
*[f'std_{i}' for i in range(1,151)], | |
*[f'med_{i}' for i in range(1,151)], | |
*[f'q1_{i}' for i in range(1,151)], | |
*[f'q3_{i}' for i in range(1,151)], | |
*[f'max_{i}' for i in range(1,151)], | |
*[f'range_{i}' for i in range(1,151)], | |
*[f'D1_{i}' for i in range(1,151)], | |
*[f'D10_{i}' for i in range(1,151)], | |
*[f'IQR_{i}' for i in range(1,151)]] | |
print(array_cols) | |
def pca_on_band(df, band_num, n_components=2): | |
""" | |
get pca features for a particular band | |
""" | |
pca_pipe = Pipeline(steps=[('standard_scaler', StandardScaler()), | |
('pca', PCA(n_components=min(n_components, df.shape[0])))]) | |
band_cols = [col for col in df.columns if str(band_num) in col] | |
# Si solo hay una muestra, no realizar PCA y en su lugar devolver la muestra después del escalado | |
if df.shape[0] == 1: | |
scaler = StandardScaler() | |
scaled_features = scaler.fit_transform(df[band_cols]) | |
return pd.DataFrame(scaled_features, | |
columns=[f'B{band_num}_PC{i+1}' for i in range(scaled_features.shape[1])]) | |
pca_pipe.fit(df[band_cols]) | |
features = pca_pipe.transform(df[band_cols]) | |
return pd.DataFrame(features, | |
columns=[f'B{band_num}_PC{i+1}' for i in range(n_components)]) | |
def get_pca_dataset(df): | |
all_df = [] | |
for band in range(1,151): | |
band_pca = pca_on_band(df,band) | |
all_df.append(band_pca) | |
return pd.concat(objs=all_df, axis=1, join='outer', ignore_index=False) | |
derived_cols = ['array_area',*[f'q1_{i}' for i in range(1,151)],*[f'q3_{i}' for i in range(1,151)]] | |
def predecir_desde_archivo_npz(ruta_archivo_npz, modelos, array_cols, derived_cols): | |
""" | |
Carga un archivo .npz, procesa los datos y utiliza los modelos para predecir los valores. | |
:param ruta_archivo_npz: String con la ruta al archivo .npz. | |
:param modelos: Lista de modelos entrenados para hacer las predicciones. | |
:param array_cols: Columnas esperadas después de obtener las propiedades agregadas. | |
:param derived_cols: Columnas derivadas que se usan junto con PCA para la entrada del modelo. | |
:return: Predicción para el archivo dado. | |
""" | |
# Cargar y procesar los datos del archivo .npz | |
propiedades_agregadas = get_agg_properties(ruta_archivo_npz) | |
datos_df = pd.DataFrame([propiedades_agregadas], columns=array_cols) | |
print(datos_df) | |
# Aplicar PCA a los datos procesados | |
pca_datos = get_pca_dataset(datos_df) | |
# Combinar con las columnas derivadas | |
datos_finales = pca_datos.merge(datos_df[derived_cols], left_index=True, right_index=True) | |
# Realizar predicciones con los modelos | |
predicciones = [modelo.predict(datos_finales) for modelo in modelos] | |
predicciones = np.array(predicciones).reshape(len(modelos), -1) | |
# Calcular la mediana de las predicciones | |
mediana_predicciones = np.median(predicciones, axis=0) | |
return mediana_predicciones | |
# Aquí asumimos que `array_cols` y `derived_cols` ya están definidos en tu entorno como se ve en tu código. | |
# También asumimos que los modelos ya están entrenados y contenidos en la lista `modelos`. | |
ruta_archivo_npz = "./1.npz" # Sustituir con la ruta real al archivo .npz | |
prediccion = predecir_desde_archivo_npz(ruta_archivo_npz, modelos_cargados, array_cols, derived_cols) | |
if len(prediccion) == 4: | |
fosforo_predicho, potasio_predicho, magnesio_predicho, pH_predicho = prediccion | |
print(f"Fósforo Predicho: {fosforo_predicho}") | |
print(f"Potasio Predicho: {potasio_predicho}") | |
print(f"Magnesio Predicho: {magnesio_predicho}") | |
print(f"pH Predicho: {pH_predicho}") | |
else: | |
print("La predicción no contiene el número esperado de componentes.") | |
import gradio as gr | |
# Asegúrate de que las funciones de predicción y carga de modelos estén definidas aquí o estén siendo importadas correctamente. | |
# Supongamos que la función 'predecir_desde_archivo_npz' está definida correctamente y funciona. | |
# También asumimos que 'modelos_cargados' es una lista de modelos CatBoost ya cargados. | |
def predecir_desde_archivo_npz_interface(archivo): | |
# Gradio pasa el archivo cargado como un objeto temporal, que puedes leer directamente | |
datos = archivo | |
# Asumimos que tus funciones de procesamiento esperan recibir un array numpy y devuelven las predicciones como un array | |
predicciones = predecir_desde_archivo_npz(datos, modelos_cargados, array_cols, derived_cols) | |
return { | |
'Fósforo (P)': float(predicciones[0]), | |
'Potasio (K)': float(predicciones[1]), | |
'Magnesio (Mg)': float(predicciones[2]), | |
'pH': float(predicciones[3]) | |
} | |
demo = gr.Interface( | |
fn=predecir_desde_archivo_npz_interface, | |
inputs=gr.File(label="Sube tu archivo NPZ",file_types = [".npz"] | |
), | |
outputs=gr.JSON(label="Predicciones", ) | |
) | |
demo.launch( | |
share=True | |
) |