#!/usr/bin/env python3
"""
Скрипт для выгрузки KYC-сессий через REST API с авторизацией по логину/паролю.
Автоматически получает JWT токен, затем выгружает сессии через POST /v1/kyc/sessions/filter.
Аргументы:
--email (обязательный) Email для логина в NeuroVision
--password (обязательный) Пароль
--admin-user-id ID администратора клиента (заголовок admin-user-id). Пустой если не нужен
--api-base Базовый URL API (по умолч. https://api.neuro-vision.ru)
--start-date (обязательный) Дата начала выгрузки (YYYY-MM-DD)
--end-date Дата конца выгрузки (YYYY-MM-DD), по умолч. сегодня
--schema-id Фильтр по schemaId (если не указан — все сценарии)
--status Фильтр по статусу: idle, processing, success, failed, exception, suspicious, expired
--client-key Фильтр по clientKey или clientUser
--state-file Файл состояния для возобновления (по умолч. sessions.pkl)
--page-size Размер страницы запроса (по умолч. 100)
--window-days Размер окна дат в днях (по умолч. 30)
--max-retries Макс. число повторов при ошибке (по умолч. 5)
--chunk-size Размер чанка для сохранения на диск (по умолч. 10000)
Примеры:
# Все сессии за период
python3 fetch_sessions.py --email admin@example.com --password secret \
--admin-user-id 4 --start-date 2025-12-01 --end-date 2026-03-10
# Только конкретный сценарий
python3 fetch_sessions.py --email admin@example.com --password secret \
--admin-user-id 4 --start-date 2025-12-01 \
--schema-id "db3c8340-8a44-11ef-86ed-a1c315ebff3c" \
--state-file sessions_client.pkl
# Фильтр по статусу
python3 fetch_sessions.py --email admin@example.com --password secret \
--start-date 2025-01-01 --status failed
# Фильтр по clientKey
python3 fetch_sessions.py --email admin@example.com --password secret \
--start-date 2025-01-01 --client-key "user@example.com"
"""
import argparse
import os
import pickle
import signal
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests
# Значения по умолчанию
DEFAULT_API_BASE = "https://api.neuro-vision.ru"
DEFAULT_STATE_FILE = "sessions.pkl"
DEFAULT_PAGE_SIZE = 100
DEFAULT_WINDOW_DAYS = 365
DEFAULT_MAX_RETRIES = 5
DEFAULT_CHUNK_SIZE = 10000
def parse_args():
p = argparse.ArgumentParser(description="Выгрузка KYC-сессий через REST API")
p.add_argument("--email", required=True, help="Email для логина")
p.add_argument("--password", required=True, help="Пароль")
p.add_argument("--admin-user-id", default="", help="ID администратора (заголовок admin-user-id)")
p.add_argument("--api-base", default=DEFAULT_API_BASE, help="Базовый URL API")
p.add_argument("--start-date", required=True, help="Дата начала (YYYY-MM-DD)")
p.add_argument("--end-date", default=None, help="Дата конца (YYYY-MM-DD), по умолч. сегодня")
p.add_argument("--schema-id", default=None, help="Фильтр по schemaId")
p.add_argument("--status", default=None,
choices=["idle", "processing", "success", "failed", "exception", "suspicious", "expired"],
help="Фильтр по статусу сессии")
p.add_argument("--client-key", default=None, help="Фильтр по clientKey/clientUser")
p.add_argument("--state-file", default=DEFAULT_STATE_FILE, help="Файл состояния")
p.add_argument("--page-size", type=int, default=DEFAULT_PAGE_SIZE)
p.add_argument("--window-days", type=int, default=DEFAULT_WINDOW_DAYS)
p.add_argument("--max-retries", type=int, default=DEFAULT_MAX_RETRIES)
p.add_argument("--chunk-size", type=int, default=DEFAULT_CHUNK_SIZE)
return p.parse_args()
def login(api_base, email, password):
"""Логинится и возвращает JWT токен."""
url = f"{api_base}/v1/user/login"
print(f"Авторизация: {email} -> {url}")
resp = requests.post(url, json={"email": email, "password": password, "valid_days": 5}, timeout=30)
if resp.status_code != 200:
sys.exit(f"Ошибка логина: HTTP {resp.status_code}n{resp.text[:500]}")
data = resp.json()
token = data.get("token") or data.get("jwt") or data.get("accessToken")
if not token:
sys.exit(f"JWT не найден в ответе. Ключи: {list(data.keys())}")
print(f"JWT получен: {token[:40]}...")
return token
def generate_windows(start_date, end_date, window_days):
windows = []
current = start_date
while current < end_date:
window_end = min(current + timedelta(days=window_days), end_date)
windows.append((current, window_end))
current = window_end + timedelta(days=1)
return windows
def format_start(dt):
return dt.strftime("%Y-%m-%dT00:00:00.000Z")
def format_end(dt):
return dt.strftime("%Y-%m-%dT23:59:59.999Z")
class ChunkedStorage:
def __init__(self, state_file, chunk_size):
self.chunk_size = chunk_size
base = state_file.replace(".pkl", "")
self.chunk_dir = Path(base + "_chunks")
self.chunk_dir.mkdir(exist_ok=True)
self.state_path = self.chunk_dir / "_state.pkl"
self.buffer = []
self.total_saved = 0
self.chunk_idx = 0
self.window_idx = 0
self.page = 1
def load(self):
if not self.state_path.exists():
return False
with open(self.state_path, "rb") as f:
state = pickle.load(f)
self.chunk_idx = state["chunk_idx"]
self.total_saved = state["total_saved"]
self.buffer = state["buffer"]
self.window_idx = state["window_idx"]
self.page = state["page"]
total = self.total_saved + len(self.buffer)
print(f"Состояние загружено: {total:,} записей ({self.chunk_idx} чанков + {len(self.buffer)} в буфере)")
return True
def add(self, sessions):
self.buffer.extend(sessions)
while len(self.buffer) >= self.chunk_size:
chunk = self.buffer[:self.chunk_size]
self.buffer = self.buffer[self.chunk_size:]
self._save_chunk(chunk)
def _save_chunk(self, chunk):
chunk_path = self.chunk_dir / f"chunk_{self.chunk_idx:04d}.pkl"
tmp_path = self.chunk_dir / f"chunk_{self.chunk_idx:04d}.pkl.tmp"
with open(tmp_path, "wb") as f:
pickle.dump(chunk, f)
os.replace(str(tmp_path), str(chunk_path))
self.total_saved += len(chunk)
self.chunk_idx += 1
print(f" Чанк {self.chunk_idx} сохранён ({len(chunk):,} записей, всего: {self.total_count():,})")
def save_state(self, window_idx, page):
self.window_idx = window_idx
self.page = page
tmp = str(self.state_path) + ".tmp"
with open(tmp, "wb") as f:
pickle.dump({
"chunk_idx": self.chunk_idx,
"total_saved": self.total_saved,
"buffer": self.buffer,
"window_idx": window_idx,
"page": page,
}, f)
os.replace(tmp, str(self.state_path))
def flush(self):
if self.buffer:
chunk = self.buffer
self.buffer = []
self._save_chunk(chunk)
def total_count(self):
return self.total_saved + len(self.buffer)
def fetch_page(http_session, url, token, admin_user_id, range_start, range_end,
page, page_size, max_retries, schema_id=None, status=None, client_key=None):
headers = {
"accept": "application/json",
"content-type": "application/json",
"token": token,
}
if admin_user_id:
headers["admin-user-id"] = admin_user_id
filters = {
"range": {
"start": range_start,
"end": range_end,
"sortOrder": "DESC",
},
"groupByClientId": "all",
}
if schema_id:
filters["schemaId"] = schema_id
if status:
filters["status"] = status
if client_key:
filters["clientKey"] = client_key
payload = {
"page": page,
"pageSize": page_size,
"filters": filters,
}
for attempt in range(1, max_retries + 1):
try:
resp = http_session.post(url, json=payload, headers=headers, timeout=60)
if resp.status_code == 200:
body = resp.json()
if body.get("status") != "ok":
print(f" API error: {body}")
return None
return body["results"]
print(f" HTTP {resp.status_code}: {resp.text[:200]}")
except requests.exceptions.RequestException as e:
print(f" Ошибка (попытка {attempt}/{max_retries}): {e}")
delay = 2 ** (attempt - 1)
print(f" Повтор через {delay}с...")
time.sleep(delay)
raise Exception(f"Не удалось получить данные после {max_retries} попыток")
def main():
args = parse_args()
# Авторизация
token = login(args.api_base, args.email, args.password)
filter_url = f"{args.api_base}/v1/kyc/sessions/filter"
start_date = datetime.strptime(args.start_date, "%Y-%m-%d").date()
end_date = (
datetime.strptime(args.end_date, "%Y-%m-%d").date()
if args.end_date
else datetime.now(timezone.utc).date()
)
windows = generate_windows(start_date, end_date, args.window_days)
print(f"Период: {start_date} — {end_date} ({len(windows)} окон по {args.window_days} дн.)")
storage = ChunkedStorage(args.state_file, args.chunk_size)
if storage.load():
window_idx = storage.window_idx
page = storage.page
else:
window_idx = 0
page = 1
shutdown_requested = False
def signal_handler(signum, frame):
nonlocal shutdown_requested
if shutdown_requested:
print("nПовторный сигнал — выход.")
sys.exit(1)
shutdown_requested = True
print("nЗавершаем после текущего запроса...")
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
http_session = requests.Session()
start_time = time.time()
records_at_start = storage.total_count()
try:
while window_idx < len(windows) and not shutdown_requested:
w_start, w_end = windows[window_idx]
range_start = format_start(w_start)
range_end = format_end(w_end)
if page == 1:
print(f"n[Окно {window_idx + 1}/{len(windows)}] {w_start} — {w_end}")
while not shutdown_requested:
result = fetch_page(
http_session, filter_url, token, args.admin_user_id,
range_start, range_end, page, args.page_size, args.max_retries,
schema_id=args.schema_id,
status=args.status,
client_key=args.client_key,
)
if result is None:
break
sessions = result.get("sessions", [])
page_info = result.get("pageInfo", {})
total_items = page_info.get("totalItems", "?")
total_pages = page_info.get("total", "?")
if not sessions:
print(f" Страница {page} пуста — окно завершено.")
break
storage.add(sessions)
elapsed = time.time() - start_time
new_records = storage.total_count() - records_at_start
speed = new_records / elapsed if elapsed > 0 else 0
print(
f" стр. {page}/{total_pages} | +{len(sessions)} | "
f"окно: {total_items} сессий | "
f"всего: {storage.total_count():,} | {speed:.0f} зап/с"
)
if page >= total_pages:
break
page += 1
if page % 10 == 0:
storage.save_state(window_idx, page)
window_idx += 1
page = 1
storage.save_state(window_idx, page)
except Exception as e:
print(f"nОШИБКА: {e}")
finally:
storage.flush()
storage.save_state(window_idx, page)
elapsed = time.time() - start_time
new_records = storage.total_count() - records_at_start
print(
f"nИтого: {storage.total_count():,} записей "
f"(+{new_records:,} за сессию, {elapsed:.0f}с)"
)
print(f"Чанки: {storage.chunk_dir}/")
http_session.close()
if __name__ == "__main__":
main()