from __future__ import annotations import logging import os import requests logger = logging.getLogger(__name__) class ProviderSyncError(Exception): pass class ProviderSyncService: def sync(self, subscription): raise NotImplementedError class ButtondownSyncService(ProviderSyncService): endpoint = "https://api.buttondown.email/v1/subscribers" def sync(self, subscription): api_key = os.getenv("BUTTONDOWN_API_KEY", "") if not api_key: raise ProviderSyncError("BUTTONDOWN_API_KEY is not configured") response = requests.post( self.endpoint, headers={"Authorization": f"Token {api_key}", "Content-Type": "application/json"}, json={"email": subscription.email}, timeout=10, ) if response.status_code >= 400: raise ProviderSyncError(f"Buttondown sync failed: {response.status_code}") logger.info("Synced subscription %s to Buttondown", subscription.email) def get_provider_service() -> ProviderSyncService: provider = os.getenv("NEWSLETTER_PROVIDER", "buttondown").lower().strip() if provider != "buttondown": raise ProviderSyncError(f"Unsupported newsletter provider: {provider}") return ButtondownSyncService()