#!/usr/bin/env python3
"""
Скачивает картинки конкретной KYC-сессии по sessionId.
Авторизуется по логину/паролю, получает данные сессии через REST API, скачивает картинки.
Аргументы:
--email (обязательный) Email для логина
--password (обязательный) Пароль
--session-id (обязательный) ID сессии
--type Тип картинок: selfie, selfie_document, document, document_full,
liveness, face_selfie, face_doc, all (по умолч. all)
--output-dir Папка для сохранения (по умолч. session_images)
--admin-user-id ID администратора (если нужен)
--api-base Базовый URL API (по умолч. https://api.neuro-vision.ru)
Примеры:
python3 download_session_images.py --email admin@example.com --password secret \
--session-id "bdd7ac90-1912-11f1-ab98-7b3b0ba9d41a" --type all
python3 download_session_images.py --email admin@example.com --password secret \
--session-id "bdd7ac90-1912-11f1-ab98-7b3b0ba9d41a" --type selfie --output-dir my_images
"""
import argparse
import os
import sys
import requests
DEFAULT_API_BASE = "https://api.neuro-vision.ru"
DEFAULT_OUTPUT_DIR = "session_images"
def login(api_base, email, password):
url = f"{api_base}/v1/user/login"
print(f"Авторизация: {email}")
resp = requests.post(url, json={"email": email, "password": password, "valid_days": 5}, timeout=30)
if resp.status_code != 200:
sys.exit(f"Ошибка логина: HTTP {resp.status_code}n{resp.text[:500]}")
data = resp.json()
token = data.get("token") or data.get("jwt") or data.get("accessToken")
if not token:
sys.exit(f"JWT не найден. Ключи: {list(data.keys())}")
return token
def parse_args():
p = argparse.ArgumentParser(description="Скачивание картинок конкретной KYC-сессии")
p.add_argument("--email", required=True)
p.add_argument("--password", required=True)
p.add_argument("--session-id", required=True, help="ID сессии")
p.add_argument("--type", default="all",
choices=["selfie", "selfie_document", "document", "document_full",
"liveness", "face_selfie", "face_doc", "all"])
p.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR)
p.add_argument("--admin-user-id", default="")
p.add_argument("--api-base", default=DEFAULT_API_BASE)
return p.parse_args()
def fetch_session(api_base, token, admin_user_id, session_id):
"""Ищет сессию по sessionId через REST API."""
headers = {"token": token, "content-type": "application/json"}
if admin_user_id:
headers["admin-user-id"] = admin_user_id
resp = requests.post(f"{api_base}/v1/kyc/sessions/filter", json={
"page": 1,
"pageSize": 100,
"filters": {
"session": session_id,
},
}, headers=headers, timeout=30)
if resp.status_code != 200:
sys.exit(f"API error: {resp.status_code}n{resp.text[:500]}")
data = resp.json()
if data.get("status") != "ok":
sys.exit(f"API error: {data}")
sessions = data["results"]["sessions"]
for s in sessions:
if s.get("sessionId") == session_id:
return s
sys.exit(f"Сессия {session_id} не найдена")
def extract_urls(session, img_type):
"""Извлекает URL-ы нужного типа. Возвращает [(url, filename), ...]"""
urls = []
for r in session.get("results") or []:
rtype = r.get("type")
if img_type in ("selfie", "all") and rtype == "faces":
for check in r.get("checks") or []:
for item in check.get("items") or []:
for url in item.get("images") or []:
if url.endswith("/selfie"):
urls.append((url, "selfie.jpg"))
if img_type in ("selfie_document", "all") and rtype == "faces":
for check in r.get("checks") or []:
for item in check.get("items") or []:
for url in item.get("images") or []:
if url.endswith("/selfie_document"):
urls.append((url, "selfie_document.jpg"))
if img_type in ("document", "all") and rtype == "document":
for i, url in enumerate(r.get("images") or []):
urls.append((url, f"document_{i}.jpg"))
if img_type in ("document_full", "all") and rtype == "document":
for i, url in enumerate(r.get("imagesFull") or []):
urls.append((url, f"document_full_{i}.jpg"))
if img_type in ("liveness", "all") and rtype == "liveness":
for i, url in enumerate(r.get("faces") or []):
urls.append((url, f"liveness_{i}.jpg"))
if img_type in ("face_selfie", "all") and rtype == "selfie":
url = r.get("faceSelfie")
if url:
urls.append((url, "face_selfie.jpg"))
if img_type in ("face_doc", "all") and rtype == "selfie":
url = r.get("faceDoc")
if url:
urls.append((url, "face_doc.jpg"))
return urls
def main():
args = parse_args()
token = login(args.api_base, args.email, args.password)
print(f"Поиск сессии: {args.session_id}")
session = fetch_session(args.api_base, token, args.admin_user_id, args.session_id)
print(f"Найдена: status={session.get('status')}, created={session.get('createdAt', '?')[:10]}")
urls = extract_urls(session, args.type)
print(f"Картинок для скачивания: {len(urls)} (type={args.type})")
if not urls:
print("Нечего скачивать.")
return
out_dir = os.path.join(args.output_dir, args.session_id)
os.makedirs(out_dir, exist_ok=True)
headers = {"token": token}
if args.admin_user_id:
headers["admin-user-id"] = args.admin_user_id
http = requests.Session()
for url, filename in urls:
filepath = os.path.join(out_dir, filename)
resp = http.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
with open(filepath, "wb") as f:
f.write(resp.content)
print(f" {filename}: {len(resp.content):,} bytes")
else:
print(f" {filename}: ОШИБКА {resp.status_code}")
http.close()
print(f"nСохранено в: {out_dir}/")
if __name__ == "__main__":
main()