wip bess swap time prediction

This commit is contained in:
Lucas Tan 2025-07-19 12:00:32 +01:00
parent 1b29cfdea8
commit 04cf0cdf29
2 changed files with 61 additions and 24 deletions

View File

@ -1,3 +1,6 @@
import pandas as pd
def initialise_SoC(bess): def initialise_SoC(bess):
"""Initialise the state of charge (SoC) for the BESS.""" """Initialise the state of charge (SoC) for the BESS."""
for i in range(0, len(bess["units"])): # initially fully charged for i in range(0, len(bess["units"])): # initially fully charged
@ -7,13 +10,14 @@ def initialise_SoC(bess):
def initial_site_assignment(c, bess): def initial_site_assignment(c, bess):
"""Initialise the site assignment for each BESS.""" """Initialise the site assignment for each BESS."""
k = 0
while k < len(c["site_info"]["sites"]):
bess["units"][k]["site"] = c["site_info"]["sites"][k]["name"]
k += 1
for k in range(0, len(bess["units"])):
# assign each BESS unit to a site
if k < len(c["site_info"]["sites"]): if k < len(c["site_info"]["sites"]):
bess["units"][k]["site"] = c["site_info"]["sites"][k]["name"]
else:
bess["units"][k]["site"] = "Unassigned" bess["units"][k]["site"] = "Unassigned"
return bess return bess
@ -23,10 +27,14 @@ def discharge_bess(bess, site_name, dt, discharge_power):
"""Discharge the BESS for a specific site.""" """Discharge the BESS for a specific site."""
for index, unit in enumerate(bess["units"]): for index, unit in enumerate(bess["units"]):
if unit["site"] == "Unassigned":
continue
if unit["site"] == site_name: if unit["site"] == site_name:
new_soc = unit["SoC"] - (dt * discharge_energy) / unit["capacity_kWh"] new_soc = unit["SoC"] - (dt * discharge_energy) / unit["capacity_kWh"]
new_soc = 0 if new_soc < 0 else new_soc new_soc = 0 if new_soc < 0 else new_soc
else: else:
# maintain SoC if not assigned to the site
new_soc = unit["SoC"] new_soc = unit["SoC"]
# update SoC # update SoC
@ -37,8 +45,49 @@ def discharge_bess(bess, site_name, dt, discharge_power):
def predict_swap_time(bess_soc_for_cycle): def predict_swap_time(bess_soc_for_cycle):
"""Predict the swap time for each BESS unit based on its SoC history.""" """Predict the swap time for each BESS unit based on its SoC history."""
swap_times = {} swap_times = {}
min2sec = 60
threshold = 2 * min2sec # 2 minutes in seconds
for unit_name, df in bess_soc_for_cycle.items(): for unit_name, df in bess_soc_for_cycle.items():
# Find the timestamp when SoC reaches 0 # need to be at least 1 min of operation to start estimation
swap_time = df[df["SoC"] == 0]["Timestamp"].min() if len(df) < threshold:
swap_times[unit_name] = None
continue
# linear extrapolation to estimate swap time
# calculate the slope of the SoC over time
m = (df["SoC"].iloc[-1] - df["SoC"].iloc[0]) / (
df["Timestamp"].iloc[-1] - df["Timestamp"].iloc[0]
)
if m == 0:
swap_times[unit_name] = None
continue
# solve for the time when SoC reaches 0
swap_time = (0 - df["SoC"].iloc[0]) / m + df["Timestamp"].iloc[0]
# assign to swap_times
swap_times[unit_name] = swap_time swap_times[unit_name] = swap_time
return swap_times return swap_times
def update_cycle_SoC(bess_data, bess_soc_for_cycle, timestamps):
init_df = pd.DataFrame(columns=["Timestamp", "SoC"])
# assign SoC for cycle
for unit in bess_data["units"]:
unit_name = unit["name"]
# reset df if SoC is 0. Start a new cycle
if unit["SoC"] == 0:
bess_soc_for_cycle[unit_name] = init_df
bess_soc_for_cycle[unit_name] = pd.concat(
[
bess_soc_for_cycle[unit_name],
pd.DataFrame(
[[timestamps[i], unit["SoC"]]],
columns=["Timestamp", "SoC"],
),
],
axis=0,
)

20
main.py
View File

@ -7,6 +7,7 @@ from Utilities.BESS import (
initial_site_assignment, initial_site_assignment,
discharge_bess, discharge_bess,
predict_swap_time, predict_swap_time,
update_cycle_SoC,
) )
import matplotlib.pyplot as pl import matplotlib.pyplot as pl
import pandas as pd import pandas as pd
@ -95,22 +96,9 @@ with ThreadPoolExecutor() as executor:
axis=0, axis=0,
) )
# assign SoC for cycle # update cycle SoC
for unit in bess_data["units"]: # this is for predicting swap times
unit_name = unit["name"] bess_soc_for_cycle = update_cycle_SoC(bess_data, bess_soc_for_cycle, timestamps)
# reset df if SoC is 0. Start a new cycle
if unit["SoC"] == 0:
bess_soc_for_cycle[unit_name] = init_df
bess_soc_for_cycle[unit_name] = pd.concat(
[
bess_soc_for_cycle[unit_name],
pd.DataFrame(
[[timestamps[i], unit["SoC"]]],
columns=["Timestamp", "SoC"],
),
],
axis=0,
)
# predict swap times # predict swap times
swap_times = predict_swap_time(bess_soc_for_cycle) swap_times = predict_swap_time(bess_soc_for_cycle)