Скрипт для парсинга KYC-сессий

#!/usr/bin/env python3
"""
Скрипт для выгрузки KYC-сессий через REST API с авторизацией по логину/паролю.
Автоматически получает JWT токен, затем выгружает сессии через POST /v1/kyc/sessions/filter.

Аргументы:
  --email            (обязательный) Email для логина в NeuroVision
  --password         (обязательный) Пароль
  --admin-user-id    ID администратора клиента (заголовок admin-user-id). Пустой если не нужен
  --api-base         Базовый URL API (по умолч. https://api.neuro-vision.ru)
  --start-date       (обязательный) Дата начала выгрузки (YYYY-MM-DD)
  --end-date         Дата конца выгрузки (YYYY-MM-DD), по умолч. сегодня
  --schema-id        Фильтр по schemaId (если не указан — все сценарии)
  --status           Фильтр по статусу: idle, processing, success, failed, exception, suspicious, expired
  --client-key       Фильтр по clientKey или clientUser
  --state-file       Файл состояния для возобновления (по умолч. sessions.pkl)
  --page-size        Размер страницы запроса (по умолч. 100)
  --window-days      Размер окна дат в днях (по умолч. 30)
  --max-retries      Макс. число повторов при ошибке (по умолч. 5)
  --chunk-size       Размер чанка для сохранения на диск (по умолч. 10000)

Примеры:
  # Все сессии за период
  python3 fetch_sessions.py --email admin@example.com --password secret \
    --admin-user-id 4 --start-date 2025-12-01 --end-date 2026-03-10

  # Только конкретный сценарий
  python3 fetch_sessions.py --email admin@example.com --password secret \
    --admin-user-id 4 --start-date 2025-12-01 \
    --schema-id "db3c8340-8a44-11ef-86ed-a1c315ebff3c" \
    --state-file sessions_client.pkl

  # Фильтр по статусу
  python3 fetch_sessions.py --email admin@example.com --password secret \
    --start-date 2025-01-01 --status failed

  # Фильтр по clientKey
  python3 fetch_sessions.py --email admin@example.com --password secret \
    --start-date 2025-01-01 --client-key "user@example.com"
"""

import argparse
import os
import pickle
import signal
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path

import requests

# Значения по умолчанию
DEFAULT_API_BASE = "https://api.neuro-vision.ru"
DEFAULT_STATE_FILE = "sessions.pkl"
DEFAULT_PAGE_SIZE = 100
DEFAULT_WINDOW_DAYS = 365
DEFAULT_MAX_RETRIES = 5
DEFAULT_CHUNK_SIZE = 10000


def parse_args():
    p = argparse.ArgumentParser(description="Выгрузка KYC-сессий через REST API")
    p.add_argument("--email", required=True, help="Email для логина")
    p.add_argument("--password", required=True, help="Пароль")
    p.add_argument("--admin-user-id", default="", help="ID администратора (заголовок admin-user-id)")
    p.add_argument("--api-base", default=DEFAULT_API_BASE, help="Базовый URL API")
    p.add_argument("--start-date", required=True, help="Дата начала (YYYY-MM-DD)")
    p.add_argument("--end-date", default=None, help="Дата конца (YYYY-MM-DD), по умолч. сегодня")
    p.add_argument("--schema-id", default=None, help="Фильтр по schemaId")
    p.add_argument("--status", default=None,
                   choices=["idle", "processing", "success", "failed", "exception", "suspicious", "expired"],
                   help="Фильтр по статусу сессии")
    p.add_argument("--client-key", default=None, help="Фильтр по clientKey/clientUser")
    p.add_argument("--state-file", default=DEFAULT_STATE_FILE, help="Файл состояния")
    p.add_argument("--page-size", type=int, default=DEFAULT_PAGE_SIZE)
    p.add_argument("--window-days", type=int, default=DEFAULT_WINDOW_DAYS)
    p.add_argument("--max-retries", type=int, default=DEFAULT_MAX_RETRIES)
    p.add_argument("--chunk-size", type=int, default=DEFAULT_CHUNK_SIZE)
    return p.parse_args()


