44 lines
1.3 KiB
Python
44 lines
1.3 KiB
Python
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
|
|
import requests
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ProviderSyncError(Exception):
|
|
pass
|
|
|
|
|
|
class ProviderSyncService:
|
|
def sync(self, subscription):
|
|
raise NotImplementedError
|
|
|
|
|
|
class ButtondownSyncService(ProviderSyncService):
|
|
endpoint = "https://api.buttondown.email/v1/subscribers"
|
|
|
|
def sync(self, subscription):
|
|
api_key = os.getenv("BUTTONDOWN_API_KEY", "")
|
|
if not api_key:
|
|
raise ProviderSyncError("BUTTONDOWN_API_KEY is not configured")
|
|
|
|
response = requests.post(
|
|
self.endpoint,
|
|
headers={"Authorization": f"Token {api_key}", "Content-Type": "application/json"},
|
|
json={"email": subscription.email},
|
|
timeout=10,
|
|
)
|
|
if response.status_code >= 400:
|
|
raise ProviderSyncError(f"Buttondown sync failed: {response.status_code}")
|
|
logger.info("Synced subscription %s to Buttondown", subscription.email)
|
|
|
|
|
|
def get_provider_service() -> ProviderSyncService:
|
|
provider = os.getenv("NEWSLETTER_PROVIDER", "buttondown").lower().strip()
|
|
if provider != "buttondown":
|
|
raise ProviderSyncError(f"Unsupported newsletter provider: {provider}")
|
|
return ButtondownSyncService()
|