from __future__ import annotations
import logging
import requests as http_requests
from django.conf import settings
from django.contrib import messages
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.db import IntegrityError
from django.db.models import Count, Prefetch
from django.http import HttpResponse, JsonResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.template.loader import render_to_string
from django.utils.cache import patch_vary_headers
from django.views import View
from django.views.decorators.http import require_GET, require_POST
from apps.blog.models import ArticlePage
from apps.comments.forms import CommentForm
from apps.comments.models import Comment, CommentReaction
logger = logging.getLogger(__name__)
def client_ip_from_request(request) -> str:
remote_addr = request.META.get("REMOTE_ADDR", "").strip()
trusted_proxies = getattr(settings, "TRUSTED_PROXY_IPS", [])
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR", "")
if remote_addr in trusted_proxies and x_forwarded_for:
return x_forwarded_for.split(",")[0].strip()
return remote_addr
def _is_htmx(request) -> bool:
return request.headers.get("HX-Request") == "true"
def _add_vary_header(response):
patch_vary_headers(response, ["HX-Request"])
return response
def _verify_turnstile(token: str, ip: str) -> bool:
secret = getattr(settings, "TURNSTILE_SECRET_KEY", "")
if not secret:
return False
try:
resp = http_requests.post(
"https://challenges.cloudflare.com/turnstile/v0/siteverify",
data={"secret": secret, "response": token, "remoteip": ip},
timeout=5,
)
result = resp.json()
if not result.get("success"):
return False
expected_hostname = getattr(settings, "TURNSTILE_EXPECTED_HOSTNAME", "")
if expected_hostname and result.get("hostname") != expected_hostname:
logger.warning("Turnstile hostname mismatch: %s", result.get("hostname"))
return False
return True
except Exception:
logger.exception("Turnstile verification failed")
return False
def _turnstile_enabled() -> bool:
return bool(getattr(settings, "TURNSTILE_SECRET_KEY", ""))
def _get_session_key(request) -> str:
session = getattr(request, "session", None)
return (session.session_key or "") if session else ""
def _turnstile_site_key():
return getattr(settings, "TURNSTILE_SITE_KEY", "")
def _annotate_reaction_counts(comments, session_key=""):
"""Hydrate each comment with reaction_counts dict and user_reacted set."""
comment_ids = [c.id for c in comments]
if not comment_ids:
return comments
counts_qs = (
CommentReaction.objects.filter(comment_id__in=comment_ids)
.values("comment_id", "reaction_type")
.annotate(count=Count("id"))
)
counts_map = {}
for row in counts_qs:
counts_map.setdefault(row["comment_id"], {"heart": 0, "plus_one": 0})
counts_map[row["comment_id"]][row["reaction_type"]] = row["count"]
user_map = {}
if session_key:
user_qs = CommentReaction.objects.filter(
comment_id__in=comment_ids, session_key=session_key
).values_list("comment_id", "reaction_type")
for cid, rtype in user_qs:
user_map.setdefault(cid, set()).add(rtype)
for comment in comments:
comment.reaction_counts = counts_map.get(comment.id, {"heart": 0, "plus_one": 0})
comment.user_reacted = user_map.get(comment.id, set())
return comments
def _comment_template_context(comment, article, request):
"""Build template context for a single comment partial."""
_annotate_reaction_counts([comment], _get_session_key(request))
return {
"comment": comment,
"page": article,
"turnstile_site_key": _turnstile_site_key(),
}
class CommentCreateView(View):
def _render_htmx_error(self, request, article, form):
"""Return error form partial for HTMX — swaps the form container itself."""
parent_id = request.POST.get("parent_id")
if parent_id:
parent = Comment.objects.filter(pk=parent_id, article=article).first()
ctx = {
"comment": parent, "page": article,
"turnstile_site_key": _turnstile_site_key(),
"reply_form_errors": form.errors,
}
return _add_vary_header(render(request, "comments/_reply_form.html", ctx))
ctx = {
"comment_form": form, "page": article,
"turnstile_site_key": _turnstile_site_key(),
}
return _add_vary_header(render(request, "comments/_comment_form.html", ctx))
def _render_htmx_success(self, request, article, comment):
"""Return fresh form + OOB-appended comment (if approved)."""
tsk = _turnstile_site_key()
oob_html = ""
if comment.is_approved:
ctx = _comment_template_context(comment, article, request)
if comment.parent_id:
comment_html = render_to_string("comments/_reply.html", ctx, request)
oob_html = (
f'
{comment_html}
'
)
else:
comment_html = render_to_string("comments/_comment.html", ctx, request)
oob_html = (
f''
f"{comment_html}
"
)
if comment.parent_id:
parent = Comment.objects.filter(pk=comment.parent_id, article=article).first()
msg = "Reply posted!" if comment.is_approved else "Your reply is awaiting moderation."
form_html = render_to_string("comments/_reply_form.html", {
"comment": parent, "page": article,
"turnstile_site_key": tsk, "reply_success_message": msg,
}, request)
else:
msg = (
"Comment posted!" if comment.is_approved
else "Your comment has been posted and is awaiting moderation."
)
form_html = render_to_string("comments/_comment_form.html", {
"page": article, "turnstile_site_key": tsk, "success_message": msg,
}, request)
resp = HttpResponse(form_html + oob_html)
return _add_vary_header(resp)
def post(self, request):
ip = client_ip_from_request(request)
key = f"comment-rate:{ip}"
count = cache.get(key, 0)
rate_limit = getattr(settings, "COMMENT_RATE_LIMIT_PER_MINUTE", 3)
if count >= rate_limit:
return HttpResponse(status=429)
cache.set(key, count + 1, timeout=60)
form = CommentForm(request.POST)
article = get_object_or_404(ArticlePage, pk=request.POST.get("article_id"))
if not article.comments_enabled:
return HttpResponse(status=404)
if form.is_valid():
if form.cleaned_data.get("honeypot"):
if _is_htmx(request):
return _add_vary_header(
render(request, "comments/_comment_success.html", {"message": "Comment posted!"})
)
return redirect(f"{article.url}?commented=1")
# Turnstile verification
turnstile_ok = False
if _turnstile_enabled():
token = request.POST.get("cf-turnstile-response", "")
turnstile_ok = _verify_turnstile(token, ip)
comment = form.save(commit=False)
comment.article = article
comment.is_approved = turnstile_ok
parent_id = form.cleaned_data.get("parent_id")
if parent_id:
comment.parent = Comment.objects.filter(pk=parent_id, article=article).first()
comment.ip_address = ip or None
try:
comment.full_clean()
except ValidationError:
form.add_error(None, "Reply depth exceeds the allowed limit")
if _is_htmx(request):
return self._render_htmx_error(request, article, form)
context = article.get_context(request)
context.update({"page": article, "comment_form": form})
return render(request, "blog/article_page.html", context, status=200)
comment.save()
if _is_htmx(request):
return self._render_htmx_success(request, article, comment)
messages.success(
request,
"Comment posted!" if comment.is_approved else "Your comment is awaiting moderation",
)
return redirect(f"{article.url}?commented=1")
if _is_htmx(request):
return self._render_htmx_error(request, article, form)
context = article.get_context(request)
context.update({"page": article, "comment_form": form})
return render(request, "blog/article_page.html", context, status=200)
@require_GET
def comment_poll(request, article_id):
"""Return comments newer than after_id for HTMX polling."""
article = get_object_or_404(ArticlePage, pk=article_id)
after_id = request.GET.get("after_id", "0")
try:
after_id = int(after_id)
except (ValueError, TypeError):
after_id = 0
approved_replies = Comment.objects.filter(is_approved=True).select_related("parent")
comments = list(
article.comments.filter(is_approved=True, parent__isnull=True, id__gt=after_id)
.prefetch_related(Prefetch("replies", queryset=approved_replies))
.order_by("created_at", "id")
)
_annotate_reaction_counts(comments, _get_session_key(request))
resp = render(request, "comments/_comment_list_inner.html", {
"approved_comments": comments,
"page": article,
"turnstile_site_key": _turnstile_site_key(),
})
return _add_vary_header(resp)
@require_POST
def comment_react(request, comment_id):
"""Toggle a reaction on a comment."""
ip = client_ip_from_request(request)
key = f"reaction-rate:{ip}"
count = cache.get(key, 0)
rate_limit = getattr(settings, "REACTION_RATE_LIMIT_PER_MINUTE", 20)
if count >= rate_limit:
return HttpResponse(status=429)
cache.set(key, count + 1, timeout=60)
comment = get_object_or_404(Comment, pk=comment_id, is_approved=True)
reaction_type = request.POST.get("reaction_type", "heart")
if reaction_type not in ("heart", "plus_one"):
return HttpResponse(status=400)
if not request.session.session_key:
request.session.create()
session_key = request.session.session_key
try:
existing = CommentReaction.objects.filter(
comment=comment, reaction_type=reaction_type, session_key=session_key
).first()
if existing:
existing.delete()
else:
CommentReaction.objects.create(
comment=comment, reaction_type=reaction_type, session_key=session_key
)
except IntegrityError:
pass
counts = {}
for rt in ("heart", "plus_one"):
counts[rt] = comment.reactions.filter(reaction_type=rt).count()
user_reacted = set(
comment.reactions.filter(session_key=session_key).values_list("reaction_type", flat=True)
)
if _is_htmx(request):
resp = render(request, "comments/_reactions.html", {
"comment": comment, "counts": counts, "user_reacted": user_reacted,
})
return _add_vary_header(resp)
return JsonResponse({"counts": counts, "user_reacted": list(user_reacted)})