Script for parsing KYC sessions

#!/usr/bin/env python3
"""Exporting KYC sessions via the NeuroVision REST API.

Authenticates via email/password, obtains a JWT, then exports sessions page by page via POST /v1/kyc/sessions/filter. Data is written to disk in chunks; the export can be interrupted and resumed from the same point (state file).

Examples:
  # All sessions for the period
  python3 fetch_sessions.py --email admin@example.com --password secret \\
      --admin-user-id 4 --start-date 2025-12-01 --end-date 2026-03-10

  # Specific scenario, separate state file
  python3 fetch_sessions.py --email admin@example.com --password secret \\
      --admin-user-id 4 --start-date 2025-12-01 \\
      --schema-id db3c8340-8a44-11ef-86ed-a1c315ebff3c \\
      --state-file sessions_client.pkl

  # Filter by status / clientKey
  python3 fetch_sessions.py --email admin@example.com --password secret \\
      --start-date 2025-01-01 --status failed
  python3 fetch_sessions.py --email admin@example.com --password secret \\
      --start-date 2025-01-01 --client-key user@example.com"""

from __future__ import annotations

import argparse
import logging
import os
import pickle
import signal
import sys
import time
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterator

import requests

# --- Default constants ---------------------------------------------------

DEFAULT_API_BASE = "https://api.neuro-vision.ru"
DEFAULT_STATE_FILE = "sessions.pkl"
DEFAULT_PAGE_SIZE = 100
DEFAULT_WINDOW_DAYS = 365
DEFAULT_MAX_RETRIES = 5
DEFAULT_CHUNK_SIZE = 10_000

LOGIN_TIMEOUT = 30
FETCH_TIMEOUT = 60
TOKEN_VALID_DAYS = 5
SAVE_STATE_EVERY_PAGES = 10

STATUSES = ["idle", "processing", "success", "failed",
            "exception", "suspicious", "expired"]

log = logging.getLogger("fetch_sessions")


# --- CLI ----------------------------------------------------------------------

