Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions froide/bounce/apps.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
from typing import Any, Dict, Optional, Union

from django.apps import AppConfig
from django.utils.translation import gettext_lazy as _
Expand All @@ -18,7 +19,7 @@ def ready(self):
mail_middleware_registry.register(UnsubscribeReferenceMailMiddleware())


def cancel_user(sender, user=None, **kwargs):
def cancel_user(sender, user=None, **kwargs) -> None:
from .models import Bounce

if user is None:
Expand Down Expand Up @@ -49,13 +50,18 @@ def export_user_data(user):
)


EmailKwargs = Dict[str, Optional[Union[str, bool, Dict[str, str]]]]


class UnsubscribeReferenceMailMiddleware:
"""
Moves unsubscribe_reference from mail render context
to email sending kwargs
"""

def enhance_email_kwargs(self, mail_intent, context, email_kwargs):
def enhance_email_kwargs(
self, mail_intent: str, context: Dict[str, Any], email_kwargs: EmailKwargs
) -> Optional[Dict[str, str]]:
unsubscribe_reference = context.get("unsubscribe_reference")
if unsubscribe_reference is None:
return
Expand Down
9 changes: 7 additions & 2 deletions froide/bounce/models.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
from typing import Dict, Union

from django.db import models
from django.utils import timezone
from django.utils.translation import gettext_lazy as _

from froide.account.models import User
from froide.helper.email_utils import BounceResult, DsnStatus

BounceInfo = Dict[str, Union[DsnStatus, bool, str]]


def convert_bounce_info(bounce_info):
def convert_bounce_info(bounce_info: BounceResult) -> BounceInfo:
d = dict(bounce_info._asdict())
d["timestamp"] = d["timestamp"].isoformat()
return d


class BounceManager(models.Manager):
def update_bounce(self, email, bounce_info):
def update_bounce(self, email: str, bounce_info: BounceResult):
email_lower = email.lower()
try:
bounce = Bounce.objects.get(email=email_lower)
Expand Down
12 changes: 6 additions & 6 deletions froide/bounce/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@
TEST_DATA_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "testdata"))


def p(path):
def p(path: str) -> str:
return os.path.join(TEST_DATA_ROOT, path)


@unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests")
class BounceTest(TestCase):
def setUp(self):
def setUp(self) -> None:
self.email = "nonexistant@example.org"

def test_bounce_address(self):
def test_bounce_address(self) -> None:
email = "Upper_Case@example.org"
bounce_address = make_bounce_address(email)
self.assertEqual(bounce_address, bounce_address.lower())
recovered_email, status = get_recipient_address_from_bounce(bounce_address)
self.assertEqual(recovered_email, email.lower())
self.assertTrue(status)

def test_bounce_parsing(self):
def test_bounce_parsing(self) -> None:
with open(p("bounce_001.txt"), "rb") as f:
email = parse_email(f)

Expand All @@ -52,7 +52,7 @@ def test_bounce_parsing(self):
self.assertIsNone(bounce.user)
self.assertEqual(len(bounce.bounces), 1)

def test_bounce_parsing_2(self):
def test_bounce_parsing_2(self) -> None:
with open(p("bounce_002.txt"), "rb") as f:
email = parse_email(f)

Expand All @@ -63,7 +63,7 @@ def test_bounce_parsing_2(self):
self.assertEqual(bounce_info.bounce_type, "hard")
self.assertEqual(bounce_info.status, (5, 1, 1))

def test_bounce_handling(self):
def test_bounce_handling(self) -> None:
def days_ago(days):
return (datetime.now() - timedelta(days=days)).isoformat()

Expand Down
64 changes: 38 additions & 26 deletions froide/bounce/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

"""
import base64
import datetime
import time
from contextlib import closing
from datetime import datetime, timedelta
from email.utils import parseaddr
from io import BytesIO
from typing import Dict, List, Optional, Tuple, Union
from urllib.parse import quote

from django.conf import settings
Expand All @@ -20,9 +21,10 @@
from django.utils import timezone
from django.utils.crypto import salted_hmac

from froide.helper.email_parsing import parse_email, parse_header_field
from froide.helper.email_parsing import ParsedEmail, parse_email, parse_header_field
from froide.helper.email_utils import (
BounceResult,
DsnStatus,
classify_bounce_status,
find_status_from_diagnostic,
get_mail_client,
Expand All @@ -42,25 +44,25 @@

MAX_BOUNCE_COUNT = 20
HARD_BOUNCE_COUNT = 3
HARD_BOUNCE_PERIOD = datetime.timedelta(seconds=3 * 7 * 24 * 60 * 60) # 3 weeks
HARD_BOUNCE_PERIOD = timedelta(seconds=3 * 7 * 24 * 60 * 60) # 3 weeks

SOFT_BOUNCE_COUNT = 5
SOFT_BOUNCE_PERIOD = datetime.timedelta(seconds=5 * 7 * 24 * 60 * 60) # 5 weeks
SOFT_BOUNCE_PERIOD = timedelta(seconds=5 * 7 * 24 * 60 * 60) # 5 weeks


def b32_encode(s):
def b32_encode(s: bytes) -> bytes:
return base64.b32encode(s).strip(b"=")


def base32_hmac(salt, value, key):
def base32_hmac(salt: str, value: str, key: str) -> str:
return b32_encode(salted_hmac(salt, value, key).digest()).decode()


def int_to_bytes(x):
def int_to_bytes(x: int) -> bytes:
return x.to_bytes((x.bit_length() + 7) // 8, "big")


def bytes_to_int(xbytes):
def bytes_to_int(xbytes: bytes) -> int:
return int.from_bytes(xbytes, "big")


Expand All @@ -69,13 +71,13 @@ class CustomTimestampSigner(TimestampSigner):
Signs in base32 so that only lower case characters are used.
"""

