Skip to content

Commit

Permalink
fix: update _async_find_best_time_to_charge and recommendations (#83)
Browse files Browse the repository at this point in the history
  • Loading branch information
woopstar authored Dec 23, 2024
1 parent 8ce2e84 commit dabc171
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 79 deletions.
Empty file.
206 changes: 128 additions & 78 deletions custom_components/hsem/custom_sensors/working_mode_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,11 +887,11 @@ async def _async_set_working_mode(self):
)
elif (
self._hourly_calculations.get(current_time_range, {}).get("recommendation")
== Recommendations.ForceBatteriesCharge.value
== Recommendations.BatteriesChargeGrid.value
):
tou_modes = DEFAULT_HSEM_TOU_MODES_FORCE_CHARGE
working_mode = WorkingModes.TimeOfUse.value
state = Recommendations.ForceBatteriesCharge.value
state = Recommendations.BatteriesChargeGrid.value
await async_logger(
self,
f"# Recommendation for {current_time_range} is to force charge the battery. Setting TOU Periods: {tou_modes} and Working Mode: {working_mode}",
Expand Down Expand Up @@ -1264,10 +1264,10 @@ async def _async_calculate_hourly_net_consumption(self):
)

async def _async_find_best_time_to_charge(self, start_hour=14, stop_hour=17):
# Get the current time
"""Find best time to charge based on prioritized conditions."""
now = datetime.now()

# Skip if the current hour is outside the specified range
# Skip if current hour is outside range
if now.hour >= stop_hour:
return

Expand All @@ -1289,7 +1289,7 @@ async def _async_find_best_time_to_charge(self, start_hour=14, stop_hour=17):
if self._hsem_batteries_remaining_charge <= 0:
return

# Calculate max charge per hour
# Calculate max charge per hour with conversion loss
conversion_loss_factor = 1 - (
convert_to_float(self._hsem_batteries_conversion_loss) / 100
)
Expand All @@ -1308,96 +1308,152 @@ async def _async_find_best_time_to_charge(self, start_hour=14, stop_hour=17):

await async_logger(
self,
f"Calculating best time to charge battery between {start_hour} and {stop_hour}",
f"Charging plan started. "
f"Time range: {start_hour} and {stop_hour}. "
f"Required charge: {round(self._hsem_batteries_remaining_charge, 2)} kWh. "
f"Rated capacity: {round(self._hsem_batteries_rated_capacity_max_state / 1000, 2)} kWh. "
f"Conversion loss: {round(self._hsem_batteries_conversion_loss, 2)}%. "
f"Max charge per hour: {round(max_charge_per_hour, 2)} kWh. ",
)

# Collect hours for analysis
charging_hours = []
# Collect all valid hours
available_hours = []
for hour_start in range(start_hour, stop_hour):
# Skip hours that have already passed
if hour_start < now.hour:
continue

hour_end = (hour_start + 1) % 24
time_range = f"{hour_start:02d}-{hour_end:02d}"

if time_range in self._hourly_calculations:
net_consumption = self._hourly_calculations[time_range].get(
"estimated_net_consumption"
)
import_price = self._hourly_calculations[time_range].get("import_price")
if time_range not in self._hourly_calculations:
continue

if net_consumption is None or import_price is None:
continue
data = self._hourly_calculations[time_range]
net_consumption = data.get("estimated_net_consumption")
import_price = data.get("import_price")

# Prioritize negative import price
if import_price < 0:
charging_hours.append(
(time_range, import_price, net_consumption, "negative_import")
)
# Use surplus power
elif net_consumption < 0 and (
convert_to_float(
self._hsem_huawei_solar_batteries_state_of_capacity_state
)
< convert_to_float(
self._hsem_huawei_solar_batteries_grid_charge_cutoff_soc_state
)
):
charging_hours.append(
(time_range, import_price, net_consumption, "surplus")
)
# Otherwise, consider import price
else:
charging_hours.append(
(time_range, import_price, net_consumption, "import")
)
if net_consumption is None or import_price is None:
continue

# Sort hours by priority
charging_hours.sort(
key=lambda x: (x[3] != "negative_import", x[3] != "surplus", x[1])
)
available_hours.append((time_range, import_price, net_consumption))

# Mark hours for charging
# First priority: Negative import prices
charged_energy = 0.0
for time_range, import_price, net_consumption, source in charging_hours:
if charged_energy >= self._hsem_batteries_remaining_charge:
remaining_charge = self._hsem_batteries_remaining_charge

for time_range, price, net_consumption in sorted(
available_hours, key=lambda x: x[1]
):
if price >= 0 or charged_energy >= remaining_charge:
break

