Remove unused imports (urlencode, F) and fix import sort order in test_v2.py. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
217 lines
8.0 KiB
Python
217 lines
8.0 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
import requests as http_requests
|
|
from django.conf import settings
|
|
from django.contrib import messages
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import ValidationError
|
|
from django.db import IntegrityError
|
|
from django.db.models import Prefetch
|
|
from django.http import HttpResponse, JsonResponse
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.views import View
|
|
from django.views.decorators.http import require_GET, require_POST
|
|
|
|
from apps.blog.models import ArticlePage
|
|
from apps.comments.forms import CommentForm
|
|
from apps.comments.models import Comment, CommentReaction
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def client_ip_from_request(request) -> str:
|
|
remote_addr = request.META.get("REMOTE_ADDR", "").strip()
|
|
trusted_proxies = getattr(settings, "TRUSTED_PROXY_IPS", [])
|
|
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR", "")
|
|
if remote_addr in trusted_proxies and x_forwarded_for:
|
|
return x_forwarded_for.split(",")[0].strip()
|
|
return remote_addr
|
|
|
|
|
|
def _is_htmx(request) -> bool:
|
|
return request.headers.get("HX-Request") == "true"
|
|
|
|
|
|
def _add_vary_header(response):
|
|
response["Vary"] = "HX-Request"
|
|
return response
|
|
|
|
|
|
def _verify_turnstile(token: str, ip: str) -> bool:
|
|
secret = getattr(settings, "TURNSTILE_SECRET_KEY", "")
|
|
if not secret:
|
|
return False
|
|
try:
|
|
resp = http_requests.post(
|
|
"https://challenges.cloudflare.com/turnstile/v0/siteverify",
|
|
data={"secret": secret, "response": token, "remoteip": ip},
|
|
timeout=5,
|
|
)
|
|
result = resp.json()
|
|
if not result.get("success"):
|
|
return False
|
|
expected_hostname = getattr(settings, "TURNSTILE_EXPECTED_HOSTNAME", "")
|
|
if expected_hostname and result.get("hostname") != expected_hostname:
|
|
logger.warning("Turnstile hostname mismatch: %s", result.get("hostname"))
|
|
return False
|
|
return True
|
|
except Exception:
|
|
logger.exception("Turnstile verification failed")
|
|
return False
|
|
|
|
|
|
def _turnstile_enabled() -> bool:
|
|
return bool(getattr(settings, "TURNSTILE_SECRET_KEY", ""))
|
|
|
|
|
|
class CommentCreateView(View):
|
|
def _render_article_with_errors(self, request, article, form):
|
|
if _is_htmx(request):
|
|
ctx = {"comment_form": form, "page": article}
|
|
ctx["turnstile_site_key"] = getattr(settings, "TURNSTILE_SITE_KEY", "")
|
|
resp = render(request, "comments/_comment_form.html", ctx, status=422)
|
|
return _add_vary_header(resp)
|
|
context = article.get_context(request)
|
|
context["page"] = article
|
|
context["comment_form"] = form
|
|
context["turnstile_site_key"] = getattr(settings, "TURNSTILE_SITE_KEY", "")
|
|
return render(request, "blog/article_page.html", context, status=200)
|
|
|
|
def post(self, request):
|
|
ip = client_ip_from_request(request)
|
|
key = f"comment-rate:{ip}"
|
|
count = cache.get(key, 0)
|
|
rate_limit = getattr(settings, "COMMENT_RATE_LIMIT_PER_MINUTE", 3)
|
|
if count >= rate_limit:
|
|
return HttpResponse(status=429)
|
|
cache.set(key, count + 1, timeout=60)
|
|
|
|
form = CommentForm(request.POST)
|
|
article = get_object_or_404(ArticlePage, pk=request.POST.get("article_id"))
|
|
if not article.comments_enabled:
|
|
return HttpResponse(status=404)
|
|
|
|
if form.is_valid():
|
|
if form.cleaned_data.get("honeypot"):
|
|
if _is_htmx(request):
|
|
return _add_vary_header(
|
|
render(request, "comments/_comment_success.html", {"message": "Comment posted!"})
|
|
)
|
|
return redirect(f"{article.url}?commented=1")
|
|
|
|
# Turnstile verification
|
|
turnstile_ok = False
|
|
if _turnstile_enabled():
|
|
token = request.POST.get("cf-turnstile-response", "")
|
|
turnstile_ok = _verify_turnstile(token, ip)
|
|
|
|
comment = form.save(commit=False)
|
|
comment.article = article
|
|
comment.is_approved = turnstile_ok
|
|
parent_id = form.cleaned_data.get("parent_id")
|
|
if parent_id:
|
|
comment.parent = Comment.objects.filter(pk=parent_id, article=article).first()
|
|
comment.ip_address = ip or None
|
|
try:
|
|
comment.full_clean()
|
|
except ValidationError:
|
|
form.add_error(None, "Reply depth exceeds the allowed limit")
|
|
return self._render_article_with_errors(request, article, form)
|
|
comment.save()
|
|
|
|
if _is_htmx(request):
|
|
if comment.is_approved:
|
|
resp = render(request, "comments/_comment.html", {
|
|
"comment": comment, "page": article,
|
|
"turnstile_site_key": getattr(settings, "TURNSTILE_SITE_KEY", ""),
|
|
})
|
|
else:
|
|
resp = render(request, "comments/_comment_success.html", {
|
|
"message": "Your comment has been posted and is awaiting moderation.",
|
|
})
|
|
return _add_vary_header(resp)
|
|
|
|
messages.success(
|
|
request,
|
|
"Comment posted!" if comment.is_approved else "Your comment is awaiting moderation",
|
|
)
|
|
return redirect(f"{article.url}?commented=1")
|
|
|
|
return self._render_article_with_errors(request, article, form)
|
|
|
|
|
|
@require_GET
|
|
def comment_poll(request, article_id):
|
|
"""Return comments newer than after_id for HTMX polling."""
|
|
article = get_object_or_404(ArticlePage, pk=article_id)
|
|
after_id = request.GET.get("after_id", "0")
|
|
try:
|
|
after_id = int(after_id)
|
|
except (ValueError, TypeError):
|
|
after_id = 0
|
|
|
|
approved_replies = Comment.objects.filter(is_approved=True).select_related("parent")
|
|
comments = (
|
|
article.comments.filter(is_approved=True, parent__isnull=True, id__gt=after_id)
|
|
.prefetch_related(Prefetch("replies", queryset=approved_replies))
|
|
.order_by("created_at", "id")
|
|
)
|
|
|
|
resp = render(request, "comments/_comment_list_inner.html", {
|
|
"approved_comments": comments,
|
|
"page": article,
|
|
"turnstile_site_key": getattr(settings, "TURNSTILE_SITE_KEY", ""),
|
|
})
|
|
return _add_vary_header(resp)
|
|
|
|
|
|
@require_POST
|
|
def comment_react(request, comment_id):
|
|
"""Toggle a reaction on a comment."""
|
|
ip = client_ip_from_request(request)
|
|
key = f"reaction-rate:{ip}"
|
|
count = cache.get(key, 0)
|
|
rate_limit = getattr(settings, "REACTION_RATE_LIMIT_PER_MINUTE", 20)
|
|
if count >= rate_limit:
|
|
return HttpResponse(status=429)
|
|
cache.set(key, count + 1, timeout=60)
|
|
|
|
comment = get_object_or_404(Comment, pk=comment_id, is_approved=True)
|
|
reaction_type = request.POST.get("reaction_type", "heart")
|
|
if reaction_type not in ("heart", "plus_one"):
|
|
return HttpResponse(status=400)
|
|
|
|
if not request.session.session_key:
|
|
request.session.create()
|
|
session_key = request.session.session_key
|
|
|
|
try:
|
|
existing = CommentReaction.objects.filter(
|
|
comment=comment, reaction_type=reaction_type, session_key=session_key
|
|
).first()
|
|
if existing:
|
|
existing.delete()
|
|
else:
|
|
CommentReaction.objects.create(
|
|
comment=comment, reaction_type=reaction_type, session_key=session_key
|
|
)
|
|
except IntegrityError:
|
|
pass
|
|
|
|
counts = {}
|
|
for rt in ("heart", "plus_one"):
|
|
counts[rt] = comment.reactions.filter(reaction_type=rt).count()
|
|
user_reacted = set(
|
|
comment.reactions.filter(session_key=session_key).values_list("reaction_type", flat=True)
|
|
)
|
|
|
|
if _is_htmx(request):
|
|
resp = render(request, "comments/_reactions.html", {
|
|
"comment": comment, "counts": counts, "user_reacted": user_reacted,
|
|
})
|
|
return _add_vary_header(resp)
|
|
|
|
return JsonResponse({"counts": counts, "user_reacted": list(user_reacted)})
|