Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions chris_backend/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid
import io
import os
import pathlib

from django.db import models
from django.db.models.functions import Length
Expand Down Expand Up @@ -36,6 +37,83 @@ def validate_permission(permission):
return permission


def user_can_access_obj(obj, user):
"""
Return whether the given user is allowed to read-access the given storage
object (a ``ChrisFolder``, ``ChrisFile`` or ``ChrisLinkFile``).

This is the canonical owner/superuser/public/permission read-access rule.
The object is accessible if the user owns it, is the superuser 'chris', the
object is public, or the user has been granted any permission to it
(possibly through one of their groups).
"""
return (obj.owner == user or user.username == 'chris' or obj.public
or obj.has_user_permission(user))


class PathAccessError(Exception):
"""
Raised by ``validate_path_access`` when a user is not allowed to access a
storage path. The exception message is a human-readable reason suitable for
surfacing to API clients.
"""
pass


def validate_path_access(user, path):
"""
Check whether the given user is allowed to access the given storage path.

Enforces the structural path rules, the PUBLIC/SHARED link-target restriction
and the owner/public/permission access rule. This is the single source of
truth for path-access authorization.

Returns the normalized path on success.
Raises ``PathAccessError`` with a human-readable reason on failure.
"""
path = path.strip().strip('/')
path_parts = pathlib.Path(path).parts

if len(path_parts) < 2:
# trying to access a top-level folder or an unknown folder
raise PathAccessError(
f"This field may not reference a top-level folder path '{path}'.")

if path_parts[0] not in ('home', 'SERVICES', 'PIPELINES'):
raise PathAccessError(
f"This field may not reference an invalid path '{path}'.")

if len(path_parts) == 2 and path_parts[0] == 'home':
raise PathAccessError(
f"This field may not reference a home folder path '{path}'.")

if len(path_parts) == 3 and path_parts[0] == 'home' and path_parts[2] == 'feeds':
raise PathAccessError(
f"This field may not reference a home's feeds folder path '{path}'.")

try:
obj = ChrisFolder.objects.get(path=path)
except ChrisFolder.DoesNotExist: # path is not a folder
try:
obj = ChrisFile.objects.get(fname=path)
except ChrisFile.DoesNotExist: # path is not a file
try:
obj = ChrisLinkFile.objects.get(fname=path)

if obj.path in ('PUBLIC', 'SHARED'):
raise PathAccessError(
f"This field may not reference an invalid path '{path}'.")

except ChrisLinkFile.DoesNotExist: # path is not a link file
raise PathAccessError(
f"This field may not reference an invalid path '{path}'.")

if not user_can_access_obj(obj, user):
raise PathAccessError(
f"User does not have permission to access path '{path}'.")
return path


class AsyncDeletableModel(models.Model):
class DeletionStatus(models.TextChoices):
INACTIVE = 'inactive'
Expand Down
181 changes: 179 additions & 2 deletions chris_backend/core/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@

import io
import logging
import os
from unittest import mock

from django.test import TestCase
from django.test import TestCase, tag
from django.conf import settings
from django.contrib.auth.models import User

from core.models import ChrisFolder, ChrisFile
from core.models import (ChrisFolder, ChrisFile, ChrisLinkFile, PathAccessError,
validate_path_access, user_can_access_obj)
from core.storage import connect_storage
from userfiles.models import UserFile


class ModelTests(TestCase):
Expand Down Expand Up @@ -84,3 +89,175 @@ def test_move(self):
self.assertEqual(self.file.fname.name, new_path)
self.assertEqual(self.file.parent_folder.path, f'home/{self.username}/tests')
storage_manager_mock.delete_obj.assert_called_with(self.upload_path)


class UserCanAccessObjTests(ModelTests):
"""
Tests for the canonical ``user_can_access_obj`` read-access rule.
"""

def test_owner_chris_public_and_permission_rule(self):
"""
Test the owner/superuser/public/permission read-access rule.
"""
user = User.objects.get(username=self.username)
chris = User.objects.get(username='chris')
other = User.objects.create_user(username='other3', password='o-pass')

folder, _ = ChrisFolder.objects.get_or_create(
path='home/other3/uploads', owner=other)

# owner -> allowed
self.assertTrue(user_can_access_obj(folder, other))
# superuser 'chris' -> allowed
self.assertTrue(user_can_access_obj(folder, chris))
# unrelated user, not public, no permission -> denied
self.assertFalse(user_can_access_obj(folder, user))

