Skip to content

Ensemble Methods

Overview

Ensemble methods combine multiple machine learning models to create more robust and accurate predictions. FinRL's DRLEnsembleAgent implements dynamic model selection, choosing the best-performing algorithm for each market condition.

DRLEnsembleAgent Architecture

The ensemble approach works by:

  1. Training Multiple Models: A2C, PPO, DDPG, SAC, and TD3 are trained simultaneously
  2. Validation Performance: Each model is evaluated on a validation period
  3. Dynamic Selection: The best-performing model (highest Sharpe ratio) is selected for trading
  4. Rolling Windows: Models are retrained periodically as new data becomes available
graph TD
    A[Market Data] --> B[Split: Train/Validation/Test]
    B --> C[Train A2C]
    B --> D[Train PPO]  
    B --> E[Train DDPG]
    B --> F[Train SAC]
    B --> G[Train TD3]

    C --> H[Validate A2C]
    D --> I[Validate PPO]
    E --> J[Validate DDPG] 
    F --> K[Validate SAC]
    G --> L[Validate TD3]

    H --> M[Calculate Sharpe Ratios]
    I --> M
    J --> M
    K --> M
    L --> M

    M --> N[Select Best Model]
    N --> O[Trade with Best Model]
    O --> P[Retrain After Rebalance Window]

Basic Usage

Simple Ensemble Strategy

from finrl.agents.stablebaselines3.models import DRLEnsembleAgent
from finrl.meta.preprocessor.preprocessors import data_split

# Initialize ensemble agent
ensemble_agent = DRLEnsembleAgent(
    df=processed_data,
    train_period=("2020-01-01", "2021-01-01"),
    val_test_period=("2021-01-01", "2022-01-01"), 
    rebalance_window=63,        # Retrain every 63 days (quarterly)
    validation_window=63,       # Use 63 days for validation
    **env_kwargs
)

# Define model configurations
A2C_model_kwargs = {"n_steps": 10, "ent_coef": 0.005, "learning_rate": 0.0007}
PPO_model_kwargs = {"ent_coef": 0.01, "n_steps": 2048, "learning_rate": 0.00025}
DDPG_model_kwargs = {"buffer_size": 50000, "learning_rate": 0.0005, "batch_size": 128}
SAC_model_kwargs = {"ent_coef": "auto", "learning_rate": 0.0001}
TD3_model_kwargs = {"policy_delay": 2, "batch_size": 100}

# Training timesteps for each algorithm
timesteps_dict = {
    "a2c": 50000,
    "ppo": 50000, 
    "ddpg": 50000,
    "sac": 50000,
    "td3": 50000
}

# Run ensemble strategy
df_summary = ensemble_agent.run_ensemble_strategy(
    A2C_model_kwargs=A2C_model_kwargs,
    PPO_model_kwargs=PPO_model_kwargs,
    DDPG_model_kwargs=DDPG_model_kwargs,
    SAC_model_kwargs=SAC_model_kwargs, 
    TD3_model_kwargs=TD3_model_kwargs,
    timesteps_dict=timesteps_dict
)

print("Ensemble Strategy Summary:")
print(df_summary)

Advanced Ensemble Configurations

1. Weighted Ensemble

Instead of selecting a single best model, combine predictions from multiple top performers:

class WeightedEnsembleAgent(DRLEnsembleAgent):
    """Ensemble agent that weights predictions by performance"""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.top_k = 3  # Use top 3 models

    def weighted_prediction(self, models, sharpe_ratios, state):
        """Make weighted predictions based on Sharpe ratios"""

        # Get top k models
        sorted_indices = np.argsort(sharpe_ratios)[-self.top_k:]
        top_models = [models[i] for i in sorted_indices]
        top_sharpes = [sharpe_ratios[i] for i in sorted_indices]

        # Calculate weights (higher Sharpe = higher weight)
        weights = np.array(top_sharpes)
        weights = np.exp(weights) / np.sum(np.exp(weights))  # Softmax

        # Get predictions from each model
        predictions = []
        for model in top_models:
            pred, _ = model.predict(state, deterministic=True)
            predictions.append(pred)

        # Weighted average
        final_prediction = np.average(predictions, axis=0, weights=weights)
        return final_prediction, weights

# Usage
weighted_agent = WeightedEnsembleAgent(
    df=processed_data,
    **ensemble_kwargs
)

2. Market Regime-Based Ensemble

Select models based on detected market regimes:

def detect_market_regime(prices, window=30):
    """Detect market regime based on volatility and trend"""

    # Calculate volatility
    returns = prices.pct_change()
    volatility = returns.rolling(window).std().iloc[-1]

    # Calculate trend
    sma_short = prices.rolling(10).mean().iloc[-1] 
    sma_long = prices.rolling(30).mean().iloc[-1]
    trend = (sma_short - sma_long) / sma_long

    # Classify regime
    if volatility > 0.03:  # High volatility
        if trend > 0.02:
            return "bull_volatile"
        elif trend < -0.02:
            return "bear_volatile" 
        else:
            return "sideways_volatile"
    else:  # Low volatility
        if trend > 0.01:
            return "bull_calm"
        elif trend < -0.01:
            return "bear_calm"
        else:
            return "sideways_calm"

class RegimeBasedEnsemble:
    """Select models based on market regime"""

    def __init__(self):
        # Best models for each regime (based on historical testing)
        self.regime_models = {
            "bull_volatile": "sac",      # SAC handles volatility well
            "bear_volatile": "td3",      # TD3 for bear markets
            "sideways_volatile": "ppo",  # PPO for ranging markets
            "bull_calm": "ddpg",         # DDPG for trending markets
            "bear_calm": "a2c",          # A2C for defensive
            "sideways_calm": "ppo"       # PPO for ranging markets
        }

    def select_model(self, price_data, trained_models):
        """Select model based on current market regime"""
        regime = detect_market_regime(price_data)
        selected_algorithm = self.regime_models[regime]

        print(f"Detected regime: {regime}, Selected model: {selected_algorithm}")
        return trained_models[selected_algorithm]

# Usage
regime_ensemble = RegimeBasedEnsemble()
current_regime = detect_market_regime(recent_prices)
selected_model = regime_ensemble.select_model(recent_prices, all_trained_models)

3. Risk-Adjusted Ensemble

Select models based on risk-adjusted returns rather than just Sharpe ratio:

def calculate_risk_metrics(returns):
    """Calculate comprehensive risk metrics"""

    # Basic metrics
    mean_return = returns.mean()
    volatility = returns.std()
    sharpe = mean_return / volatility if volatility > 0 else 0

    # Advanced metrics
    downside_returns = returns[returns < 0]
    downside_deviation = downside_returns.std() if len(downside_returns) > 0 else 0
    sortino = mean_return / downside_deviation if downside_deviation > 0 else 0

    # Maximum drawdown
    cumulative = (1 + returns).cumprod()
    peak = cumulative.expanding(min_periods=1).max()
    drawdown = (cumulative - peak) / peak
    max_drawdown = drawdown.min()

    # Calmar ratio
    calmar = mean_return / abs(max_drawdown) if max_drawdown != 0 else 0

    return {
        'sharpe': sharpe,
        'sortino': sortino, 
        'calmar': calmar,
        'max_drawdown': max_drawdown,
        'volatility': volatility
    }

def risk_adjusted_scoring(performance_metrics):
    """Create composite score from multiple risk metrics"""

    # Weights for different metrics
    weights = {
        'sharpe': 0.4,
        'sortino': 0.3,
        'calmar': 0.2,
        'max_drawdown': -0.1  # Negative because lower is better
    }

    # Normalize metrics (simple z-score)
    normalized = {}
    for metric, values in performance_metrics.items():
        if len(values) > 1:
            mean_val = np.mean(values)
            std_val = np.std(values)
            normalized[metric] = [(v - mean_val) / std_val if std_val > 0 else 0 for v in values]
        else:
            normalized[metric] = [0] * len(values)

    # Calculate composite scores
    composite_scores = []
    for i in range(len(list(performance_metrics.values())[0])):
        score = sum(weights.get(metric, 0) * normalized[metric][i] 
                   for metric in normalized.keys() if metric in weights)
        composite_scores.append(score)

    return composite_scores

class RiskAdjustedEnsemble(DRLEnsembleAgent):
    """Ensemble with comprehensive risk-adjusted model selection"""

    def get_validation_score(self, iteration, model_name):
        """Override to use risk-adjusted scoring instead of just Sharpe"""

        # Get validation results
        df_total_value = pd.read_csv(f"results/account_value_validation_{model_name}_{iteration}.csv")
        daily_returns = df_total_value["account_value"].pct_change().dropna()

        # Calculate risk metrics
        risk_metrics = calculate_risk_metrics(daily_returns)

        # Create composite score
        score = (risk_metrics['sharpe'] * 0.4 + 
                risk_metrics['sortino'] * 0.3 + 
                risk_metrics['calmar'] * 0.2 +
                (1 + risk_metrics['max_drawdown']) * 0.1)  # Max DD is negative

        return score

Performance Analysis and Monitoring