def signature(self, value):
def signature(self, value: str) -> str:
return base32_hmac(self.salt + "signer", value, self.key)

def timestamp(self):
def timestamp(self) -> str:
return base64.b32encode(int_to_bytes(int(time.time()))).decode("ascii")

def unsign(self, value, max_age=None):
def unsign(self, value: str, max_age: Optional[int] = None) -> str:
"""
Retrieve original value and check it wasn't signed more
than max_age seconds ago.
Expand All @@ -84,7 +86,7 @@ def unsign(self, value, max_age=None):
value, timestamp = result.rsplit(self.sep, 1)
timestamp = bytes_to_int(base64.b32decode(timestamp))
if max_age is not None:
if isinstance(max_age, datetime.timedelta):
if isinstance(max_age, timedelta):
max_age = max_age.total_seconds()
# Check timestamp is not older than max_age
age = time.time() - timestamp
Expand All @@ -93,12 +95,12 @@ def unsign(self, value, max_age=None):
return value


def make_bounce_address(email):
def make_bounce_address(email: str) -> str:
_, email = parseaddr(email)
return make_signed_address(email)


def make_unsubscribe_header(email, reference):
def make_unsubscribe_header(email: str, reference: str) -> str:
_, email = parseaddr(email)
unsub_email = make_unsubscribe_address(email)
return "<mailto:{email}?subject={subject}>".format(
Expand All @@ -109,11 +111,11 @@ def make_unsubscribe_header(email, reference):
)


def make_unsubscribe_address(email):
def make_unsubscribe_address(email: str) -> str:
return make_signed_address(email, email_format=UNSUBSCRIBE_FORMAT)


def make_signed_address(email, email_format=BOUNCE_FORMAT):
def make_signed_address(email: str, email_format: str = BOUNCE_FORMAT) -> str:
signer = CustomTimestampSigner(sep=SIGN_SEP)
email = email.lower()
# Sanitize address to convert unicode domains to punycode
Expand All @@ -139,7 +141,7 @@ def get_signing_methods(email, signature):
yield TimestampSigner, original


def get_recipient_address_from_bounce(bounce_email):
def get_recipient_address_from_bounce(bounce_email: str) -> Tuple[str, bool]:
return get_original_email_from_signed(bounce_email)


Expand All @@ -150,8 +152,8 @@ def get_recipient_address_from_unsubscribe(unsub_email):


def get_original_email_from_signed(
signed_email, email_format=BOUNCE_FORMAT, max_age=MAX_BOUNCE_AGE
):
signed_email: str, email_format: str = BOUNCE_FORMAT, max_age: int = MAX_BOUNCE_AGE
) -> Tuple[str, bool]:
head, tail = email_format.split("{token}")
# Cut off head and tail of bounce formatting
token = signed_email[len(head) : -len(tail)]
Expand Down Expand Up @@ -229,7 +231,7 @@ def process_bounce_mail(mail_bytes):
mail_managers("No bounce detected in bounce mailbox", email.subject)


def add_bounce_mail(email):
def add_bounce_mail(email: ParsedEmail) -> None:
recipient_list = set(
[get_recipient_address_from_bounce(addr) for name, addr in email.to]
)
Expand All @@ -243,7 +245,7 @@ def add_bounce_mail(email):
)


def update_bounce(email, recipient):
def update_bounce(email: ParsedEmail, recipient: str) -> None:
bounce = Bounce.objects.update_bounce(recipient, email.bounce_info)
should_deactivate = check_deactivation_condition(bounce)

Expand All @@ -256,9 +258,14 @@ def update_bounce(email, recipient):
)


def get_bounce_stats(bounces, bounce_type="hard", start_date=None):
Bounces = List[Dict[str, Union[DsnStatus, bool, str]]]


def get_bounce_stats(
bounces: Bounces, bounce_type: str = "hard", start_date: Optional[datetime] = None
) -> int:
filtered_bounces = [
datetime.datetime.strptime(b["timestamp"][:19], "%Y-%m-%dT%H:%M:%S")
datetime.strptime(b["timestamp"][:19], "%Y-%m-%dT%H:%M:%S")
for b in bounces
if b["bounce_type"] == bounce_type
]
Expand All @@ -268,15 +275,20 @@ def get_bounce_stats(bounces, bounce_type="hard", start_date=None):
return len(filtered_bounces)


def check_bounce_status(bounces, bounce_type, period, threshold):
start_date = datetime.datetime.now() - period
def check_bounce_status(
bounces: List[Dict[str, Union[DsnStatus, bool, str]]],
bounce_type: str,
period: timedelta,
threshold: int,
) -> bool:
start_date = datetime.now() - period
count = get_bounce_stats(bounces, bounce_type=bounce_type, start_date=start_date)
if count >= MAX_BOUNCE_COUNT:
return True
return count >= threshold


def check_deactivation_condition(bounce):
def check_deactivation_condition(bounce: Bounce) -> bool:
"""
Decide if current bounce state warrants deactivation
"""
Expand Down