# public -> allowed
folder.public = True
folder.save()
self.assertTrue(user_can_access_obj(folder, user))

# explicitly shared with the user -> allowed
folder.public = False
folder.save()
folder.grant_user_permission(user, 'r')
self.assertTrue(user_can_access_obj(folder, user))


class ValidatePathAccessTests(ModelTests):
"""
Tests for the centralized ``validate_path_access`` path-access
authorization function.
"""

def test_rejects_structural_paths_with_exact_messages(self):
"""
Test whether validate_path_access rejects structurally invalid paths and
that the exact, distinct error messages are preserved.
"""
user = User.objects.get(username=self.username)

cases = [
('home',
"This field may not reference a top-level folder path 'home'."),
('NOTAROOT/x',
"This field may not reference an invalid path 'NOTAROOT/x'."),
(f'home/{self.username}',
f"This field may not reference a home folder path "
f"'home/{self.username}'."),
(f'home/{self.username}/feeds',
f"This field may not reference a home's feeds folder path "
f"'home/{self.username}/feeds'."),
]
for path, expected_msg in cases:
with self.assertRaises(PathAccessError) as cm:
validate_path_access(user, path)
self.assertEqual(str(cm.exception), expected_msg)

def test_rejects_nonexistent_path_with_exact_message(self):
"""
Test whether validate_path_access rejects a structurally valid path that
does not resolve to any folder/file/link file.
"""
user = User.objects.get(username=self.username)
with self.assertRaises(PathAccessError) as cm:
validate_path_access(user, 'home/someone/uploads')
self.assertEqual(
str(cm.exception),
"This field may not reference an invalid path 'home/someone/uploads'.")

def test_folder_owner_public_and_permission_rule(self):
"""
Test the owner/public/permission access rule and that a normalized path
is returned on success.
"""
user = User.objects.get(username=self.username)
own_folder, _ = ChrisFolder.objects.get_or_create(
path=f'home/{self.username}/uploads', owner=user)

# owner -> allowed; trailing slash is normalized away in the return value
self.assertEqual(validate_path_access(user, own_folder.path + '/'),
own_folder.path)

other = User.objects.create_user(username='other', password='other-pass')
other_folder, _ = ChrisFolder.objects.get_or_create(
path='home/other/uploads', owner=other)

# not owner / not public / no permission -> denied (permission message)
with self.assertRaises(PathAccessError) as cm:
validate_path_access(user, other_folder.path)
self.assertEqual(
str(cm.exception),
"User does not have permission to access path 'home/other/uploads'.")

# public -> allowed
other_folder.public = True
other_folder.save()
self.assertEqual(validate_path_access(user, other_folder.path),
other_folder.path)

# explicitly shared with the user -> allowed
other_folder.public = False
other_folder.save()
other_folder.grant_user_permission(user, 'r')
self.assertEqual(validate_path_access(user, other_folder.path),
other_folder.path)

def test_chris_superuser_bypass(self):
"""
Test whether the 'chris' superuser is allowed to access any path.
"""
chris = User.objects.get(username='chris')
other = User.objects.create_user(username='other2', password='o-pass')
other_folder, _ = ChrisFolder.objects.get_or_create(
path='home/other2/uploads', owner=other)
self.assertEqual(validate_path_access(chris, other_folder.path),
other_folder.path)


@tag('integration')
class ValidatePathAccessStorageTests(ModelTests):
"""
Storage-backed tests for ``validate_path_access`` covering the ChrisFile and
ChrisLinkFile resolution branches and the PUBLIC/SHARED link-target rule.
"""

def setUp(self):
super(ValidatePathAccessStorageTests, self).setUp()
self.storage_manager = connect_storage(settings)

def test_resolves_file_and_link_targets(self):
user = User.objects.get(username=self.username)
folder, _ = ChrisFolder.objects.get_or_create(
path=f'home/{self.username}/uploads', owner=user)

# ChrisFile branch -> owned by user -> allowed
fpath = f'home/{self.username}/uploads/f.txt'
with io.StringIO('x') as f:
self.storage_manager.upload_obj(fpath, f.read(),
content_type='text/plain')
uf = UserFile(owner=user, parent_folder=folder)
uf.fname.name = fpath
uf.save()
self.assertEqual(validate_path_access(user, fpath), fpath)

# ChrisLinkFile branch, target not PUBLIC/SHARED -> allowed
lf = ChrisLinkFile(path=f'home/{self.username}/uploads', owner=user,
parent_folder=folder)
lf.save(name='mylink')
self.assertEqual(validate_path_access(user, lf.fname.name),
lf.fname.name)

