feat: add health monitoring endpoint
This commit is contained in:
1
apps/health/tests/__init__.py
Normal file
1
apps/health/tests/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
205
apps/health/tests/test_checks.py
Normal file
205
apps/health/tests/test_checks.py
Normal file
@@ -0,0 +1,205 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import time
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
from django.db.utils import OperationalError
|
||||
|
||||
from apps.health import checks
|
||||
|
||||
|
||||
class SuccessfulCursor:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def execute(self, query):
|
||||
self.query = query
|
||||
|
||||
|
||||
class FailingCursor:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def execute(self, query):
|
||||
raise OperationalError("database unavailable")
|
||||
|
||||
|
||||
class FakeCache:
|
||||
def __init__(self, value_to_return=None):
|
||||
self.value_to_return = value_to_return
|
||||
self.stored = {}
|
||||
|
||||
def set(self, key, value, timeout=None):
|
||||
self.stored[key] = value
|
||||
|
||||
def get(self, key):
|
||||
if self.value_to_return is not None:
|
||||
return self.value_to_return
|
||||
return self.stored.get(key)
|
||||
|
||||
def delete(self, key):
|
||||
self.stored.pop(key, None)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_db_ok(monkeypatch):
|
||||
monkeypatch.setattr(checks.connection, "cursor", lambda: SuccessfulCursor())
|
||||
|
||||
result = checks.check_db()
|
||||
|
||||
assert result["status"] == "ok"
|
||||
assert "latency_ms" in result
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_db_fail(monkeypatch):
|
||||
monkeypatch.setattr(checks.connection, "cursor", lambda: FailingCursor())
|
||||
|
||||
result = checks.check_db()
|
||||
|
||||
assert result == {"status": "fail", "detail": "database unavailable"}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cache_ok(monkeypatch):
|
||||
monkeypatch.setattr(checks, "cache", FakeCache())
|
||||
|
||||
result = checks.check_cache()
|
||||
|
||||
assert result["status"] == "ok"
|
||||
assert "latency_ms" in result
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_cache_fail(monkeypatch):
|
||||
monkeypatch.setattr(checks, "cache", FakeCache(value_to_return="wrong-value"))
|
||||
|
||||
result = checks.check_cache()
|
||||
|
||||
assert result == {"status": "fail", "detail": "Cache probe returned unexpected value"}
|
||||
|
||||
|
||||
def test_celery_no_broker(monkeypatch):
|
||||
monkeypatch.delenv("CELERY_BROKER_URL", raising=False)
|
||||
|
||||
result = checks.check_celery()
|
||||
|
||||
assert result["status"] == "ok"
|
||||
assert "CELERY_BROKER_URL is unset" in result["detail"]
|
||||
|
||||
|
||||
def test_celery_no_kombu(monkeypatch):
|
||||
monkeypatch.setenv("CELERY_BROKER_URL", "redis://broker")
|
||||
|
||||
def raise_import_error(name):
|
||||
raise ImportError(name)
|
||||
|
||||
monkeypatch.setattr(importlib, "import_module", raise_import_error)
|
||||
|
||||
result = checks.check_celery()
|
||||
|
||||
assert result["status"] == "ok"
|
||||
assert "kombu is not installed" in result["detail"]
|
||||
|
||||
|
||||
def test_celery_ok(monkeypatch):
|
||||
monkeypatch.setenv("CELERY_BROKER_URL", "redis://broker")
|
||||
|
||||
class FakeBrokerConnection:
|
||||
def __init__(self, url, connect_timeout):
|
||||
self.url = url
|
||||
self.connect_timeout = connect_timeout
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def ensure_connection(self, max_retries):
|
||||
self.max_retries = max_retries
|
||||
|
||||
monkeypatch.setattr(importlib, "import_module", lambda name: SimpleNamespace(Connection=FakeBrokerConnection))
|
||||
|
||||
result = checks.check_celery()
|
||||
|
||||
assert result == {"status": "ok"}
|
||||
|
||||
|
||||
def test_celery_fail(monkeypatch):
|
||||
monkeypatch.setenv("CELERY_BROKER_URL", "redis://broker")
|
||||
|
||||
class BrokenBrokerConnection:
|
||||
def __init__(self, url, connect_timeout):
|
||||
self.url = url
|
||||
self.connect_timeout = connect_timeout
|
||||
|
||||
def __enter__(self):
|
||||
raise OSError("broker down")
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
monkeypatch.setattr(importlib, "import_module", lambda name: SimpleNamespace(Connection=BrokenBrokerConnection))
|
||||
|
||||
result = checks.check_celery()
|
||||
|
||||
assert result == {"status": "fail", "detail": "broker down"}
|
||||
|
||||
|
||||
def test_backup_no_env(monkeypatch):
|
||||
monkeypatch.delenv("BACKUP_STATUS_FILE", raising=False)
|
||||
|
||||
result = checks.check_backup()
|
||||
|
||||
assert result["status"] == "fail"
|
||||
assert "BACKUP_STATUS_FILE is unset" in result["detail"]
|
||||
|
||||
|
||||
def test_backup_missing_file(monkeypatch, tmp_path):
|
||||
status_file = tmp_path / "missing-backup-status"
|
||||
monkeypatch.setenv("BACKUP_STATUS_FILE", str(status_file))
|
||||
|
||||
result = checks.check_backup()
|
||||
|
||||
assert result == {"status": "fail", "detail": f"Backup status file not found: {status_file}"}
|
||||
|
||||
|
||||
def test_backup_fresh(monkeypatch, tmp_path):
|
||||
status_file = tmp_path / "backup-status"
|
||||
status_file.write_text(str(time.time() - 60), encoding="utf-8")
|
||||
monkeypatch.setenv("BACKUP_STATUS_FILE", str(status_file))
|
||||
|
||||
result = checks.check_backup()
|
||||
|
||||
assert result == {"status": "ok"}
|
||||
|
||||
|
||||
def test_backup_stale(monkeypatch, tmp_path):
|
||||
status_file = tmp_path / "backup-status"
|
||||
stale_timestamp = time.time() - (checks.BACKUP_MAX_AGE_SECONDS + 1)
|
||||
status_file.write_text(str(stale_timestamp), encoding="utf-8")
|
||||
monkeypatch.setenv("BACKUP_STATUS_FILE", str(status_file))
|
||||
|
||||
result = checks.check_backup()
|
||||
|
||||
assert result["status"] == "fail"
|
||||
assert "Last backup is" in result["detail"]
|
||||
|
||||
|
||||
def test_backup_invalid(monkeypatch, tmp_path):
|
||||
status_file = tmp_path / "backup-status"
|
||||
status_file.write_text("not-a-timestamp", encoding="utf-8")
|
||||
monkeypatch.setenv("BACKUP_STATUS_FILE", str(status_file))
|
||||
|
||||
result = checks.check_backup()
|
||||
|
||||
assert result == {"status": "fail", "detail": "Invalid backup status file"}
|
||||
103
apps/health/tests/test_views.py
Normal file
103
apps/health/tests/test_views.py
Normal file
@@ -0,0 +1,103 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _mock_checks(monkeypatch, **overrides):
|
||||
payloads = {
|
||||
"db": {"status": "ok", "latency_ms": 1.0},
|
||||
"cache": {"status": "ok", "latency_ms": 1.0},
|
||||
"celery": {"status": "ok"},
|
||||
"backup": {"status": "ok"},
|
||||
}
|
||||
payloads.update(overrides)
|
||||
|
||||
monkeypatch.setattr("apps.health.views.check_db", lambda: payloads["db"])
|
||||
monkeypatch.setattr("apps.health.views.check_cache", lambda: payloads["cache"])
|
||||
monkeypatch.setattr("apps.health.views.check_celery", lambda: payloads["celery"])
|
||||
monkeypatch.setattr("apps.health.views.check_backup", lambda: payloads["backup"])
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_healthy(client, monkeypatch):
|
||||
_mock_checks(monkeypatch)
|
||||
|
||||
response = client.get("/health/")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "ok"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_degraded_celery(client, monkeypatch):
|
||||
_mock_checks(monkeypatch, celery={"status": "fail", "detail": "broker down"})
|
||||
|
||||
response = client.get("/health/")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "degraded"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_degraded_backup(client, monkeypatch):
|
||||
_mock_checks(monkeypatch, backup={"status": "fail", "detail": "backup missing"})
|
||||
|
||||
response = client.get("/health/")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "degraded"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_unhealthy_db(client, monkeypatch):
|
||||
_mock_checks(monkeypatch, db={"status": "fail", "detail": "db down"})
|
||||
|
||||
response = client.get("/health/")
|
||||
|
||||
assert response.status_code == 503
|
||||
assert response.json()["status"] == "unhealthy"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_unhealthy_cache(client, monkeypatch):
|
||||
_mock_checks(monkeypatch, cache={"status": "fail", "detail": "cache down"})
|
||||
|
||||
response = client.get("/health/")
|
||||
|
||||
assert response.status_code == 503
|
||||
assert response.json()["status"] == "unhealthy"
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_response_shape(client, monkeypatch):
|
||||
_mock_checks(monkeypatch)
|
||||
|
||||
payload = client.get("/health/").json()
|
||||
|
||||
assert set(payload) == {"status", "version", "checks", "timestamp"}
|
||||
assert set(payload["version"]) == {"git_sha", "build"}
|
||||
assert set(payload["checks"]) == {"db", "cache", "celery", "backup"}
|
||||
assert re.fullmatch(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", payload["timestamp"])
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_version_fields(client, monkeypatch):
|
||||
_mock_checks(monkeypatch)
|
||||
monkeypatch.setenv("GIT_SHA", "59cc1c4")
|
||||
monkeypatch.setenv("BUILD_ID", "build-20260306-59cc1c4")
|
||||
|
||||
payload = client.get("/health/").json()
|
||||
|
||||
assert payload["version"]["git_sha"]
|
||||
assert payload["version"]["build"]
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_no_cache_headers(client, monkeypatch):
|
||||
_mock_checks(monkeypatch)
|
||||
|
||||
response = client.get("/health/")
|
||||
|
||||
assert "no-cache" in response["Cache-Control"]
|
||||
Reference in New Issue
Block a user