Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions storage_file/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@
"security/storage_file.xml",
"data/ir_cron.xml",
"data/storage_backend.xml",
"wizards/swap_backend.xml",
],
}
98 changes: 98 additions & 0 deletions storage_file/models/storage_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@ def write(self, vals):
"File can not be updated, remove it and create a new one"
)
)
if "backend_id" in vals and not self.env.context.get(
"storage_file_swap_backend"
):
new_backend = self.env["storage.backend"].browse(vals["backend_id"])
to_swap = self.filtered(
lambda r: r.backend_id and r.backend_id != new_backend
)
if to_swap:
to_swap._swap_backend(new_backend)
remaining = self - to_swap
if remaining:
return super(StorageFile, remaining).write(vals)
return True
return super().write(vals)

@api.depends("file_size")
Expand Down Expand Up @@ -220,6 +233,91 @@ def _clean_storage_file(self, batch_size=1000):
done=done, remaining=0 if done <= batch_size else len(ids) - done
)

def _swap_backend(self, new_backend):
"""Swap files to ``new_backend``.

For each record:

- read binary data from the current backend storage,
- upload it to the new backend (re-computing relative_path using the
new backend filename strategy),
- update ``backend_id`` and ``relative_path`` on the record,
- delete the old file from the previous backend.

Files already on ``new_backend`` are skipped.

:return: dict with ``moved`` (list of names) and ``failed`` (list of
error descriptions).
"""
if not new_backend:
raise UserError(self.env._("A destination storage is required."))
new_backend = new_backend.sudo()
if not new_backend.filename_strategy:
raise UserError(
self.env._(
"The filename strategy is empty for the backend %s.\n"
"Please configure it.",
new_backend.name,
)
)
moved = []
failed = []
for record in self.sudo():
if not record.exists():
failed.append(f"ID {record.id}: record no longer exists")
continue
if record.backend_id == new_backend:
continue
old_backend = record.backend_id
old_relative_path = record.relative_path
try:
if not old_relative_path:
record.with_context(
storage_file_swap_backend=True
).backend_id = new_backend
moved.append(f"{record.name} (ID {record.id})")
continue
bin_data = old_backend.get(old_relative_path, binary=True)
# Same logic as _build_relative_path but using the target
# backend strategy (backend_id not yet reassigned).
strategy = new_backend.filename_strategy
if strategy == "hash":
new_relative_path = record.checksum[:2] + "/" + record.checksum
else:
new_relative_path = record.slug
new_backend.add(
new_relative_path,
bin_data,
mimetype=record.mimetype,
binary=True,
)
record.with_context(storage_file_swap_backend=True).write(
{
"backend_id": new_backend.id,
"relative_path": new_relative_path,
}
)
try:
old_backend.delete(old_relative_path)
except Exception as exc:
_logger.warning(
"Swapped %s but failed to delete old file %s from %s: %s",
record.name,
old_relative_path,
old_backend.name,
exc,
)
moved.append(f"{record.name} (ID {record.id})")
except Exception as exc:
failed.append(f"{record.name} (ID {record.id}): {exc}")
_logger.exception(
"Failed to swap file %s (ID %d) to backend %s",
record.name,
record.id,
new_backend.name,
)
return {"moved": moved, "failed": failed}