Ensemble Performance Tracking

def analyze_ensemble_performance(df_summary):
    """Analyze ensemble strategy performance"""

    # Model selection frequency
    model_counts = df_summary['Model Used'].value_counts()
    print("Model Selection Frequency:")
    for model, count in model_counts.items():
        percentage = (count / len(df_summary)) * 100
        print(f"  {model}: {count} times ({percentage:.1f}%)")

    # Performance by model
    performance_by_model = {}
    for model in model_counts.index:
        model_rows = df_summary[df_summary['Model Used'] == model]
        # Assuming we have return data for each period
        avg_sharpe = model_rows[f'{model} Sharpe'].mean()
        performance_by_model[model] = avg_sharpe

    print("\nAverage Sharpe Ratio by Selected Model:")
    for model, sharpe in performance_by_model.items():
        print(f"  {model}: {sharpe:.3f}")

    # Best performing periods
    best_periods = df_summary.loc[df_summary['A2C Sharpe'].idxmax():df_summary['TD3 Sharpe'].idxmax()]
    print(f"\nBest performing period: {best_periods['Val Start'].iloc[0]} to {best_periods['Val End'].iloc[0]}")

    return model_counts, performance_by_model

# Usage
model_frequency, model_performance = analyze_ensemble_performance(df_summary)

Real-time Ensemble Monitoring

class EnsembleMonitor:
    """Monitor ensemble performance in real-time"""

    def __init__(self):
        self.performance_history = []
        self.model_history = []
        self.alert_thresholds = {
            'sharpe_decline': -0.5,     # Alert if Sharpe drops by 0.5
            'drawdown_limit': -0.15,    # Alert if drawdown > 15%
            'model_instability': 3      # Alert if model changes 3+ times rapidly
        }

    def update_performance(self, current_return, selected_model):
        """Update performance tracking"""
        self.performance_history.append(current_return)
        self.model_history.append(selected_model)

        # Keep only recent history (e.g., last 30 periods)
        if len(self.performance_history) > 30:
            self.performance_history.pop(0)
            self.model_history.pop(0)

        self.check_alerts()

    def check_alerts(self):
        """Check for performance alerts"""
        if len(self.performance_history) < 10:
            return

        # Calculate recent Sharpe
        recent_returns = np.array(self.performance_history[-10:])
        recent_sharpe = np.mean(recent_returns) / np.std(recent_returns) if np.std(recent_returns) > 0 else 0

        # Calculate drawdown
        cumulative = np.cumprod(1 + np.array(self.performance_history))
        peak = np.maximum.accumulate(cumulative)
        drawdown = (cumulative - peak) / peak
        current_drawdown = drawdown[-1]

        # Check model stability
        recent_models = self.model_history[-5:]
        model_changes = len(set(recent_models))

        # Generate alerts
        if recent_sharpe < self.alert_thresholds['sharpe_decline']:
            print(f"⚠️ ALERT: Sharpe ratio declined to {recent_sharpe:.3f}")

        if current_drawdown < self.alert_thresholds['drawdown_limit']:
            print(f"🚨 ALERT: Drawdown reached {current_drawdown:.1%}")

        if model_changes >= self.alert_thresholds['model_instability']:
            print(f"⚡ ALERT: High model instability - {model_changes} changes in 5 periods")

# Usage
monitor = EnsembleMonitor()

# During trading loop
for period in trading_periods:
    current_return = calculate_period_return()
    selected_model = ensemble_agent.get_current_model()
    monitor.update_performance(current_return, selected_model)

Custom Ensemble Strategies

1. Sector-Based Ensemble

Different models for different asset classes:

class SectorEnsemble:
    """Ensemble with different models for different sectors"""

    def __init__(self):
        self.sector_models = {
            'tech': ['sac', 'td3'],      # High volatility sectors
            'finance': ['ppo', 'ddpg'],   # More stable sectors  
            'crypto': ['sac', 'ppo'],     # 24/7 markets
            'commodities': ['a2c', 'ddpg'] # Trend-following
        }

    def get_asset_sector(self, ticker):
        """Classify asset by sector"""
        tech_stocks = ['AAPL', 'GOOGL', 'MSFT', 'NVDA']
        finance_stocks = ['JPM', 'BAC', 'GS', 'WFC'] 
        crypto_assets = ['BTC-USD', 'ETH-USD', 'ADA-USD']

        if ticker in tech_stocks:
            return 'tech'
        elif ticker in finance_stocks:
            return 'finance'
        elif ticker in crypto_assets:
            return 'crypto'
        else:
            return 'commodities'  # Default

    def select_models_for_portfolio(self, tickers):
        """Select appropriate models for each asset"""
        model_selection = {}
        for ticker in tickers:
            sector = self.get_asset_sector(ticker)
            model_selection[ticker] = self.sector_models[sector]
        return model_selection

