diff --git a/pychromecast/__init__.py b/pychromecast/__init__.py index ff46f0e9e..f3b29fcbb 100644 --- a/pychromecast/__init__.py +++ b/pychromecast/__init__.py @@ -439,9 +439,7 @@ def new_cast_status(self, status: CastStatus) -> None: if status: self.status_event.set() - def start_app( - self, app_id: str, force_launch: bool = False, timeout: float = REQUEST_TIMEOUT - ) -> None: + async def start_app(self, app_id: str, force_launch: bool = False, timeout: float = REQUEST_TIMEOUT) -> None: """Start an app on the Chromecast.""" self.logger.info("Starting app %s", app_id) response_handler = WaitResponse(timeout, f"start app {app_id}") @@ -450,9 +448,9 @@ def start_app( force_launch=force_launch, callback_function=response_handler.callback, ) - response_handler.wait_response() + await response_handler.wait_response() - def quit_app(self, timeout: float = REQUEST_TIMEOUT) -> None: + async def quit_app(self, timeout: float = REQUEST_TIMEOUT) -> None: """Tells the Chromecast to quit current app_id.""" self.logger.info("Quitting current app") @@ -460,9 +458,9 @@ def quit_app(self, timeout: float = REQUEST_TIMEOUT) -> None: self.socket_client.receiver_controller.stop_app( callback_function=response_handler.callback ) - response_handler.wait_response() + await response_handler.wait_response() - def volume_up(self, delta: float = 0.1, timeout: float = REQUEST_TIMEOUT) -> float: + async def volume_up(self, delta: float = 0.1, timeout: float = REQUEST_TIMEOUT) -> float: """Increment volume by 0.1 (or delta) unless it is already maxed. Returns the new volume. """ @@ -470,9 +468,9 @@ def volume_up(self, delta: float = 0.1, timeout: float = REQUEST_TIMEOUT) -> flo raise ValueError(f"volume delta must be greater than zero, not {delta}") if not self.status: raise NotConnected - return self.set_volume(self.status.volume_level + delta, timeout=timeout) + return await self.set_volume(self.status.volume_level + delta, timeout=timeout) - def volume_down( + async def volume_down( self, delta: float = 0.1, timeout: float = REQUEST_TIMEOUT ) -> float: """Decrement the volume by 0.1 (or delta) unless it is already 0. @@ -482,9 +480,9 @@ def volume_down( raise ValueError(f"volume delta must be greater than zero, not {delta}") if not self.status: raise NotConnected - return self.set_volume(self.status.volume_level - delta, timeout=timeout) + return await self.set_volume(self.status.volume_level - delta, timeout=timeout) - def wait(self, timeout: float | None = None) -> None: + async def connect(self, timeout: float | None = None) -> None: """ Waits until the cast device is ready for communication. The device is ready as soon a status message has been received. @@ -498,47 +496,20 @@ def wait(self, timeout: float | None = None) -> None: operation in seconds (or fractions thereof). Or None to block forever. """ - if not self.socket_client.is_alive(): - self.socket_client.start() - ready = self.status_event.wait(timeout=timeout) - if not ready: - raise RequestTimeout("wait", cast(float, timeout)) + if timeout: + self.socket_client.timeout = timeout - def disconnect(self, timeout: float | None = None) -> None: - """ - Disconnects the chromecast and waits for it to terminate. + if not self.socket_client.connected: + await self.socket_client.connect() - :param timeout: A floating point number specifying a timeout for the - operation in seconds (or fractions thereof). Or None - to block forever. Set to 0 to not block. + def disconnect(self) -> None: """ - self.socket_client.disconnect() - self.join(timeout=timeout) - - def join(self, timeout: float | None = None) -> None: + Disconnects the chromecast. """ - Blocks the thread of the caller until the chromecast connection is - stopped. - - :param timeout: a floating point number specifying a timeout for the - operation in seconds (or fractions thereof). Or None - to block forever. - """ - self.socket_client.join(timeout=timeout) - if self.socket_client.is_alive(): - raise TimeoutError("join", timeout) - - def start(self) -> None: - """ - Start the chromecast connection's worker thread. - """ - self.socket_client.start() + self.socket_client.disconnect() def __del__(self) -> None: - try: - self.socket_client.stop.set() - except AttributeError: - pass + self.disconnect() def __repr__(self) -> str: return ( diff --git a/pychromecast/controllers/media.py b/pychromecast/controllers/media.py index 5db53789d..7df056be3 100644 --- a/pychromecast/controllers/media.py +++ b/pychromecast/controllers/media.py @@ -547,7 +547,7 @@ def _send_start_play_media( # pylint: disable=too-many-locals self.send_message(msg, inc_session_id=True, callback_function=callback_function) - def quick_play(self, *, media_id: str, timeout: float, **kwargs: Any) -> None: + async def quick_play(self, *, media_id: str, timeout: float, **kwargs: Any) -> None: """Quick Play""" media_type = kwargs.pop("media_type", "video/mp4") @@ -556,7 +556,7 @@ def quick_play(self, *, media_id: str, timeout: float, **kwargs: Any) -> None: self.play_media( media_id, media_type, **kwargs, callback_function=response_handler.callback ) - response_handler.wait_response() + await response_handler.wait_response() class MediaController(BaseMediaPlayer): @@ -623,35 +623,35 @@ def _send_command( command, callback_function=callback_function, inc_session_id=True ) - def play(self, timeout: float = 10.0) -> None: + async def play(self, timeout: float = 10.0) -> None: """Send the PLAY command.""" response_handler = WaitResponse(timeout, "play") self._send_command({MESSAGE_TYPE: TYPE_PLAY}, response_handler.callback) - response_handler.wait_response() + await response_handler.wait_response() - def pause(self, timeout: float = 10.0) -> None: + async def pause(self, timeout: float = 10.0) -> None: """Send the PAUSE command.""" response_handler = WaitResponse(timeout, "pause") self._send_command({MESSAGE_TYPE: TYPE_PAUSE}, response_handler.callback) - response_handler.wait_response() + await response_handler.wait_response() - def stop(self, timeout: float = 10.0) -> None: + async def stop(self, timeout: float = 10.0) -> None: """Send the STOP command.""" response_handler = WaitResponse(timeout, "stop") self._send_command({MESSAGE_TYPE: TYPE_STOP}, response_handler.callback) - response_handler.wait_response() + await response_handler.wait_response() - def rewind(self, timeout: float = 10.0) -> None: + async def rewind(self, timeout: float = 10.0) -> None: """Starts playing the media from the beginning.""" - self.seek(0, timeout) + await self.seek(0, timeout) - def skip(self, timeout: float = 10.0) -> None: + async def skip(self, timeout: float = 10.0) -> None: """Skips rest of the media. Values less then -5 behaved flaky.""" if not self.status.duration or self.status.duration < 5: return - self.seek(int(self.status.duration) - 5, timeout) + await self.seek(int(self.status.duration) - 5, timeout) - def seek(self, position: float, timeout: float = 10.0) -> None: + async def seek(self, position: float, timeout: float = 10.0) -> None: """Seek the media to a specific location.""" response_handler = WaitResponse(timeout, f"seek {position}") self._send_command( @@ -662,9 +662,9 @@ def seek(self, position: float, timeout: float = 10.0) -> None: }, response_handler.callback, ) - response_handler.wait_response() + await response_handler.wait_response() - def set_playback_rate(self, playback_rate: float, timeout: float = 10.0) -> None: + async def set_playback_rate(self, playback_rate: float, timeout: float = 10.0) -> None: """Set the playback rate. 1.0 is regular time, 0.5 is slow motion.""" response_handler = WaitResponse(timeout, "set playback rate") self._send_command( @@ -674,55 +674,41 @@ def set_playback_rate(self, playback_rate: float, timeout: float = 10.0) -> None }, response_handler.callback, ) - response_handler.wait_response() + await response_handler.wait_response() - def queue_next(self, timeout: float = 10.0) -> None: + async def queue_next(self, timeout: float = 10.0) -> None: """Send the QUEUE_NEXT command.""" response_handler = WaitResponse(timeout, "queue next") self._send_command( {MESSAGE_TYPE: TYPE_QUEUE_UPDATE, "jump": 1}, response_handler.callback ) - response_handler.wait_response() + await response_handler.wait_response() - def queue_prev(self, timeout: float = 10.0) -> None: + async def queue_prev(self, timeout: float = 10.0) -> None: """Send the QUEUE_PREV command.""" response_handler = WaitResponse(timeout, "queue prev") self._send_command( {MESSAGE_TYPE: TYPE_QUEUE_UPDATE, "jump": -1}, response_handler.callback ) - response_handler.wait_response() + await response_handler.wait_response() - def enable_subtitle(self, track_id: int, timeout: float = 10.0) -> None: + async def enable_subtitle(self, track_id: int, timeout: float = 10.0) -> None: """Enable specific text track.""" response_handler = WaitResponse(timeout, "enable subtitle") self._send_command( {MESSAGE_TYPE: TYPE_EDIT_TRACKS_INFO, "activeTrackIds": [track_id]}, response_handler.callback, ) - response_handler.wait_response() + await response_handler.wait_response() - def disable_subtitle(self, timeout: float = 10.0) -> None: + async def disable_subtitle(self, timeout: float = 10.0) -> None: """Disable subtitle.""" response_handler = WaitResponse(timeout, "disable subtitle") self._send_command( {MESSAGE_TYPE: TYPE_EDIT_TRACKS_INFO, "activeTrackIds": []}, response_handler.callback, ) - response_handler.wait_response() - - def block_until_active(self, timeout: float | None = None) -> None: - """ - Blocks thread until the media controller session is active on the - chromecast. The media controller only accepts playback control - commands when a media session is active. - - If a session is already active then the method returns immediately. - - :param timeout: a floating point number specifying a timeout for the - operation in seconds (or fractions thereof). Or None - to block forever. - """ - self.session_active_event.wait(timeout=timeout) + await response_handler.wait_response() def _process_media_status(self, data: dict) -> None: """Processes a STATUS message.""" diff --git a/pychromecast/controllers/receiver.py b/pychromecast/controllers/receiver.py index c847690c5..cfe3e38ec 100644 --- a/pychromecast/controllers/receiver.py +++ b/pychromecast/controllers/receiver.py @@ -241,7 +241,7 @@ def stop_app( callback_function=callback_function, ) - def set_volume(self, volume: float, timeout: float = REQUEST_TIMEOUT) -> float: + async def set_volume(self, volume: float, timeout: float = REQUEST_TIMEOUT) -> float: """Allows to set volume. Should be value between 0..1. Returns the new volume. @@ -253,17 +253,17 @@ def set_volume(self, volume: float, timeout: float = REQUEST_TIMEOUT) -> float: {MESSAGE_TYPE: "SET_VOLUME", "volume": {"level": volume}}, callback_function=response_handler.callback, ) - response_handler.wait_response() + await response_handler.wait_response() return volume - def set_volume_muted(self, muted: bool, timeout: float = REQUEST_TIMEOUT) -> None: + async def set_volume_muted(self, muted: bool, timeout: float = REQUEST_TIMEOUT) -> None: """Allows to mute volume.""" response_handler = WaitResponse(timeout, "mute volume") self.send_message( {MESSAGE_TYPE: "SET_VOLUME", "volume": {"muted": muted}}, callback_function=response_handler.callback, ) - response_handler.wait_response() + await response_handler.wait_response() @staticmethod def _parse_status(data: dict, cast_type: str) -> CastStatus: diff --git a/pychromecast/controllers/supla.py b/pychromecast/controllers/supla.py index e99454c78..57863d4e9 100644 --- a/pychromecast/controllers/supla.py +++ b/pychromecast/controllers/supla.py @@ -55,7 +55,7 @@ def play_media( no_add_request_id=True, ) - def quick_play( + async def quick_play( self, *, media_id: str, timeout: float, is_live: bool = False, **kwargs: Any ) -> None: """Quick Play""" @@ -66,4 +66,4 @@ def quick_play( **kwargs, callback_function=response_handler.callback, ) - response_handler.wait_response() + await response_handler.wait_response() diff --git a/pychromecast/controllers/yleareena.py b/pychromecast/controllers/yleareena.py index e695ced8f..32eefd158 100644 --- a/pychromecast/controllers/yleareena.py +++ b/pychromecast/controllers/yleareena.py @@ -56,7 +56,7 @@ def play_areena_media( # pylint: disable=too-many-locals self.send_message(msg, inc_session_id=True, callback_function=callback_function) - def quick_play( + async def quick_play( self, *, media_id: str, @@ -74,4 +74,4 @@ def quick_play( **kwargs, callback_function=response_handler.callback, ) - response_handler.wait_response() + await response_handler.wait_response() diff --git a/pychromecast/response_handler.py b/pychromecast/response_handler.py index 35bc74904..a5c25217a 100644 --- a/pychromecast/response_handler.py +++ b/pychromecast/response_handler.py @@ -2,9 +2,10 @@ from __future__ import annotations -from collections.abc import Callable +import asyncio +import contextlib import logging -import threading +from collections.abc import Callable from typing import Protocol from .error import RequestFailed, RequestTimeout @@ -37,7 +38,7 @@ class WaitResponse: def __init__(self, timeout: float, request: str) -> None: """Initialize.""" - self._event = threading.Event() + self._event = asyncio.Event() self._request = request self._timeout = timeout @@ -47,9 +48,12 @@ def callback(self, msg_sent: bool, response: dict | None) -> None: self.msg_sent = msg_sent self._event.set() - def wait_response(self) -> None: + async def wait_response(self) -> None: """Wait for the request to finish.""" - request_completed = self._event.wait(self._timeout) + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for(self._event.wait(), self._timeout) + + request_completed = self._event.is_set() if not request_completed: raise RequestTimeout(self._request, self._timeout) diff --git a/pychromecast/socket_client.py b/pychromecast/socket_client.py index 492451858..1b102c2e8 100644 --- a/pychromecast/socket_client.py +++ b/pychromecast/socket_client.py @@ -10,17 +10,16 @@ from __future__ import annotations import abc -import errno +import asyncio import json import logging -import selectors -import socket import ssl -import threading +import struct import time +from asyncio import BaseTransport, CancelledError, Task, Transport from collections import defaultdict from dataclasses import dataclass -from struct import pack, unpack +from struct import pack import zeroconf @@ -134,7 +133,7 @@ def new_connection_status(self, status: ConnectionStatus) -> None: # pylint: disable-next=too-many-instance-attributes -class SocketClient(threading.Thread, CastStatusListener): +class SocketClient(asyncio.Protocol, CastStatusListener): """ Class to interact with a Chromecast through a socket. @@ -175,8 +174,6 @@ def __init__( ) -> None: super().__init__() - self.daemon = True - self.logger = logging.getLogger(__name__) self._force_recon = False @@ -193,9 +190,10 @@ def __init__( self.port = 8009 self.source_id = "sender-0" - self.stop = threading.Event() - # socketpair used to interrupt the worker thread - self.socketpair = socket.socketpair() + self._transport: Transport | None = None + self._loop = asyncio.get_event_loop() + self._connected = False + self._connection_daemon_task: Task | None = None self.app_namespaces: list[str] = [] self.destination_id: str | None = None @@ -206,12 +204,6 @@ def __init__( self.connecting = True self.first_connection = True - self.socket: socket.socket | ssl.SSLSocket | None = None - self.selector = selectors.DefaultSelector() - self.wakeup_selector_key = self.selector.register( - self.socketpair[0], selectors.EVENT_READ - ) - self.remote_selector_key: selectors.SelectorKey | None = None # dict mapping namespace on Controller objects self._handlers: dict[str, set[BaseController]] = defaultdict(set) @@ -228,17 +220,91 @@ def __init__( self.receiver_controller.register_status_listener(self) - def initialize_connection( # pylint:disable=too-many-statements, too-many-branches + def connection_made(self, transport: BaseTransport): + # peername = transport.get_extra_info('peername') + # print('Connection from {}'.format(peername)) + self._transport = transport + self._connected = True + self.logger.debug("[%s(%s):%s] Connection made", self.fn or "", self.host, self.port) + self.heartbeat_controller.ping() + + + def connection_lost(self, exc): + self.logger.warning("[%s(%s):%s] Connection lost with the server, reconnect... (%s)", + self.fn or "", self.host, self.port, exc) + if self._transport: + self._transport.close() + self._transport = None + self._report_connection_status( + ConnectionStatus( + CONNECTION_STATUS_LOST, + NetworkAddress(self.host, self.port), + None, + ) + ) + + def data_received(self, data: bytes): + if len(data) == 0: + self._transport.close() + self._transport = None + self._report_connection_status( + ConnectionStatus( + CONNECTION_STATUS_LOST, + NetworkAddress(self.host, self.port), + None, + ) + ) + return + + # first 4 bytes is Big-Endian payload length + read_len = struct.unpack(">I", data[0:4])[0] + data = data[4:] + data = data[:read_len] + message = CastMessage() + message.ParseFromString(data) + if not message: + return + data = _dict_from_message_payload(message) + asyncio.create_task(self._handle_response(message, data)) + + async def _handle_response(self, message: CastMessage, data: dict): + try: + self._route_message(message, data) + except Exception as ex: + self.logger.error( + "[%s(%s):%s] Error handling response message :%s", self.fn or "", self.host, self.port, ex + ) + if REQUEST_ID in data and data[REQUEST_ID] in self._request_callbacks: + self._request_callbacks.pop(data[REQUEST_ID])(True, data) + + async def _connection_task(self): + context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + self._transport, protocol = await self._loop.create_connection( + lambda: self, + host=self.host, + port=self.port, + ssl=context, + ssl_handshake_timeout=self.timeout, + ssl_shutdown_timeout=self.timeout, + ) + + async def initialize_connection( # pylint:disable=too-many-statements, too-many-branches self, ) -> None: """Initialize a socket to a Chromecast, retrying as necessary.""" tries = self.tries - if self.socket is not None: - self.selector.unregister(self.socket) - self.socket.close() - self.socket = None - self.remote_selector_key = None + if self._transport: + return + + if self._connection_daemon_task: + try: + self._connection_daemon_task.cancel() + except CancelledError: + pass + self._connection_daemon_task = None # Make sure nobody is blocking. for callback_function in self._request_callbacks.values(): @@ -267,36 +333,23 @@ def mdns_backoff( retry["delay"] = min(retry["delay"] * 2, 300) retries[service] = retry - while not self.stop.is_set() and ( - tries is None or tries > 0 - ): # pylint:disable=too-many-nested-blocks + while tries is None or tries > 0: # pylint:disable=too-many-nested-blocks # Prune retries dict - retries = { - key: retries[key] - for key in self.services.copy() - if (key is not None and key in retries) - } - + retries = {key: retries[key] for key in self.services.copy() if (key is not None and key in retries)} for service in self.services.copy(): now = time.time() - retry = retries.get( - service, {"delay": self.retry_wait, "next_retry": now} + retry = retries.get(service, {"delay": self.retry_wait, "next_retry": now}) + self.logger.debug( + "[%s(%s):%s] Connection try %s", + self.host, + self.port, + service, + retry, ) if now < retry["next_retry"]: continue + try: - if self.socket is not None: - # If we retry connecting, we need to clean up the socket again - self.selector.unregister(self.socket) - self.socket.close() - self.socket = None - self.remote_selector_key = None - - self.socket = new_socket() - self.remote_selector_key = self.selector.register( - self.socket, selectors.EVENT_READ - ) - self.socket.settimeout(self.timeout) self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_CONNECTING, @@ -307,9 +360,7 @@ def mdns_backoff( # Resolve the service name. host = None port = None - host, port, service_info = get_host_from_service( - service, self.zconf - ) + host, port, service_info = get_host_from_service(service, self.zconf) if host and port: if service_info: try: @@ -356,11 +407,21 @@ def mdns_backoff( self.host, self.port, ) - self.socket.connect((self.host, self.port)) - context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - self.socket = context.wrap_socket(self.socket) + + await asyncio.wait_for(self._connection_task(), timeout=self.timeout) + self.logger.debug( + "[%s(%s):%s] Resolved service %s to %s:%s", + self.fn or "", + self.host, + self.port, + service, + host, + port, + ) + self.heartbeat_controller.reset() + if self._connection_daemon_task is None: + self._connection_daemon_task = asyncio.create_task(self._connection_deamon()) + self.connecting = False self._force_recon = False self._report_connection_status( @@ -395,17 +456,14 @@ def mdns_backoff( # if another thread tries - and fails - to send a message before the # calls to receiver_controller and heartbeat_controller. except (OSError, NotConnected) as err: + self.logger.info( + "[%s(%s):%s] Connection error %s", + self.fn or "", + self.host, + self.port, + err, + ) self.connecting = True - if self.stop.is_set(): - self.logger.error( - "[%s(%s):%s] Failed to connect: %s. aborting due to stop signal.", - self.fn or "", - self.host, - self.port, - err, - ) - raise ChromecastConnectionError("Failed to connect") from err - self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_FAILED, @@ -439,7 +497,6 @@ def mdns_backoff( if tries: tries -= 1 - self.stop.set() self.logger.error( "[%s(%s):%s] Failed to connect. No retries.", self.fn or "", @@ -450,13 +507,22 @@ def mdns_backoff( def disconnect(self) -> None: """Disconnect socket connection to Chromecast device""" - self.stop.set() - try: - # Write to the socket to interrupt the worker thread - self.socketpair[1].send(b"x") - except socket.error: - # The socketpair may already be closed during shutdown, ignore it - pass + # self.stop.set() + if self._transport: + self._transport.close() + self._transport = None + self._connected = False + if self._connection_daemon_task: + try: + self._connection_daemon_task.cancel() + except CancelledError: + pass + self._connection_daemon_task = None + + @property + def connected(self) -> bool: + """Connection status.""" + return self._connected def register_handler(self, handler: BaseController) -> None: """Register a new namespace handler.""" @@ -466,10 +532,7 @@ def register_handler(self, handler: BaseController) -> None: def unregister_handler(self, handler: BaseController) -> None: """Register a new namespace handler.""" - if ( - handler.namespace in self._handlers - and handler in self._handlers[handler.namespace] - ): + if handler.namespace in self._handlers and handler in self._handlers[handler.namespace]: self._handlers[handler.namespace].remove(handler) handler.unregistered() @@ -511,26 +574,18 @@ def _gen_request_id(self) -> int: return self._request_id - @property - def is_connected(self) -> bool: - """ - Returns True if the client is connected, False if it is stopped - (or trying to connect). - """ - return not self.connecting - @property def is_stopped(self) -> bool: """ Returns True if the connection has been stopped, False if it is running. """ - return self.stop.is_set() + return self._transport is None - def run(self) -> None: + async def connect(self) -> None: """Connect to the cast and start polling the socket.""" try: - self.initialize_connection() + await self.initialize_connection() except ChromecastConnectionError: self._report_connection_status( ConnectionStatus( @@ -541,116 +596,15 @@ def run(self) -> None: ) return - self.heartbeat_controller.reset() - self.logger.debug("Thread started...") - while not self.stop.is_set(): - try: - if self._run_once() == 1: - break - except Exception: # pylint: disable=broad-except - self._force_recon = True - self.logger.exception( - "[%s(%s):%s] Unhandled exception in worker thread, attempting reconnect", - self.fn or "", - self.host, - self.port, - ) - - self.logger.debug("Thread done...") - # Clean up - self._cleanup() + self.logger.debug("[%s(%s):%s] Connection established", self.fn or "", self.host, self.port) - def _run_once(self) -> int: - """Receive from the socket and handle data.""" - # pylint: disable=too-many-branches, too-many-statements, too-many-return-statements + async def _connection_deamon(self): + """Checks connection every 10 seconds and reconnects if lost.""" + while True: + await asyncio.sleep(10) + await self._check_connection() - try: - if not self._check_connection(): - return 0 - except ChromecastConnectionError: - return 1 - - # A connection has been established at this point by self._check_connection - assert self.socket is not None - - # Poll the socket and the socketpair, with a timeout of SELECT_TIMEOUT - # The timeout ensures we call _check_connection often enough to avoid - # the HeartbeatController from detecting a timeout - # The socketpair allow us to be interrupted on shutdown without waiting - # for the SELECT_TIMEOUT timout to expire - try: - ready = self.selector.select(SELECT_TIMEOUT) - except (ValueError, OSError) as exc: - self.logger.error( - "[%s(%s):%s] Error in select call: %s", - self.fn or "", - self.host, - self.port, - exc, - ) - self._force_recon = True - return 0 - - can_read = {key for key, _ in ready} - # read message from chromecast - message = None - if self.remote_selector_key in can_read and not self._force_recon: - try: - message = self._read_message() - except InterruptLoop as exc: - if self.stop.is_set(): - self.logger.info( - "[%s(%s):%s] Stopped while reading message, disconnecting.", - self.fn or "", - self.host, - self.port, - ) - else: - self.logger.error( - "[%s(%s):%s] Interruption caught without being stopped: %s", - self.fn or "", - self.host, - self.port, - exc, - ) - return 1 - except ssl.SSLError as exc: - if exc.errno == ssl.SSL_ERROR_EOF: - if self.stop.is_set(): - return 1 - raise - except socket.error as exc: - self._force_recon = True - self.logger.error( - "[%s(%s):%s] Error reading from socket: %s", - self.fn or "", - self.host, - self.port, - exc, - ) - - if self.wakeup_selector_key in can_read: - # Clear the socket's buffer - self.socketpair[0].recv(128) - - # If we are stopped after receiving a message we skip the message - # and tear down the connection - if self.stop.is_set(): - return 1 - - if not message: - return 0 - - # See if any handlers will accept this message - data = _dict_from_message_payload(message) - self._route_message(message, data) - - if REQUEST_ID in data and data[REQUEST_ID] in self._request_callbacks: - self._request_callbacks.pop(data[REQUEST_ID])(True, data) - - return 0 - - def _check_connection(self) -> bool: + async def _check_connection(self) -> bool: """ Checks if the connection is active, and if not reconnect @@ -682,14 +636,21 @@ def _check_connection(self) -> bool: for channel in self._open_channels: self.disconnect_channel(channel) self._report_connection_status( - ConnectionStatus( - CONNECTION_STATUS_LOST, NetworkAddress(self.host, self.port), None - ) + ConnectionStatus(CONNECTION_STATUS_LOST, NetworkAddress(self.host, self.port), None) ) + if self._transport: + self._transport.close() + self._transport = None + try: - self.initialize_connection() + await self.initialize_connection() except ChromecastConnectionError: - self.stop.set() + self.logger.error( + "[%s(%s):%s] Chromecast Connection error", + self.fn or "", + self.host, + self.port, + ) return False return True @@ -698,10 +659,7 @@ def _route_message(self, message: CastMessage, data: dict) -> None: # route message to handlers if message.namespace in self._handlers: # debug messages - if ( - message.namespace != NS_HEARTBEAT - or self.heartbeat_controller.logger.isEnabledFor(logging.DEBUG) - ): + if message.namespace != NS_HEARTBEAT or self.heartbeat_controller.logger.isEnabledFor(logging.DEBUG): self.logger.debug( "[%s(%s):%s] Received: %s", self.fn or "", @@ -726,10 +684,7 @@ def _route_message(self, message: CastMessage, data: dict) -> None: ) except Exception: # pylint: disable=broad-except self.logger.exception( - ( - "[%s(%s):%s] Exception caught while sending message to " - "controller %s: %s" - ), + ("[%s(%s):%s] Exception caught while sending message to " "controller %s: %s"), self.fn or "", self.host, self.port, @@ -761,13 +716,6 @@ def _cleanup(self) -> None: except Exception: # pylint: disable=broad-except pass - if self.socket is not None: - try: - self.socket.close() - except Exception: # pylint: disable=broad-except - self.logger.exception( - "[%s(%s):%s] _cleanup", self.fn or "", self.host, self.port - ) self._report_connection_status( ConnectionStatus( CONNECTION_STATUS_DISCONNECTED, @@ -775,11 +723,6 @@ def _cleanup(self) -> None: None, ) ) - - self.socketpair[0].close() - self.socketpair[1].close() - self.selector.close() - self.connecting = True def _report_connection_status(self, status: ConnectionStatus) -> None: @@ -804,46 +747,6 @@ def _report_connection_status(self, status: ConnectionStatus) -> None: self.port, ) - def _read_bytes_from_socket(self, msglen: int) -> bytes: - """Read bytes from the socket.""" - # It is a programming error if this is called when we don't have a socket - assert self.socket is not None - - chunks = [] - bytes_recd = 0 - while bytes_recd < msglen: - if self.stop.is_set(): - raise InterruptLoop("Stopped while reading from socket") - try: - chunk = self.socket.recv(min(msglen - bytes_recd, 2048)) - if chunk == b"": - raise socket.error("socket connection broken") - chunks.append(chunk) - bytes_recd += len(chunk) - except TimeoutError: - self.logger.debug( - "[%s(%s):%s] timeout in : _read_bytes_from_socket", - self.fn or "", - self.host, - self.port, - ) - continue - return b"".join(chunks) - - def _read_message(self) -> CastMessage: - """Reads a message from the socket and converts it to a message.""" - # first 4 bytes is Big-Endian payload length - payload_info = self._read_bytes_from_socket(4) - read_len = unpack(">I", payload_info)[0] - - # now read the payload - payload = self._read_bytes_from_socket(read_len) - - message = CastMessage() - message.ParseFromString(payload) - - return message - # pylint: disable=too-many-arguments, too-many-branches def send_message( self, @@ -865,6 +768,7 @@ def send_message( # If channel is not open yet, connect to it. self._ensure_channel_connected(destination_id) + request_id = None if not no_add_request_id: request_id = self._gen_request_id() data[REQUEST_ID] = request_id @@ -885,10 +789,7 @@ def send_message( be_size = pack(">I", msg.ByteSize()) # Log all messages except heartbeat - if ( - msg.namespace != NS_HEARTBEAT - or self.heartbeat_controller.logger.isEnabledFor(logging.DEBUG) - ): + if msg.namespace != NS_HEARTBEAT or self.heartbeat_controller.logger.isEnabledFor(logging.DEBUG): self.logger.debug( "[%s(%s):%s] Sending: %s", self.fn or "", @@ -897,22 +798,21 @@ def send_message( _message_to_string(msg, data), ) - if not force and self.stop.is_set(): + if not force and self._transport is None: if callback_function: callback_function(False, None) - raise PyChromecastStopped("Socket client's thread is stopped.") + raise PyChromecastStopped("Socket client's is stopped.") if not self.connecting and not self._force_recon: # We have a socket - assert self.socket is not None - + assert self._transport is not None try: if callback_function: if not no_add_request_id: self._request_callbacks[request_id] = callback_function else: callback_function(True, None) - self.socket.sendall(be_size + msg.SerializeToString()) - except socket.error as exc: + self._transport.write(be_size + msg.SerializeToString()) + except Exception as exc: if callback_function: callback_function(False, None) if not no_add_request_id: @@ -970,9 +870,7 @@ def send_app_message( if self.destination_id is None: if callback_function: callback_function(False, None) - raise NotConnected( - "Attempting send a message when destination_id is not set" - ) + raise NotConnected("Attempting send a message when destination_id is not set") return self.send_message( self.destination_id, @@ -1027,9 +925,7 @@ def disconnect_channel(self, destination_id: str) -> None: except NotConnected: pass except Exception: # pylint: disable=broad-except - self.logger.exception( - "[%s(%s):%s] Exception", self.fn or "", self.host, self.port - ) + self.logger.exception("[%s(%s):%s] Exception", self.fn or "", self.host, self.port) self._open_channels.remove(destination_id) @@ -1075,29 +971,3 @@ def receive_message(self, message: CastMessage, data: dict) -> bool: return True return False - - -def new_socket() -> socket.socket: - """ - Create a new socket with OS-specific parameters - - Try to set SO_REUSEPORT for BSD-flavored systems if it's an option. - Catches errors if not. - """ - new_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - new_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - - try: - # noinspection PyUnresolvedReferences - reuseport = socket.SO_REUSEPORT - except AttributeError: - pass - else: - try: - new_sock.setsockopt(socket.SOL_SOCKET, reuseport, 1) - except (OSError, socket.error) as err: - # OSError on python 3, socket.error on python 2 - if err.errno != errno.ENOPROTOOPT: - raise - - return new_sock diff --git a/pyproject.toml b/pyproject.toml index 88d67fe18..b8af27e8c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "PyChromecast" -version = "14.0.7" +version = "15.0.0" license = {text = "MIT"} description = "Python module to talk to Google Chromecast." readme = "README.rst" diff --git a/requirements.txt b/requirements.txt index 8887ef5d0..71d855d9b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ casttube==0.2.1 -protobuf==5.29.3 -zeroconf==0.146.5 +protobuf==5.29.4 +zeroconf==0.146.5 \ No newline at end of file