#!/usr/bin/env python3
"""
Downloads images for a specific KYC session by sessionId.
Logs in with email/password, retrieves session data via REST API, and downloads images.
Arguments:
--email (required) Email for login
--password (required) Password
--session-id (required) Session ID
--type Image type: selfie, selfie_document, document, document_full,
liveness, face_selfie, face_doc, all (default: all)
--output-dir Folder for saving files (default: session_images)
--admin-user-id Administrator ID (if needed)
--api-base Base API URL (default: https://api.neuro-vision.ru)
Examples:
python3 download_session_images.py --email admin@example.com --password secret \
--session-id "bdd7ac90-1912-11f1-ab98-7b3b0ba9d41a" --type all
python3 download_session_images.py --email admin@example.com --password secret \
--session-id "bdd7ac90-1912-11f1-ab98-7b3b0ba9d41a" --type selfie --output-dir my_images
"""
import argparse
import os
import sys
import requests
DEFAULT_API_BASE = "https://api.neuro-vision.ru"
DEFAULT_OUTPUT_DIR = "session_images"
def login(api_base, email, password):
url = f"{api_base}/v1/user/login"
print(f"Authorization: {email}")
resp = requests.post(url, json={"email": email, "password": password, "valid_days": 5}, timeout=30)
if resp.status_code != 200:
sys.exit(f"Login error: HTTP {resp.status_code}\n{resp.text[:500]}")
data = resp.json()
token = data.get("token") or data.get("jwt") or data.get("accessToken")
if not token:
sys.exit(f"JWT not found. Keys: {list(data.keys())}")
return token
def parse_args():
p = argparse.ArgumentParser(description="Download images for a specific KYC session")
p.add_argument("--email", required=True)
p.add_argument("--password", required=True)
p.add_argument("--session-id", required=True, help="Session ID")
p.add_argument("--type", default="all",
choices=["selfie", "selfie_document", "document", "document_full",
"liveness", "face_selfie", "face_doc", "all"])
p.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR)
p.add_argument("--admin-user-id", default="")
p.add_argument("--api-base", default=DEFAULT_API_BASE)
return p.parse_args()
def fetch_session(api_base, token, admin_user_id, session_id):
"""Searches for a session by sessionId via REST API."""
headers = {"token": token, "content-type": "application/json"}
if admin_user_id:
headers["admin-user-id"] = admin_user_id
resp = requests.post(f"{api_base}/v1/kyc/sessions/filter", json={
"page": 1,
"pageSize": 100,
"filters": {
"session": session_id,
},
}, headers=headers, timeout=30)
if resp.status_code != 200:
sys.exit(f"API error: {resp.status_code}\n{resp.text[:500]}")
data = resp.json()
if data.get("status") != "ok":
sys.exit(f"API error: {data}")
sessions = data["results"]["sessions"]
for s in sessions:
if s.get("sessionId") == session_id:
return s
sys.exit(f"Session {session_id} not found")
def extract_urls(session, img_type):
"""Extracts URLs of the required type. Returns [(url, filename), ...]"""
urls = []
for r in session.get("results") or []:
rtype = r.get("type")
if img_type in ("selfie", "all") and rtype == "faces":
for check in r.get("checks") or []:
for item in check.get("items") or []:
for url in item.get("images") or []:
if url.endswith("/selfie"):
urls.append((url, "selfie.jpg"))
if img_type in ("selfie_document", "all") and rtype == "faces":
for check in r.get("checks") or []:
for item in check.get("items") or []:
for url in item.get("images") or []:
if url.endswith("/selfie_document"):
urls.append((url, "selfie_document.jpg"))
if img_type in ("document", "all") and rtype == "document":
for i, url in enumerate(r.get("images") or []):
urls.append((url, f"document_{i}.jpg"))
if img_type in ("document_full", "all") and rtype == "document":
for i, url in enumerate(r.get("imagesFull") or []):
urls.append((url, f"document_full_{i}.jpg"))
if img_type in ("liveness", "all") and rtype == "liveness":
for i, url in enumerate(r.get("faces") or []):
urls.append((url, f"liveness_{i}.jpg"))
if img_type in ("face_selfie", "all") and rtype == "selfie":
url = r.get("faceSelfie")
if url:
urls.append((url, "face_selfie.jpg"))
if img_type in ("face_doc", "all") and rtype == "selfie":
url = r.get("faceDoc")
if url:
urls.append((url, "face_doc.jpg"))
return urls
def main():
args = parse_args()
token = login(args.api_base, args.email, args.password)
print(f"Searching session: {args.session_id}")
session = fetch_session(args.api_base, token, args.admin_user_id, args.session_id)
print(f"Found: status={session.get('status')}, created={session.get('createdAt', '?')[:10]}")
urls = extract_urls(session, args.type)
print(f"Images to download: {len(urls)} (type={args.type})")
if not urls:
print("Nothing to download.")
return
out_dir = os.path.join(args.output_dir, args.session_id)
os.makedirs(out_dir, exist_ok=True)
headers = {"token": token}
if args.admin_user_id:
headers["admin-user-id"] = args.admin_user_id
http = requests.Session()
for url, filename in urls:
filepath = os.path.join(out_dir, filename)
resp = http.get(url, headers=headers, timeout=30)
if resp.status_code == 200:
with open(filepath, "wb") as f:
f.write(resp.content)
print(f" {filename}: {len(resp.content):,} bytes")
else:
print(f" {filename}: ERROR {resp.status_code}")
http.close()
print(f"\nSaved to: {out_dir}/")
if __name__ == "__main__":
main()