#!/usr/bin/env python3
"""Exporting KYC sessions via the NeuroVision REST API.
Authenticates via email/password, obtains a JWT, then exports sessions page by page via POST /v1/kyc/sessions/filter. Data is written to disk in chunks; the export can be interrupted and resumed from the same point (state file).
Examples:
# All sessions for the period
python3 fetch_sessions.py --email admin@example.com --password secret \\
--admin-user-id 4 --start-date 2025-12-01 --end-date 2026-03-10
# Specific scenario, separate state file
python3 fetch_sessions.py --email admin@example.com --password secret \\
--admin-user-id 4 --start-date 2025-12-01 \\
--schema-id db3c8340-8a44-11ef-86ed-a1c315ebff3c \\
--state-file sessions_client.pkl
# Filter by status / clientKey
python3 fetch_sessions.py --email admin@example.com --password secret \\
--start-date 2025-01-01 --status failed
python3 fetch_sessions.py --email admin@example.com --password secret \\
--start-date 2025-01-01 --client-key user@example.com"""
from __future__ import annotations
import argparse
import logging
import os
import pickle
import signal
import sys
import time
from dataclasses import dataclass, field
from datetime import date, datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Iterator
import requests
# --- Default constants ---------------------------------------------------
DEFAULT_API_BASE = "https://api.neuro-vision.ru"
DEFAULT_STATE_FILE = "sessions.pkl"
DEFAULT_PAGE_SIZE = 100
DEFAULT_WINDOW_DAYS = 365
DEFAULT_MAX_RETRIES = 5
DEFAULT_CHUNK_SIZE = 10_000
LOGIN_TIMEOUT = 30
FETCH_TIMEOUT = 60
TOKEN_VALID_DAYS = 5
SAVE_STATE_EVERY_PAGES = 10
STATUSES = ["idle", "processing", "success", "failed",
"exception", "suspicious", "expired"]
log = logging.getLogger("fetch_sessions")
# --- CLI ----------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description="Export KYC sessions via the REST API",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p.add_argument("--email", required=True, help="Email for login")
p.add_argument("--password", required=True, help="Password")
p.add_argument("--admin-user-id", default="", help="Administrator ID (admin-user-id header)")
p.add_argument("--api-base", default=DEFAULT_API_BASE, help="Base API URL")
p.add_argument("--start-date", required=True, help="Start date (YYYY-MM-DD)")
p.add_argument("--end-date", default=None, help="End date (YYYY-MM-DD), defaults to today")
p.add_argument("--schema-id", default=None, help="Filter by schemaId")
p.add_argument("--status", default=None, choices=STATUSES, help="Filter by session status")
p.add_argument("--client-key", default=None, help="Filter by clientKey/clientUser")
p.add_argument("--state-file", default=DEFAULT_STATE_FILE, help="State file")
p.add_argument("--page-size", type=int, default=DEFAULT_PAGE_SIZE)
p.add_argument("--window-days", type=int, default=DEFAULT_WINDOW_DAYS)
p.add_argument("--max-retries", type=int, default=DEFAULT_MAX_RETRIES)
p.add_argument("--chunk-size", type=int, default=DEFAULT_CHUNK_SIZE)
return p.parse_args()
def parse_date(value: str) -> date:
try:
return datetime.strptime(value, "%Y-%m-%d").date()
except ValueError:
sys.exit(f"Invalid date format: {value!r} (expected YYYY-MM-DD)")
# --- Date windows -----------------------------------------------------------------
def generate_windows(start: date, end: date, window_days: int) -> list[tuple[date, date]]:
windows, current = [], start
while current <= end:
window_end = min(current + timedelta(days=window_days), end)
windows.append((current, window_end))
current = window_end + timedelta(days=1)
return windows
def fmt_start(d: date) -> str:
return d.strftime("%Y-%m-%dT00:00:00.000Z")
def fmt_end(d: date) -> str:
return d.strftime("%Y-%m-%dT23:59:59.999Z")
# --- Chunked storage with resumption -------------------------------------
@dataclass
class ChunkedStorage:
"""Buffers sessions in memory and flushes them to disk in fixed-size chunks. Stores the pagination position (window + page) for resumption."""
state_file: str
chunk_size: int
chunk_dir: Path = field(init=False)
state_path: Path = field(init=False)
buffer: list[Any] = field(default_factory=list)
total_saved: int = 0
chunk_idx: int = 0
window_idx: int = 0
page: int = 1
def __post_init__(self) -> None:
base = self.state_file.removesuffix(".pkl")
self.chunk_dir = Path(f"{base}_chunks")
self.chunk_dir.mkdir(exist_ok=True)
self.state_path = self.chunk_dir / "_state.pkl"
@property
def total_count(self) -> int:
return self.total_saved + len(self.buffer)
def load(self) -> bool:
if not self.state_path.exists():
return False
with open(self.state_path, "rb") as f:
state = pickle.load(f)
self.__dict__.update(
chunk_idx=state["chunk_idx"],
total_saved=state["total_saved"],
buffer=state["buffer"],
window_idx=state["window_idx"],
page=state["page"],
)
log.info("State loaded: %s records (%s chunks + %s in buffer)",
f"{self.total_count:,}", self.chunk_idx, len(self.buffer))
return True
def add(self, sessions: list[Any]) -> None:
self.buffer.extend(sessions)
while len(self.buffer) >= self.chunk_size:
self._save_chunk(self.buffer[:self.chunk_size])
del self.buffer[:self.chunk_size]
def flush(self) -> None:
if self.buffer:
self._save_chunk(self.buffer)
self.buffer = []
def _save_chunk(self, chunk: list[Any]) -> None:
path = self.chunk_dir / f"chunk_{self.chunk_idx:04d}.pkl"
self._atomic_dump(chunk, path)
self.total_saved += len(chunk)
self.chunk_idx += 1
log.info(" Chunk %s saved (%s records, total: %s)",
self.chunk_idx, f"{len(chunk):,}", f"{self.total_count:,}")
def save_state(self, window_idx: int, page: int) -> None:
self.window_idx, self.page = window_idx, page
self._atomic_dump(
{
"chunk_idx": self.chunk_idx,
"total_saved": self.total_saved,
"buffer": self.buffer,
"window_idx": window_idx,
"page": page,
},
self.state_path,
)
@staticmethod
def _atomic_dump(obj: Any, path: Path) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "wb") as f:
pickle.dump(obj, f)
os.replace(tmp, path)
# --- HTTP Client --------------------------------------------------------------
class ApiClient:
def __init__(self, api_base: str, max_retries: int):
self.api_base = api_base.rstrip("/")
self.max_retries = max_retries
self.session = requests.Session()
self.token: str | None = None
self.admin_user_id = ""
def close(self) -> None:
self.session.close()
def login(self, email: str, password: str) -> None:
url = f"{self.api_base}/v1/user/login"
log.info("Authentication: %s -> %s", email, url)
resp = self.session.post(
url,
json={"email": email, "password": password, "valid_days": TOKEN_VALID_DAYS},
timeout=LOGIN_TIMEOUT,
)
if resp.status_code != 200:
sys.exit(f"Login error: HTTP {resp.status_code}\n{resp.text[:500]}")
data = resp.json()
token = data.get("token") or data.get("jwt") or data.get("accessToken")
if not token:
sys.exit(f"JWT not found in the response. Keys: {list(data.keys())}")
self.token = token
log.info("JWT received (valid for %s days)", TOKEN_VALID_DAYS)
def fetch_page(self, range_start: str, range_end: str, page: int, page_size: int,
*, schema_id=None, status=None, client_key=None) -> dict | None:
"""Returns results or None on API error. Throws on network failure."""
url = f"{self.api_base}/v1/kyc/sessions/filter"
headers = {
"accept": "application/json",
"content-type": "application/json",
"token": self.token,
}
if self.admin_user_id:
headers["admin-user-id"] = self.admin_user_id
filters: dict[str, Any] = {
"range": {"start": range_start, "end": range_end, "sortOrder": "DESC"},
"groupByClientId": "all",
}
if schema_id:
filters["schemaId"] = schema_id
if status:
filters["status"] = status
if client_key:
filters["clientKey"] = client_key
payload = {"page": page, "pageSize": page_size, "filters": filters}
for attempt in range(1, self.max_retries + 1):
try:
resp = self.session.post(url, json=payload, headers=headers, timeout=FETCH_TIMEOUT)
if resp.status_code == 200:
body = resp.json()
if body.get("status") != "ok":
log.error(" API error: %s", body)
return None
return body["results"]
log.warning(" HTTP %s: %s", resp.status_code, resp.text[:200])
except requests.exceptions.RequestException as e:
log.warning(" Network error (attempt %s/%s): %s", attempt, self.max_retries, e)
delay = 2 ** (attempt - 1)
log.info(" Retrying in %ss...", delay)
time.sleep(delay)
raise RuntimeError(f"Failed to fetch data after {self.max_retries} attempts")
# --- Graceful shutdown --------------------------------------------------------
class ShutdownFlag:
def __init__(self) -> None:
self.requested = False
signal.signal(signal.SIGINT, self._handle)
signal.signal(signal.SIGTERM, self._handle)
def _handle(self, signum, frame) -> None:
if self.requested:
log.warning("Second signal received — exiting immediately.")
sys.exit(1)
self.requested = True
log.info("Shutting down after the current request...")
# --- Main loop ------------------------------------------------------------
def run(args: argparse.Namespace) -> None:
start_date = parse_date(args.start_date)
end_date = parse_date(args.end_date) if args.end_date else datetime.now(timezone.utc).date()
if start_date > end_date:
sys.exit(f"start-date ({start_date}) is later than end-date ({end_date})")
windows = generate_windows(start_date, end_date, args.window_days)
log.info("Period: %s — %s (%s windows of %s days)",
start_date, end_date, len(windows), args.window_days)
client = ApiClient(args.api_base, args.max_retries)
client.login(args.email, args.password)
client.admin_user_id = args.admin_user_id
storage = ChunkedStorage(args.state_file, args.chunk_size)
if not storage.load():
storage.window_idx, storage.page = 0, 1
shutdown = ShutdownFlag()
start_time = time.time()
records_at_start = storage.total_count
window_idx, page = storage.window_idx, storage.page
try:
while window_idx < len(windows) and not shutdown.requested:
w_start, w_end = windows[window_idx]
range_start, range_end = fmt_start(w_start), fmt_end(w_end)
if page == 1:
log.info("[Window %s/%s] %s — %s", window_idx + 1, len(windows), w_start, w_end)
while not shutdown.requested:
result = client.fetch_page(
range_start, range_end, page, args.page_size,
schema_id=args.schema_id, status=args.status, client_key=args.client_key,
)
if result is None:
break
sessions = result.get("sessions", [])
page_info = result.get("pageInfo", {})
total_items = page_info.get("totalItems", "?")
total_pages = page_info.get("total", 0) or 0
if not sessions:
log.info(" Page %s is empty — window completed.", page)
break
storage.add(sessions)
elapsed = time.time() - start_time
new_records = storage.total_count - records_at_start
speed = new_records / elapsed if elapsed > 0 else 0
log.info(" page %s/%s | +%s | window: %s sessions | total: %s | %.0f rec/s",
page, total_pages or "?", len(sessions), total_items,
f"{storage.total_count:,}", speed)
if total_pages and page >= total_pages:
break
page += 1
if page % SAVE_STATE_EVERY_PAGES == 0:
storage.save_state(window_idx, page)
window_idx += 1
page = 1
storage.save_state(window_idx, page)
except Exception as e:
log.exception("ERROR: %s", e)
finally:
storage.flush()
storage.save_state(window_idx, page)
elapsed = time.time() - start_time
new_records = storage.total_count - records_at_start
log.info("Total: %s records (+%s during this run, %.0fs)",
f"{storage.total_count:,}", f"{new_records:,}", elapsed)
log.info("Chunks: %s/", storage.chunk_dir)
client.close()
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
run(parse_args())
if __name__ == "__main__":
main()