Redesign websocket#401
Conversation
| self, | ||
| callback: Callable[..., Any], | ||
| *, | ||
| on_error: Callable[[Exception], None] | None = None, |
There was a problem hiding this comment.
The on_error callback will allow users of the library to react on errors with the subscription, like changing the state of instances that rely on the subscription when there's an error.
| and live_data.get("power") is None | ||
| ): | ||
| live_data["power"] = 0 | ||
| def _add_extra_data(self, data: dict[str, Any]) -> dict[str, Any]: |
There was a problem hiding this comment.
I've broken out the extra data addition to a helper method to keep things better separated. I've not changed how or what extra data is added.
I recommend to review the PR with whitespace changes hidden, to make it clear that this part wasn't changed.
| KEEP_ALIVE_TIMEOUT = 90 | ||
| LOCK_CONNECT = asyncio.Lock() | ||
| MIN_RECONNECT_INTERVAL = 1 | ||
| MAX_RECONNECT_INTERVAL = 60 | ||
| PING_INTERVAL = 30 | ||
| PONG_TIMEOUT = 20 |
There was a problem hiding this comment.
Here are the keep alive and reconnect settings.
| raise WebsocketReconnectedError("Websocket reconnected") from err | ||
| raise WebsocketTransportError(err) from err | ||
|
|
||
| async def set_subscription_endpoint(self, url: str) -> None: |
There was a problem hiding this comment.
We now reconnect if the subscription endpoint changes. This wasn't done before. That's why I've changed to a coroutine function for setting the endpoint.
| with pytest.raises(WebsocketReconnectedError): | ||
| await anext(tibber_rt.subscribe(MagicMock(), on_error=None)) | ||
|
|
||
| await unblock_task |
That will only reconnect if we dont get an answer to the ping? |
|
Ok, I didn't know that. Is it enough to resubscribe the home if the Pulse is not responding? I think we can have a simple timer task per home that checks if it's more than 60 seconds since the last message, and resubscribes in that case, otherwise sleeps another 60 seconds. We use a single websocket connection that can have multiple home subscriptions running at the same time. |
|
I think that should be fine |
| ) -> None: | ||
| """Subscribe to Tibber.""" | ||
| try: | ||
| async for _data in self._tibber_control.realtime.subscribe( |
There was a problem hiding this comment.
| async for _data in self._tibber_control.realtime.subscribe( | |
| self._last_rt_data_received = time.time() | |
| async for _data in self._tibber_control.realtime.subscribe( |
Otherwise, I think we will never reconnect if we dont get the first data?
(Then we should rename _last_rt_data_received to something like _rt_last_activity ?)
There was a problem hiding this comment.
I think we can adjust the None check in _rt_subscription_timeout instead to achieve the same thing. 👍
| if not (data := await self._tibber_control.execute(REAL_TIME_CONSUMPTION_ENABLED % self._home_id)): | ||
| _LOGGER.error("Could not get the data.") | ||
| return | ||
| self.info["viewer"]["home"]["features"]["realTimeConsumptionEnabled"] = data["viewer"]["home"]["features"][ |
There was a problem hiding this comment.
This is fragile as it needs the info attribute to already been set with the data structure. I need to improve this.
TibberHomefeature from the realtime module and make the realtime module unaware ofTibberHome. TheTibberHomeis aware of the realtime feature but not the other way around. The realtime module should focus on the websocket connection and not handle specifics of the Tibber data features.reconnectingfeature of the gql client to automatically reconnect in the background when the connection is closed.This is a big PR. Let me know what I can do to help with the review.