From 089078a9ea25d03e5f7093da5db2edc5c10caf86 Mon Sep 17 00:00:00 2001 From: Lucas Tan Date: Mon, 28 Jul 2025 18:18:09 +0100 Subject: [PATCH] battery swap method wip --- Utilities/BESS.py | 56 +++++++++++++++++++++++++++++++++++++++-------- dashboard.py | 2 +- main.py | 8 ++++++- 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/Utilities/BESS.py b/Utilities/BESS.py index 96b7505..4b6d14e 100644 --- a/Utilities/BESS.py +++ b/Utilities/BESS.py @@ -95,12 +95,50 @@ def update_cycle_SoC(bess_data, bess_soc_for_cycle, timestamp): return bess_soc_for_cycle -def arrange_swap(bess_data, c): - for unit in bess_data["units"]: - if unit["SoC"] < c["bess"]["buffer"]: - # find for unassigned BESS unit with SOC at 100% - for candidate in bess_data["units"]: - if candidate["SoC"] == 1 and candidate["site"] == "Unassigned": - # assign the candidate to the site - candidate["site"] = unit["site"] - break +def arrange_swap(c, bess_data, bess_soc_for_cycle): + # identify BESS units that need swapping + units_needing_swap = [ + unit for unit in bess_data["units"] if unit["SoC"] < bess_data["buffer"]["min"] + ] + + if not units_needing_swap: + return bess_data, bess_soc_for_cycle + + # identify BESS units that are unassigned and fully charged + unassigned_fully_charged = [ + unit + for unit in bess_data["units"] + if unit["SoC"] == 1 and unit["site"] == "Unassigned" + ] + + if not unassigned_fully_charged: + return bess_data, bess_soc_for_cycle + + # assign unassigned fully charged units to units needing swap + for unit in units_needing_swap: + # take the first unassigned fully charged unit + new_unit = unassigned_fully_charged.pop(0) + # assign it to the site of the unit needing swap + new_unit["site"] = unit["site"] + # reset SoC to 1 (fully charged) + new_unit["SoC"] = 1 + # set current load to existing load + new_unit["current_load_kW"] = unit["current_load_kW"] + + # reset old unit + unit["site"] = "Unassigned" # mark the old unit as unassigned + unit["current_load_kW"] = 0 # reset current load + + # update the BESS data + # search for the index of the unit needing swap and replace it with the new unit + index = next( + i for i, d in enumerate(bess_data["units"]) if d["name"] == unit["name"] + ) + bess_data["units"][index] = new_unit + # search for index of new unit, and replace with old unit + new_index = next( + i for i, d in enumerate(bess_data["units"]) if d["name"] == new_unit["name"] + ) + bess_data["units"][new_index] = unit + + return bess_data, bess_soc_for_cycle diff --git a/dashboard.py b/dashboard.py index 61c71be..acf7b68 100644 --- a/dashboard.py +++ b/dashboard.py @@ -104,7 +104,7 @@ if st.session_state.running: ) # display BESS data, SoC, Load Consumption show_table() - time.sleep(3) + time.sleep(1) st.rerun() else: show_table() diff --git a/main.py b/main.py index 613bef4..0ab37da 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,7 @@ import yaml from Utilities.Time import get_start_time from Utilities.LoadProfile import get_load_profiles from Utilities.BESS import ( + arrange_swap, initialise_SoC, initial_site_assignment, discharge_bess, @@ -46,7 +47,7 @@ sim_lock = threading.Lock() # initialise BESS def _init_state(): - global bess_data, bess_soc_since_start, bess_soc_for_cycle, cumulative_load_profiles, load_profiles_since_start + global bess_data, bess_soc_since_start, bess_soc_for_cycle, cumulative_load_profiles, load_profiles_since_start, init_df bd = initialise_SoC(bess_data.copy()) bd = initial_site_assignment(c, bd) @@ -109,6 +110,11 @@ def simulation_loop(): ) swap_times = predict_swap_time(bess_soc_for_cycle) + # trigger swap if needed + bess_data, bess_soc_for_cycle = arrange_swap( + c, bess_data, bess_soc_for_cycle + ) + # integrate newly fetched profiles if is_running_in_async and future.done(): load_profiles = future.result()