Implement newsletter double opt-in email flow and CSP nonce headers
This commit is contained in:
@@ -1,5 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import secrets
|
||||
|
||||
from .consent import ConsentService
|
||||
|
||||
|
||||
@@ -10,3 +12,26 @@ class ConsentMiddleware:
|
||||
def __call__(self, request):
|
||||
request.consent = ConsentService.get_consent(request)
|
||||
return self.get_response(request)
|
||||
|
||||
|
||||
class SecurityHeadersMiddleware:
|
||||
def __init__(self, get_response):
|
||||
self.get_response = get_response
|
||||
|
||||
def __call__(self, request):
|
||||
nonce = secrets.token_urlsafe(16)
|
||||
request.csp_nonce = nonce
|
||||
response = self.get_response(request)
|
||||
response["Content-Security-Policy"] = (
|
||||
f"default-src 'self'; "
|
||||
f"script-src 'self' 'nonce-{nonce}'; "
|
||||
"style-src 'self'; "
|
||||
"img-src 'self' data: blob:; "
|
||||
"font-src 'self'; "
|
||||
"connect-src 'self'; "
|
||||
"object-src 'none'; "
|
||||
"base-uri 'self'; "
|
||||
"frame-ancestors 'self'"
|
||||
)
|
||||
response["Permissions-Policy"] = "camera=(), microphone=(), geolocation=()"
|
||||
return response
|
||||
|
||||
24
apps/core/tests/test_security.py
Normal file
24
apps/core/tests/test_security.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_security_headers_present(client, home_page):
|
||||
resp = client.get("/")
|
||||
assert resp.status_code == 200
|
||||
assert "Content-Security-Policy" in resp
|
||||
assert "Permissions-Policy" in resp
|
||||
assert "unsafe-inline" not in resp["Content-Security-Policy"]
|
||||
assert "script-src" in resp["Content-Security-Policy"]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_csp_nonce_applied_to_inline_script(client, home_page):
|
||||
resp = client.get("/")
|
||||
csp = resp["Content-Security-Policy"]
|
||||
match = re.search(r"nonce-([^' ;]+)", csp)
|
||||
assert match
|
||||
nonce = match.group(1)
|
||||
html = resp.content.decode()
|
||||
assert f'nonce="{nonce}"' in html
|
||||
@@ -1,6 +1,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
import requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -15,9 +18,26 @@ class ProviderSyncService:
|
||||
|
||||
|
||||
class ButtondownSyncService(ProviderSyncService):
|
||||
endpoint = "https://api.buttondown.email/v1/subscribers"
|
||||
|
||||
def sync(self, subscription):
|
||||
logger.info("Synced subscription %s", subscription.email)
|
||||
api_key = os.getenv("BUTTONDOWN_API_KEY", "")
|
||||
if not api_key:
|
||||
raise ProviderSyncError("BUTTONDOWN_API_KEY is not configured")
|
||||
|
||||
response = requests.post(
|
||||
self.endpoint,
|
||||
headers={"Authorization": f"Token {api_key}", "Content-Type": "application/json"},
|
||||
json={"email": subscription.email},
|
||||
timeout=10,
|
||||
)
|
||||
if response.status_code >= 400:
|
||||
raise ProviderSyncError(f"Buttondown sync failed: {response.status_code}")
|
||||
logger.info("Synced subscription %s to Buttondown", subscription.email)
|
||||
|
||||
|
||||
def get_provider_service() -> ProviderSyncService:
|
||||
provider = os.getenv("NEWSLETTER_PROVIDER", "buttondown").lower().strip()
|
||||
if provider != "buttondown":
|
||||
raise ProviderSyncError(f"Unsupported newsletter provider: {provider}")
|
||||
return ButtondownSyncService()
|
||||
|
||||
@@ -12,6 +12,24 @@ def test_subscribe_ok(client):
|
||||
assert NewsletterSubscription.objects.filter(email="a@example.com").exists()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_subscribe_sends_confirmation_email(client, mailoutbox):
|
||||
resp = client.post("/newsletter/subscribe/", {"email": "new@example.com", "source": "nav"})
|
||||
assert resp.status_code == 200
|
||||
assert len(mailoutbox) == 1
|
||||
assert "Confirm your No Hype AI newsletter subscription" in mailoutbox[0].subject
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_duplicate_subscribe_returns_ok_without_extra_email(client, mailoutbox):
|
||||
client.post("/newsletter/subscribe/", {"email": "dupe@example.com", "source": "nav"})
|
||||
assert len(mailoutbox) == 1
|
||||
resp = client.post("/newsletter/subscribe/", {"email": "dupe@example.com", "source": "footer"})
|
||||
assert resp.status_code == 200
|
||||
assert resp.json()["status"] == "ok"
|
||||
assert len(mailoutbox) == 1
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_subscribe_invalid(client):
|
||||
resp = client.post("/newsletter/subscribe/", {"email": "bad"})
|
||||
|
||||
@@ -1,8 +1,13 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from django.core import signing
|
||||
from django.core.mail import EmailMultiAlternatives
|
||||
from django.http import Http404, JsonResponse
|
||||
from django.shortcuts import get_object_or_404, redirect
|
||||
from django.template.loader import render_to_string
|
||||
from django.urls import reverse
|
||||
from django.views import View
|
||||
|
||||
from apps.newsletter.forms import SubscriptionForm
|
||||
@@ -10,6 +15,27 @@ from apps.newsletter.models import NewsletterSubscription
|
||||
from apps.newsletter.services import ProviderSyncError, get_provider_service
|
||||
|
||||
CONFIRMATION_TOKEN_MAX_AGE_SECONDS = 60 * 60 * 24 * 2
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def confirmation_token(email: str) -> str:
|
||||
return signing.dumps(email, salt="newsletter-confirm")
|
||||
|
||||
|
||||
def send_confirmation_email(request, subscription: NewsletterSubscription) -> None:
|
||||
token = confirmation_token(subscription.email)
|
||||
confirm_url = request.build_absolute_uri(reverse("newsletter_confirm", args=[token]))
|
||||
context = {"confirmation_url": confirm_url, "subscription": subscription}
|
||||
subject = render_to_string("newsletter/email/confirmation_subject.txt", context).strip()
|
||||
text_body = render_to_string("newsletter/email/confirmation_body.txt", context)
|
||||
html_body = render_to_string("newsletter/email/confirmation_body.html", context)
|
||||
message = EmailMultiAlternatives(
|
||||
subject=subject,
|
||||
body=text_body,
|
||||
to=[subscription.email],
|
||||
)
|
||||
message.attach_alternative(html_body, "text/html")
|
||||
message.send()
|
||||
|
||||
|
||||
class SubscribeView(View):
|
||||
@@ -20,9 +46,14 @@ class SubscribeView(View):
|
||||
if form.cleaned_data.get("honeypot"):
|
||||
return JsonResponse({"status": "ok"})
|
||||
|
||||
email = form.cleaned_data["email"]
|
||||
email = form.cleaned_data["email"].lower().strip()
|
||||
source = form.cleaned_data.get("source") or "unknown"
|
||||
NewsletterSubscription.objects.get_or_create(email=email, defaults={"source": source})
|
||||
subscription, created = NewsletterSubscription.objects.get_or_create(
|
||||
email=email,
|
||||
defaults={"source": source},
|
||||
)
|
||||
if created and not subscription.confirmed:
|
||||
send_confirmation_email(request, subscription)
|
||||
return JsonResponse({"status": "ok"})
|
||||
|
||||
|
||||
@@ -42,10 +73,6 @@ class ConfirmView(View):
|
||||
service = get_provider_service()
|
||||
try:
|
||||
service.sync(subscription)
|
||||
except ProviderSyncError:
|
||||
pass
|
||||
except ProviderSyncError as exc:
|
||||
logger.exception("Newsletter provider sync failed: %s", exc)
|
||||
return redirect("/")
|
||||
|
||||
|
||||
def confirmation_token(email: str) -> str:
|
||||
return signing.dumps(email, salt="newsletter-confirm")
|
||||
|
||||
Reference in New Issue
Block a user