Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 56 additions & 90 deletions pychromecast/socket_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,7 @@ def mdns_backoff(
except (OSError, NotConnected) as err:
self.connecting = True
if self.stop.is_set():
self.logger.error(
"[%s(%s):%s] Failed to connect: %s. aborting due to stop signal.",
self.fn or "",
self.host,
self.port,
err,
)
raise ChromecastConnectionError("Failed to connect") from err
raise InterruptLoop() from err

self._report_connection_status(
ConnectionStatus(
Expand Down Expand Up @@ -438,7 +431,6 @@ def mdns_backoff(
if tries:
tries -= 1

self.stop.set()
self.logger.error(
"[%s(%s):%s] Failed to connect. No retries.",
self.fn or "",
Expand Down Expand Up @@ -531,43 +523,76 @@ def run(self) -> None:
try:
self.initialize_connection()
except ChromecastConnectionError:
self._report_connection_status(
ConnectionStatus(
CONNECTION_STATUS_DISCONNECTED,
NetworkAddress(self.host, self.port),
None,
)
self.logger.debug(
"[%s(%s):%s] Connection error to host closing down",
self.fn or "",
self.host,
self.port,
exc_info=True,
)
self._cleanup()
return

except InterruptLoop:
self._cleanup()
return

self.heartbeat_controller.reset()
self.logger.debug("Thread started...")
while not self.stop.is_set():
try:
if self._run_once() == 1:
break
self._run_once()

except ChromecastConnectionError:
self.logger.debug(
"[%s(%s):%s] Connection error to host closing down",
self.fn or "",
self.host,
self.port,
exc_info=True,
)
break

except InterruptLoop:
break

except ChromecastConnectionClosed as exc:
self._force_recon = True
self.logger.debug(
"[%s(%s):%s] %s",
self.fn or "",
self.host,
self.port,
exc,
)

except Exception: # pylint: disable=broad-except
if self.stop.is_set():
self.logger.debug(
"[%s(%s):%s] Unhandled exception in worker thread while stopping",
self.fn or "",
self.host,
self.port,
exc_info=True,
)
break

self._force_recon = True
self.logger.exception(
"[%s(%s):%s] Unhandled exception in worker thread, attempting reconnect",
self.fn or "",
self.host,
self.port,
)
time.sleep(self.retry_wait)

self.logger.debug("Thread done...")
# Clean up
self._cleanup()

def _run_once(self) -> int:
def _run_once(self) -> None:
"""Receive from the socket and handle data."""
# pylint: disable=too-many-branches, too-many-statements, too-many-return-statements

try:
if not self._check_connection():
return 0
except ChromecastConnectionError:
return 1
self._check_connection()

# A connection has been established at this point by self._check_connection
assert self.socket is not None
Expand All @@ -577,65 +602,13 @@ def _run_once(self) -> int:
# the HeartbeatController from detecting a timeout
# The socketpair allow us to be interrupted on shutdown without waiting
# for the SELECT_TIMEOUT timout to expire
try:
ready = self.selector.select(SELECT_TIMEOUT)
except (ValueError, OSError) as exc:
self.logger.error(
"[%s(%s):%s] Error in select call: %s",
self.fn or "",
self.host,
self.port,
exc,
)
self._force_recon = True
return 0
ready = self.selector.select(SELECT_TIMEOUT)

can_read = {key for key, _ in ready}
# read message from chromecast
message = None
if self.remote_selector_key in can_read and not self._force_recon:
try:
message = self._read_message()
except InterruptLoop as exc:
if self.stop.is_set():
self.logger.info(
"[%s(%s):%s] Stopped while reading message, disconnecting.",
self.fn or "",
self.host,
self.port,
)
else:
self.logger.error(
"[%s(%s):%s] Interruption caught without being stopped: %s",
self.fn or "",
self.host,
self.port,
exc,
)
return 1
except ssl.SSLError as exc:
if exc.errno == ssl.SSL_ERROR_EOF:
if self.stop.is_set():
return 1
raise
except ChromecastConnectionClosed as exc:
self._force_recon = True
self.logger.debug(
"[%s(%s):%s] %s",
self.fn or "",
self.host,
self.port,
exc,
)
except socket.error as exc:
self._force_recon = True
self.logger.error(
"[%s(%s):%s] Error reading from socket: %s",
self.fn or "",
self.host,
self.port,
exc,
)
message = self._read_message()

if self.wakeup_selector_key in can_read:
# Clear the socket's buffer
Expand All @@ -644,10 +617,10 @@ def _run_once(self) -> int:
# If we are stopped after receiving a message we skip the message
# and tear down the connection
if self.stop.is_set():
return 1
raise InterruptLoop()

if not message:
return 0
return

# See if any handlers will accept this message
data = _dict_from_message_payload(message)
Expand All @@ -656,9 +629,7 @@ def _run_once(self) -> int:
if REQUEST_ID in data and data[REQUEST_ID] in self._request_callbacks:
self._request_callbacks.pop(data[REQUEST_ID])(True, data)

return 0

def _check_connection(self) -> bool:
def _check_connection(self) -> None:
"""
Checks if the connection is active, and if not reconnect

Expand Down Expand Up @@ -694,12 +665,7 @@ def _check_connection(self) -> bool:
CONNECTION_STATUS_LOST, NetworkAddress(self.host, self.port), None
)
)
try:
self.initialize_connection()
except ChromecastConnectionError:
self.stop.set()
return False
return True
self.initialize_connection()

def _route_message(self, message: CastMessage, data: dict) -> None:
"""Route message to any handlers on the message namespace"""
Expand Down