88 lines
3.3 KiB
Python
88 lines
3.3 KiB
Python
import time
|
|
from datetime import datetime, timedelta
|
|
import numpy as np
|
|
import matplotlib.pyplot as pl
|
|
|
|
|
|
def get_start_time():
|
|
"""Returns the current time in seconds since the epoch."""
|
|
current_date = datetime.now()
|
|
# set to 00:00
|
|
current_date = current_date.replace(hour=0, minute=0, second=0, microsecond=0)
|
|
return int(current_date.timestamp())
|
|
|
|
|
|
def generate_timestrings(start_time_str, end_time_str, dt):
|
|
"""Generates a list of timestamps from start_time to end_time, which are strings such
|
|
as 19:00."""
|
|
|
|
timestamps = []
|
|
current_time = datetime.strptime(start_time_str, "%H:%M")
|
|
end_time = datetime.strptime(end_time_str, "%H:%M")
|
|
while current_time <= end_time:
|
|
timestamps.append(current_time)
|
|
current_time += timedelta(seconds=dt)
|
|
timestrings = [dt.strftime("%H:%M") for dt in timestamps]
|
|
return timestrings
|
|
|
|
|
|
def index_peak_times(timestamps, peak_times, peak_durations):
|
|
"""Converts peak times from HH:MM format to seconds since epoch."""
|
|
# start_time is the start time of the batch process in seconds since the epoch
|
|
# peak_times is a list of strings in HH:MM format
|
|
dt = timestamps[1] - timestamps[0] # time step in seconds
|
|
peak_indices = np.zeros(len(timestamps), dtype=int)
|
|
start_datetime = datetime.fromtimestamp(timestamps[0])
|
|
processed_times = []
|
|
peak_occurence_no = 1
|
|
for time_str, duration in zip(peak_times, peak_durations):
|
|
# convert HH:MM to a datetime object
|
|
time_obj = datetime.strptime(time_str, "%H:%M")
|
|
full_datetime = start_datetime.replace(
|
|
hour=time_obj.hour, minute=time_obj.minute, second=0, microsecond=0
|
|
)
|
|
peak_start = int(full_datetime.timestamp())
|
|
peak_end = peak_start + duration * 60 # duration in minutes to seconds
|
|
peak_timestamps = np.arange(peak_start, peak_end + 1, dt, dtype=int)
|
|
|
|
indices = np.where(np.isin(timestamps, peak_timestamps))[0]
|
|
peak_indices[indices] = peak_occurence_no
|
|
peak_occurence_no += 1
|
|
|
|
return peak_indices
|
|
|
|
|
|
def index_operating_hours(timestamps, operating_hours):
|
|
"""Indexes the operating hours in the timestamps."""
|
|
# operating_hours is a dictionary with "start" and "end" keys in HH:MM format
|
|
|
|
operating_indices = np.zeros(len(timestamps), dtype=int)
|
|
start_time = datetime.strptime(operating_hours["start"], "%H:%M")
|
|
end_time = datetime.strptime(operating_hours["end"], "%H:%M")
|
|
|
|
# get date from timestamp variable
|
|
start_date = datetime.fromtimestamp(timestamps[0]).date()
|
|
start_datetime = datetime.combine(start_date, start_time.time())
|
|
end_datetime = datetime.combine(start_date, end_time.time())
|
|
|
|
# convert to seconds since epoch
|
|
start_time = int(start_datetime.timestamp())
|
|
end_time = int(end_datetime.timestamp())
|
|
|
|
for i, ts in enumerate(timestamps):
|
|
if start_time <= ts <= end_time:
|
|
operating_indices[i] = 1 # mark as operating hour
|
|
|
|
return operating_indices
|
|
|
|
|
|
def check_is_weekday(batch_start_time):
|
|
"""Checks if the batch start time is on a weekday."""
|
|
# batch_start_time is in seconds since the epoch
|
|
start_time = datetime.fromtimestamp(batch_start_time)
|
|
if start_time.weekday() >= 5: # Saturday or Sunday
|
|
is_weekday = False
|
|
else:
|
|
is_weekday = True
|
|
return is_weekday
|