# ChrisLinkFile whose target is PUBLIC -> rejected even though owned
lf2 = ChrisLinkFile(path='PUBLIC', owner=user, parent_folder=folder)
lf2.save(name='publink')
with self.assertRaises(PathAccessError) as cm:
validate_path_access(user, lf2.fname.name)
self.assertEqual(
str(cm.exception),
f"This field may not reference an invalid path '{lf2.fname.name}'.")

# delete files from storage
self.storage_manager.delete_path(f'home/{self.username}/uploads')
5 changes: 2 additions & 3 deletions chris_backend/filebrowser/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from django.db import models

from core.models import ChrisFolder
from core.models import ChrisFolder, user_can_access_obj


def get_folder_queryset(pk_dict, user=None):
Expand All @@ -18,8 +18,7 @@ def get_folder_queryset(pk_dict, user=None):
if not folder.public:
return ChrisFolder.objects.none()
else:
if not (folder.owner == user or user.username == 'chris' or folder.public
or folder.has_user_permission(user)):
if not user_can_access_obj(folder, user):
return ChrisFolder.objects.none()
return qs

Expand Down
3 changes: 2 additions & 1 deletion chris_backend/plugininstances/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
from plugins.fields import CPUField, MemoryField
from plugins.fields import MemoryInt, CPUInt
from workflows.models import Workflow
from plugininstances.enums import ACTIVE_STATUSES, INACTIVE_STATUSES, STATUS_CHOICES, REMOTE_CLEANUP_STATUS_CHOICES
from plugininstances.enums import (ACTIVE_STATUSES, INACTIVE_STATUSES, STATUS_CHOICES,
REMOTE_CLEANUP_STATUS_CHOICES)


logger = logging.getLogger(__name__)
Expand Down
49 changes: 6 additions & 43 deletions chris_backend/plugininstances/serializers.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@

import pathlib

from django.core.exceptions import ObjectDoesNotExist
from rest_framework import serializers
from rest_framework.reverse import reverse
from drf_spectacular.utils import OpenApiTypes, extend_schema_field

from collectionjson.fields import ItemLinkField
from core.models import ChrisFolder, ChrisFile, ChrisLinkFile
from core.models import PathAccessError, validate_path_access
from plugins.enums import TYPES
from plugins.models import Plugin

Expand Down Expand Up @@ -344,51 +342,16 @@ class Meta:

def validate_paths(user, string):
"""
Custom function to check whether a user is allowed to access the provided paths.
Custom function to check whether a user is allowed to access the provided
paths (a string of one or more paths separated by commas).
"""
path_list = [s.strip().strip('/') for s in string.split(',')]

for path in path_list:
path_parts = pathlib.Path(path).parts

if len(path_parts) < 2:
# trying to access a top-level folder or an unknown folder
raise serializers.ValidationError(
[f"This field may not reference a top-level folder path '{path}'."])

if path_parts[0] not in ('home', 'SERVICES', 'PIPELINES'):
raise serializers.ValidationError(
[f"This field may not reference an invalid path '{path}'."])

if len(path_parts) == 2 and path_parts[0] == 'home':
raise serializers.ValidationError(
[f"This field may not reference a home folder path '{path}'."])

if len(path_parts) == 3 and path_parts[0] == 'home' and path_parts[2] == 'feeds':
raise serializers.ValidationError(
[f"This field may not reference a home's feeds folder path '{path}'."])

try:
obj = ChrisFolder.objects.get(path=path)
except ChrisFolder.DoesNotExist: # path is not a folder
try:
obj = ChrisFile.objects.get(fname=path)
except ChrisFile.DoesNotExist: # path is not a file
try:
obj = ChrisLinkFile.objects.get(fname=path)

if obj.path in ('PUBLIC', 'SHARED'):
raise serializers.ValidationError(
[f"This field may not reference an invalid path '{path}'."])

except ChrisLinkFile.DoesNotExist: # path is not a link file
raise serializers.ValidationError(
[f"This field may not reference an invalid path '{path}'."])

if not (obj.owner == user or user.username == 'chris' or obj.public
or obj.has_user_permission(user)):
raise serializers.ValidationError(
[f"User does not have permission to access path '{path}'."])
validate_path_access(user, path)
except PathAccessError as e:
raise serializers.ValidationError([str(e)])

return ','.join(path_list)

Expand Down
Loading
Loading