2. Time-Based Ensemble

Different models for different time periods:

class TimeBasedEnsemble:
    """Select models based on time patterns"""

    def __init__(self):
        self.time_models = {
            'market_open': 'sac',     # High volatility at open
            'mid_day': 'ppo',         # Calmer mid-day trading
            'market_close': 'td3',    # EOD positioning
            'overnight': 'ddpg'       # Overnight gaps
        }

    def get_time_period(self, timestamp):
        """Classify current time period"""
        hour = timestamp.hour

        if 9 <= hour <= 10:  # Market open
            return 'market_open'
        elif 11 <= hour <= 14:  # Mid-day
            return 'mid_day' 
        elif 15 <= hour <= 16:  # Market close
            return 'market_close'
        else:  # After hours
            return 'overnight'

    def select_model_by_time(self, current_time, trained_models):
        """Select model based on current time"""
        time_period = self.get_time_period(current_time)
        selected_algorithm = self.time_models[time_period]
        return trained_models[selected_algorithm]

Best Practices

1. Model Diversity

Ensure your ensemble includes diverse algorithms:

# Good diversity - different algorithm types
diverse_config = {
    'on_policy': ['a2c', 'ppo'],      # On-policy methods
    'off_policy': ['ddpg', 'sac', 'td3'],  # Off-policy methods
    'stochastic': ['ppo', 'sac'],     # Stochastic policies
    'deterministic': ['ddpg', 'td3']  # Deterministic policies
}

2. Validation Strategy

Use proper validation to avoid overfitting:

# Walk-forward validation
def walk_forward_validation(data, window_size=252, step_size=63):
    """Implement walk-forward validation"""

    validation_results = []

    for start_idx in range(window_size, len(data) - step_size, step_size):
        train_data = data[start_idx-window_size:start_idx]
        val_data = data[start_idx:start_idx+step_size]

        # Train models on train_data
        # Validate on val_data
        # Record results

        validation_results.append({
            'period': f"{train_data.index[0]} to {val_data.index[-1]}",
            'performance': validate_models(train_data, val_data)
        })

    return validation_results

3. Resource Management

Manage computational resources effectively:

# Parallel training for ensemble
from multiprocessing import Pool
import concurrent.futures

def train_model_parallel(model_config):
    """Train a single model (for parallel execution)"""
    model_name, params, env, timesteps = model_config

    agent = DRLAgent(env)
    model = agent.get_model(model_name, model_kwargs=params)
    trained_model = agent.train_model(model, f"{model_name}_parallel", timesteps)

    return model_name, trained_model

def train_ensemble_parallel(model_configs):
    """Train all models in parallel"""

    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(train_model_parallel, config) for config in model_configs]

        trained_models = {}
        for future in concurrent.futures.as_completed(futures):
            model_name, model = future.result()
            trained_models[model_name] = model

    return trained_models

4. Performance Comparison

Compare ensemble vs individual models:

def compare_strategies(individual_results, ensemble_results):
    """Compare individual models vs ensemble"""

    print("Performance Comparison:")
    print("-" * 50)

    # Individual model performance
    for model, results in individual_results.items():
        sharpe = calculate_sharpe(results['returns'])
        max_dd = calculate_max_drawdown(results['portfolio_values'])
        print(f"{model:8s}: Sharpe={sharpe:.3f}, MaxDD={max_dd:.1%}")

    # Ensemble performance
    ensemble_sharpe = calculate_sharpe(ensemble_results['returns'])
    ensemble_max_dd = calculate_max_drawdown(ensemble_results['portfolio_values'])
    print(f"{'Ensemble':8s}: Sharpe={ensemble_sharpe:.3f}, MaxDD={ensemble_max_dd:.1%}")

    # Statistical significance test
    from scipy import stats
    ensemble_returns = ensemble_results['returns']

    for model, results in individual_results.items():
        t_stat, p_value = stats.ttest_ind(ensemble_returns, results['returns'])
        significance = "***" if p_value < 0.01 else "**" if p_value < 0.05 else "*" if p_value < 0.1 else ""
        print(f"Ensemble vs {model}: p-value = {p_value:.4f} {significance}")

Ensemble methods in FinRL provide a robust approach to trading that can adapt to changing market conditions and reduce the risk of model-specific failures. By combining multiple algorithms and using intelligent selection criteria, you can build more reliable and profitable trading systems.