@api.model
def get_from_slug_name_with_id(self, slug_name_with_id):
"""
Expand Down
1 change: 1 addition & 0 deletions storage_file/readme/CONTRIBUTORS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Sebastien Beau \<<sebastien.beau@akretion.com>\>
- Raphaël Reverdy \<<raphael.reverdy@akretion.com>\>
- Vo Hong Thien \<<thienvh@trobz.com>\>
- Simone Orsi \<<simone.orsi@camptocamp.com>\>
1 change: 1 addition & 0 deletions storage_file/security/ir.model.access.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
access_storage_file_edit,storage_file edit,model_storage_file,base.group_system,1,1,1,1
access_storage_file_read_public,storage_file public read,model_storage_file,base.group_user,1,0,0,0
access_storage_file_replace,storage_file_replace public,model_storage_file_replace,base.group_user,1,1,1,1
access_storage_file_swap_backend,storage_file_swap_backend admin,model_storage_file_swap_backend,base.group_system,1,1,1,1
1 change: 1 addition & 0 deletions storage_file/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from . import test_storage_file
from . import test_swap_backend
206 changes: 206 additions & 0 deletions storage_file/tests/test_swap_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
# Copyright 2026 Camptocamp SA
# @author Simone Orsi <simone.orsi@camptocamp.com>
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).

import base64
from unittest import mock

from odoo.exceptions import UserError
from odoo.tests import Form

from odoo.addons.component.tests.common import TransactionComponentCase


class TestSwapBackend(TransactionComponentCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.backend_a = cls.env.ref("storage_backend.default_storage_backend")
cls.backend_b = cls.backend_a.copy(
{
"name": "Second Backend",
"directory_path": "backend_b",
}
)
cls.backend_b.filename_strategy = "name_with_id"

def _create_storage_file(self, name="my_file.txt", data=b"hello", backend=None):
return self.env["storage.file"].create(
{
"name": name,
"backend_id": (backend or self.backend_a).id,
"data": base64.b64encode(data),
}
)

# -- model-level swap -------------------------------------------------

def test_swap_uploads_to_new_backend_and_updates_record(self):
stfile = self._create_storage_file(data=b"payload")

result = stfile._swap_backend(self.backend_b)

self.assertEqual(stfile.backend_id, self.backend_b)
self.assertEqual(stfile.relative_path, f"my_file-{stfile.id}.txt")
self.assertEqual(base64.b64decode(stfile.data), b"payload")
self.assertIn(stfile.name, result["moved"][0])

def test_swap_deletes_old_file(self):
stfile = self._create_storage_file(data=b"payload")
old_relative_path = stfile.relative_path
stfile._swap_backend(self.backend_b)
with self.assertRaises(FileNotFoundError):
self.backend_a.sudo().get(old_relative_path, binary=True)

def test_swap_skips_records_already_on_destination(self):
stfile = self._create_storage_file(backend=self.backend_b)
with mock.patch.object(
type(self.env["storage.backend"]), "delete"
) as mocked_delete:
result = stfile._swap_backend(self.backend_b)
mocked_delete.assert_not_called()
self.assertEqual(stfile.backend_id, self.backend_b)
self.assertEqual(result["moved"], [])
self.assertEqual(result["failed"], [])

def test_swap_requires_destination(self):
stfile = self._create_storage_file()
with self.assertRaisesRegex(UserError, "A destination storage is required"):
stfile._swap_backend(self.env["storage.backend"])

def test_swap_requires_destination_filename_strategy(self):
self.backend_b.filename_strategy = False
stfile = self._create_storage_file()
with self.assertRaisesRegex(UserError, "The filename strategy is empty"):
stfile._swap_backend(self.backend_b)

def test_swap_failure_reports_in_failed(self):
"""Upload failure is caught and reported in the failed list."""
stfile = self._create_storage_file(data=b"payload")
old_relative_path = stfile.relative_path
with mock.patch.object(
type(self.env["storage.backend"]),
"add",
side_effect=RuntimeError("boom"),
):
with self.assertLogs(
"odoo.addons.storage_file.models.storage_file", level="ERROR"
) as log_cm:
result = stfile._swap_backend(self.backend_b)
self.assertEqual(len(result["failed"]), 1)
self.assertIn("boom", result["failed"][0])
self.assertTrue(
any("Failed to swap file" in msg and "boom" in msg for msg in log_cm.output)
)
# Old file still physically present.
self.assertEqual(
self.backend_a.sudo().get(old_relative_path, binary=True), b"payload"
)

def test_swap_swallows_old_backend_delete_error(self):
stfile = self._create_storage_file(data=b"payload")
with mock.patch.object(
type(self.env["storage.backend"]),
"delete",
side_effect=RuntimeError("boom"),
):
with self.assertLogs(
"odoo.addons.storage_file.models.storage_file", level="WARNING"
) as log_cm:
result = stfile._swap_backend(self.backend_b)
self.assertTrue(
any("failed to delete" in msg and "boom" in msg for msg in log_cm.output)
)
# File still counts as moved
self.assertEqual(len(result["moved"]), 1)

# -- wizard ----------------------------------------------------------------

def test_wizard_default_get_single_backend(self):
stfile1 = self._create_storage_file(name="f1.txt")
stfile2 = self._create_storage_file(name="f2.txt")
wiz = (
self.env["storage.file.swap.backend"]
.with_context(
active_model="storage.file",
active_ids=[stfile1.id, stfile2.id],
)
.create({})
)
self.assertEqual(wiz.source_backend_id, self.backend_a)
self.assertEqual(wiz.file_ids, stfile1 + stfile2)

def test_wizard_default_get_rejects_mixed_backends(self):
stfile1 = self._create_storage_file(name="f1.txt")
stfile2 = self._create_storage_file(name="f2.txt", backend=self.backend_b)
with self.assertRaisesRegex(
UserError,
"All selected records must belong to the same source storage backend",
):
self.env["storage.file.swap.backend"].with_context(
active_model="storage.file",
active_ids=[stfile1.id, stfile2.id],
).create({})

def test_wizard_apply_swaps_files(self):
stfile = self._create_storage_file(data=b"payload")
wiz = (
self.env["storage.file.swap.backend"]
.with_context(
active_model="storage.file",
active_ids=stfile.ids,
)
.create({"dest_backend_id": self.backend_b.id})
)
wiz.action_apply()
self.assertEqual(stfile.backend_id, self.backend_b)

def test_wizard_apply_rejects_same_backend(self):
stfile = self._create_storage_file()
wiz = (
self.env["storage.file.swap.backend"]
.with_context(
active_model="storage.file",
active_ids=stfile.ids,
)
.create({})
)
wiz.dest_backend_id = wiz.source_backend_id
with self.assertRaisesRegex(
UserError, "Destination storage must differ from source"
):
wiz.action_apply()

def test_wizard_form_loads_with_source_backend(self):
"""The form view loads and pre-fills source_backend_id."""
stfile = self._create_storage_file()
view = "storage_file.storage_file_swap_backend_view_form"
with Form(
self.env["storage.file.swap.backend"].with_context(
active_model="storage.file",
active_ids=stfile.ids,
),
view=view,
) as wiz_form:
self.assertEqual(wiz_form.source_backend_id, self.backend_a)
wiz_form.dest_backend_id = self.backend_b

# -- write triggers swap ------------------------------------------------

def test_write_backend_id_triggers_swap(self):
"""Writing backend_id on storage.file triggers the full swap."""
stfile = self._create_storage_file(data=b"payload")
old_path = stfile.relative_path
stfile.backend_id = self.backend_b
self.assertEqual(stfile.backend_id, self.backend_b)
self.assertEqual(base64.b64decode(stfile.data), b"payload")
with self.assertRaises(FileNotFoundError):
self.backend_a.sudo().get(old_path, binary=True)

def test_write_backend_id_noop_if_same(self):
"""Writing same backend_id does nothing special."""
stfile = self._create_storage_file(data=b"payload")
old_path = stfile.relative_path
stfile.backend_id = self.backend_a
self.assertEqual(stfile.backend_id, self.backend_a)
self.assertEqual(stfile.relative_path, old_path)
1 change: 1 addition & 0 deletions storage_file/wizards/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
from . import swap_backend
from . import replace_file
Loading
Loading