+
+
diff --git a/dashboard/src/views/authentication/authForms/stages/AuthStageAccount.vue b/dashboard/src/views/authentication/authForms/stages/AuthStageAccount.vue
new file mode 100644
index 0000000000..2464525394
--- /dev/null
+++ b/dashboard/src/views/authentication/authForms/stages/AuthStageAccount.vue
@@ -0,0 +1,72 @@
+
+
+
+ emit('update:username', value)"
+ @keyup.enter="onSubmit"
+ >
+
+ emit('update:password', value)"
+ @keyup.enter="onSubmit"
+ >
+
+
+ {{ t('defaultHint') }}
+
+
+
+ {{ t('login') }}
+
+
diff --git a/dashboard/src/views/authentication/authForms/stages/AuthStageRecovery.vue b/dashboard/src/views/authentication/authForms/stages/AuthStageRecovery.vue
new file mode 100644
index 0000000000..376b0664eb
--- /dev/null
+++ b/dashboard/src/views/authentication/authForms/stages/AuthStageRecovery.vue
@@ -0,0 +1,80 @@
+
+
+
+
+
+
+
+ {{ t('recovery.totpDisableWarning') }}
+
+
+
+ onCodeInput(value)"
+ @keyup.enter="onSubmit"
+ >
+
+
+
+ {{ t('recovery.submit') }}
+
+
diff --git a/dashboard/src/views/authentication/authForms/stages/AuthStageTotp.vue b/dashboard/src/views/authentication/authForms/stages/AuthStageTotp.vue
new file mode 100644
index 0000000000..ae11926d6f
--- /dev/null
+++ b/dashboard/src/views/authentication/authForms/stages/AuthStageTotp.vue
@@ -0,0 +1,80 @@
+
+
+
+
+
+
+ {{ t('totp.verify') }}
+
+
+ emit('update:code', value)"
+ @keyup.enter="onSubmit"
+ >
+
+ emit('update:trustDevice', !!value)"
+ >
+
+
+
+ {{ t('recovery.useRecoveryCode') }}
+
+
+
+
+ {{ t('totp.verify') }}
+
+
diff --git a/pyproject.toml b/pyproject.toml
index 16055d72fa..606b059343 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -65,6 +65,7 @@ dependencies = [
"pysocks>=1.7.1",
"packaging>=24.2",
"python-ripgrep==0.0.8",
+ "pyotp>=2.9.0",
]
[dependency-groups]
diff --git a/requirements.txt b/requirements.txt
index e667c67559..36ed53fb57 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -54,3 +54,4 @@ shipyard-neo-sdk>=0.2.0
packaging>=24.2
qrcode>=8.2
python-ripgrep==0.0.8
+pyotp>=2.9.0
diff --git a/tests/test_dashboard.py b/tests/test_dashboard.py
index 86bdf455ef..bb0b9db412 100644
--- a/tests/test_dashboard.py
+++ b/tests/test_dashboard.py
@@ -12,11 +12,13 @@
from types import SimpleNamespace
from urllib.parse import parse_qs, urlsplit, urlunsplit
+import pyotp
import pytest
import pytest_asyncio
from quart import Quart, jsonify
from werkzeug.datastructures import FileStorage
+import astrbot.dashboard.server as dashboard_server
from astrbot.core import LogBroker
from astrbot.core.core_lifecycle import AstrBotCoreLifecycle
from astrbot.core.db.sqlite import SQLiteDatabase
@@ -28,6 +30,10 @@
verify_dashboard_password,
)
from astrbot.core.utils.pip_installer import PipInstallError
+from astrbot.core.utils.totp import (
+ TOTP_TRUSTED_DEVICE_COOKIE_NAME,
+ generate_recovery_code,
+)
from astrbot.dashboard.password_state import (
get_dashboard_password_hash,
is_password_change_required,
@@ -356,6 +362,449 @@ async def test_auth_login_secure_cookie_override(
assert "SameSite=Strict" in jwt_cookie_header
+@pytest.mark.asyncio
+async def test_auth_rate_limit_uses_client_ip_bucket_across_paths(
+ app: Quart,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+ monkeypatch: pytest.MonkeyPatch,
+):
+ monkeypatch.setenv("ASTRBOT_TEST_MODE", "false")
+ dashboard_server._rate_limiters.clear()
+ original_value = core_lifecycle_td.astrbot_config["dashboard"].get(
+ "trust_proxy_headers", False
+ )
+ core_lifecycle_td.astrbot_config["dashboard"]["trust_proxy_headers"] = True
+
+ try:
+ test_client = app.test_client()
+ headers = {"X-Forwarded-For": "198.51.100.10"}
+ await test_client.post(
+ "/api/auth/login",
+ json={"username": "wrong", "password": "wrong"},
+ headers=headers,
+ )
+ await test_client.post("/api/auth/totp/setup", json={}, headers=headers)
+
+ assert len(dashboard_server._rate_limiters) == 1
+ assert "198.51.100.10" in dashboard_server._rate_limiters
+ finally:
+ core_lifecycle_td.astrbot_config["dashboard"]["trust_proxy_headers"] = (
+ original_value
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_rate_limit_separates_different_client_ips(
+ app: Quart,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+ monkeypatch: pytest.MonkeyPatch,
+):
+ monkeypatch.setenv("ASTRBOT_TEST_MODE", "false")
+ dashboard_server._rate_limiters.clear()
+ original_value = core_lifecycle_td.astrbot_config["dashboard"].get(
+ "trust_proxy_headers", False
+ )
+ core_lifecycle_td.astrbot_config["dashboard"]["trust_proxy_headers"] = True
+
+ try:
+ test_client = app.test_client()
+ await test_client.post(
+ "/api/auth/login",
+ json={"username": "wrong", "password": "wrong"},
+ headers={"X-Forwarded-For": "198.51.100.10"},
+ )
+ await test_client.post(
+ "/api/auth/login",
+ json={"username": "wrong", "password": "wrong"},
+ headers={"X-Forwarded-For": "198.51.100.11"},
+ )
+
+ assert len(dashboard_server._rate_limiters) == 2
+ assert "198.51.100.10" in dashboard_server._rate_limiters
+ assert "198.51.100.11" in dashboard_server._rate_limiters
+ finally:
+ core_lifecycle_td.astrbot_config["dashboard"]["trust_proxy_headers"] = (
+ original_value
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_rate_limit_ignores_proxy_headers_by_default(
+ app: Quart,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+ monkeypatch: pytest.MonkeyPatch,
+):
+ monkeypatch.setenv("ASTRBOT_TEST_MODE", "false")
+ dashboard_server._rate_limiters.clear()
+ original_value = core_lifecycle_td.astrbot_config["dashboard"].get(
+ "trust_proxy_headers", False
+ )
+ core_lifecycle_td.astrbot_config["dashboard"]["trust_proxy_headers"] = False
+
+ try:
+ test_client = app.test_client()
+ await test_client.post(
+ "/api/auth/login",
+ json={"username": "wrong", "password": "wrong"},
+ headers={"X-Forwarded-For": "198.51.100.20"},
+ )
+ await test_client.post(
+ "/api/auth/login",
+ json={"username": "wrong", "password": "wrong"},
+ headers={"X-Forwarded-For": "198.51.100.21"},
+ )
+
+ assert len(dashboard_server._rate_limiters) == 1
+ assert "198.51.100.20" not in dashboard_server._rate_limiters
+ assert "198.51.100.21" not in dashboard_server._rate_limiters
+ finally:
+ core_lifecycle_td.astrbot_config["dashboard"]["trust_proxy_headers"] = (
+ original_value
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_login_requires_totp_when_enabled_and_not_trusted(
+ app: Quart,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+):
+ original_dashboard_config = copy.deepcopy(
+ core_lifecycle_td.astrbot_config["dashboard"]
+ )
+ test_client = app.test_client()
+ _, recovery_code_hash = generate_recovery_code()
+ secret = pyotp.random_base32()
+
+ try:
+ core_lifecycle_td.astrbot_config["dashboard"]["totp"] = {
+ "enable": True,
+ "secret": secret,
+ "recovery_code_hash": recovery_code_hash,
+ }
+ response = await test_client.post(
+ "/api/auth/login",
+ json={
+ "username": core_lifecycle_td.astrbot_config["dashboard"]["username"],
+ "password": _resolve_dashboard_password(core_lifecycle_td),
+ },
+ )
+ data = await response.get_json()
+ assert response.status_code == 401
+ assert data["status"] == "error"
+ assert data["data"]["totp_required"] is True
+ finally:
+ await _restore_dashboard_password_state(
+ core_lifecycle_td,
+ original_dashboard_config,
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_login_accepts_valid_totp_code(
+ app: Quart,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+):
+ original_dashboard_config = copy.deepcopy(
+ core_lifecycle_td.astrbot_config["dashboard"]
+ )
+ test_client = app.test_client()
+ _, recovery_code_hash = generate_recovery_code()
+ secret = pyotp.random_base32()
+
+ try:
+ core_lifecycle_td.astrbot_config["dashboard"]["totp"] = {
+ "enable": True,
+ "secret": secret,
+ "recovery_code_hash": recovery_code_hash,
+ }
+ response = await test_client.post(
+ "/api/auth/login",
+ json={
+ "username": core_lifecycle_td.astrbot_config["dashboard"]["username"],
+ "password": _resolve_dashboard_password(core_lifecycle_td),
+ "code": pyotp.TOTP(secret).now(),
+ },
+ )
+ data = await response.get_json()
+ assert data["status"] == "ok"
+ assert "token" in data["data"]
+ finally:
+ await _restore_dashboard_password_state(
+ core_lifecycle_td,
+ original_dashboard_config,
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_login_rejects_invalid_totp_code(
+ app: Quart,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+):
+ original_dashboard_config = copy.deepcopy(
+ core_lifecycle_td.astrbot_config["dashboard"]
+ )
+ test_client = app.test_client()
+ _, recovery_code_hash = generate_recovery_code()
+ secret = pyotp.random_base32()
+
+ try:
+ core_lifecycle_td.astrbot_config["dashboard"]["totp"] = {
+ "enable": True,
+ "secret": secret,
+ "recovery_code_hash": recovery_code_hash,
+ }
+ valid_code = pyotp.TOTP(secret).now()
+ invalid_code = str((int(valid_code) + 1) % 1_000_000).zfill(6)
+ response = await test_client.post(
+ "/api/auth/login",
+ json={
+ "username": core_lifecycle_td.astrbot_config["dashboard"]["username"],
+ "password": _resolve_dashboard_password(core_lifecycle_td),
+ "code": invalid_code,
+ },
+ )
+ data = await response.get_json()
+ assert response.status_code == 401
+ assert data["status"] == "error"
+ finally:
+ await _restore_dashboard_password_state(
+ core_lifecycle_td,
+ original_dashboard_config,
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_login_with_recovery_code_disables_totp(
+ app: Quart,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+):
+ original_dashboard_config = copy.deepcopy(
+ core_lifecycle_td.astrbot_config["dashboard"]
+ )
+ test_client = app.test_client()
+ recovery_code, recovery_code_hash = generate_recovery_code()
+ secret = pyotp.random_base32()
+
+ try:
+ core_lifecycle_td.astrbot_config["dashboard"]["totp"] = {
+ "enable": True,
+ "secret": secret,
+ "recovery_code_hash": recovery_code_hash,
+ }
+ response = await test_client.post(
+ "/api/auth/login",
+ json={
+ "username": core_lifecycle_td.astrbot_config["dashboard"]["username"],
+ "password": _resolve_dashboard_password(core_lifecycle_td),
+ "code": recovery_code,
+ },
+ )
+ data = await response.get_json()
+ assert data["status"] == "ok"
+ assert core_lifecycle_td.astrbot_config["dashboard"]["totp"] == {
+ "enable": False,
+ "secret": "",
+ "recovery_code_hash": "",
+ }
+ finally:
+ await _restore_dashboard_password_state(
+ core_lifecycle_td,
+ original_dashboard_config,
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_login_sets_trusted_device_cookie_when_flag_true(
+ app: Quart,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+):
+ original_dashboard_config = copy.deepcopy(
+ core_lifecycle_td.astrbot_config["dashboard"]
+ )
+ test_client = app.test_client()
+ _, recovery_code_hash = generate_recovery_code()
+ secret = pyotp.random_base32()
+
+ try:
+ core_lifecycle_td.astrbot_config["dashboard"]["totp"] = {
+ "enable": True,
+ "secret": secret,
+ "recovery_code_hash": recovery_code_hash,
+ }
+ response = await test_client.post(
+ "/api/auth/login",
+ json={
+ "username": core_lifecycle_td.astrbot_config["dashboard"]["username"],
+ "password": _resolve_dashboard_password(core_lifecycle_td),
+ "code": pyotp.TOTP(secret).now(),
+ "trust_device_flag": True,
+ },
+ )
+ data = await response.get_json()
+ assert data["status"] == "ok"
+ set_cookie_headers = response.headers.getlist("Set-Cookie")
+ trusted_cookie_header = next(
+ (
+ value
+ for value in set_cookie_headers
+ if TOTP_TRUSTED_DEVICE_COOKIE_NAME in value
+ ),
+ "",
+ )
+ assert trusted_cookie_header
+ assert "HttpOnly" in trusted_cookie_header
+ assert "SameSite=Strict" in trusted_cookie_header
+ assert "Path=/api/auth" in trusted_cookie_header
+ finally:
+ await _restore_dashboard_password_state(
+ core_lifecycle_td,
+ original_dashboard_config,
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_login_skips_totp_when_trusted_cookie_valid(
+ app: Quart,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+):
+ original_dashboard_config = copy.deepcopy(
+ core_lifecycle_td.astrbot_config["dashboard"]
+ )
+ test_client = app.test_client()
+ _, recovery_code_hash = generate_recovery_code()
+ secret = pyotp.random_base32()
+
+ try:
+ core_lifecycle_td.astrbot_config["dashboard"]["totp"] = {
+ "enable": True,
+ "secret": secret,
+ "recovery_code_hash": recovery_code_hash,
+ }
+ first_login = await test_client.post(
+ "/api/auth/login",
+ json={
+ "username": core_lifecycle_td.astrbot_config["dashboard"]["username"],
+ "password": _resolve_dashboard_password(core_lifecycle_td),
+ "code": pyotp.TOTP(secret).now(),
+ "trust_device_flag": True,
+ },
+ )
+ first_data = await first_login.get_json()
+ assert first_data["status"] == "ok"
+
+ second_login = await test_client.post(
+ "/api/auth/login",
+ json={
+ "username": core_lifecycle_td.astrbot_config["dashboard"]["username"],
+ "password": _resolve_dashboard_password(core_lifecycle_td),
+ },
+ )
+ second_data = await second_login.get_json()
+ assert second_login.status_code == 200
+ assert second_data["status"] == "ok"
+ finally:
+ await _restore_dashboard_password_state(
+ core_lifecycle_td,
+ original_dashboard_config,
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_totp_disable_by_totp_code(
+ app: Quart,
+ authenticated_header: dict,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+):
+ original_dashboard_config = copy.deepcopy(
+ core_lifecycle_td.astrbot_config["dashboard"]
+ )
+ test_client = app.test_client()
+ _, recovery_code_hash = generate_recovery_code()
+ secret = pyotp.random_base32()
+
+ try:
+ core_lifecycle_td.astrbot_config["dashboard"]["totp"] = {
+ "enable": True,
+ "secret": secret,
+ "recovery_code_hash": recovery_code_hash,
+ }
+ response = await test_client.post(
+ "/api/auth/totp/disable",
+ headers=authenticated_header,
+ json={"code": pyotp.TOTP(secret).now()},
+ )
+ data = await response.get_json()
+ assert data["status"] == "ok"
+ assert core_lifecycle_td.astrbot_config["dashboard"]["totp"] == {
+ "enable": False,
+ "secret": "",
+ "recovery_code_hash": "",
+ }
+ finally:
+ await _restore_dashboard_password_state(
+ core_lifecycle_td,
+ original_dashboard_config,
+ )
+
+
+@pytest.mark.asyncio
+async def test_auth_totp_verify_setup_with_valid_code_returns_recovery_code(
+ app: Quart,
+ authenticated_header: dict,
+):
+ test_client = app.test_client()
+ secret = pyotp.random_base32()
+ response = await test_client.post(
+ "/api/auth/totp/verify-setup",
+ headers=authenticated_header,
+ json={"secret": secret, "code": pyotp.TOTP(secret).now()},
+ )
+ data = await response.get_json()
+ assert data["status"] == "ok"
+ assert isinstance(data["data"]["recovery_code"], str)
+ assert isinstance(data["data"]["recovery_code_hash"], str)
+ assert data["data"]["recovery_code"]
+ assert data["data"]["recovery_code_hash"]
+
+
+@pytest.mark.asyncio
+async def test_auth_totp_disable_by_recovery_code(
+ app: Quart,
+ authenticated_header: dict,
+ core_lifecycle_td: AstrBotCoreLifecycle,
+):
+ original_dashboard_config = copy.deepcopy(
+ core_lifecycle_td.astrbot_config["dashboard"]
+ )
+ test_client = app.test_client()
+ recovery_code, recovery_code_hash = generate_recovery_code()
+ secret = pyotp.random_base32()
+
+ try:
+ core_lifecycle_td.astrbot_config["dashboard"]["totp"] = {
+ "enable": True,
+ "secret": secret,
+ "recovery_code_hash": recovery_code_hash,
+ }
+ response = await test_client.post(
+ "/api/auth/totp/disable",
+ headers=authenticated_header,
+ json={"code": recovery_code},
+ )
+ data = await response.get_json()
+ assert data["status"] == "ok"
+ assert core_lifecycle_td.astrbot_config["dashboard"]["totp"] == {
+ "enable": False,
+ "secret": "",
+ "recovery_code_hash": "",
+ }
+ finally:
+ await _restore_dashboard_password_state(
+ core_lifecycle_td,
+ original_dashboard_config,
+ )
+
+
@pytest.mark.asyncio
async def test_legacy_md5_dashboard_password_keeps_legacy_auth_until_edit(
app: Quart,
diff --git a/tests/unit/test_totp_utils.py b/tests/unit/test_totp_utils.py
new file mode 100644
index 0000000000..2a8ae83c04
--- /dev/null
+++ b/tests/unit/test_totp_utils.py
@@ -0,0 +1,98 @@
+import pyotp
+import pytest
+
+from astrbot.core.db.sqlite import SQLiteDatabase
+from astrbot.core.utils.totp import (
+ consume_totp_code,
+ generate_recovery_code,
+ is_totp_enabled,
+ is_totp_trusted_device_valid,
+ issue_totp_trusted_device,
+ verify_recovery_code,
+)
+
+
+@pytest.mark.parametrize(
+ ("totp_config", "expected"),
+ [
+ ({}, False),
+ ({"enable": False, "secret": "abc", "recovery_code_hash": "hash"}, False),
+ ({"enable": True, "secret": "", "recovery_code_hash": "hash"}, False),
+ ({"enable": True, "secret": "abc", "recovery_code_hash": ""}, False),
+ ({"enable": True, "secret": "abc", "recovery_code_hash": "hash"}, True),
+ ],
+)
+def test_is_totp_enabled_requires_enable_secret_and_recovery_hash(
+ totp_config: dict,
+ expected: bool,
+):
+ config = {"dashboard": {"totp": totp_config}}
+ assert is_totp_enabled(config) is expected
+
+
+@pytest.mark.asyncio
+async def test_consume_totp_code_prevents_replay_same_timecode():
+ secret = pyotp.random_base32()
+ code = pyotp.TOTP(secret).now()
+ assert await consume_totp_code(secret, code) is True
+ assert await consume_totp_code(secret, code) is False
+
+
+def test_generate_and_verify_recovery_code_roundtrip():
+ recovery_code, recovery_code_hash = generate_recovery_code()
+ config = {"dashboard": {"totp": {"recovery_code_hash": recovery_code_hash}}}
+ assert verify_recovery_code(config, recovery_code) is True
+
+
+def test_verify_recovery_code_rejects_malformed_or_wrong_length():
+ recovery_code, recovery_code_hash = generate_recovery_code()
+ config = {"dashboard": {"totp": {"recovery_code_hash": recovery_code_hash}}}
+ assert verify_recovery_code(config, "abc") is False
+ assert verify_recovery_code(config, recovery_code[:-1]) is False
+
+
+@pytest.mark.asyncio
+async def test_issue_and_validate_trusted_device_token(tmp_path):
+ db = SQLiteDatabase(str(tmp_path / "trusted-device.db"))
+ config = {
+ "dashboard": {
+ "jwt_secret": "test-jwt-secret",
+ "totp": {
+ "enable": True,
+ "secret": pyotp.random_base32(),
+ "recovery_code_hash": "hash",
+ },
+ }
+ }
+ try:
+ token = await issue_totp_trusted_device(config, db)
+ assert isinstance(token, str) and token
+ assert await is_totp_trusted_device_valid(config, db, token) is True
+ finally:
+ await db.engine.dispose()
+
+
+@pytest.mark.asyncio
+async def test_trusted_device_invalid_after_totp_secret_change(tmp_path):
+ db = SQLiteDatabase(str(tmp_path / "trusted-device.db"))
+ old_secret = pyotp.random_base32()
+ new_secret = pyotp.random_base32()
+ config = {
+ "dashboard": {
+ "jwt_secret": "test-jwt-secret",
+ "totp": {
+ "enable": True,
+ "secret": old_secret,
+ "recovery_code_hash": "hash",
+ },
+ }
+ }
+ try:
+ token = await issue_totp_trusted_device(config, db)
+ assert isinstance(token, str) and token
+ assert await is_totp_trusted_device_valid(config, db, token) is True
+
+ config["dashboard"]["totp"]["secret"] = new_secret
+ assert await is_totp_trusted_device_valid(config, db, token) is False
+ finally:
+ await db.engine.dispose()