45 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			45 lines
		
	
	
		
			1.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| def initialise_SoC(bess):
 | |
|     """Initialise the state of charge (SoC) for the BESS."""
 | |
|     for i in range(0, len(bess["units"])):  # initially fully charged
 | |
|         bess["units"][i]["SoC"] = 1
 | |
|     return bess
 | |
| 
 | |
| 
 | |
| def initial_site_assignment(c, bess):
 | |
|     """Initialise the site assignment for each BESS."""
 | |
|     k = 0
 | |
|     while k < len(c["site_info"]["sites"]):
 | |
|         bess["units"][k]["site"] = c["site_info"]["sites"][k]["name"]
 | |
|         k += 1
 | |
| 
 | |
|     if k < len(c["site_info"]["sites"]):
 | |
|         bess["units"][k]["site"] = "Unassigned"
 | |
|     return bess
 | |
| 
 | |
| 
 | |
| def discharge_bess(bess, site_name, dt, discharge_power):
 | |
|     # convert discharge power to discharge energy (kW to kWh)
 | |
|     discharge_energy = discharge_power * dt / 3600
 | |
| 
 | |
|     """Discharge the BESS for a specific site."""
 | |
|     for index, unit in enumerate(bess["units"]):
 | |
|         if unit["site"] == site_name:
 | |
|             new_soc = unit["SoC"] - (dt * discharge_energy) / unit["capacity_kWh"]
 | |
|             new_soc = 0 if new_soc < 0 else new_soc
 | |
|         else:
 | |
|             new_soc = unit["SoC"]
 | |
| 
 | |
|         # update SoC
 | |
|         bess["units"][index]["SoC"] = new_soc
 | |
|     return bess
 | |
| 
 | |
| 
 | |
| def predict_swap_time(bess_soc_for_cycle):
 | |
|     """Predict the swap time for each BESS unit based on its SoC history."""
 | |
|     swap_times = {}
 | |
|     for unit_name, df in bess_soc_for_cycle.items():
 | |
|         # Find the timestamp when SoC reaches 0
 | |
|         swap_time = df[df["SoC"] == 0]["Timestamp"].min()
 | |
|         swap_times[unit_name] = swap_time
 | |
|     return swap_times
 |