Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions assume/common/forecast_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,130 @@ def set_preloaded_forecast_by_name(
return preprocess_information[new_forecast_name]


def calculate_price_from_cleared_history(
current_forecast, preprocess_information, *args, **kwargs
):
"""Learn electricity price from actual market clearing history.

Extracts last 24 hours of cleared prices from unit outputs, calculates average,
trend, and hourly seasonality factors, then forecasts next 48 hours.

Args:
current_forecast (dict): Map of market_id to forecasted price series.
preprocess_information (dict): Contains 'unit' and timing information.
*args, **kwargs: Additional arguments (unit, current_time, etc.).

Returns:
dict: Updated forecast with adaptive prices learned from clearing history.
"""
unit = kwargs.get("unit")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this unit set when running this in the simulation (not in the test)


if unit is None or not hasattr(unit.outputs, "get"):
# Fallback: return current forecast if unit data not available
return current_forecast

try:
# Extract cleared prices from last 24 hours
cleared_prices = unit.outputs.get("energy_accepted_price", {})
if isinstance(cleared_prices, dict):
cleared_prices = pd.Series(cleared_prices)
elif not isinstance(cleared_prices, pd.Series):
cleared_prices = pd.Series(cleared_prices)

# Filter non-zero prices (actual cleared prices)
actual_cleared_prices = cleared_prices[cleared_prices > 0.0]

if len(actual_cleared_prices) < 2:
# Not enough data yet, return current forecast
return current_forecast

# Calculate statistics from recent prices
recent_prices = actual_cleared_prices.tail(24) # Last 24 hours
avg_price = recent_prices.mean()

# Calculate trend: compare recent half vs older half
mid_point = len(recent_prices) // 2
if mid_point > 0:
recent_half = recent_prices.iloc[mid_point:].mean()
older_half = recent_prices.iloc[:mid_point].mean()
trend = (recent_half - older_half) / max(
older_half, 1.0
) # Avoid division by zero
else:
trend = 0.0

# Hourly seasonality factors (typical EOM market pattern)
hourly_factors = {
0: 0.70, # Night: low price
1: 0.70,
2: 0.70,
3: 0.70,
4: 0.70,
5: 0.75,
6: 0.90,
7: 0.95,
8: 1.10, # Morning ramp
9: 1.20, # Peak demand starts
10: 1.25, # Peak
11: 1.25,
12: 1.20,
13: 1.15,
14: 1.10,
15: 1.15,
16: 1.25, # Afternoon peak
17: 1.30,
18: 1.25,
19: 1.15,
20: 1.05,
21: 0.95,
22: 0.80,
23: 0.70,
}

# Generate 48-hour forecast
forecast_hours = 48
forecasted_prices = []
current_hour_index = 0

for i in range(forecast_hours):
hour_of_day = (current_hour_index + i) % 24

# Decay trend over forecast horizon (gradual convergence to average)
decay_factor = 1.0 - (i / forecast_hours) * 0.5
hourly_component = avg_price * hourly_factors[hour_of_day]
trend_component = trend * avg_price * decay_factor

forecasted_price = hourly_component + trend_component
forecasted_price = max(5.0, forecasted_price) # Floor at €5/MWh

forecasted_prices.append(forecasted_price)

# Update forecast for EOM market (main electricity market)
if "EOM" in current_forecast:
if isinstance(current_forecast["EOM"], FastSeries):
# Update values in the FastSeries
updated_forecast = current_forecast["EOM"].copy()
updated_forecast[:forecast_hours] = forecasted_prices
current_forecast["EOM"] = updated_forecast
else:
# Handle regular pandas Series
updated_forecast = current_forecast["EOM"].copy()
updated_forecast.iloc[:forecast_hours] = forecasted_prices
current_forecast["EOM"] = updated_forecast

return current_forecast

except Exception as e:
# If anything goes wrong, return current forecast
log.warning(
f"Error in calculate_price_from_cleared_history: {e}. Using fallback."
)
return current_forecast


forecast_update_algorithms = {
"price_default": default_update,
"adaptive": calculate_price_from_cleared_history,
"residual_load_default": default_update,
"residual_load_set_preloaded": set_preloaded_forecast_by_name,
"congestion_signal_default": default_update,
Expand Down
95 changes: 91 additions & 4 deletions assume/common/forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,11 @@ def update(self, *args, **kwargs):
*args,
**kwargs,
)
self.congestion_signal = self._dict_to_series(self.congestion_signal)
self.congestion_signal = (
self._dict_to_series(self.congestion_signal)
if isinstance(self.congestion_signal, dict)
else self._to_series(self.congestion_signal)
)

renewable_utilisation_update_algorithm_name = self.forecast_algorithms.get(
"update_renewable_utilisation", "renewable_utilisation_default"
Expand All @@ -677,19 +681,28 @@ def update(self, *args, **kwargs):
*args,
**kwargs,
)
self.renewable_utilisation_signal = self._dict_to_series(
self.renewable_utilisation_signal
self.renewable_utilisation_signal = (
self._dict_to_series(self.renewable_utilisation_signal)
if isinstance(self.renewable_utilisation_signal, dict)
else self._to_series(self.renewable_utilisation_signal)
)


class SteelplantForecaster(DsmUnitForecaster):
"""Forecaster for steelplant units.

Provides all DSM forecasts (see :class:`DsmUnitForecaster`) plus fuel prices.
Provides all DSM forecasts (see :class:`DsmUnitForecaster`) plus fuel prices and steel demand.
After initialization, DSM signals are copied to the unit and ``setup_model()`` is called.

Supports three operational strategies:
1. **Profile-guided**: If ``normalized_load_profile`` is provided, production follows the profile shape.
2. **Min-demand**: If hourly minimum demand (``steel_demand``) is provided, meets per-hour minimums.
3. **Cost-optimized**: If neither is provided, minimizes cost without shape constraints.

Attributes:
fuel_prices (dict[str, ForecastSeries]): Map of fuel type to forecasted fuel prices.
steel_demand (ForecastSeries): Per-timestep steel production demand (optional).
normalized_load_profile (ForecastSeries): Normalized profile to guide production shape (optional).
"""

def __init__(
Expand All @@ -704,6 +717,8 @@ def __init__(
congestion_signal: ForecastSeries = 0.0,
renewable_utilisation_signal: ForecastSeries = 0.0,
electricity_price: ForecastSeries = None,
steel_demand: ForecastSeries = None,
normalized_load_profile: ForecastSeries = None,
):
super().__init__(
index=index,
Expand All @@ -717,6 +732,14 @@ def __init__(
electricity_price=electricity_price,
)
self.fuel_prices = self._dict_to_series(fuel_prices)
self.steel_demand = (
self._to_series(steel_demand) if steel_demand is not None else None
)
self.normalized_load_profile = (
self._to_series(normalized_load_profile)
if normalized_load_profile is not None
else None
)

def get_price(self, fuel: str) -> FastSeries:
if fuel not in self.fuel_prices:
Expand All @@ -737,14 +760,78 @@ def initialize(
initializing_unit,
)

# Always set standard DSM signals
initializing_unit.electricity_price = self.electricity_price
initializing_unit.congestion_signal = self.congestion_signal
initializing_unit.renewable_utilisation_signal = (
self.renewable_utilisation_signal
)

# Get the unit's ID for dynamic attribute naming
unit_id = str(getattr(initializing_unit, "id", None))

# Set ID-prefixed attributes for operational strategy selection
# Strategy 1: Normalized load profile (if provided)
if self.normalized_load_profile is not None and unit_id:
profile_attr = f"{unit_id}_normalized_load_profile"
setattr(initializing_unit, profile_attr, self.normalized_load_profile)

# Strategy 2: Hourly minimum steel demand (if provided)
if self.steel_demand is not None and unit_id:
demand_attr = f"{unit_id}_steel_demand"
setattr(initializing_unit, demand_attr, self.steel_demand)

# Backward compatibility: also set non-prefixed attributes
if self.steel_demand is not None:
initializing_unit.steel_demand_per_timestep = self.steel_demand

if self.normalized_load_profile is not None:
initializing_unit.normalized_load_profile = self.normalized_load_profile

initializing_unit.setup_model()

def update(self, *args, **kwargs):
"""Update DSM-specific forecasts including adaptive electricity price learning.

Calls parent update for DSM signals (congestion, renewable utilisation),
then updates electricity price using the configured algorithm. If using
the adaptive price learning algorithm, clears prices are extracted from
the unit's outputs and used to forecast next period.

Args:
*args: Passed through to the underlying update algorithms.
**kwargs: Passed through to the underlying update algorithms, must include 'unit'.
"""
# Call parent DsmUnitForecaster.update() for DSM signals
super().update(*args, **kwargs)

# Update electricity price via configured algorithm
price_update_algorithm_name = self.forecast_algorithms.get(
"update_price", "price_default"
)
price_update_algorithm = self._registries["update"].get(
price_update_algorithm_name
)

if price_update_algorithm is not None:
# Call the price update algorithm (may be price_default or adaptive)
self.price = price_update_algorithm(
self.price,
self.preprocess_information.get("price", {}),
*args,
**kwargs,
)
self.price = self._dict_to_series(self.price)

# Sync electricity_price with updated price from EOM market
if "EOM" in self.price:
self.electricity_price = self.price["EOM"]

# Push updated price to the unit so it uses the latest forecast in optimization
unit = kwargs.get("unit")
if unit is not None:
unit.electricity_price = self.electricity_price


class SteamgenerationForecaster(DsmUnitForecaster):
"""Forecaster for steam generation units.
Expand Down
Loading
Loading