diff --git a/twitter/errors.py b/twitter/errors.py new file mode 100644 index 0000000..49059c8 --- /dev/null +++ b/twitter/errors.py @@ -0,0 +1,6 @@ +from httpx import Response + +class HttpResponseError(Exception): + def __init__(self, content: str, status_code: int) -> None: + self.content = content + self.status_code = status_code \ No newline at end of file diff --git a/twitter/scraper.py b/twitter/scraper.py index f35fa0f..cb1005c 100644 --- a/twitter/scraper.py +++ b/twitter/scraper.py @@ -9,10 +9,11 @@ import websockets from httpx import AsyncClient, Limits, ReadTimeout, URL from tqdm.asyncio import tqdm_asyncio - +from .errors import HttpResponseError from .constants import * from .login import login from .util import * +import time try: if get_ipython().__class__.__name__ == 'ZMQInteractiveShell': @@ -30,7 +31,6 @@ except ImportError as e: ... - class Scraper: def __init__(self, email: str = None, username: str = None, password: str = None, session: Client = None, **kwargs): self.save = kwargs.get('save', True) @@ -42,7 +42,7 @@ def __init__(self, email: str = None, username: str = None, password: str = None self.session = self._validate_session(email, username, password, session, **kwargs) self.rate_limits = {} - def users(self, screen_names: list[str], **kwargs) -> list[dict]: + async def users(self, screen_names: list[str], **kwargs) -> list[dict]: """ Get user data by screen names. @@ -50,9 +50,9 @@ def users(self, screen_names: list[str], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of user data as dicts """ - return self._run(Operation.UserByScreenName, screen_names, **kwargs) + return await self._run(Operation.UserByScreenName, screen_names, **kwargs) - def tweets_by_id(self, tweet_ids: list[int | str], **kwargs) -> list[dict]: + async def tweets_by_id(self, tweet_ids: list[int | str], **kwargs) -> list[dict]: """ Get tweet metadata by tweet ids. @@ -60,9 +60,9 @@ def tweets_by_id(self, tweet_ids: list[int | str], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of tweet data as dicts """ - return self._run(Operation.TweetResultByRestId, tweet_ids, **kwargs) + return await self._run(Operation.TweetResultByRestId, tweet_ids, **kwargs) - def tweets_by_ids(self, tweet_ids: list[int | str], **kwargs) -> list[dict]: + async def tweets_by_ids(self, tweet_ids: list[int | str], **kwargs) -> list[dict]: """ Get tweet metadata by tweet ids. @@ -72,9 +72,9 @@ def tweets_by_ids(self, tweet_ids: list[int | str], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of tweet data as dicts """ - return self._run(Operation.TweetResultsByRestIds, batch_ids(tweet_ids), **kwargs) + return await self._run(Operation.TweetResultsByRestIds, batch_ids(tweet_ids), **kwargs) - def tweets_details(self, tweet_ids: list[int], **kwargs) -> list[dict]: + async def tweets_details(self, tweet_ids: list[int], **kwargs) -> list[dict]: """ Get tweet data by tweet ids. @@ -84,9 +84,9 @@ def tweets_details(self, tweet_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of tweet data as dicts """ - return self._run(Operation.TweetDetail, tweet_ids, **kwargs) + return await self._run(Operation.TweetDetail, tweet_ids, **kwargs) - def tweets(self, user_ids: list[int], **kwargs) -> list[dict]: + async def tweets(self, user_ids: list[int], **kwargs) -> list[dict]: """ Get tweets by user ids. @@ -96,9 +96,9 @@ def tweets(self, user_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of tweet data as dicts """ - return self._run(Operation.UserTweets, user_ids, **kwargs) + return await self._run(Operation.UserTweets, user_ids, **kwargs) - def tweets_and_replies(self, user_ids: list[int], **kwargs) -> list[dict]: + async def tweets_and_replies(self, user_ids: list[int], **kwargs) -> list[dict]: """ Get tweets and replies by user ids. @@ -108,9 +108,9 @@ def tweets_and_replies(self, user_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of tweet data as dicts """ - return self._run(Operation.UserTweetsAndReplies, user_ids, **kwargs) + return await self._run(Operation.UserTweetsAndReplies, user_ids, **kwargs) - def media(self, user_ids: list[int], **kwargs) -> list[dict]: + async def media(self, user_ids: list[int], **kwargs) -> list[dict]: """ Get media by user ids. @@ -120,9 +120,9 @@ def media(self, user_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of tweet data as dicts """ - return self._run(Operation.UserMedia, user_ids, **kwargs) + return await self._run(Operation.UserMedia, user_ids, **kwargs) - def likes(self, user_ids: list[int], **kwargs) -> list[dict]: + async def likes(self, user_ids: list[int], **kwargs) -> list[dict]: """ Get likes by user ids. @@ -132,9 +132,9 @@ def likes(self, user_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of tweet data as dicts """ - return self._run(Operation.Likes, user_ids, **kwargs) + return await self._run(Operation.Likes, user_ids, **kwargs) - def followers(self, user_ids: list[int], **kwargs) -> list[dict]: + async def followers(self, user_ids: list[int], **kwargs) -> list[dict]: """ Get followers by user ids. @@ -144,9 +144,9 @@ def followers(self, user_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of user data as dicts """ - return self._run(Operation.Followers, user_ids, **kwargs) + return await self._run(Operation.Followers, user_ids, **kwargs) - def following(self, user_ids: list[int], **kwargs) -> list[dict]: + async def following(self, user_ids: list[int], **kwargs) -> list[dict]: """ Get following by user ids. @@ -156,9 +156,9 @@ def following(self, user_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of user data as dicts """ - return self._run(Operation.Following, user_ids, **kwargs) + return await self._run(Operation.Following, user_ids, **kwargs) - def favoriters(self, tweet_ids: list[int], **kwargs) -> list[dict]: + async def favoriters(self, tweet_ids: list[int], **kwargs) -> list[dict]: """ Get favoriters by tweet ids. @@ -168,9 +168,9 @@ def favoriters(self, tweet_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of user data as dicts """ - return self._run(Operation.Favoriters, tweet_ids, **kwargs) + return await self._run(Operation.Favoriters, tweet_ids, **kwargs) - def retweeters(self, tweet_ids: list[int], **kwargs) -> list[dict]: + async def retweeters(self, tweet_ids: list[int], **kwargs) -> list[dict]: """ Get retweeters by tweet ids. @@ -180,9 +180,9 @@ def retweeters(self, tweet_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of user data as dicts """ - return self._run(Operation.Retweeters, tweet_ids, **kwargs) + return await self._run(Operation.Retweeters, tweet_ids, **kwargs) - def tweet_stats(self, user_ids: list[int], **kwargs) -> list[dict]: + async def tweet_stats(self, user_ids: list[int], **kwargs) -> list[dict]: """ Get tweet statistics by user ids. @@ -190,9 +190,9 @@ def tweet_stats(self, user_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of tweet statistics as dicts """ - return self._run(Operation.TweetStats, user_ids, **kwargs) + return await self._run(Operation.TweetStats, user_ids, **kwargs) - def users_by_ids(self, user_ids: list[int], **kwargs) -> list[dict]: + async def users_by_ids(self, user_ids: list[int], **kwargs) -> list[dict]: """ Get user data by user ids. @@ -202,9 +202,9 @@ def users_by_ids(self, user_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of user data as dicts """ - return self._run(Operation.UsersByRestIds, batch_ids(user_ids), **kwargs) + return await self._run(Operation.UsersByRestIds, batch_ids(user_ids), **kwargs) - def recommended_users(self, user_ids: list[int] = None, **kwargs) -> list[dict]: + async def recommended_users(self, user_ids: list[int] = None, **kwargs) -> list[dict]: """ Get recommended users by user ids, or general recommendations if no user ids are provided. @@ -216,9 +216,9 @@ def recommended_users(self, user_ids: list[int] = None, **kwargs) -> list[dict]: contexts = [{"context": orjson.dumps({"contextualUserId": x}).decode()} for x in user_ids] else: contexts = [{'context': None}] - return self._run(Operation.ConnectTabTimeline, contexts, **kwargs) + return await self._run(Operation.ConnectTabTimeline, contexts, **kwargs) - def profile_spotlights(self, screen_names: list[str], **kwargs) -> list[dict]: + async def profile_spotlights(self, screen_names: list[str], **kwargs) -> list[dict]: """ Get user data by screen names. @@ -229,9 +229,9 @@ def profile_spotlights(self, screen_names: list[str], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of user data as dicts """ - return self._run(Operation.ProfileSpotlightsQuery, screen_names, **kwargs) + return await self._run(Operation.ProfileSpotlightsQuery, screen_names, **kwargs) - def users_by_id(self, user_ids: list[int], **kwargs) -> list[dict]: + async def users_by_id(self, user_ids: list[int], **kwargs) -> list[dict]: """ Get user data by user ids. @@ -243,9 +243,9 @@ def users_by_id(self, user_ids: list[int], **kwargs) -> list[dict]: @param kwargs: optional keyword arguments @return: list of user data as dicts """ - return self._run(Operation.UserByRestId, user_ids, **kwargs) + return await self._run(Operation.UserByRestId, user_ids, **kwargs) - def download_media(self, ids: list[int], photos: bool = True, videos: bool = True, cards: bool = True, hq_img_variant: bool = True, video_thumb: bool = False, out: str = 'media', + async def download_media(self, ids: list[int], photos: bool = True, videos: bool = True, cards: bool = True, hq_img_variant: bool = True, video_thumb: bool = False, out: str = 'media', metadata_out: str = 'media.json', **kwargs) -> dict: """ Download and extract media metadata from Tweets @@ -332,10 +332,10 @@ async def get(client: AsyncClient, url: str): if cards: tmp.extend(parse_card_media(v['card'])) res.extend([(k, m) for m in tmp]) - asyncio.run(process(download(res, out))) + await process(download(res, out)) return media - def trends(self, utc: list[str] = None) -> dict: + async def trends(self, utc: list[str] = None) -> dict: """ Get trends for all UTC offsets @@ -364,7 +364,7 @@ async def process(): return await tqdm_asyncio.gather(*tasks, desc='Getting trends') return await asyncio.gather(*tasks) - trends = asyncio.run(process()) + trends = await process() out = self.out / 'raw' / 'trends' out.mkdir(parents=True, exist_ok=True) (out / f'{time.time_ns()}.json').write_text(orjson.dumps( @@ -372,7 +372,7 @@ async def process(): option=orjson.OPT_INDENT_2 | orjson.OPT_SORT_KEYS).decode(), encoding='utf-8') return trends - def spaces(self, *, rooms: list[str] = None, search: list[dict] = None, audio: bool = False, chat: bool = False, + async def spaces(self, *, rooms: list[str] = None, search: list[dict] = None, audio: bool = False, chat: bool = False, **kwargs) -> list[dict]: """ Get Twitter spaces data @@ -388,17 +388,17 @@ def spaces(self, *, rooms: list[str] = None, search: list[dict] = None, audio: b @return: list of spaces data """ if rooms: - spaces = self._run(Operation.AudioSpaceById, rooms, **kwargs) + spaces = await self._run(Operation.AudioSpaceById, rooms, **kwargs) else: - res = self._run(Operation.AudioSpaceSearch, search, **kwargs) + res = await self._run(Operation.AudioSpaceSearch, search, **kwargs) search_results = set(find_key(res, 'rest_id')) - spaces = self._run(Operation.AudioSpaceById, search_results, **kwargs) + spaces = await self._run(Operation.AudioSpaceById, search_results, **kwargs) if audio or chat: - return self._get_space_data(spaces, audio, chat) + return await self._get_space_data(spaces, audio, chat) return spaces - def _get_space_data(self, spaces: list[dict], audio=True, chat=True): - streams = self._check_streams(spaces) + async def _get_space_data(self, spaces: list[dict], audio=True, chat=True): + streams = await self._check_streams(spaces) chat_data = None if chat: temp = [] # get necessary keys instead of passing large dicts @@ -411,7 +411,7 @@ def _get_space_data(self, spaces: list[dict], audio=True, chat=True): 'media_key': meta['media_key'], 'state': meta['state'], }) - chat_data = self._get_chat_data(temp) + chat_data = await self._get_chat_data(temp) if audio: temp = [] for stream in streams: @@ -421,7 +421,7 @@ def _get_space_data(self, spaces: list[dict], audio=True, chat=True): 'rest_id': stream['space']['data']['audioSpace']['metadata']['rest_id'], 'chunks': chunks, }) - self._download_audio(temp) + await self._download_audio(temp) return chat_data async def _get_stream(self, client: AsyncClient, media_key: str) -> dict | None: @@ -499,7 +499,7 @@ def _get_chunks(self, location: str) -> list[str]: if self.debug: self.logger.error(f'Failed to get chunks\n{e}') - def _get_chat_data(self, keys: list[dict]) -> list[dict]: + async def _get_chat_data(self, keys: list[dict]) -> list[dict]: async def get(c: AsyncClient, key: dict) -> dict: info = await self._init_chat(c, key['chat_token']) chat = await self._get_chat(c, info['endpoint'], info['access_token']) @@ -522,9 +522,9 @@ async def process(): return await tqdm_asyncio.gather(*tasks, desc='Downloading chat data') return await asyncio.gather(*tasks) - return asyncio.run(process()) + return await process() - def _download_audio(self, data: list[dict]) -> None: + async def _download_audio(self, data: list[dict]) -> None: async def get(s: AsyncClient, chunk: str, rest_id: str) -> tuple: r = await s.get(chunk) return rest_id, r @@ -541,7 +541,7 @@ async def process(data: list[dict]) -> list: return await tqdm_asyncio.gather(*tasks, desc='Downloading audio') return await asyncio.gather(*tasks) - chunks = asyncio.run(process(data)) + chunks = await process(data) streams = {} [streams.setdefault(_id, []).append(chunk) for _id, chunk in chunks] # ensure chunks are in correct order @@ -554,7 +554,7 @@ async def process(data: list[dict]) -> list: with open(out / f'{space_id}.aac', 'wb') as fp: [fp.write(c.content) for c in chunks] - def _check_streams(self, keys: list[dict]) -> list[dict]: + async def _check_streams(self, keys: list[dict]) -> list[dict]: async def get(c: AsyncClient, space: dict) -> dict: media_key = space['data']['audioSpace']['metadata']['media_key'] stream = await self._get_stream(c, media_key) @@ -567,9 +567,9 @@ async def process(): async with AsyncClient(limits=limits, headers=headers, cookies=cookies, timeout=20) as c: return await asyncio.gather(*(get(c, key) for key in keys)) - return asyncio.run(process()) + return await process() - def _run(self, operation: tuple[dict, str, str], queries: set | list[int | str | list | dict], **kwargs): + async def _run(self, operation: tuple[dict, str, str], queries: set | list[int | str | list | dict], **kwargs): keys, qid, name = operation # stay within rate-limits if (l := len(queries)) > MAX_ENDPOINT_LIMIT: @@ -578,27 +578,45 @@ def _run(self, operation: tuple[dict, str, str], queries: set | list[int | str | queries = list(queries)[:MAX_ENDPOINT_LIMIT] if all(isinstance(q, dict) for q in queries): - data = asyncio.run(self._process(operation, list(queries), **kwargs)) + data = await self._process(operation, list(queries), **kwargs) return get_json(data, **kwargs) # queries are of type set | list[int|str], need to convert to list[dict] _queries = [{k: q} for q in queries for k, v in keys.items()] - res = asyncio.run(self._process(operation, _queries, **kwargs)) + res = await self._process(operation, _queries, **kwargs) data = get_json(res, **kwargs) return data.pop() if kwargs.get('cursor') else flatten(data) async def _query(self, client: AsyncClient, operation: tuple, **kwargs) -> Response: + wait_restrict = kwargs.pop("wait_restrict", False) + keys, qid, name = operation + + for k in keys: + if k not in kwargs: + raise ValueError(f"Invalid args to query '{kwargs}', '{k}' field is needed") + params = { 'variables': Operation.default_variables | keys | kwargs, 'features': Operation.default_features, } + r = await client.get(f'https://twitter.com/i/api/graphql/{qid}/{name}', params=build_params(params)) try: self.rate_limits[name] = {k: int(v) for k, v in r.headers.items() if 'rate-limit' in k} + if name in self.rate_limits and self.rate_limits[name]['x-rate-limit-remaining'] == 0 and wait_restrict: + dur = self.rate_limits[name]['x-rate-limit-reset'] - time.time() + if dur > 0: + print(f"\nRestiricted by API for {dur} secs.") + await asyncio.sleep(dur) + kwargs["wait_restrict"] = wait_restrict + await self._query(client, operation, **kwargs) except Exception as e: - self.logger.debug(f'{e}') + self.logger.debug(f'Failed to set up rate limits {e}') + + if r.status_code != 200: + raise HttpResponseError(r.text, r.status_code) if self.debug: log(self.logger, self.debug, r) @@ -745,7 +763,7 @@ async def get(c: AsyncClient, space: dict) -> list[dict]: return await tqdm_asyncio.gather(*tasks, desc='Getting live transcripts') return await asyncio.gather(*tasks) - def space_live_transcript(self, room: str, frequency: int = 1): + async def space_live_transcript(self, room: str, frequency: int = 1): """ Log live transcript of a space @@ -754,15 +772,12 @@ def space_live_transcript(self, room: str, frequency: int = 1): @return: None """ - async def get(spaces: list[dict]): - client = init_session() - chats = await self._get_live_chats(client, spaces) - await asyncio.gather(*(self._space_listener(c, frequency) for c in chats)) - - spaces = self.spaces(rooms=[room]) - asyncio.run(get(spaces)) + spaces = await self.spaces(rooms=[room]) + client = init_session() + chats = await self._get_live_chats(client, spaces) + await asyncio.gather(*(self._space_listener(c, frequency) for c in chats)) - def spaces_live(self, rooms: list[str]): + async def spaces_live(self, rooms: list[str]): """ Capture live audio stream from spaces @@ -841,8 +856,8 @@ async def process(spaces: list[dict]): async with AsyncClient(limits=limits, headers=headers, cookies=cookies, timeout=20) as c: return await asyncio.gather(*(poll_space(c, space) for space in spaces)) - spaces = self.spaces(rooms=rooms) - return asyncio.run(process(spaces)) + spaces = await self.spaces(rooms=rooms) + return await process(spaces) def _init_logger(self, **kwargs) -> Logger: if kwargs.get('debug'): diff --git a/twitter/search.py b/twitter/search.py index 2291741..53c371e 100644 --- a/twitter/search.py +++ b/twitter/search.py @@ -42,10 +42,11 @@ def __init__(self, email: str = None, username: str = None, password: str = None self.logger = self._init_logger(**kwargs) self.session = self._validate_session(email, username, password, session, **kwargs) - def run(self, queries: list[dict], limit: int = math.inf, out: str = 'data/search_results', **kwargs): - out = Path(out) - out.mkdir(parents=True, exist_ok=True) - return asyncio.run(self.process(queries, limit, out, **kwargs)) + async def run(self, queries: list[dict], limit: int = math.inf, out: str = 'data/search_results', **kwargs): + if out is not None: + out = Path(out) + out.mkdir(parents=True, exist_ok=True) + return await self.process(queries, limit, out, **kwargs) async def process(self, queries: list[dict], limit: int, out: Path, **kwargs) -> list: async with AsyncClient(headers=get_headers(self.session)) as s: diff --git a/twitter/util.py b/twitter/util.py index 5fd0fbd..8442942 100644 --- a/twitter/util.py +++ b/twitter/util.py @@ -7,12 +7,15 @@ import aiofiles import orjson +from .errors import HttpResponseError from aiofiles.os import makedirs from httpx import Response, Client from textwrap import dedent from .constants import GREEN, MAGENTA, RED, RESET, MAX_GQL_CHAR_LIMIT, USER_AGENTS, ORANGE +import asyncio +DEFAULT_RESTRICT_WAIT = 100 def init_session(): client = Client(headers={ @@ -45,8 +48,13 @@ def batch_ids(ids: list[int | str], char_limit: int = MAX_GQL_CHAR_LIMIT) -> lis def build_params(params: dict) -> dict: return {k: orjson.dumps(v).decode() for k, v in params.items()} - async def save_json(r: Response, path: str | Path, name: str, **kwargs): + if r is None: + return + + if r.status_code != 200: + return + try: data = r.json() kwargs.pop('cursor', None) @@ -61,7 +69,7 @@ async def save_json(r: Response, path: str | Path, name: str, **kwargs): await fp.write(orjson.dumps(data)) except Exception as e: - print(f'Failed to save JSON data for {kwargs}\n{e}') + print(f'Failed to save JSON data for {kwargs}: {r}\n{e}') def flatten(seq: list | tuple) -> list: @@ -82,6 +90,8 @@ def get_json(res: list[Response], **kwargs) -> list: results = [] for r in temp: try: + if r is None: + continue data = r.json() if cursor: results.append([data, cursor])