from __future__ import annotations import logging import requests as http_requests from django.conf import settings from django.core.cache import cache from django.core.exceptions import ValidationError from django.db import IntegrityError from django.db.models import Count, Prefetch from django.http import HttpResponse, JsonResponse from django.shortcuts import get_object_or_404, redirect, render from django.template.loader import render_to_string from django.utils.cache import patch_vary_headers from django.views import View from django.views.decorators.http import require_GET, require_POST from apps.blog.models import ArticlePage from apps.comments.forms import CommentForm from apps.comments.models import Comment, CommentReaction logger = logging.getLogger(__name__) def client_ip_from_request(request) -> str: remote_addr = request.META.get("REMOTE_ADDR", "").strip() trusted_proxies = getattr(settings, "TRUSTED_PROXY_IPS", []) x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR", "") if remote_addr in trusted_proxies and x_forwarded_for: return x_forwarded_for.split(",")[0].strip() return remote_addr def _is_htmx(request) -> bool: return request.headers.get("HX-Request") == "true" def _add_vary_header(response): patch_vary_headers(response, ["HX-Request"]) return response def _comment_redirect(article: ArticlePage, *, approved: bool): state = "approved" if approved else "pending" return redirect(f"{article.url}?commented={state}") def _verify_turnstile(token: str, ip: str) -> bool: secret = getattr(settings, "TURNSTILE_SECRET_KEY", "") if not secret: return False try: resp = http_requests.post( "https://challenges.cloudflare.com/turnstile/v0/siteverify", data={"secret": secret, "response": token, "remoteip": ip}, timeout=5, ) result = resp.json() if not result.get("success"): return False expected_hostname = getattr(settings, "TURNSTILE_EXPECTED_HOSTNAME", "") if expected_hostname and result.get("hostname") != expected_hostname: logger.warning("Turnstile hostname mismatch: %s", result.get("hostname")) return False return True except Exception: logger.exception("Turnstile verification failed") return False def _turnstile_enabled() -> bool: return bool(getattr(settings, "TURNSTILE_SECRET_KEY", "")) def _get_session_key(request) -> str: session = getattr(request, "session", None) return (session.session_key or "") if session else "" def _turnstile_site_key(): return getattr(settings, "TURNSTILE_SITE_KEY", "") def _annotate_reaction_counts(comments, session_key=""): """Hydrate each comment with reaction_counts dict and user_reacted set.""" comment_ids = [c.id for c in comments] if not comment_ids: return comments counts_qs = ( CommentReaction.objects.filter(comment_id__in=comment_ids) .values("comment_id", "reaction_type") .annotate(count=Count("id")) ) counts_map = {} for row in counts_qs: counts_map.setdefault(row["comment_id"], {"heart": 0, "plus_one": 0}) counts_map[row["comment_id"]][row["reaction_type"]] = row["count"] user_map = {} if session_key: user_qs = CommentReaction.objects.filter( comment_id__in=comment_ids, session_key=session_key ).values_list("comment_id", "reaction_type") for cid, rtype in user_qs: user_map.setdefault(cid, set()).add(rtype) for comment in comments: comment.reaction_counts = counts_map.get(comment.id, {"heart": 0, "plus_one": 0}) comment.user_reacted = user_map.get(comment.id, set()) return comments def _comment_template_context(comment, article, request): """Build template context for a single comment partial.""" _annotate_reaction_counts([comment], _get_session_key(request)) return { "comment": comment, "page": article, "turnstile_site_key": _turnstile_site_key(), } class CommentCreateView(View): def _render_htmx_error(self, request, article, form): """Return error form partial for HTMX — swaps the form container itself.""" raw_parent_id = request.POST.get("parent_id") if raw_parent_id: try: parent_id = int(raw_parent_id) except (ValueError, TypeError): parent_id = None parent = Comment.objects.filter(pk=parent_id, article=article).first() if parent_id else None if parent: ctx = { "comment": parent, "page": article, "turnstile_site_key": _turnstile_site_key(), "reply_form_errors": form.errors, "reply_form": form, } return _add_vary_header(render(request, "comments/_reply_form.html", ctx)) ctx = { "comment_form": form, "page": article, "turnstile_site_key": _turnstile_site_key(), } return _add_vary_header(render(request, "comments/_comment_form.html", ctx)) def _render_htmx_success(self, request, article, comment): """Return fresh form + OOB-appended comment (if approved).""" tsk = _turnstile_site_key() oob_parts = [] if comment.is_approved: ctx = _comment_template_context(comment, article, request) if comment.parent_id: # _reply.html expects 'reply' context key reply_ctx = ctx.copy() reply_ctx["reply"] = reply_ctx.pop("comment") comment_html = render_to_string("comments/_reply.html", reply_ctx, request) oob_parts.append( f'
{comment_html}
' ) else: comment_html = render_to_string("comments/_comment.html", ctx, request) oob_parts.append(f'
{comment_html}
') # Ensure stale empty-state copy is removed when the first approved comment appears. oob_parts.append('
') if comment.parent_id: parent = Comment.objects.filter(pk=comment.parent_id, article=article).first() msg = "Reply posted!" if comment.is_approved else "Your reply is awaiting moderation." form_html = render_to_string("comments/_reply_form.html", { "comment": parent, "page": article, "turnstile_site_key": tsk, "reply_success_message": msg, }, request) else: msg = ( "Comment posted!" if comment.is_approved else "Your comment has been posted and is awaiting moderation." ) form_html = render_to_string("comments/_comment_form.html", { "page": article, "turnstile_site_key": tsk, "success_message": msg, }, request) resp = HttpResponse(form_html + "".join(oob_parts)) return _add_vary_header(resp) def post(self, request): ip = client_ip_from_request(request) key = f"comment-rate:{ip}" count = cache.get(key, 0) rate_limit = getattr(settings, "COMMENT_RATE_LIMIT_PER_MINUTE", 3) if count >= rate_limit: return HttpResponse(status=429) cache.set(key, count + 1, timeout=60) form = CommentForm(request.POST) article = get_object_or_404(ArticlePage, pk=request.POST.get("article_id")) if not article.comments_enabled: return HttpResponse(status=404) if form.is_valid(): if form.cleaned_data.get("honeypot"): if _is_htmx(request): return _add_vary_header( render(request, "comments/_comment_success.html", {"message": "Comment posted!"}) ) return _comment_redirect(article, approved=True) # Turnstile verification turnstile_ok = False if _turnstile_enabled(): token = request.POST.get("cf-turnstile-response", "") turnstile_ok = _verify_turnstile(token, ip) comment = form.save(commit=False) comment.article = article comment.is_approved = turnstile_ok parent_id = form.cleaned_data.get("parent_id") if parent_id: comment.parent = Comment.objects.filter(pk=parent_id, article=article).first() comment.ip_address = ip or None try: comment.full_clean() except ValidationError: form.add_error(None, "Reply depth exceeds the allowed limit") if _is_htmx(request): return self._render_htmx_error(request, article, form) context = article.get_context(request) context.update({"page": article, "comment_form": form}) return render(request, "blog/article_page.html", context, status=200) comment.save() if _is_htmx(request): return self._render_htmx_success(request, article, comment) return _comment_redirect(article, approved=comment.is_approved) if _is_htmx(request): return self._render_htmx_error(request, article, form) context = article.get_context(request) context.update({"page": article, "comment_form": form}) return render(request, "blog/article_page.html", context, status=200) @require_GET def comment_poll(request, article_id): """Return comments newer than after_id for HTMX polling.""" article = get_object_or_404(ArticlePage, pk=article_id) after_id = request.GET.get("after_id", "0") try: after_id = int(after_id) except (ValueError, TypeError): after_id = 0 approved_replies = Comment.objects.filter(is_approved=True).select_related("parent") comments = list( article.comments.filter(is_approved=True, parent__isnull=True, id__gt=after_id) .prefetch_related(Prefetch("replies", queryset=approved_replies)) .order_by("created_at", "id") ) _annotate_reaction_counts(comments, _get_session_key(request)) resp = render(request, "comments/_comment_list_inner.html", { "approved_comments": comments, "page": article, "turnstile_site_key": _turnstile_site_key(), }) return _add_vary_header(resp) @require_POST def comment_react(request, comment_id): """Toggle a reaction on a comment.""" ip = client_ip_from_request(request) key = f"reaction-rate:{ip}" count = cache.get(key, 0) rate_limit = getattr(settings, "REACTION_RATE_LIMIT_PER_MINUTE", 20) if count >= rate_limit: return HttpResponse(status=429) cache.set(key, count + 1, timeout=60) comment = get_object_or_404(Comment, pk=comment_id, is_approved=True) reaction_type = request.POST.get("reaction_type", "heart") if reaction_type not in ("heart", "plus_one"): return HttpResponse(status=400) if not request.session.session_key: request.session.create() session_key = request.session.session_key try: existing = CommentReaction.objects.filter( comment=comment, reaction_type=reaction_type, session_key=session_key ).first() if existing: existing.delete() else: CommentReaction.objects.create( comment=comment, reaction_type=reaction_type, session_key=session_key ) except IntegrityError: pass counts = {} for rt in ("heart", "plus_one"): counts[rt] = comment.reactions.filter(reaction_type=rt).count() user_reacted = set( comment.reactions.filter(session_key=session_key).values_list("reaction_type", flat=True) ) if _is_htmx(request): resp = render(request, "comments/_reactions.html", { "comment": comment, "counts": counts, "user_reacted": user_reacted, }) return _add_vary_header(resp) return JsonResponse({"counts": counts, "user_reacted": list(user_reacted)})