Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 91 additions & 4 deletions assume/common/forecaster.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,11 @@ def update(self, *args, **kwargs):
*args,
**kwargs,
)
self.congestion_signal = self._dict_to_series(self.congestion_signal)
self.congestion_signal = (
self._dict_to_series(self.congestion_signal)
if isinstance(self.congestion_signal, dict)
else self._to_series(self.congestion_signal)
)

renewable_utilisation_update_algorithm_name = self.forecast_algorithms.get(
"update_renewable_utilisation", "renewable_utilisation_default"
Expand All @@ -677,19 +681,28 @@ def update(self, *args, **kwargs):
*args,
**kwargs,
)
self.renewable_utilisation_signal = self._dict_to_series(
self.renewable_utilisation_signal
self.renewable_utilisation_signal = (
self._dict_to_series(self.renewable_utilisation_signal)
if isinstance(self.renewable_utilisation_signal, dict)
else self._to_series(self.renewable_utilisation_signal)
)


class SteelplantForecaster(DsmUnitForecaster):
"""Forecaster for steelplant units.

Provides all DSM forecasts (see :class:`DsmUnitForecaster`) plus fuel prices.
Provides all DSM forecasts (see :class:`DsmUnitForecaster`) plus fuel prices and steel demand.
After initialization, DSM signals are copied to the unit and ``setup_model()`` is called.

Supports three operational strategies:
1. **Profile-guided**: If ``normalized_load_profile`` is provided, production follows the profile shape.
2. **Min-demand**: If hourly minimum demand (``steel_demand``) is provided, meets per-hour minimums.
3. **Cost-optimized**: If neither is provided, minimizes cost without shape constraints.

Attributes:
fuel_prices (dict[str, ForecastSeries]): Map of fuel type to forecasted fuel prices.
steel_demand (ForecastSeries): Per-timestep steel production demand (optional).
normalized_load_profile (ForecastSeries): Normalized profile to guide production shape (optional).
"""

def __init__(
Expand All @@ -704,6 +717,8 @@ def __init__(
congestion_signal: ForecastSeries = 0.0,
renewable_utilisation_signal: ForecastSeries = 0.0,
electricity_price: ForecastSeries = None,
steel_demand: ForecastSeries = None,
normalized_load_profile: ForecastSeries = None,
):
super().__init__(
index=index,
Expand All @@ -717,6 +732,14 @@ def __init__(
electricity_price=electricity_price,
)
self.fuel_prices = self._dict_to_series(fuel_prices)
self.steel_demand = (
self._to_series(steel_demand) if steel_demand is not None else None
)
self.normalized_load_profile = (
self._to_series(normalized_load_profile)
if normalized_load_profile is not None
else None
)

def get_price(self, fuel: str) -> FastSeries:
if fuel not in self.fuel_prices:
Expand All @@ -737,14 +760,78 @@ def initialize(
initializing_unit,
)

# Always set standard DSM signals
initializing_unit.electricity_price = self.electricity_price
initializing_unit.congestion_signal = self.congestion_signal
initializing_unit.renewable_utilisation_signal = (
self.renewable_utilisation_signal
)

# Get the unit's ID for dynamic attribute naming
unit_id = str(getattr(initializing_unit, "id", None))

# Set ID-prefixed attributes for operational strategy selection
# Strategy 1: Normalized load profile (if provided)
if self.normalized_load_profile is not None and unit_id:
profile_attr = f"{unit_id}_normalized_load_profile"
setattr(initializing_unit, profile_attr, self.normalized_load_profile)

# Strategy 2: Hourly minimum steel demand (if provided)
if self.steel_demand is not None and unit_id:
demand_attr = f"{unit_id}_steel_demand"
setattr(initializing_unit, demand_attr, self.steel_demand)

# Backward compatibility: also set non-prefixed attributes
if self.steel_demand is not None:
initializing_unit.steel_demand_per_timestep = self.steel_demand

if self.normalized_load_profile is not None:
initializing_unit.normalized_load_profile = self.normalized_load_profile

initializing_unit.setup_model()

def update(self, *args, **kwargs):
"""Update DSM-specific forecasts including adaptive electricity price learning.

Calls parent update for DSM signals (congestion, renewable utilisation),
then updates electricity price using the configured algorithm. If using
the adaptive price learning algorithm, clears prices are extracted from
the unit's outputs and used to forecast next period.

Args:
*args: Passed through to the underlying update algorithms.
**kwargs: Passed through to the underlying update algorithms, must include 'unit'.
"""
# Call parent DsmUnitForecaster.update() for DSM signals
super().update(*args, **kwargs)

# Update electricity price via configured algorithm
price_update_algorithm_name = self.forecast_algorithms.get(
"update_price", "price_default"
)
price_update_algorithm = self._registries["update"].get(
price_update_algorithm_name
)

if price_update_algorithm is not None:
# Call the price update algorithm (may be price_default or adaptive)
self.price = price_update_algorithm(
self.price,
self.preprocess_information.get("price", {}),
*args,
**kwargs,
)
self.price = self._dict_to_series(self.price)

# Sync electricity_price with updated price from EOM market
if "EOM" in self.price:
self.electricity_price = self.price["EOM"]

# Push updated price to the unit so it uses the latest forecast in optimization
unit = kwargs.get("unit")
if unit is not None:
unit.electricity_price = self.electricity_price


class SteamgenerationForecaster(DsmUnitForecaster):
"""Forecaster for steam generation units.
Expand Down
87 changes: 80 additions & 7 deletions assume/scenario/loader_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ def load_dsm_units(
- The CSV file is expected to have columns such as 'name', 'technology', 'unit_type', and other operational parameters.
- The function assumes that the first non-null value in common and bidding columns is representative if multiple
entries exist for the same plant.
- Rolling-horizon optimisation settings (``horizon_mode``, ``look_ahead_horizon``, ``commit_horizon``,
``rolling_step``) are read as optional per-plant columns. They are assembled into the
``dsm_optimisation_config`` dict passed to the unit constructor.
- It is crucial that the input CSV file follows the expected structure for the function to process it correctly.
"""

Expand Down Expand Up @@ -196,13 +199,27 @@ def load_dsm_units(
"is_prosumer",
"congestion_threshold",
"peak_load_cap",
"load_profile_deviation",
]
# Filter the common columns to only include those that exist in the DataFrame
common_columns = [col for col in common_columns if col in dsm_units.columns]

# Get bidding columns dynamically
bidding_columns = [col for col in dsm_units.columns if col.startswith("bidding_")]

# Rolling-horizon optimisation columns (per-plant, optional). Filled on the first
# technology row of each plant; assembled into a dsm_optimisation_config dict below.
dsm_opt_columns = [
col
for col in [
"horizon_mode",
"look_ahead_horizon",
"commit_horizon",
"rolling_step",
]
if col in dsm_units.columns
]

# Initialize the dictionary to hold the final structured data
dsm_units_dict = {}

Expand All @@ -219,16 +236,31 @@ def load_dsm_units(
# Process each technology within the plant
components = {}
for tech, tech_data in group.groupby("technology"):
# Clean the technology-specific data: drop all-NaN columns and drop 'technology', common, and bidding columns
# Clean the technology-specific data: drop all-NaN columns and drop 'technology', common,
# bidding, and DSM optimisation columns
cleaned_data = tech_data.dropna(axis=1, how="all").drop(
columns=["technology"] + common_columns + bidding_columns,
columns=["technology"]
+ common_columns
+ bidding_columns
+ dsm_opt_columns,
errors="ignore",
)
# Ensure that there is at least one record before adding to components
if not cleaned_data.empty:
components[tech] = cleaned_data.to_dict(orient="records")[0]

dsm_unit["components"] = components

# Assemble per-plant rolling-horizon config from CSV columns (if any values present)
if dsm_opt_columns:
opt_cfg = {}
for col in dsm_opt_columns:
non_null_values = group[col].dropna()
if not non_null_values.empty:
opt_cfg[col] = non_null_values.iloc[0]
if opt_cfg:
dsm_unit["dsm_optimisation_config"] = opt_cfg

dsm_units_dict[name] = dsm_unit

# Convert the structured dictionary into a DataFrame
Expand Down Expand Up @@ -591,11 +623,7 @@ def load_config_and_create_forecaster(
# Initialize an empty dictionary to combine the DSM units
dsm_units = {}
for unit_type in ["industrial_dsm_units", "residential_dsm_units"]:
units = load_dsm_units(
path=path,
config=config,
file_name=unit_type,
)
units = load_dsm_units(path=path, config=config, file_name=unit_type)
if units is not None:
dsm_units.update(units)

Expand Down Expand Up @@ -746,13 +774,58 @@ def get_building_profile(column_name: str) -> pd.Series:
**extra_building_profiles,
)
if type == "steel_plant":
# Fetch ID-prefixed data for operational strategy selection
# Strategy 1: Normalized load profile (profile-guided operation)
normalized_profile = None
profile_col = f"{id}_normalized_load_profile"

steel_demand = None
demand_col = f"{id}_steel_demand"
if forecasts_df is not None:
if profile_col in forecasts_df.columns:
normalized_profile = forecasts_df[profile_col]
elif "normalized_load_profile" in forecasts_df.columns:
# Fallback: use generic column if ID-specific not found
normalized_profile = forecasts_df["normalized_load_profile"]

# Strategy 2: Hourly minimum steel demand (min-demand operation)
if demand_col in forecasts_df.columns:
steel_demand = forecasts_df[demand_col]
elif "steel_demand" in forecasts_df.columns:
# Fallback: use generic column if ID-specific not found
steel_demand = forecasts_df["steel_demand"]

# Resolve electricity price source: either algorithm or direct column
price_update_source = unit.get(
"forecast_electricity_price_update", None
)

initial_market_prices = {}
if price_update_source is not None:
price_forecast = (
forecasts_df.get(price_update_source, None)
if forecasts_df is not None
else None
)
if price_forecast is not None:
initial_market_prices["EOM"] = price_forecast
else:
unit_forecast_algorithms["update_price"] = (
Comment thread
Manish-Khanra marked this conversation as resolved.
Outdated
price_update_source
)

unit_forecasts[id] = SteelplantForecaster(
index=shared_unit_index,
availability=availability.get(
id, pd.Series(1.0, index, name=id)
),
market_prices=initial_market_prices
or unit.get("market_prices"),
forecast_algorithms=unit_forecast_algorithms,
forecast_registries=None,
fuel_prices=fuel_prices_df,
normalized_load_profile=normalized_profile,
steel_demand=steel_demand,
)
if type == "hydrogen_plant":
unit_forecasts[id] = HydrogenForecaster(
Expand Down
28 changes: 26 additions & 2 deletions assume/strategies/naive_strategies.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ class DsmEnergyOptimizationStrategy(MinMaxStrategy):
"""
A naive strategy of a Demand Side Management (DSM) unit. The bid volume is the optimal power requirement of
the unit at the start time of the product. The bid price is the marginal cost of the unit at the start time of the product.

For rolling-horizon configured units, this strategy triggers re-optimization at each market clearing round,
ensuring the unit optimizes for the next window using updated state and remaining demand.
"""

def calculate_bids(
Expand All @@ -169,8 +172,19 @@ def calculate_bids(
Orderbook: The bids consisting of the start time, end time, only hours, price and volume.
"""

# check if unit has opt_power_requirement attribute
if unit.optimisation_counter == 0:
if unit.horizon_mode == "rolling_horizon":
current_market_time = product_tuples[0][0]
# Hook to refresh runtime forecasts before re-optimising the next window.
# Currently the configured update algorithms default to no-ops; the hook is
# kept so price/forecast learning can be plugged in without touching strategies.
unit.forecaster.update(unit=unit)
did_reoptimize = unit._check_and_reoptimize_rolling_window(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be calling update_forecasts_if_needed instead.
This logic should then go into this update_forecasts_if_needed function.

But I think that we need to specify this further together with @reinecfi

current_market_time
)
if not did_reoptimize and unit.optimisation_counter == 0:
unit.determine_optimal_operation_with_flex()
unit.optimisation_counter = 1
elif unit.optimisation_counter == 0:
unit.determine_optimal_operation_with_flex()
unit.optimisation_counter = 1

Expand Down Expand Up @@ -201,6 +215,8 @@ class DsmEnergyNaiveRedispatchStrategy(MinMaxStrategy):
"""
A naive strategy of a Demand Side Management (DSM) unit that bids the available flexibility of the unit on the redispatch market.
The bid volume is the flexible power requirement of the unit at the start time of the product. The bid price is the marginal cost of the unit at the start time of the product.

For rolling-horizon configured units, this strategy triggers re-optimization at each market clearing round.
"""

def calculate_bids(
Expand All @@ -210,6 +226,14 @@ def calculate_bids(
product_tuples: list[Product],
**kwargs,
) -> Orderbook:
if unit.horizon_mode == "rolling_horizon":
current_market_time = product_tuples[0][0]
# Hook to refresh runtime forecasts before re-optimising the next window.
# Currently the configured update algorithms default to no-ops; the hook is
# kept so price/forecast learning can be plugged in without touching strategies.
unit.forecaster.update(unit=unit)
unit._check_and_reoptimize_rolling_window(current_market_time)
Comment thread
Manish-Khanra marked this conversation as resolved.

# calculate the optimal operation of the unit according to the objective function
unit.determine_optimal_operation_with_flex()

Expand Down
Loading
Loading