def login(api_base, email, password):
    """Логинится и возвращает JWT токен."""
    url = f"{api_base}/v1/user/login"
    print(f"Авторизация: {email} -> {url}")
    resp = requests.post(url, json={"email": email, "password": password, "valid_days": 5}, timeout=30)
    if resp.status_code != 200:
        sys.exit(f"Ошибка логина: HTTP {resp.status_code}n{resp.text[:500]}")

    data = resp.json()
    token = data.get("token") or data.get("jwt") or data.get("accessToken")
    if not token:
        sys.exit(f"JWT не найден в ответе. Ключи: {list(data.keys())}")

    print(f"JWT получен: {token[:40]}...")
    return token


def generate_windows(start_date, end_date, window_days):
    windows = []
    current = start_date
    while current < end_date:
        window_end = min(current + timedelta(days=window_days), end_date)
        windows.append((current, window_end))
        current = window_end + timedelta(days=1)
    return windows


def format_start(dt):
    return dt.strftime("%Y-%m-%dT00:00:00.000Z")


def format_end(dt):
    return dt.strftime("%Y-%m-%dT23:59:59.999Z")


class ChunkedStorage:
    def __init__(self, state_file, chunk_size):
        self.chunk_size = chunk_size
        base = state_file.replace(".pkl", "")
        self.chunk_dir = Path(base + "_chunks")
        self.chunk_dir.mkdir(exist_ok=True)
        self.state_path = self.chunk_dir / "_state.pkl"
        self.buffer = []
        self.total_saved = 0
        self.chunk_idx = 0
        self.window_idx = 0
        self.page = 1

    def load(self):
        if not self.state_path.exists():
            return False
        with open(self.state_path, "rb") as f:
            state = pickle.load(f)
        self.chunk_idx = state["chunk_idx"]
        self.total_saved = state["total_saved"]
        self.buffer = state["buffer"]
        self.window_idx = state["window_idx"]
        self.page = state["page"]
        total = self.total_saved + len(self.buffer)
        print(f"Состояние загружено: {total:,} записей ({self.chunk_idx} чанков + {len(self.buffer)} в буфере)")
        return True

    def add(self, sessions):
        self.buffer.extend(sessions)
        while len(self.buffer) >= self.chunk_size:
            chunk = self.buffer[:self.chunk_size]
            self.buffer = self.buffer[self.chunk_size:]
            self._save_chunk(chunk)

    def _save_chunk(self, chunk):
        chunk_path = self.chunk_dir / f"chunk_{self.chunk_idx:04d}.pkl"
        tmp_path = self.chunk_dir / f"chunk_{self.chunk_idx:04d}.pkl.tmp"
        with open(tmp_path, "wb") as f:
            pickle.dump(chunk, f)
        os.replace(str(tmp_path), str(chunk_path))
        self.total_saved += len(chunk)
        self.chunk_idx += 1
        print(f"  Чанк {self.chunk_idx} сохранён ({len(chunk):,} записей, всего: {self.total_count():,})")

    def save_state(self, window_idx, page):
        self.window_idx = window_idx
        self.page = page
        tmp = str(self.state_path) + ".tmp"
        with open(tmp, "wb") as f:
            pickle.dump({
                "chunk_idx": self.chunk_idx,
                "total_saved": self.total_saved,
                "buffer": self.buffer,
                "window_idx": window_idx,
                "page": page,
            }, f)
        os.replace(tmp, str(self.state_path))

    def flush(self):
        if self.buffer:
            chunk = self.buffer
            self.buffer = []
            self._save_chunk(chunk)

    def total_count(self):
        return self.total_saved + len(self.buffer)