def parse_args() -> argparse.Namespace:
    p = argparse.ArgumentParser(
        description="Export KYC sessions via the REST API",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    p.add_argument("--email", required=True, help="Email for login")
    p.add_argument("--password", required=True, help="Password")
    p.add_argument("--admin-user-id", default="", help="Administrator ID (admin-user-id header)")
    p.add_argument("--api-base", default=DEFAULT_API_BASE, help="Base API URL")
    p.add_argument("--start-date", required=True, help="Start date (YYYY-MM-DD)")
    p.add_argument("--end-date", default=None, help="End date (YYYY-MM-DD), defaults to today")
    p.add_argument("--schema-id", default=None, help="Filter by schemaId")
    p.add_argument("--status", default=None, choices=STATUSES, help="Filter by session status")
    p.add_argument("--client-key", default=None, help="Filter by clientKey/clientUser")
    p.add_argument("--state-file", default=DEFAULT_STATE_FILE, help="State file")
    p.add_argument("--page-size", type=int, default=DEFAULT_PAGE_SIZE)
    p.add_argument("--window-days", type=int, default=DEFAULT_WINDOW_DAYS)
    p.add_argument("--max-retries", type=int, default=DEFAULT_MAX_RETRIES)
    p.add_argument("--chunk-size", type=int, default=DEFAULT_CHUNK_SIZE)
    return p.parse_args()


def parse_date(value: str) -> date:
    try:
        return datetime.strptime(value, "%Y-%m-%d").date()
    except ValueError:
        sys.exit(f"Invalid date format: {value!r} (expected YYYY-MM-DD)")


# --- Date windows -----------------------------------------------------------------

def generate_windows(start: date, end: date, window_days: int) -> list[tuple[date, date]]:
    windows, current = [], start
    while current <= end:
        window_end = min(current + timedelta(days=window_days), end)
        windows.append((current, window_end))
        current = window_end + timedelta(days=1)
    return windows


def fmt_start(d: date) -> str:
    return d.strftime("%Y-%m-%dT00:00:00.000Z")


def fmt_end(d: date) -> str:
    return d.strftime("%Y-%m-%dT23:59:59.999Z")


# --- Chunked storage with resumption -------------------------------------

@dataclass
class ChunkedStorage:
    """Buffers sessions in memory and flushes them to disk in fixed-size chunks. Stores the pagination position (window + page) for resumption."""

    state_file: str
    chunk_size: int
    chunk_dir: Path = field(init=False)
    state_path: Path = field(init=False)
    buffer: list[Any] = field(default_factory=list)
    total_saved: int = 0
    chunk_idx: int = 0
    window_idx: int = 0
    page: int = 1

    def __post_init__(self) -> None:
        base = self.state_file.removesuffix(".pkl")
        self.chunk_dir = Path(f"{base}_chunks")
        self.chunk_dir.mkdir(exist_ok=True)
        self.state_path = self.chunk_dir / "_state.pkl"

    @property
    def total_count(self) -> int:
        return self.total_saved + len(self.buffer)

    def load(self) -> bool:
        if not self.state_path.exists():
            return False
        with open(self.state_path, "rb") as f:
            state = pickle.load(f)
        self.__dict__.update(
            chunk_idx=state["chunk_idx"],
            total_saved=state["total_saved"],
            buffer=state["buffer"],
            window_idx=state["window_idx"],
            page=state["page"],
        )
        log.info("State loaded: %s records (%s chunks + %s in buffer)",
                 f"{self.total_count:,}", self.chunk_idx, len(self.buffer))
        return True

    def add(self, sessions: list[Any]) -> None:
        self.buffer.extend(sessions)
        while len(self.buffer) >= self.chunk_size:
            self._save_chunk(self.buffer[:self.chunk_size])
            del self.buffer[:self.chunk_size]

    def flush(self) -> None:
        if self.buffer:
            self._save_chunk(self.buffer)
            self.buffer = []

    def _save_chunk(self, chunk: list[Any]) -> None:
        path = self.chunk_dir / f"chunk_{self.chunk_idx:04d}.pkl"
        self._atomic_dump(chunk, path)
        self.total_saved += len(chunk)
        self.chunk_idx += 1
        log.info("  Chunk %s saved (%s records, total: %s)",
                 self.chunk_idx, f"{len(chunk):,}", f"{self.total_count:,}")

    def save_state(self, window_idx: int, page: int) -> None:
        self.window_idx, self.page = window_idx, page
        self._atomic_dump(
            {
                "chunk_idx": self.chunk_idx,
                "total_saved": self.total_saved,
                "buffer": self.buffer,
                "window_idx": window_idx,
                "page": page,
            },
            self.state_path,
        )

    @staticmethod
    def _atomic_dump(obj: Any, path: Path) -> None:
        tmp = path.with_suffix(path.suffix + ".tmp")
        with open(tmp, "wb") as f:
            pickle.dump(obj, f)
        os.replace(tmp, path)


# --- HTTP Client --------------------------------------------------------------

class ApiClient:
    def __init__(self, api_base: str, max_retries: int):
        self.api_base = api_base.rstrip("/")
        self.max_retries = max_retries
        self.session = requests.Session()
        self.token: str | None = None
        self.admin_user_id = ""

    def close(self) -> None:
        self.session.close()

    def login(self, email: str, password: str) -> None:
        url = f"{self.api_base}/v1/user/login"
        log.info("Authentication: %s -> %s", email, url)
        resp = self.session.post(
            url,
            json={"email": email, "password": password, "valid_days": TOKEN_VALID_DAYS},
            timeout=LOGIN_TIMEOUT,
        )
        if resp.status_code != 200:
            sys.exit(f"Login error: HTTP {resp.status_code}\n{resp.text[:500]}")

        data = resp.json()
        token = data.get("token") or data.get("jwt") or data.get("accessToken")
        if not token:
            sys.exit(f"JWT not found in the response. Keys: {list(data.keys())}")

        self.token = token
        log.info("JWT received (valid for %s days)", TOKEN_VALID_DAYS)

    def fetch_page(self, range_start: str, range_end: str, page: int, page_size: int,
                   *, schema_id=None, status=None, client_key=None) -> dict | None:
        """Returns results or None on API error. Throws on network failure."""
        url = f"{self.api_base}/v1/kyc/sessions/filter"
        headers = {
            "accept": "application/json",
            "content-type": "application/json",
            "token": self.token,
        }
        if self.admin_user_id:
            headers["admin-user-id"] = self.admin_user_id

        filters: dict[str, Any] = {
            "range": {"start": range_start, "end": range_end, "sortOrder": "DESC"},
            "groupByClientId": "all",
        }
        if schema_id:
            filters["schemaId"] = schema_id
        if status:
            filters["status"] = status
        if client_key:
            filters["clientKey"] = client_key

        payload = {"page": page, "pageSize": page_size, "filters": filters}

        for attempt in range(1, self.max_retries + 1):
            try:
                resp = self.session.post(url, json=payload, headers=headers, timeout=FETCH_TIMEOUT)
                if resp.status_code == 200:
                    body = resp.json()
                    if body.get("status") != "ok":
                        log.error("  API error: %s", body)
                        return None
                    return body["results"]
                log.warning("  HTTP %s: %s", resp.status_code, resp.text[:200])
            except requests.exceptions.RequestException as e:
                log.warning("  Network error (attempt %s/%s): %s", attempt, self.max_retries, e)

            delay = 2 ** (attempt - 1)
            log.info("  Retrying in %ss...", delay)
            time.sleep(delay)

        raise RuntimeError(f"Failed to fetch data after {self.max_retries} attempts")


# --- Graceful shutdown --------------------------------------------------------

class ShutdownFlag:
    def __init__(self) -> None:
        self.requested = False
        signal.signal(signal.SIGINT, self._handle)
        signal.signal(signal.SIGTERM, self._handle)

    def _handle(self, signum, frame) -> None:
        if self.requested:
            log.warning("Second signal received — exiting immediately.")
            sys.exit(1)
        self.requested = True
        log.info("Shutting down after the current request...")


# --- Main loop ------------------------------------------------------------

def run(args: argparse.Namespace) -> None:
    start_date = parse_date(args.start_date)
    end_date = parse_date(args.end_date) if args.end_date else datetime.now(timezone.utc).date()
    if start_date > end_date:
        sys.exit(f"start-date ({start_date}) is later than end-date ({end_date})")

    windows = generate_windows(start_date, end_date, args.window_days)
    log.info("Period: %s — %s (%s windows of %s days)",
             start_date, end_date, len(windows), args.window_days)

    client = ApiClient(args.api_base, args.max_retries)
    client.login(args.email, args.password)
    client.admin_user_id = args.admin_user_id

    storage = ChunkedStorage(args.state_file, args.chunk_size)
    if not storage.load():
        storage.window_idx, storage.page = 0, 1

    shutdown = ShutdownFlag()
    start_time = time.time()
    records_at_start = storage.total_count

    window_idx, page = storage.window_idx, storage.page

    try:
        while window_idx < len(windows) and not shutdown.requested:
            w_start, w_end = windows[window_idx]
            range_start, range_end = fmt_start(w_start), fmt_end(w_end)

            if page == 1:
                log.info("[Window %s/%s] %s — %s", window_idx + 1, len(windows), w_start, w_end)

            while not shutdown.requested:
                result = client.fetch_page(
                    range_start, range_end, page, args.page_size,
                    schema_id=args.schema_id, status=args.status, client_key=args.client_key,
                )
                if result is None:
                    break

                sessions = result.get("sessions", [])
                page_info = result.get("pageInfo", {})
                total_items = page_info.get("totalItems", "?")
                total_pages = page_info.get("total", 0) or 0

                if not sessions:
                    log.info("  Page %s is empty — window completed.", page)
                    break

                storage.add(sessions)

                elapsed = time.time() - start_time
                new_records = storage.total_count - records_at_start
                speed = new_records / elapsed if elapsed > 0 else 0
                log.info("  page %s/%s | +%s | window: %s sessions | total: %s | %.0f rec/s",
                         page, total_pages or "?", len(sessions), total_items,
                         f"{storage.total_count:,}", speed)

                if total_pages and page >= total_pages:
                    break

                page += 1
                if page % SAVE_STATE_EVERY_PAGES == 0:
                    storage.save_state(window_idx, page)

            window_idx += 1
            page = 1
            storage.save_state(window_idx, page)

    except Exception as e:
        log.exception("ERROR: %s", e)
    finally:
        storage.flush()
        storage.save_state(window_idx, page)
        elapsed = time.time() - start_time
        new_records = storage.total_count - records_at_start
        log.info("Total: %s records (+%s during this run, %.0fs)",
                 f"{storage.total_count:,}", f"{new_records:,}", elapsed)
        log.info("Chunks: %s/", storage.chunk_dir)
        client.close()


def main() -> None:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(message)s",
        datefmt="%H:%M:%S",
    )
    run(parse_args())


if __name__ == "__main__":
    main()