From 926295139b9c65c51eccaf26c664a6a2926f6d39 Mon Sep 17 00:00:00 2001 From: CauchYoung <2024302072042@whu.edu.cn> Date: Tue, 14 Apr 2026 06:29:31 +0800 Subject: [PATCH] fix(youtube): support youtu.be and shorts URLs --- .../converters/_youtube_converter.py | 29 +++++++++++---- .../tests/test_youtube_converter.py | 35 +++++++++++++++++++ 2 files changed, 58 insertions(+), 6 deletions(-) create mode 100644 packages/markitdown/tests/test_youtube_converter.py diff --git a/packages/markitdown/src/markitdown/converters/_youtube_converter.py b/packages/markitdown/src/markitdown/converters/_youtube_converter.py index c96e8f4f6..ff28a2601 100644 --- a/packages/markitdown/src/markitdown/converters/_youtube_converter.py +++ b/packages/markitdown/src/markitdown/converters/_youtube_converter.py @@ -53,8 +53,8 @@ def accepts( url = unquote(url) url = url.replace(r"\?", "?").replace(r"\=", "=") - if not url.startswith("https://www.youtube.com/watch?"): - # Not a YouTube URL + if self._extract_video_id(url) is None: + # Not a supported YouTube URL return False if extension in ACCEPTED_FILE_EXTENSIONS: @@ -147,10 +147,8 @@ def convert( if IS_YOUTUBE_TRANSCRIPT_CAPABLE: ytt_api = YouTubeTranscriptApi() transcript_text = "" - parsed_url = urlparse(stream_info.url) # type: ignore - params = parse_qs(parsed_url.query) # type: ignore - if "v" in params and params["v"][0]: - video_id = str(params["v"][0]) + video_id = self._extract_video_id(stream_info.url or "") + if video_id: transcript_list = ytt_api.list(video_id) languages = ["en"] for transcript in transcript_list: @@ -196,6 +194,25 @@ def convert( title=title, ) + def _extract_video_id(self, url: str) -> Union[str, None]: + parsed_url = urlparse(url) + host = parsed_url.netloc.lower() + path = parsed_url.path.strip("/") + + if host in {"youtu.be", "www.youtu.be"}: + return path.split("/", 1)[0] if path else None + + if host in {"youtube.com", "www.youtube.com", "m.youtube.com"}: + if path == "watch": + params = parse_qs(parsed_url.query) + video_id = params.get("v", [None])[0] + return str(video_id) if video_id else None + if path.startswith("shorts/") or path.startswith("embed/"): + video_id = path.split("/", 1)[1] + return video_id.split("/", 1)[0] if video_id else None + + return None + def _get( self, metadata: Dict[str, str], diff --git a/packages/markitdown/tests/test_youtube_converter.py b/packages/markitdown/tests/test_youtube_converter.py new file mode 100644 index 000000000..7dc151949 --- /dev/null +++ b/packages/markitdown/tests/test_youtube_converter.py @@ -0,0 +1,35 @@ +import io + +from markitdown import StreamInfo +from markitdown.converters._youtube_converter import YouTubeConverter + + +def _stream_info(url: str) -> StreamInfo: + return StreamInfo(url=url, mimetype="text/html", extension=".html") + + +def test_accepts_youtube_short_urls() -> None: + converter = YouTubeConverter() + + assert converter.accepts(io.BytesIO(b""), _stream_info("https://youtu.be/dQw4w9WgXcQ")) + assert converter.accepts( + io.BytesIO(b""), _stream_info("https://www.youtube.com/shorts/dQw4w9WgXcQ") + ) + assert converter.accepts( + io.BytesIO(b""), _stream_info("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + ) + + +def test_extract_video_id_from_supported_youtube_urls() -> None: + converter = YouTubeConverter() + + assert ( + converter._extract_video_id("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + == "dQw4w9WgXcQ" + ) + assert converter._extract_video_id("https://youtu.be/dQw4w9WgXcQ?t=42") == "dQw4w9WgXcQ" + assert ( + converter._extract_video_id("https://www.youtube.com/shorts/dQw4w9WgXcQ") + == "dQw4w9WgXcQ" + ) + assert converter._extract_video_id("https://example.com/watch?v=dQw4w9WgXcQ") is None