def fetch_page(http_session, url, token, admin_user_id, range_start, range_end,
               page, page_size, max_retries, schema_id=None, status=None, client_key=None):
    headers = {
        "accept": "application/json",
        "content-type": "application/json",
        "token": token,
    }
    if admin_user_id:
        headers["admin-user-id"] = admin_user_id

    filters = {
        "range": {
            "start": range_start,
            "end": range_end,
            "sortOrder": "DESC",
        },
        "groupByClientId": "all",
    }
    if schema_id:
        filters["schemaId"] = schema_id
    if status:
        filters["status"] = status
    if client_key:
        filters["clientKey"] = client_key

    payload = {
        "page": page,
        "pageSize": page_size,
        "filters": filters,
    }

    for attempt in range(1, max_retries + 1):
        try:
            resp = http_session.post(url, json=payload, headers=headers, timeout=60)
            if resp.status_code == 200:
                body = resp.json()
                if body.get("status") != "ok":
                    print(f"  API error: {body}")
                    return None
                return body["results"]
            print(f"  HTTP {resp.status_code}: {resp.text[:200]}")
        except requests.exceptions.RequestException as e:
            print(f"  Ошибка (попытка {attempt}/{max_retries}): {e}")

        delay = 2 ** (attempt - 1)
        print(f"  Повтор через {delay}с...")
        time.sleep(delay)

    raise Exception(f"Не удалось получить данные после {max_retries} попыток")


def main():
    args = parse_args()

    # Авторизация
    token = login(args.api_base, args.email, args.password)
    filter_url = f"{args.api_base}/v1/kyc/sessions/filter"

    start_date = datetime.strptime(args.start_date, "%Y-%m-%d").date()
    end_date = (
        datetime.strptime(args.end_date, "%Y-%m-%d").date()
        if args.end_date
        else datetime.now(timezone.utc).date()
    )

    windows = generate_windows(start_date, end_date, args.window_days)
    print(f"Период: {start_date} — {end_date} ({len(windows)} окон по {args.window_days} дн.)")

    storage = ChunkedStorage(args.state_file, args.chunk_size)
    if storage.load():
        window_idx = storage.window_idx
        page = storage.page
    else:
        window_idx = 0
        page = 1

    shutdown_requested = False

    def signal_handler(signum, frame):
        nonlocal shutdown_requested
        if shutdown_requested:
            print("nПовторный сигнал — выход.")
            sys.exit(1)
        shutdown_requested = True
        print("nЗавершаем после текущего запроса...")

    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)

    http_session = requests.Session()
    start_time = time.time()
    records_at_start = storage.total_count()

    try:
        while window_idx < len(windows) and not shutdown_requested:
            w_start, w_end = windows[window_idx]
            range_start = format_start(w_start)
            range_end = format_end(w_end)

            if page == 1:
                print(f"n[Окно {window_idx + 1}/{len(windows)}] {w_start} — {w_end}")

            while not shutdown_requested:
                result = fetch_page(
                    http_session, filter_url, token, args.admin_user_id,
                    range_start, range_end, page, args.page_size, args.max_retries,
                    schema_id=args.schema_id,
                    status=args.status,
                    client_key=args.client_key,
                )

                if result is None:
                    break

                sessions = result.get("sessions", [])
                page_info = result.get("pageInfo", {})
                total_items = page_info.get("totalItems", "?")
                total_pages = page_info.get("total", "?")

                if not sessions:
                    print(f"  Страница {page} пуста — окно завершено.")
                    break

                storage.add(sessions)
                elapsed = time.time() - start_time
                new_records = storage.total_count() - records_at_start
                speed = new_records / elapsed if elapsed > 0 else 0

                print(
                    f"  стр. {page}/{total_pages} | +{len(sessions)} | "
                    f"окно: {total_items} сессий | "
                    f"всего: {storage.total_count():,} | {speed:.0f} зап/с"
                )

                if page >= total_pages:
                    break

                page += 1

                if page % 10 == 0:
                    storage.save_state(window_idx, page)

            window_idx += 1
            page = 1
            storage.save_state(window_idx, page)

    except Exception as e:
        print(f"nОШИБКА: {e}")
    finally:
        storage.flush()
        storage.save_state(window_idx, page)
        elapsed = time.time() - start_time
        new_records = storage.total_count() - records_at_start
        print(
            f"nИтого: {storage.total_count():,} записей "
            f"(+{new_records:,} за сессию, {elapsed:.0f}с)"
        )
        print(f"Чанки: {storage.chunk_dir}/")
        http_session.close()


if __name__ == "__main__":
    main()