diff --git a/apps/base/zip_utils.py b/apps/base/zip_utils.py new file mode 100644 index 0000000000..b968b52d98 --- /dev/null +++ b/apps/base/zip_utils.py @@ -0,0 +1,37 @@ +import os +import zipfile + + +class PathTraversalError(ValueError): + """Raised when a resolved path escapes its intended root directory.""" + + +def _is_path_under_root(root_path, candidate): + try: + return os.path.commonpath([root_path, candidate]) == root_path + except ValueError: + return False + + +def safe_join_under_root(root, *relative_paths): + """ + Join path segments under root and verify the result stays within root. + """ + root_path = os.path.realpath(root) + candidate = os.path.realpath(os.path.join(root_path, *relative_paths)) + if not _is_path_under_root(root_path, candidate): + raise PathTraversalError("Path escapes extraction root.") + return candidate + + +def safe_extract_zip_file(zip_ref, destination): + """ + Extract zip archive members while preventing path traversal (zip slip). + """ + destination_path = os.path.realpath(destination) + for member in zip_ref.namelist(): + try: + safe_join_under_root(destination_path, member) + except PathTraversalError: + raise zipfile.BadZipFile("Zip archive contains unsafe file paths.") + zip_ref.extractall(destination_path) diff --git a/apps/challenges/challenge_config_utils.py b/apps/challenges/challenge_config_utils.py index 07bae2cdbd..fee6e7e573 100644 --- a/apps/challenges/challenge_config_utils.py +++ b/apps/challenges/challenge_config_utils.py @@ -7,6 +7,11 @@ import requests import yaml +from base.zip_utils import ( + PathTraversalError, + safe_extract_zip_file, + safe_join_under_root, +) from challenges.models import ( Challenge, ChallengePhase, @@ -95,9 +100,11 @@ def write_file(output_path, mode, file_content): def extract_zip_file(file_path, mode, output_path): zip_ref = zipfile.ZipFile(file_path, mode) - zip_ref.extractall(output_path) - logger.info("Zip file extracted to {}".format(output_path)) - zip_ref.close() + try: + safe_extract_zip_file(zip_ref, output_path) + logger.info("Zip file extracted to {}".format(output_path)) + finally: + zip_ref.close() return zip_ref @@ -172,10 +179,15 @@ def is_challenge_config_yaml_html_field_valid( is_valid {boolean} -- flag for field validation is success message {string} -- error message if any """ - value = join(base_location, yaml_file_data.get(key)) + relative_path = yaml_file_data.get(key) message = "" is_valid = False - if value: + if relative_path: + try: + value = safe_join_under_root(base_location, relative_path) + except PathTraversalError: + message = "Invalid file path for {}.".format(key) + return is_valid, message if not isfile(value): message = "File at path {} not found. Please specify a valid file path".format( key @@ -293,7 +305,13 @@ def is_challenge_phase_split_mapping_valid( def get_value_from_field(data, base_location, field_name): - file_path = join(base_location, data.get(field_name)) + relative_path = data.get(field_name) + if not relative_path: + return None + try: + file_path = safe_join_under_root(base_location, relative_path) + except PathTraversalError: + return None field_value = None if file_path.endswith(".html") and isfile(file_path): field_value = get_file_content(file_path, "rb").decode("utf-8") @@ -464,8 +482,9 @@ def read_and_validate_yaml(self): # YAML Read Error try: - self.yaml_file_path = join( - self.base_location, self.unique_folder_name, self.yaml_file + self.yaml_file_path = safe_join_under_root( + join(self.base_location, self.unique_folder_name), + self.yaml_file, ) self.yaml_file_data = read_yaml_file(self.yaml_file_path, "r") return True @@ -480,6 +499,10 @@ def read_and_validate_yaml(self): ).format(error_description, line_number, column_number) self.error_messages.append(message) return False + except PathTraversalError: + message = "Challenge configuration contains unsafe file paths." + self.error_messages.append(message) + return False def _approved_config_locked(self): return ( @@ -612,19 +635,21 @@ def validate_challenge_logo(self): or image.endswith(".jpeg") or image.endswith(".png") ): - self.challenge_image_path = join( - self.base_location, - self.unique_folder_name, - self.extracted_folder_name, - image, - ) - - if isfile(self.challenge_image_path): - self.challenge_image_file = ContentFile( - get_file_content(self.challenge_image_path, "rb"), image + self.challenge_image_file = None + try: + self.challenge_image_path = safe_join_under_root( + self.challenge_config_location, image + ) + if isfile(self.challenge_image_path): + self.challenge_image_file = ContentFile( + get_file_content(self.challenge_image_path, "rb"), + image, + ) + except PathTraversalError: + logger.warning( + "Challenge logo path rejected due to path traversal risk: %s", + image, ) - else: - self.challenge_image_file = None else: self.challenge_image_file = None self.files["challenge_image_file"] = self.challenge_image_file @@ -737,26 +762,33 @@ def validate_evaluation_script_file(self): ) self.error_messages.append(message) else: - evaluation_script_path = join( - self.challenge_config_location, evaluation_script - ) - # Check for evaluation script file in extracted zip folder - if isfile(evaluation_script_path): - self.challenge_evaluation_script_file = ( - read_file_data_as_content_file( - evaluation_script_path, - "rb", - evaluation_script_path, - ) - ) - self.files["challenge_evaluation_script_file"] = ( - self.challenge_evaluation_script_file + try: + evaluation_script_path = safe_join_under_root( + self.challenge_config_location, evaluation_script ) - else: + except PathTraversalError: message = self.error_messages_dict.get( "missing_evaluation_script" ) self.error_messages.append(message) + else: + # Check for evaluation script file in extracted zip folder + if isfile(evaluation_script_path): + self.challenge_evaluation_script_file = ( + read_file_data_as_content_file( + evaluation_script_path, + "rb", + evaluation_script_path, + ) + ) + self.files["challenge_evaluation_script_file"] = ( + self.challenge_evaluation_script_file + ) + else: + message = self.error_messages_dict.get( + "missing_evaluation_script" + ) + self.error_messages.append(message) else: message = self.error_messages_dict.get( "missing_evaluation_script_key" @@ -886,6 +918,7 @@ def validate_challenge_phases(self, current_phase_config_ids): phase_codenames = [] self.files["challenge_test_annotation_files"] = [] for data in challenge_phases_data: + test_annotation_file_path = None if "codename" not in data: self.error_messages.append( self.error_messages_dict["no_codename_for_challenge_phase"] @@ -900,25 +933,36 @@ def validate_challenge_phases(self, current_phase_config_ids): self.error_messages.append(message) test_annotation_file = data.get("test_annotation_file") if test_annotation_file: - test_annotation_file_path = join( - self.challenge_config_location, test_annotation_file - ) - if isfile(test_annotation_file_path): - challenge_test_annotation_file = ( - read_file_data_as_content_file( - test_annotation_file_path, - "rb", - test_annotation_file_path, - ) + try: + test_annotation_file_path = safe_join_under_root( + self.challenge_config_location, test_annotation_file ) - self.files["challenge_test_annotation_files"].append( - challenge_test_annotation_file - ) - else: + except PathTraversalError: message = self.error_messages_dict[ "no_test_annotation_file_found" ].format(data["name"]) self.error_messages.append(message) + self.files["challenge_test_annotation_files"].append(None) + else: + if isfile(test_annotation_file_path): + challenge_test_annotation_file = ( + read_file_data_as_content_file( + test_annotation_file_path, + "rb", + test_annotation_file_path, + ) + ) + self.files["challenge_test_annotation_files"].append( + challenge_test_annotation_file + ) + else: + message = self.error_messages_dict[ + "no_test_annotation_file_found" + ].format(data["name"]) + self.error_messages.append(message) + self.files["challenge_test_annotation_files"].append( + None + ) else: test_annotation_file_path = None self.files["challenge_test_annotation_files"].append(None) diff --git a/apps/challenges/views.py b/apps/challenges/views.py index 99d3fc20ef..3392d35900 100644 --- a/apps/challenges/views.py +++ b/apps/challenges/views.py @@ -25,6 +25,11 @@ paginated_queryset, send_slack_notification, ) +from base.zip_utils import ( + PathTraversalError, + safe_extract_zip_file, + safe_join_under_root, +) from challenges.challenge_config_utils import ( download_and_write_file, extract_zip_file, @@ -1449,8 +1454,12 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): # Extract zip file try: zip_ref = zipfile.ZipFile(CHALLENGE_ZIP_DOWNLOAD_LOCATION, "r") - zip_ref.extractall(join(BASE_LOCATION, unique_folder_name)) - zip_ref.close() + try: + safe_extract_zip_file( + zip_ref, join(BASE_LOCATION, unique_folder_name) + ) + finally: + zip_ref.close() except zipfile.BadZipfile: message = ( "The zip file contents cannot be extracted. " @@ -1486,10 +1495,15 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) try: - with open( - join(BASE_LOCATION, unique_folder_name, yaml_file), "r" - ) as stream: + yaml_file_path = safe_join_under_root( + join(BASE_LOCATION, unique_folder_name), yaml_file + ) + with open(yaml_file_path, "r") as stream: yaml_file_data = yaml.safe_load(stream) + except PathTraversalError: + message = "Challenge configuration contains unsafe file paths." + response_data = {"error": message} + return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) except (yaml.YAMLError, ScannerError) as exc: # To get the problem description if hasattr(exc, "problem"): @@ -1508,14 +1522,15 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): response_data = {"error": message} return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) + challenge_config_root = join( + BASE_LOCATION, unique_folder_name, extracted_folder_name + ) + # Check for evaluation script path in yaml file. try: evaluation_script = yaml_file_data["evaluation_script"] - evaluation_script_path = join( - BASE_LOCATION, - unique_folder_name, - extracted_folder_name, - evaluation_script, + evaluation_script_path = safe_join_under_root( + challenge_config_root, evaluation_script ) except KeyError: message = ( @@ -1524,6 +1539,10 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): ) response_data = {"error": message} return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) + except PathTraversalError: + message = "Challenge configuration contains unsafe file paths." + response_data = {"error": message} + return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) # Check for evaluation script file in extracted zip folder. if isfile(evaluation_script_path): @@ -1553,12 +1572,16 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): for data in challenge_phases_data: test_annotation_file = data.get("test_annotation_file") if test_annotation_file: - test_annotation_file_path = join( - BASE_LOCATION, - unique_folder_name, - extracted_folder_name, - test_annotation_file, - ) + try: + test_annotation_file_path = safe_join_under_root( + challenge_config_root, test_annotation_file + ) + except PathTraversalError: + message = "Challenge configuration contains unsafe file paths." + response_data = {"error": message} + return Response( + response_data, status=status.HTTP_406_NOT_ACCEPTABLE + ) if not isfile(test_annotation_file_path): message = ( "No test annotation file found in zip file" @@ -1677,9 +1700,16 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): or image.endswith(".jpeg") or image.endswith(".png") ): - challenge_image_path = join( - BASE_LOCATION, unique_folder_name, extracted_folder_name, image - ) + try: + challenge_image_path = safe_join_under_root( + challenge_config_root, image + ) + except PathTraversalError: + message = "Challenge configuration contains unsafe file paths." + response_data = {"error": message} + return Response( + response_data, status=status.HTTP_406_NOT_ACCEPTABLE + ) if isfile(challenge_image_path): challenge_image_file = ContentFile( get_file_content(challenge_image_path, "rb"), image @@ -1691,11 +1721,8 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): # check for challenge description file try: - challenge_description_file_path = join( - BASE_LOCATION, - unique_folder_name, - extracted_folder_name, - yaml_file_data["description"], + challenge_description_file_path = safe_join_under_root( + challenge_config_root, yaml_file_data["description"] ) if challenge_description_file_path.endswith(".html") and isfile( challenge_description_file_path @@ -1712,14 +1739,15 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): ) response_data = {"error": message} return Response(response_data, status.HTTP_406_NOT_ACCEPTABLE) + except PathTraversalError: + message = "Challenge configuration contains unsafe file paths." + response_data = {"error": message} + return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) # check for evaluation details file try: - challenge_evaluation_details_file_path = join( - BASE_LOCATION, - unique_folder_name, - extracted_folder_name, - yaml_file_data["evaluation_details"], + challenge_evaluation_details_file_path = safe_join_under_root( + challenge_config_root, yaml_file_data["evaluation_details"] ) if challenge_evaluation_details_file_path.endswith(".html") and isfile( @@ -1737,14 +1765,15 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): ) response_data = {"error": message} return Response(response_data, status.HTTP_406_NOT_ACCEPTABLE) + except PathTraversalError: + message = "Challenge configuration contains unsafe file paths." + response_data = {"error": message} + return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) # check for terms and conditions file try: - challenge_terms_and_cond_file_path = join( - BASE_LOCATION, - unique_folder_name, - extracted_folder_name, - yaml_file_data["terms_and_conditions"], + challenge_terms_and_cond_file_path = safe_join_under_root( + challenge_config_root, yaml_file_data["terms_and_conditions"] ) if challenge_terms_and_cond_file_path.endswith(".html") and isfile( challenge_terms_and_cond_file_path @@ -1761,14 +1790,15 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): ) response_data = {"error": message} return Response(response_data, status.HTTP_406_NOT_ACCEPTABLE) + except PathTraversalError: + message = "Challenge configuration contains unsafe file paths." + response_data = {"error": message} + return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) # Check for submission guidelines file try: - submission_guidelines_file_path = join( - BASE_LOCATION, - unique_folder_name, - extracted_folder_name, - yaml_file_data["submission_guidelines"], + submission_guidelines_file_path = safe_join_under_root( + challenge_config_root, yaml_file_data["submission_guidelines"] ) if submission_guidelines_file_path.endswith(".html") and isfile( submission_guidelines_file_path @@ -1785,6 +1815,10 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): ) response_data = {"error": message} return Response(response_data, status.HTTP_406_NOT_ACCEPTABLE) + except PathTraversalError: + message = "Challenge configuration contains unsafe file paths." + response_data = {"error": message} + return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) # Check for leaderboard schema in YAML file leaderboard_schema = yaml_file_data.get("leaderboard") @@ -1926,20 +1960,22 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): challenge_phase_ids = {} for data in challenge_phases_data: # Check for challenge phase description file - phase_description_file_path = join( - BASE_LOCATION, - unique_folder_name, - extracted_folder_name, - data["description"], - ) - if phase_description_file_path.endswith(".html") and isfile( - phase_description_file_path - ): - data["description"] = get_file_content( - phase_description_file_path, "rb" - ).decode("utf-8") - else: - data["description"] = None + try: + phase_description_file_path = safe_join_under_root( + challenge_config_root, data["description"] + ) + if phase_description_file_path.endswith( + ".html" + ) and isfile(phase_description_file_path): + data["description"] = get_file_content( + phase_description_file_path, "rb" + ).decode("utf-8") + else: + data["description"] = None + except PathTraversalError as exc: + raise PathTraversalError( + "Challenge configuration contains unsafe file paths." + ) from exc data["slug"] = "{}-{}-{}".format( challenge.title.split(" ")[0].lower(), @@ -1948,12 +1984,14 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): )[:198] test_annotation_file = data.get("test_annotation_file") if test_annotation_file: - test_annotation_file_path = join( - BASE_LOCATION, - unique_folder_name, - extracted_folder_name, - test_annotation_file, - ) + try: + test_annotation_file_path = safe_join_under_root( + challenge_config_root, test_annotation_file + ) + except PathTraversalError as exc: + raise PathTraversalError( + "Challenge configuration contains unsafe file paths." + ) from exc if isfile(test_annotation_file_path): with open( test_annotation_file_path, "rb" @@ -2178,6 +2216,11 @@ def create_challenge_using_zip_file(request, challenge_host_team_pk): } return Response(response_data, status=status.HTTP_201_CREATED) + except PathTraversalError: + response_data = { + "error": "Challenge configuration contains unsafe file paths." + } + return Response(response_data, status=status.HTTP_406_NOT_ACCEPTABLE) except: # noqa: E722 try: if response_data: diff --git a/scripts/workers/remote_submission_worker.py b/scripts/workers/remote_submission_worker.py index e2aadbed11..fecd0e7007 100644 --- a/scripts/workers/remote_submission_worker.py +++ b/scripts/workers/remote_submission_worker.py @@ -134,9 +134,13 @@ def download_and_extract_zip_file(url, download_location, extract_location): with open(download_location, "wb") as f: f.write(response.content) # extract zip file + from base.zip_utils import safe_extract_zip_file + zip_ref = zipfile.ZipFile(download_location, "r") - zip_ref.extractall(extract_location) - zip_ref.close() + try: + safe_extract_zip_file(zip_ref, extract_location) + finally: + zip_ref.close() # delete zip file try: os.remove(download_location) diff --git a/scripts/workers/submission_worker.py b/scripts/workers/submission_worker.py index 1afb316351..e9e7274804 100644 --- a/scripts/workers/submission_worker.py +++ b/scripts/workers/submission_worker.py @@ -177,9 +177,13 @@ def extract_zip_file(download_location, extract_location): * `download_location`: Location of zip file * `extract_location`: Location of directory for extracted file """ + from base.zip_utils import safe_extract_zip_file + zip_ref = zipfile.ZipFile(download_location, "r") - zip_ref.extractall(extract_location) - zip_ref.close() + try: + safe_extract_zip_file(zip_ref, extract_location) + finally: + zip_ref.close() def delete_zip_file(download_location): diff --git a/tests/unit/base/test_zip_utils.py b/tests/unit/base/test_zip_utils.py new file mode 100644 index 0000000000..c9f70d02b4 --- /dev/null +++ b/tests/unit/base/test_zip_utils.py @@ -0,0 +1,67 @@ +import os +import tempfile +import unittest +import zipfile +from unittest.mock import patch + +from base.zip_utils import ( + PathTraversalError, + _is_path_under_root, + safe_extract_zip_file, + safe_join_under_root, +) + + +class SafeJoinUnderRootTest(unittest.TestCase): + def setUp(self): + self.root = tempfile.mkdtemp() + + def test_allows_paths_under_root(self): + resolved = safe_join_under_root(self.root, "configs", "challenge.yaml") + expected = os.path.realpath( + os.path.join(self.root, "configs", "challenge.yaml") + ) + self.assertEqual(resolved, expected) + + def test_rejects_parent_directory_traversal(self): + with self.assertRaises(PathTraversalError): + safe_join_under_root(self.root, "..", "etc", "passwd") + + @patch("base.zip_utils.os.path.commonpath", side_effect=ValueError) + def test_is_path_under_root_handles_commonpath_value_error( + self, mock_commonpath + ): + self.assertFalse(_is_path_under_root("/tmp/root", "/other/path")) + mock_commonpath.assert_called_once() + + +class SafeExtractZipFileTest(unittest.TestCase): + def setUp(self): + self.root = tempfile.mkdtemp() + + def test_extracts_valid_members(self): + zip_path = os.path.join(self.root, "valid.zip") + extract_dir = os.path.join(self.root, "extracted") + os.makedirs(extract_dir) + + with zipfile.ZipFile(zip_path, "w") as archive: + archive.writestr("challenge.txt", "ok") + + with zipfile.ZipFile(zip_path, "r") as archive: + safe_extract_zip_file(archive, extract_dir) + + self.assertTrue( + os.path.isfile(os.path.join(extract_dir, "challenge.txt")) + ) + + def test_rejects_unsafe_members(self): + zip_path = os.path.join(self.root, "unsafe.zip") + extract_dir = os.path.join(self.root, "unsafe-extracted") + os.makedirs(extract_dir) + + with zipfile.ZipFile(zip_path, "w") as archive: + archive.writestr("../evil.txt", "bad") + + with zipfile.ZipFile(zip_path, "r") as archive: + with self.assertRaises(zipfile.BadZipFile): + safe_extract_zip_file(archive, extract_dir) diff --git a/tests/unit/challenges/test_challenge_config_utils.py b/tests/unit/challenges/test_challenge_config_utils.py index 918ce492da..d1d4a2ce98 100644 --- a/tests/unit/challenges/test_challenge_config_utils.py +++ b/tests/unit/challenges/test_challenge_config_utils.py @@ -134,6 +134,15 @@ def test_file_not_html(self, mock_isfile): "File html_field is not a HTML file. Please specify a valid HTML file", ) + def test_unsafe_path_rejected(self): + yaml_file_data = {"html_field": "../../../etc/passwd.html"} + is_valid, message = is_challenge_config_yaml_html_field_valid( + yaml_file_data, "html_field", self.base_location + ) + + self.assertFalse(is_valid) + self.assertEqual(message, "Invalid file path for html_field.") + class TestIsChallengePhaseSplitMappingValid(unittest.TestCase): def test_invalid_dataset_split_id(self): @@ -280,6 +289,11 @@ def setUp(self): Mock() ) # Initialize if needed self.util.extracted_folder_name = "extracted_folder" + self.util.challenge_config_location = join( + self.base_location, + self.unique_folder_name, + self.extracted_folder_name, + ) self.util.error_messages = [] self.request.data = {"GITHUB_REPOSITORY": "some_repo"} @@ -317,6 +331,18 @@ def test_yaml_read_error(self, mock_read_yaml_file): self.util.error_messages, ) + def test_read_and_validate_yaml_unsafe_yaml_path(self): + self.util.yaml_file_count = 1 + self.util.yaml_file = "../../etc/passwd.yaml" + + result = self.util.read_and_validate_yaml() + + self.assertFalse(result) + self.assertIn( + "Challenge configuration contains unsafe file paths.", + self.util.error_messages, + ) + @mockpatch("challenges.challenge_config_utils.isfile") @mockpatch("challenges.challenge_config_utils.get_file_content") def test_validate_challenge_logo_valid_image( @@ -328,11 +354,11 @@ def test_validate_challenge_logo_valid_image( self.util.validate_challenge_logo() - expected_path = join( - self.base_location, - self.unique_folder_name, - self.extracted_folder_name, - "logo.png", + expected_path = os.path.realpath( + join( + self.util.challenge_config_location, + "logo.png", + ) ) self.assertEqual(self.util.challenge_image_path, expected_path) self.assertIsNotNone(self.util.challenge_image_file) @@ -358,6 +384,36 @@ def test_validate_challenge_logo_no_image_key(self): self.assertIsNone(self.util.challenge_image_file) self.assertIsNone(self.util.files["challenge_image_file"]) + @mockpatch("challenges.challenge_config_utils.logger") + def test_validate_challenge_logo_unsafe_path(self, mock_logger): + self.util.yaml_file_data = {"image": "../../../etc/passwd.png"} + + self.util.validate_challenge_logo() + + self.assertIsNone(self.util.challenge_image_file) + self.assertIsNone(self.util.files["challenge_image_file"]) + mock_logger.warning.assert_called_once() + + def test_validate_evaluation_script_file_unsafe_path(self): + self.util.yaml_file_data = { + "evaluation_script": "../../../etc/passwd.zip" + } + + self.util.validate_evaluation_script_file() + + self.assertIn( + "Evaluation script file is missing.", self.util.error_messages + ) + + def test_validate_challenge_description_unsafe_path(self): + self.util.yaml_file_data = {"description": "../../../etc/passwd.html"} + + self.util.validate_challenge_description() + + self.assertIn( + "Invalid file path for description.", self.util.error_messages + ) + @pytest.mark.django_db @mockpatch("challenges.challenge_config_utils.ValidateChallengeConfigUtil") def test_validate_challenge_config_util_invalid_yaml( @@ -869,6 +925,11 @@ def test_get_value_from_field_not_html(self, mock_isfile): val = get_value_from_field(data, "/tmp", "desc") self.assertIsNone(val) + def test_get_value_from_field_unsafe_path(self): + data = {"desc": "../../../etc/passwd.html"} + val = get_value_from_field(data, "/tmp", "desc") + self.assertIsNone(val) + @pytest.mark.django_db class TestValidateChallengeConfigUtil(unittest.TestCase): @@ -1154,6 +1215,29 @@ def test_no_test_annotation_file_found(self): self.util.error_messages[0], "No test annotation file found for phase 'Phase 1'.", ) + self.assertEqual( + self.util.files["challenge_test_annotation_files"], [None] + ) + + def test_unsafe_test_annotation_file_path(self): + self.util.yaml_file_data = { + "challenge_phases": [ + { + "codename": "phase1", + "name": "Phase 1", + "test_annotation_file": "../../../etc/passwd", + "id": 1, + } + ] + } + self.util.validate_challenge_phases([]) + self.assertEqual( + self.util.error_messages[0], + "No test annotation file found for phase 'Phase 1'.", + ) + self.assertEqual( + self.util.files["challenge_test_annotation_files"], [None] + ) def test_is_submission_public_restricted(self): self.util.yaml_file_data = { diff --git a/tests/unit/challenges/test_views.py b/tests/unit/challenges/test_views.py index d1c246ebe5..7a52635dc1 100644 --- a/tests/unit/challenges/test_views.py +++ b/tests/unit/challenges/test_views.py @@ -4,6 +4,7 @@ import json import os import shutil +import zipfile from datetime import timedelta from os.path import join from smtplib import SMTPException @@ -12,6 +13,7 @@ import mock import requests import responses +import yaml from allauth.account.models import EmailAddress from challenges.models import ( Challenge, @@ -5149,6 +5151,55 @@ def setUp(self): content_type="application/zip", ) + def _build_example_zip_with_yaml_updates(self, yaml_updates): + zip_path = join( + settings.BASE_DIR, "examples", "example1", "test_zip_file.zip" + ) + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_path, "r") as source_zip: + with zipfile.ZipFile( + zip_buffer, "w", zipfile.ZIP_DEFLATED + ) as destination_zip: + for name in source_zip.namelist(): + if name == "challenge_config.yaml": + challenge_config = yaml.safe_load( + source_zip.read(name) + ) + challenge_config.update(yaml_updates) + destination_zip.writestr( + name, + yaml.dump( + challenge_config, default_flow_style=False + ), + ) + else: + destination_zip.writestr(name, source_zip.read(name)) + zip_buffer.seek(0) + return zip_buffer.getvalue() + + def _post_modified_example_zip(self, yaml_updates): + self.url = reverse_lazy( + "challenges:create_challenge_using_zip_file", + kwargs={"challenge_host_team_pk": self.challenge_host_team.pk}, + ) + zip_bytes = self._build_example_zip_with_yaml_updates(yaml_updates) + with mock.patch("challenges.views.requests.get") as mock_get: + response = mock.Mock() + response.content = zip_bytes + response.status_code = 200 + mock_get.return_value = response + return self.client.post( + self.url, + { + "zip_configuration": SimpleUploadedFile( + "test_sample.zip", + zip_bytes, + content_type="application/zip", + ) + }, + format="multipart", + ) + @responses.activate def test_create_challenge_using_zip_file_when_zip_file_is_not_uploaded( self, @@ -5331,6 +5382,71 @@ def test_create_challenge_using_zip_file_success(self): self.assertEqual(Leaderboard.objects.count(), 2) self.assertEqual(ChallengePhaseSplit.objects.count(), 2) + @responses.activate + def test_create_challenge_using_zip_file_unsafe_evaluation_script(self): + responses.add(responses.POST, settings.SLACK_WEB_HOOK_URL, status=200) + response = self._post_modified_example_zip( + {"evaluation_script": "../../../etc/passwd.zip"} + ) + self.assertEqual(response.status_code, status.HTTP_406_NOT_ACCEPTABLE) + self.assertEqual( + response.data, + {"error": ("Challenge configuration contains unsafe file paths.")}, + ) + + @responses.activate + def test_create_challenge_using_zip_file_unsafe_test_annotation(self): + responses.add(responses.POST, settings.SLACK_WEB_HOOK_URL, status=200) + response = self._post_modified_example_zip( + { + "challenge_phases": [ + { + "id": 1, + "name": "Challenge Name of the challenge phase", + "description": "challenge_phase_description.html", + "leaderboard_public": True, + "is_public": True, + "start_date": "2017-06-09 20:00:00", + "end_date": "2017-06-19 20:00:00", + "test_annotation_file": "../../../etc/passwd", + "codename": "Challenge phase codename", + "max_submissions_per_day": 100, + "max_submissions": 1000, + "max_submissions_per_month": 1000, + } + ] + } + ) + self.assertEqual(response.status_code, status.HTTP_406_NOT_ACCEPTABLE) + self.assertEqual( + response.data, + {"error": ("Challenge configuration contains unsafe file paths.")}, + ) + + @responses.activate + def test_create_challenge_using_zip_file_unsafe_description_path(self): + responses.add(responses.POST, settings.SLACK_WEB_HOOK_URL, status=200) + response = self._post_modified_example_zip( + {"description": "../../../etc/passwd.html"} + ) + self.assertEqual(response.status_code, status.HTTP_406_NOT_ACCEPTABLE) + self.assertEqual( + response.data, + {"error": ("Challenge configuration contains unsafe file paths.")}, + ) + + @responses.activate + def test_create_challenge_using_zip_file_unsafe_image_path(self): + responses.add(responses.POST, settings.SLACK_WEB_HOOK_URL, status=200) + response = self._post_modified_example_zip( + {"image": "../../../etc/passwd.png"} + ) + self.assertEqual(response.status_code, status.HTTP_406_NOT_ACCEPTABLE) + self.assertEqual( + response.data, + {"error": ("Challenge configuration contains unsafe file paths.")}, + ) + class GetAllSubmissionsTest( BaseAPITestClass diff --git a/tests/unit/worker/test_remote_submission_worker.py b/tests/unit/worker/test_remote_submission_worker.py index 71ef725029..7dbc31d011 100644 --- a/tests/unit/worker/test_remote_submission_worker.py +++ b/tests/unit/worker/test_remote_submission_worker.py @@ -92,7 +92,9 @@ def test_successful_download_and_extraction( mock_open.assert_called_once_with("/path/to/download.zip", "wb") mock_open().write.assert_called_once_with(b"Test content") mock_zipfile.assert_called_once_with("/path/to/download.zip", "r") - mock_zipfile().extractall.assert_called_once_with("/path/to/extract/") + mock_zipfile().extractall.assert_called_once_with( + os.path.realpath("/path/to/extract/") + ) mock_remove.assert_called_once_with("/path/to/download.zip") @patch("requests.get") @@ -138,7 +140,9 @@ def test_file_removal_failure( ) mock_open().write.assert_called_once_with(b"Test content") - mock_zipfile().extractall.assert_called_once_with("/path/to/extract/") + mock_zipfile().extractall.assert_called_once_with( + os.path.realpath("/path/to/extract/") + ) class TestLoadChallenge(unittest.TestCase):