ISA444Bonus / app.py
libbeyfox's picture
Update app.py
aed30f9 verified
import pandas as pd
import gradio as gr
import tempfile
import os
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt
from statsforecast import StatsForecast
from statsforecast.models import (
HistoricAverage,
Naive,
SeasonalNaive,
WindowAverage,
SeasonalWindowAverage,
AutoETS,
AutoARIMA,
AutoCES,
AutoTheta,
DynamicOptimizedTheta,
MSTL
)
from utilsforecast.evaluation import evaluate
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
# Import for MLForecast
from mlforecast import MLForecast
from lightgbm import LGBMRegressor
# Foundation Models
try:
from chronos import ChronosPipeline
import torch
CHRONOS_AVAILABLE = True
except:
CHRONOS_AVAILABLE = False
try:
from uni2ts.model.moirai import MoiraiForecast
MOIRAI_AVAILABLE = True
except:
MOIRAI_AVAILABLE = False
# Helper function to calculate date offset based on frequency and horizon
def calculate_date_offset(freq, horizon):
"""Calculate a timedelta based on frequency code and horizon"""
if freq == 'H':
return pd.Timedelta(hours=horizon)
elif freq == 'D':
return pd.Timedelta(days=horizon)
elif freq == 'B':
return pd.Timedelta(days=int(horizon * 1.4))
elif freq == 'WS':
return pd.Timedelta(weeks=horizon)
elif freq == 'MS':
return pd.Timedelta(days=horizon * 30)
elif freq == 'QS':
return pd.Timedelta(days=horizon * 90)
elif freq == 'YS':
return pd.Timedelta(days=horizon * 365)
else:
return pd.Timedelta(days=horizon)
# Function to generate and return a plot for validation results
def create_forecast_plot(forecast_df, original_df, title="Forecasting Results", horizon=None, freq='D'):
plt.figure(figsize=(12, 7))
unique_ids = forecast_df['unique_id'].unique()
forecast_cols = [col for col in forecast_df.columns if col not in ['unique_id', 'ds', 'cutoff', 'y']]
colors = plt.cm.tab10.colors
min_cutoff = None
for i, unique_id in enumerate(unique_ids):
original_data = original_df[original_df['unique_id'] == unique_id]
plt.plot(original_data['ds'], original_data['y'], 'k-', linewidth=2, label=f'{unique_id} (Actual)')
forecast_data = forecast_df[forecast_df['unique_id'] == unique_id]
if 'cutoff' in forecast_data.columns:
cutoffs = pd.to_datetime(forecast_data['cutoff'].unique())
if len(cutoffs) > 0:
earliest_cutoff = cutoffs.min()
if min_cutoff is None or earliest_cutoff < min_cutoff:
min_cutoff = earliest_cutoff
for cutoff in cutoffs:
plt.axvline(x=cutoff, color='gray', linestyle='--', alpha=0.4)
for j, col in enumerate(forecast_cols):
if col in forecast_data.columns:
model_name = col.replace('_', ' ').title()
plt.plot(forecast_data['ds'], forecast_data[col],
color=colors[j % len(colors)],
linestyle='--',
linewidth=1.5,
label=f'{model_name}')
plt.title(title, fontsize=16)
plt.xlabel('Date', fontsize=12)
plt.ylabel('Value', fontsize=12)
plt.grid(True, alpha=0.3)
plt.legend(loc='upper center', bbox_to_anchor=(0.5, -0.15), ncol=3, fontsize=10)
plt.tight_layout(rect=[0, 0.05, 1, 0.95])
if min_cutoff is not None and horizon is not None:
date_offset = calculate_date_offset(freq, horizon)
start_date = min_cutoff - date_offset
max_date = forecast_df['ds'].max()
plt.xlim(start_date, max_date)
plt.annotate('Training | Test',
xy=(min_cutoff, plt.ylim()[0]),
xytext=(0, -40),
textcoords='offset points',
horizontalalignment='center',
fontsize=10)
fig = plt.gcf()
ax = plt.gca()
fig.autofmt_xdate()
return fig
# Function to load and process uploaded CSV
def load_data(file):
if file is None:
return None, "Please upload a CSV file"
try:
df = pd.read_csv(file)
required_cols = ['unique_id', 'ds', 'y']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
return None, f"Missing required columns: {', '.join(missing_cols)}"
df['ds'] = pd.to_datetime(df['ds'])
df = df.sort_values(['unique_id', 'ds']).reset_index(drop=True)
# Check for NaN values
if df['y'].isna().any():
return None, "Data contains missing values in the 'y' column"
return df, "Data loaded successfully!"
except Exception as e:
return None, f"Error loading data: {str(e)}"
# Main forecasting function
def run_forecast(
file, frequency, eval_strategy, horizon, step_size, num_windows,
use_historical_avg, use_naive, use_seasonal_naive, seasonality,
use_window_avg, window_size, use_seasonal_window_avg, seasonal_window_size,
use_autoets, use_autoarima, use_autoces, use_autotheta,
use_lgbm, use_chronos, use_moirai,
future_horizon
):
"""
Main function to run forecasting with all selected models.
Now includes proper handling of models that don't support predictors.
"""
try:
# Load data
df, message = load_data(file)
if df is None:
return None, None, None, None, None, [], message
# Prepare data - only required columns for models without predictors
df_basic = df[['unique_id', 'ds', 'y']].copy()
# Initialize models list
models = []
models_need_predictors = []
# Basic models (no predictors needed)
if use_historical_avg:
models.append(HistoricAverage())
if use_naive:
models.append(Naive())
if use_seasonal_naive:
models.append(SeasonalNaive(season_length=int(seasonality)))
if use_window_avg:
models.append(WindowAverage(window_size=int(window_size)))
if use_seasonal_window_avg:
models.append(SeasonalWindowAverage(season_length=int(seasonality), window_size=int(seasonal_window_size)))
if use_autoets:
models.append(AutoETS(season_length=int(seasonality)))
if use_autoces:
models.append(AutoCES(season_length=int(seasonality)))
if use_autotheta:
models.append(AutoTheta(season_length=int(seasonality)))
# Models that can use predictors
if use_autoarima:
models_need_predictors.append(AutoARIMA(season_length=int(seasonality)))
# Run cross-validation or fixed window
if eval_strategy == "Cross Validation":
h = horizon
validation_results = []
# Run models without predictors
if models:
sf = StatsForecast(models=models, freq=frequency, n_jobs=-1)
cv_df = sf.cross_validation(
df=df_basic,
h=int(h),
step_size=int(step_size),
n_windows=int(num_windows)
)
validation_results.append(cv_df)
# Run models with predictors (if needed, add predictor handling here)
# For now, we'll run them without predictors
if models_need_predictors:
sf_pred = StatsForecast(models=models_need_predictors, freq=frequency, n_jobs=-1)
cv_df_pred = sf_pred.cross_validation(
df=df_basic,
h=int(h),
step_size=int(step_size),
n_windows=int(num_windows)
)
validation_results.append(cv_df_pred)
# Combine results
if validation_results:
validation_df = pd.concat(validation_results, axis=1)
validation_df = validation_df.loc[:,~validation_df.columns.duplicated()]
else:
return None, None, None, None, None, [], "No models selected"
else: # Fixed Window
# Split data
train_df = []
for uid in df_basic['unique_id'].unique():
uid_data = df_basic[df_basic['unique_id'] == uid].iloc[:-int(horizon)]
train_df.append(uid_data)
train_df = pd.concat(train_df)
# Fit and predict
all_models = models + models_need_predictors
if all_models:
sf = StatsForecast(models=all_models, freq=frequency, n_jobs=-1)
sf.fit(train_df)
validation_df = sf.predict(h=int(horizon), level=[90, 95])
else:
return None, None, None, None, None, [], "No models selected"
# Add ML model forecasts if selected
if use_lgbm:
mlf = MLForecast(
models={'LightGBM': LGBMRegressor(verbose=-1)},
freq=frequency,
lags=[1, 7, 14],
num_threads=1
)
if eval_strategy == "Cross Validation":
ml_cv = mlf.cross_validation(
df=df_basic,
h=int(horizon),
step_size=int(step_size),
n_windows=int(num_windows)
)
validation_df = validation_df.merge(ml_cv, on=['unique_id', 'ds', 'cutoff'], how='outer')
else:
mlf.fit(train_df)
ml_pred = mlf.predict(h=int(horizon))
validation_df = validation_df.merge(ml_pred, on=['unique_id', 'ds'], how='outer')
# Add foundation model forecasts
if use_chronos and CHRONOS_AVAILABLE:
try:
pipeline = ChronosPipeline.from_pretrained(
"amazon/chronos-t5-tiny",
device_map="auto",
torch_dtype=torch.bfloat16,
)
chronos_forecasts = []
for uid in df_basic['unique_id'].unique():
uid_data = train_df[train_df['unique_id'] == uid]['y'].values
context = torch.tensor(uid_data)
forecast = pipeline.predict(context, prediction_length=int(horizon))
forecast_median = np.median(forecast[0].numpy(), axis=0)
uid_forecast = pd.DataFrame({
'unique_id': uid,
'ds': pd.date_range(
start=train_df[train_df['unique_id'] == uid]['ds'].max() + pd.Timedelta(days=1),
periods=int(horizon),
freq=frequency
),
'Chronos': forecast_median
})
chronos_forecasts.append(uid_forecast)
chronos_df = pd.concat(chronos_forecasts)
validation_df = validation_df.merge(chronos_df, on=['unique_id', 'ds'], how='outer')
except Exception as e:
print(f"Chronos error: {e}")
# Evaluate models
eval_cols = [col for col in validation_df.columns if col not in ['unique_id', 'ds', 'cutoff', 'y']]
if 'y' not in validation_df.columns:
# Merge with actual values
validation_df = validation_df.merge(
df_basic[['unique_id', 'ds', 'y']],
on=['unique_id', 'ds'],
how='left'
)
# Calculate metrics
metrics_list = []
for col in eval_cols:
if col in validation_df.columns and not validation_df[col].isna().all():
y_true = validation_df['y'].values
y_pred = validation_df[col].values
mask = ~(np.isnan(y_true) | np.isnan(y_pred))
if mask.sum() > 0:
y_true_clean = y_true[mask]
y_pred_clean = y_pred[mask]
# Calculate RMSE manually
rmse_value = np.sqrt(mean_squared_error(y_true_clean, y_pred_clean))
metrics_list.append({
'Model': col,
'MAE': mean_absolute_error(y_true_clean, y_pred_clean),
'RMSE': rmse_value,
'MAPE': mean_absolute_percentage_error(y_true_clean, y_pred_clean) * 100
})
eval_metrics = pd.DataFrame(metrics_list)
# Create validation plot
validation_plot = create_forecast_plot(
validation_df.reset_index() if 'index' not in validation_df.columns else validation_df,
df_basic,
"Validation Results",
horizon,
frequency
)
# Future forecast
future_models = models + models_need_predictors
if future_models:
sf_future = StatsForecast(models=future_models, freq=frequency, n_jobs=-1)
sf_future.fit(df_basic)
future_df = sf_future.predict(h=int(future_horizon), level=[90, 95])
else:
future_df = pd.DataFrame()
# Create future forecast plot
future_plot = create_forecast_plot(
future_df.reset_index() if not future_df.empty else pd.DataFrame(),
df_basic,
"Future Forecast",
future_horizon,
frequency
)
# Export files
export_files = []
# Save to temp files
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f:
eval_metrics.to_csv(f, index=False)
export_files.append(f.name)
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f:
validation_df.to_csv(f, index=False)
export_files.append(f.name)
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f:
future_df.to_csv(f, index=False)
export_files.append(f.name)
return (
eval_metrics,
validation_df,
validation_plot,
future_df,
future_plot,
export_files,
" Forecasting completed successfully!"
)
except Exception as e:
import traceback
error_msg = f"Error: {str(e)}\n\n{traceback.format_exc()}"
return None, None, None, None, None, [], error_msg
# Gradio Interface
with gr.Blocks(title="Duke Energy Forecasting App") as app:
gr.Markdown("""
# Time Series Forecasting
Upload your time series data and select models to generate forecasts.
Supports StatsForecast, MLForecast, and Foundation Models (Chronos, Moirai).
""")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="Upload CSV File", file_types=['.csv'])
with gr.Accordion("Forecast Configuration", open=True):
frequency = gr.Dropdown(
choices=[
("Hourly", "H"),
("Daily", "D"),
("Business Day", "B"),
("Weekly", "WS"),
("Monthly", "MS"),
("Quarterly", "QS"),
("Yearly", "YS")
],
label="Data Frequency",
value="D"
)
eval_strategy = gr.Radio(
choices=["Fixed Window", "Cross Validation"],
label="Evaluation Strategy",
value="Cross Validation"
)
with gr.Group(visible=True) as fixed_window_box:
gr.Markdown("### Fixed Window Settings")
horizon = gr.Slider(1, 100, value=10, step=1, label="Validation Horizon")
with gr.Group(visible=True) as cv_box:
gr.Markdown("### Cross Validation Settings")
with gr.Row():
step_size = gr.Slider(1, 50, value=10, step=1, label="Step Size")
num_windows = gr.Slider(1, 20, value=5, step=1, label="Number of Windows")
with gr.Group():
gr.Markdown("### Future Forecast Settings")
future_horizon = gr.Slider(1, 100, value=10, step=1, label="Future Forecast Horizon")
with gr.Accordion("Model Configuration", open=True):
with gr.Tabs():
with gr.TabItem("Statistical Models"):
gr.Markdown("## Basic Models")
with gr.Row():
use_historical_avg = gr.Checkbox(label="Historical Average", value=True)
use_naive = gr.Checkbox(label="Naive", value=True)
with gr.Group():
gr.Markdown("### Seasonality Configuration")
seasonality = gr.Number(label="Seasonality Period", value=7)
gr.Markdown("### Seasonal Models")
use_seasonal_naive = gr.Checkbox(label="Seasonal Naive", value=True)
gr.Markdown("### Window-based Models")
with gr.Row():
use_window_avg = gr.Checkbox(label="Window Average", value=False)
window_size = gr.Number(label="Window Size", value=10)
with gr.Row():
use_seasonal_window_avg = gr.Checkbox(label="Seasonal Window Average", value=False)
seasonal_window_size = gr.Number(label="Seasonal Window Size", value=2)
gr.Markdown("### Advanced Models")
with gr.Row():
use_autoets = gr.Checkbox(label="AutoETS", value=False)
use_autoarima = gr.Checkbox(label="AutoARIMA", value=False)
with gr.Row():
use_autoces = gr.Checkbox(label="AutoCES", value=False)
use_autotheta = gr.Checkbox(label="AutoTheta", value=False)
with gr.TabItem("Machine Learning"):
gr.Markdown("## Gradient Boosting Models")
use_lgbm = gr.Checkbox(label="LightGBM", value=True)
with gr.TabItem("Foundation Models"):
gr.Markdown("## State-of-the-Art Foundation Models")
with gr.Row():
use_chronos = gr.Checkbox(
label="Chronos (Amazon)",
value=CHRONOS_AVAILABLE,
interactive=CHRONOS_AVAILABLE
)
use_moirai = gr.Checkbox(
label="Moirai (Salesforce)",
value=False,
interactive=MOIRAI_AVAILABLE
)
if not CHRONOS_AVAILABLE:
gr.Markdown(" Chronos not available. Install: `pip install chronos-forecasting`")
if not MOIRAI_AVAILABLE:
gr.Markdown(" Moirai not available. Install: `pip install uni2ts`")
with gr.Column(scale=3):
message_output = gr.Textbox(label="Status Message")
with gr.Tabs():
with gr.TabItem("Validation Results"):
eval_output = gr.Dataframe(label="Evaluation Metrics")
validation_plot = gr.Plot(label="Validation Plot")
validation_output = gr.Dataframe(label="Validation Data", visible=False)
with gr.Row():
show_data_btn = gr.Button("Show Validation Data")
hide_data_btn = gr.Button("Hide Validation Data", visible=False)
with gr.TabItem("Future Forecast"):
forecast_plot = gr.Plot(label="Future Forecast Plot")
forecast_output = gr.Dataframe(label="Future Forecast Data", visible=False)
with gr.Row():
show_forecast_btn = gr.Button("Show Forecast Data")
hide_forecast_btn = gr.Button("Hide Forecast Data", visible=False)
with gr.TabItem("Export Results"):
export_files = gr.Files(label="Download Results")
with gr.Row():
submit_btn = gr.Button("Run Validation and Forecast", variant="primary", size="lg")
# Event handlers
def update_eval_boxes(strategy):
return (
gr.update(visible=strategy == "Fixed Window"),
gr.update(visible=strategy == "Cross Validation")
)
eval_strategy.change(
fn=update_eval_boxes,
inputs=[eval_strategy],
outputs=[fixed_window_box, cv_box]
)
def show_data():
return gr.update(visible=True), gr.update(visible=True), gr.update(visible=False)
def hide_data():
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True)
show_data_btn.click(fn=show_data, outputs=[validation_output, hide_data_btn, show_data_btn])
hide_data_btn.click(fn=hide_data, outputs=[validation_output, hide_data_btn, show_data_btn])
show_forecast_btn.click(fn=show_data, outputs=[forecast_output, hide_forecast_btn, show_forecast_btn])
hide_forecast_btn.click(fn=hide_data, outputs=[forecast_output, hide_forecast_btn, show_forecast_btn])
submit_btn.click(
fn=run_forecast,
inputs=[
file_input, frequency, eval_strategy, horizon, step_size, num_windows,
use_historical_avg, use_naive, use_seasonal_naive, seasonality,
use_window_avg, window_size, use_seasonal_window_avg, seasonal_window_size,
use_autoets, use_autoarima, use_autoces, use_autotheta,
use_lgbm, use_chronos, use_moirai,
future_horizon
],
outputs=[
eval_output,
validation_output,
validation_plot,
forecast_output,
forecast_plot,
export_files,
message_output
]
)
if __name__ == "__main__":
app.launch(share=True)