145 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			145 lines
		
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import pandas as pd
 | |
| 
 | |
| 
 | |
| def initialise_SoC(bess):
 | |
|     """Initialise the state of charge (SoC) for the BESS."""
 | |
|     for i in range(0, len(bess["units"])):  # initially fully charged
 | |
|         bess["units"][i]["SoC"] = 1
 | |
|     return bess
 | |
| 
 | |
| 
 | |
| def initial_site_assignment(c, bess):
 | |
|     """Initialise the site assignment for each BESS."""
 | |
| 
 | |
|     for k in range(0, len(bess["units"])):
 | |
|         # assign each BESS unit to a site
 | |
|         if k < len(c["site_info"]["sites"]):
 | |
|             bess["units"][k]["site"] = c["site_info"]["sites"][k]["name"]
 | |
|         else:
 | |
|             bess["units"][k]["site"] = "Unassigned"
 | |
| 
 | |
|     return bess
 | |
| 
 | |
| 
 | |
| def discharge_bess(bess, site_name, dt, discharge_power):
 | |
|     # convert discharge power to discharge energy (kW to kWh)
 | |
|     discharge_energy = discharge_power * dt / 3600
 | |
| 
 | |
|     """Discharge the BESS for a specific site."""
 | |
|     for index, unit in enumerate(bess["units"]):
 | |
|         if unit["site"] == "Unassigned":
 | |
|             continue
 | |
| 
 | |
|         if unit["site"] == site_name:
 | |
|             new_soc = unit["SoC"] - discharge_energy / unit["capacity_kWh"]
 | |
|             new_soc = 0 if new_soc < 0 else new_soc
 | |
|         else:
 | |
|             continue
 | |
| 
 | |
|         # update SoC and current load
 | |
|         bess["units"][index]["current_load_kW"] = discharge_power
 | |
|         bess["units"][index]["SoC"] = new_soc
 | |
|     return bess
 | |
| 
 | |
| 
 | |
| def predict_swap_time(bess_soc_for_cycle):
 | |
|     """Predict the swap time for each BESS unit based on its SoC history."""
 | |
|     swap_times = {}
 | |
|     min2sec = 60
 | |
|     threshold = 2 * min2sec  # 2 minutes in seconds
 | |
| 
 | |
|     for unit_name, df in bess_soc_for_cycle.items():
 | |
|         # need to be at least 1 min of operation to start estimation
 | |
|         if len(df) < threshold:
 | |
|             swap_times[unit_name] = None
 | |
|             continue
 | |
| 
 | |
|         # linear extrapolation to estimate swap time
 | |
|         # calculate the slope of the SoC over time
 | |
|         m = (df["SoC"].iloc[-1] - df["SoC"].iloc[0]) / (
 | |
|             df["Timestamp"].iloc[-1] - df["Timestamp"].iloc[0]
 | |
|         )
 | |
| 
 | |
|         if m == 0:
 | |
|             swap_times[unit_name] = None
 | |
|             continue
 | |
| 
 | |
|         # solve for the time when SoC reaches 0
 | |
|         swap_time = (0 - df["SoC"].iloc[0]) / m + df["Timestamp"].iloc[0]
 | |
|         # assign to swap_times
 | |
|         swap_times[unit_name] = swap_time
 | |
| 
 | |
|     return swap_times
 | |
| 
 | |
| 
 | |
| def update_cycle_SoC(bess_data, bess_soc_for_cycle, timestamp):
 | |
|     init_df = pd.DataFrame(columns=["Timestamp", "SoC"])
 | |
|     # assign SoC for cycle
 | |
|     for unit in bess_data["units"]:
 | |
|         unit_name = unit["name"]
 | |
|         # reset df if SoC is 0. Start a new cycle
 | |
|         if unit["SoC"] == 0:
 | |
|             bess_soc_for_cycle[unit_name] = init_df
 | |
| 
 | |
|         bess_soc_for_cycle[unit_name] = pd.concat(
 | |
|             [
 | |
|                 bess_soc_for_cycle[unit_name],
 | |
|                 pd.DataFrame(
 | |
|                     [[timestamp, unit["SoC"]]],
 | |
|                     columns=["Timestamp", "SoC"],
 | |
|                 ),
 | |
|             ],
 | |
|             axis=0,
 | |
|         )
 | |
| 
 | |
|     return bess_soc_for_cycle
 | |
| 
 | |
| 
 | |
| def arrange_swap(c, bess_data, bess_soc_for_cycle):
 | |
|     # identify BESS units that need swapping
 | |
|     units_needing_swap = [
 | |
|         unit for unit in bess_data["units"] if unit["SoC"] < bess_data["buffer"]["min"]
 | |
|     ]
 | |
| 
 | |
|     if not units_needing_swap:
 | |
|         return bess_data, bess_soc_for_cycle
 | |
| 
 | |
|     # identify BESS units that are unassigned and fully charged
 | |
|     unassigned_fully_charged = [
 | |
|         unit
 | |
|         for unit in bess_data["units"]
 | |
|         if unit["SoC"] == 1 and unit["site"] == "Unassigned"
 | |
|     ]
 | |
| 
 | |
|     if not unassigned_fully_charged:
 | |
|         return bess_data, bess_soc_for_cycle
 | |
| 
 | |
|     # assign unassigned fully charged units to units needing swap
 | |
|     for unit in units_needing_swap:
 | |
|         # take the first unassigned fully charged unit
 | |
|         new_unit = unassigned_fully_charged.pop(0)
 | |
|         # assign it to the site of the unit needing swap
 | |
|         new_unit["site"] = unit["site"]
 | |
|         # reset SoC to 1 (fully charged)
 | |
|         new_unit["SoC"] = 1
 | |
|         # set current load to existing load
 | |
|         new_unit["current_load_kW"] = unit["current_load_kW"]
 | |
| 
 | |
|         # reset old unit
 | |
|         unit["site"] = "Unassigned"  # mark the old unit as unassigned
 | |
|         unit["current_load_kW"] = 0  # reset current load
 | |
| 
 | |
|         # update the BESS data
 | |
|         # search for the index of the unit needing swap and replace it with the new unit
 | |
|         index = next(
 | |
|             i for i, d in enumerate(bess_data["units"]) if d["name"] == unit["name"]
 | |
|         )
 | |
|         bess_data["units"][index] = new_unit
 | |
|         # search for index of new unit, and replace with old unit
 | |
|         new_index = next(
 | |
|             i for i, d in enumerate(bess_data["units"]) if d["name"] == new_unit["name"]
 | |
|         )
 | |
|         bess_data["units"][new_index] = unit
 | |
| 
 | |
|     return bess_data, bess_soc_for_cycle
 |