Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions apps/base/zip_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os
import zipfile


class PathTraversalError(ValueError):
"""Raised when a resolved path escapes its intended root directory."""


def _is_path_under_root(root_path, candidate):
try:
return os.path.commonpath([root_path, candidate]) == root_path
except ValueError:
return False


def safe_join_under_root(root, *relative_paths):
"""
Join path segments under root and verify the result stays within root.
"""
root_path = os.path.realpath(root)
candidate = os.path.realpath(os.path.join(root_path, *relative_paths))
if not _is_path_under_root(root_path, candidate):
raise PathTraversalError("Path escapes extraction root.")
return candidate


def safe_extract_zip_file(zip_ref, destination):
"""
Extract zip archive members while preventing path traversal (zip slip).
"""
destination_path = os.path.realpath(destination)
for member in zip_ref.namelist():
try:
safe_join_under_root(destination_path, member)
except PathTraversalError:
raise zipfile.BadZipFile("Zip archive contains unsafe file paths.")
zip_ref.extractall(destination_path)
141 changes: 92 additions & 49 deletions apps/challenges/challenge_config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

import requests
import yaml
from base.zip_utils import (
PathTraversalError,
safe_extract_zip_file,
safe_join_under_root,
)
from challenges.models import (
Challenge,
ChallengePhase,
Expand Down Expand Up @@ -95,9 +100,11 @@ def write_file(output_path, mode, file_content):

def extract_zip_file(file_path, mode, output_path):
zip_ref = zipfile.ZipFile(file_path, mode)
zip_ref.extractall(output_path)
logger.info("Zip file extracted to {}".format(output_path))
zip_ref.close()
try:
safe_extract_zip_file(zip_ref, output_path)
logger.info("Zip file extracted to {}".format(output_path))
finally:
zip_ref.close()
return zip_ref
Comment thread
coderabbitai[bot] marked this conversation as resolved.


Expand Down Expand Up @@ -172,10 +179,15 @@ def is_challenge_config_yaml_html_field_valid(
is_valid {boolean} -- flag for field validation is success
message {string} -- error message if any
"""
value = join(base_location, yaml_file_data.get(key))
relative_path = yaml_file_data.get(key)
message = ""
is_valid = False
if value:
if relative_path:
try:
value = safe_join_under_root(base_location, relative_path)
except PathTraversalError:
message = "Invalid file path for {}.".format(key)
return is_valid, message
if not isfile(value):
message = "File at path {} not found. Please specify a valid file path".format(
key
Expand Down Expand Up @@ -293,7 +305,13 @@ def is_challenge_phase_split_mapping_valid(


def get_value_from_field(data, base_location, field_name):
file_path = join(base_location, data.get(field_name))
relative_path = data.get(field_name)
if not relative_path:
return None
try:
file_path = safe_join_under_root(base_location, relative_path)
except PathTraversalError:
return None
field_value = None
if file_path.endswith(".html") and isfile(file_path):
field_value = get_file_content(file_path, "rb").decode("utf-8")
Expand Down Expand Up @@ -464,8 +482,9 @@ def read_and_validate_yaml(self):

# YAML Read Error
try:
self.yaml_file_path = join(
self.base_location, self.unique_folder_name, self.yaml_file
self.yaml_file_path = safe_join_under_root(
join(self.base_location, self.unique_folder_name),
self.yaml_file,
)
self.yaml_file_data = read_yaml_file(self.yaml_file_path, "r")
return True
Expand All @@ -480,6 +499,10 @@ def read_and_validate_yaml(self):
).format(error_description, line_number, column_number)
self.error_messages.append(message)
return False
except PathTraversalError:
message = "Challenge configuration contains unsafe file paths."
self.error_messages.append(message)
return False

def _approved_config_locked(self):
return (
Expand Down Expand Up @@ -612,19 +635,21 @@ def validate_challenge_logo(self):
or image.endswith(".jpeg")
or image.endswith(".png")
):
self.challenge_image_path = join(
self.base_location,
self.unique_folder_name,
self.extracted_folder_name,
image,
)

if isfile(self.challenge_image_path):
self.challenge_image_file = ContentFile(
get_file_content(self.challenge_image_path, "rb"), image
self.challenge_image_file = None
try:
self.challenge_image_path = safe_join_under_root(
self.challenge_config_location, image
)
if isfile(self.challenge_image_path):
self.challenge_image_file = ContentFile(
get_file_content(self.challenge_image_path, "rb"),
image,
)
except PathTraversalError:
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
logger.warning(
"Challenge logo path rejected due to path traversal risk: %s",
image,
)
else:
self.challenge_image_file = None
else:
self.challenge_image_file = None
self.files["challenge_image_file"] = self.challenge_image_file
Expand Down Expand Up @@ -737,26 +762,33 @@ def validate_evaluation_script_file(self):
)
self.error_messages.append(message)
else:
evaluation_script_path = join(
self.challenge_config_location, evaluation_script
)
# Check for evaluation script file in extracted zip folder
if isfile(evaluation_script_path):
self.challenge_evaluation_script_file = (
read_file_data_as_content_file(
evaluation_script_path,
"rb",
evaluation_script_path,
)
)
self.files["challenge_evaluation_script_file"] = (
self.challenge_evaluation_script_file
try:
evaluation_script_path = safe_join_under_root(
self.challenge_config_location, evaluation_script
)
else:
except PathTraversalError:
message = self.error_messages_dict.get(
"missing_evaluation_script"
)
self.error_messages.append(message)
else:
# Check for evaluation script file in extracted zip folder
if isfile(evaluation_script_path):
self.challenge_evaluation_script_file = (
read_file_data_as_content_file(
evaluation_script_path,
"rb",
evaluation_script_path,
)
)
self.files["challenge_evaluation_script_file"] = (
self.challenge_evaluation_script_file
)
else:
message = self.error_messages_dict.get(
"missing_evaluation_script"
)
self.error_messages.append(message)
else:
message = self.error_messages_dict.get(
"missing_evaluation_script_key"
Expand Down Expand Up @@ -900,25 +932,36 @@ def validate_challenge_phases(self, current_phase_config_ids):
self.error_messages.append(message)
test_annotation_file = data.get("test_annotation_file")
if test_annotation_file:
test_annotation_file_path = join(
self.challenge_config_location, test_annotation_file
)
if isfile(test_annotation_file_path):
challenge_test_annotation_file = (
read_file_data_as_content_file(
test_annotation_file_path,
"rb",
test_annotation_file_path,
)
try:
test_annotation_file_path = safe_join_under_root(
self.challenge_config_location, test_annotation_file
)
self.files["challenge_test_annotation_files"].append(
challenge_test_annotation_file
)
else:
except PathTraversalError:
message = self.error_messages_dict[
"no_test_annotation_file_found"
].format(data["name"])
self.error_messages.append(message)
self.files["challenge_test_annotation_files"].append(None)
else:
if isfile(test_annotation_file_path):
challenge_test_annotation_file = (
read_file_data_as_content_file(
test_annotation_file_path,
"rb",
test_annotation_file_path,
)
)
self.files["challenge_test_annotation_files"].append(
challenge_test_annotation_file
)
else:
message = self.error_messages_dict[
"no_test_annotation_file_found"
].format(data["name"])
self.error_messages.append(message)
self.files["challenge_test_annotation_files"].append(
None
)
else:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
test_annotation_file_path = None
self.files["challenge_test_annotation_files"].append(None)
Expand Down
Loading
Loading