Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions apps/base/zip_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os
import zipfile


class PathTraversalError(ValueError):
"""Raised when a resolved path escapes its intended root directory."""


def safe_join_under_root(root, *relative_paths):
"""
Join path segments under root and verify the result stays within root.
"""
root_path = os.path.abspath(root)
candidate = os.path.abspath(os.path.join(root_path, *relative_paths))
if not (
candidate == root_path or candidate.startswith(root_path + os.sep)
):
raise PathTraversalError("Path escapes extraction root.")
return candidate
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated


def safe_extract_zip_file(zip_ref, destination):
"""
Extract zip archive members while preventing path traversal (zip slip).
"""
destination_path = os.path.abspath(destination)
for member in zip_ref.namelist():
member_path = os.path.abspath(os.path.join(destination_path, member))
if not (
member_path == destination_path
or member_path.startswith(destination_path + os.sep)
):
raise zipfile.BadZipFile("Zip archive contains unsafe file paths.")
zip_ref.extractall(destination_path)
135 changes: 86 additions & 49 deletions apps/challenges/challenge_config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

import requests
import yaml
from base.zip_utils import (
PathTraversalError,
safe_extract_zip_file,
safe_join_under_root,
)
from challenges.models import (
Challenge,
ChallengePhase,
Expand Down Expand Up @@ -95,9 +100,11 @@ def write_file(output_path, mode, file_content):

def extract_zip_file(file_path, mode, output_path):
zip_ref = zipfile.ZipFile(file_path, mode)
zip_ref.extractall(output_path)
logger.info("Zip file extracted to {}".format(output_path))
zip_ref.close()
try:
safe_extract_zip_file(zip_ref, output_path)
logger.info("Zip file extracted to {}".format(output_path))
finally:
zip_ref.close()
return zip_ref
Comment thread
coderabbitai[bot] marked this conversation as resolved.


Expand Down Expand Up @@ -172,10 +179,15 @@ def is_challenge_config_yaml_html_field_valid(
is_valid {boolean} -- flag for field validation is success
message {string} -- error message if any
"""
value = join(base_location, yaml_file_data.get(key))
relative_path = yaml_file_data.get(key)
message = ""
is_valid = False
if value:
if relative_path:
try:
value = safe_join_under_root(base_location, relative_path)
except PathTraversalError:
message = "Invalid file path for {}.".format(key)
return is_valid, message
if not isfile(value):
message = "File at path {} not found. Please specify a valid file path".format(
key
Expand Down Expand Up @@ -293,7 +305,13 @@ def is_challenge_phase_split_mapping_valid(


def get_value_from_field(data, base_location, field_name):
file_path = join(base_location, data.get(field_name))
relative_path = data.get(field_name)
if not relative_path:
return None
try:
file_path = safe_join_under_root(base_location, relative_path)
except PathTraversalError:
return None
field_value = None
if file_path.endswith(".html") and isfile(file_path):
field_value = get_file_content(file_path, "rb").decode("utf-8")
Expand Down Expand Up @@ -464,8 +482,9 @@ def read_and_validate_yaml(self):

# YAML Read Error
try:
self.yaml_file_path = join(
self.base_location, self.unique_folder_name, self.yaml_file
self.yaml_file_path = safe_join_under_root(
join(self.base_location, self.unique_folder_name),
self.yaml_file,
)
self.yaml_file_data = read_yaml_file(self.yaml_file_path, "r")
return True
Expand All @@ -480,6 +499,10 @@ def read_and_validate_yaml(self):
).format(error_description, line_number, column_number)
self.error_messages.append(message)
return False
except PathTraversalError:
message = "Challenge configuration contains unsafe file paths."
self.error_messages.append(message)
return False

def _approved_config_locked(self):
return (
Expand Down Expand Up @@ -612,19 +635,18 @@ def validate_challenge_logo(self):
or image.endswith(".jpeg")
or image.endswith(".png")
):
self.challenge_image_path = join(
self.base_location,
self.unique_folder_name,
self.extracted_folder_name,
image,
)

if isfile(self.challenge_image_path):
self.challenge_image_file = ContentFile(
get_file_content(self.challenge_image_path, "rb"), image
self.challenge_image_file = None
try:
self.challenge_image_path = safe_join_under_root(
self.challenge_config_location, image
)
else:
self.challenge_image_file = None
if isfile(self.challenge_image_path):
self.challenge_image_file = ContentFile(
get_file_content(self.challenge_image_path, "rb"),
image,
)
except PathTraversalError:
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
pass
else:
self.challenge_image_file = None
self.files["challenge_image_file"] = self.challenge_image_file
Expand Down Expand Up @@ -737,26 +759,33 @@ def validate_evaluation_script_file(self):
)
self.error_messages.append(message)
else:
evaluation_script_path = join(
self.challenge_config_location, evaluation_script
)
# Check for evaluation script file in extracted zip folder
if isfile(evaluation_script_path):
self.challenge_evaluation_script_file = (
read_file_data_as_content_file(
evaluation_script_path,
"rb",
evaluation_script_path,
)
)
self.files["challenge_evaluation_script_file"] = (
self.challenge_evaluation_script_file
try:
evaluation_script_path = safe_join_under_root(
self.challenge_config_location, evaluation_script
)
else:
except PathTraversalError:
message = self.error_messages_dict.get(
"missing_evaluation_script"
)
self.error_messages.append(message)
else:
# Check for evaluation script file in extracted zip folder
if isfile(evaluation_script_path):
self.challenge_evaluation_script_file = (
read_file_data_as_content_file(
evaluation_script_path,
"rb",
evaluation_script_path,
)
)
self.files["challenge_evaluation_script_file"] = (
self.challenge_evaluation_script_file
)
else:
message = self.error_messages_dict.get(
"missing_evaluation_script"
)
self.error_messages.append(message)
else:
message = self.error_messages_dict.get(
"missing_evaluation_script_key"
Expand Down Expand Up @@ -900,25 +929,33 @@ def validate_challenge_phases(self, current_phase_config_ids):
self.error_messages.append(message)
test_annotation_file = data.get("test_annotation_file")
if test_annotation_file:
test_annotation_file_path = join(
self.challenge_config_location, test_annotation_file
)
if isfile(test_annotation_file_path):
challenge_test_annotation_file = (
read_file_data_as_content_file(
test_annotation_file_path,
"rb",
test_annotation_file_path,
)
)
self.files["challenge_test_annotation_files"].append(
challenge_test_annotation_file
try:
test_annotation_file_path = safe_join_under_root(
self.challenge_config_location, test_annotation_file
)
else:
except PathTraversalError:
message = self.error_messages_dict[
"no_test_annotation_file_found"
].format(data["name"])
self.error_messages.append(message)
self.files["challenge_test_annotation_files"].append(None)
else:
if isfile(test_annotation_file_path):
challenge_test_annotation_file = (
read_file_data_as_content_file(
test_annotation_file_path,
"rb",
test_annotation_file_path,
)
)
self.files["challenge_test_annotation_files"].append(
challenge_test_annotation_file
)
else:
message = self.error_messages_dict[
"no_test_annotation_file_found"
].format(data["name"])
self.error_messages.append(message)
else:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
test_annotation_file_path = None
self.files["challenge_test_annotation_files"].append(None)
Expand Down
Loading
Loading