remaining_charge_needed = (
self._hsem_batteries_remaining_charge - charged_energy
energy_to_charge = min(
max_charge_per_hour, remaining_charge - charged_energy
)

# Adjust energy to charge based on surplus power (net_consumption)
available_surplus = abs(net_consumption) if net_consumption < 0 else 0
max_available_energy = min(
max_charge_per_hour, remaining_charge_needed + available_surplus
)
if energy_to_charge > 0:
self._hourly_calculations[time_range][
"recommendation"
] = Recommendations.BatteriesChargeGrid.value
self._hourly_calculations[time_range][
"batteries_charged"
] = energy_to_charge
charged_energy += energy_to_charge

await async_logger(
self,
f"Hour: {time_range}. "
f"Charging from grid due to negative import price. "
f"Import Price: {price}"
f"Energy charged: {round(energy_to_charge, 2)} kWh. "
f"Total energy charged: {round(charged_energy, 2)} kWh. ",
)

# Deduct surplus from the actual charge needed
actual_energy_to_charge = max(0, max_available_energy - available_surplus)
# Second priority: Solar surplus (negative net consumption)
if charged_energy < remaining_charge:
solar_hours = [(t, p, nc) for t, p, nc in available_hours if nc < 0]
solar_hours.sort(
key=lambda x: x[2]
) # Sort by most negative net consumption

for time_range, price, net_consumption in solar_hours:
if charged_energy >= remaining_charge:
break

available_solar = abs(net_consumption)
energy_to_charge = min(
max_charge_per_hour,
remaining_charge - charged_energy,
available_solar,
)

# Mark hour for charging
self._mark_hour_for_charging(time_range, actual_energy_to_charge, source)
charged_energy += actual_energy_to_charge + available_surplus
if energy_to_charge > 0:
self._hourly_calculations[time_range][
"recommendation"
] = Recommendations.BatteriesChargeSolar.value
self._hourly_calculations[time_range][
"batteries_charged"
] = energy_to_charge
charged_energy += energy_to_charge

await async_logger(
self,
f"Marked hour {time_range} for charging using {source}. "
f"Surplus Used: {round(available_surplus, 2)} kWh. "
f"Energy Charged: {round(actual_energy_to_charge, 2)} kWh. "
f"Remaining Charge Needed: {round(remaining_charge_needed, 2)} kWh. "
f"Total Charged: {round(charged_energy, 2)} kWh. ",
)
await async_logger(
self,
f"Hour: {time_range}. "
f"Charging from solar. "
f"Energy charged: {round(energy_to_charge, 2)} kWh. "
f"Total energy charged: {round(charged_energy, 2)} kWh. "
f"Available Solar: {round(available_solar, 2)} kWh. "
f"Net Consumption: {round(net_consumption, 2)} kWh. ",
)

# Calculate total solar surplus after the charging hours
solar_surplus = await self._async_calculate_solar_surplus(charging_hours)
# Third priority: Cheapest remaining hours considering partial solar contribution
if charged_energy < remaining_charge:
remaining_hours = [(t, p, nc) for t, p, nc in available_hours]
remaining_hours.sort(key=lambda x: x[1]) # Sort by price

# Adjust the AC charge cutoff
await self._async_adjust_ac_charge_cutoff_soc(charged_energy, solar_surplus)
for time_range, price, net_consumption in remaining_hours:
if charged_energy >= remaining_charge:
break

_LOGGER.debug(
f"Updated hourly calculations with charging plan: {self._hourly_calculations}"
available_solar = abs(net_consumption) if net_consumption < 0 else 0
grid_energy_needed = min(
max_charge_per_hour - available_solar,
remaining_charge - charged_energy - available_solar,
)

energy_to_charge = available_solar + grid_energy_needed

if energy_to_charge > 0:
self._hourly_calculations[time_range][
"recommendation"
] = Recommendations.BatteriesChargeGrid.value
self._hourly_calculations[time_range][
"batteries_charged"
] = energy_to_charge
charged_energy += energy_to_charge

await async_logger(
self,
f"Hour: {time_range}. "
f"Charging from grid. "
f"Energy charged: {round(energy_to_charge, 2)} kWh. "
f"Total energy charged: {round(charged_energy, 2)} kWh. "
f"Available Solar: {round(available_solar, 2)} kWh. "
f"Net Consumption: {round(net_consumption, 2)} kWh. "
f"Import Price: {price}",
)

# Calculate solar surplus and adjust cutoff
solar_surplus = await self._async_calculate_solar_surplus(
[(t, 0, 0, "") for t, _, _ in available_hours]
)
await self._async_adjust_ac_charge_cutoff_soc(charged_energy, solar_surplus)

await async_logger(
self,
f"Charging plan completed. "
f"Required Charge: {round(self._hsem_batteries_remaining_charge, 2)} kWh. "
f"Total energy charged: {round(charged_energy, 2)} kWh. ",
)

async def _async_calculate_solar_surplus(self, charging_hours):
Expand Down Expand Up @@ -1476,12 +1532,6 @@ async def _async_adjust_ac_charge_cutoff_soc(self, charged_energy, solar_surplus
f"(Solar Surplus: {solar_surplus} kWh, Max Capacity: {max_battery_capacity_kwh} kWh)",
)

def _mark_hour_for_charging(self, time_range, energy_to_charge, source):
self._hourly_calculations[time_range][
"recommendation"
] = Recommendations.ForceBatteriesCharge.value
self._hourly_calculations[time_range]["batteries_charged"] = energy_to_charge

async def _async_optimization_strategy(self):
"""Calculate the optimization strategy for each hour of the day."""

Expand All @@ -1502,7 +1552,7 @@ async def _async_optimization_strategy(self):
data["recommendation"] = Recommendations.FullyFedToGrid.value

# Maximize Self Consumption
elif net_consumption < -1:
elif net_consumption < 0:
data["recommendation"] = Recommendations.MaximizeSelfConsumption.value

# Between 17 and 21 we always want to maximize self consumption
Expand Down
3 changes: 2 additions & 1 deletion custom_components/hsem/utils/recommendations.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class Recommendations(Enum):
TimeOfUse = "time_of_use_luna2000"
MaximizeSelfConsumption = "maximise_self_consumption"
FullyFedToGrid = "fully_fed_to_grid"
ForceBatteriesCharge = "force_batteries_charge"
BatteriesChargeSolar = "batteries_charge_solar"
BatteriesChargeGrid = "batteries_charge_grid"
ForceBatteriesDischarge = "force_batteries_discharge"
EVSmartCharging = "ev_smart_charging"
ForceExport = "force_export"
Expand Down

0 comments on commit dabc171

Please sign in to comment.