Guides
Python
Synchronous and async clients with proper timeouts, CPM-bounded concurrency, and retry logic. Works with requests and aiohttp.
Setup
pip install requests # sync
pip install aiohttp # async (optional)import os
API_KEY = os.environ["NSL_API_KEY"]
BASE_URL = "https://api.nslsolver.com"
SOLVE_TIMEOUT = 120 # 180 for KasadaSynchronous client
import requests
from config import API_KEY, BASE_URL, SOLVE_TIMEOUT
class SolveError(Exception):
def __init__(self, status: int, message: str):
super().__init__(f"{status}: {message}")
self.status = status
self.message = message
def solve(payload: dict) -> dict:
r = requests.post(
f"{BASE_URL}/solve",
headers={"X-API-Key": API_KEY},
json=payload,
timeout=SOLVE_TIMEOUT,
)
data = r.json()
if not data.get("success"):
raise SolveError(r.status_code, data.get("error", "unknown"))
return data
# Turnstile
result = solve({
"type": "turnstile",
"site_key": "0x4AAAAAAAB...",
"url": "https://example.com",
})
print(result["token"], result["cost"])Async with aiohttp
import asyncio
import aiohttp
from config import API_KEY, BASE_URL, SOLVE_TIMEOUT
async def solve(session: aiohttp.ClientSession, payload: dict) -> dict:
async with session.post(
f"{BASE_URL}/solve",
headers={"X-API-Key": API_KEY},
json=payload,
timeout=aiohttp.ClientTimeout(total=SOLVE_TIMEOUT),
) as r:
data = await r.json()
if not data.get("success"):
raise RuntimeError(f"{r.status}: {data.get('error')}")
return data
async def main():
async with aiohttp.ClientSession() as session:
payload = {
"type": "turnstile",
"site_key": "0x4AAAAAAAB...",
"url": "https://example.com",
}
tasks = [solve(session, payload) for _ in range(20)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, r in enumerate(results):
print(i, r if isinstance(r, Exception) else r["token"][:32])
asyncio.run(main())Match your concurrency to your CPM
Don't run 1,000 parallel coroutines if your key is max_cpm = 600. Use a Semaphore to bound concurrency to roughly max_cpm * avg_solve_seconds / 60 — see Rate limits.
Bounded concurrency
import asyncio, aiohttp
from config import API_KEY, BASE_URL, SOLVE_TIMEOUT
async def fetch_limits(session) -> dict:
async with session.get(
f"{BASE_URL}/balance",
headers={"X-API-Key": API_KEY},
) as r:
return await r.json()
async def main():
async with aiohttp.ClientSession() as session:
limits = await fetch_limits(session)
cpm = limits.get("cpm_limit", 600) or 600
avg_solve = 1.5 # tune to your mix
max_parallel = max(1, int(cpm * avg_solve / 60))
sem = asyncio.Semaphore(max_parallel)
async def one(payload):
async with sem:
async with session.post(
f"{BASE_URL}/solve",
headers={"X-API-Key": API_KEY},
json=payload,
timeout=aiohttp.ClientTimeout(total=SOLVE_TIMEOUT),
) as r:
return await r.json()
results = await asyncio.gather(*[one(p) for p in jobs])Retry with exponential backoff
Only retry on 429 (CPM) and 503 (transient). Everything else (4xx aside from 429) is a coding error you should fix instead of retry.
import time, requests
from config import API_KEY, BASE_URL, SOLVE_TIMEOUT
RETRY = {429, 503}
FATAL = {400, 401, 402, 403}
def solve_with_retry(payload: dict, attempts: int = 4) -> dict:
last = None
for i in range(attempts):
try:
r = requests.post(
f"{BASE_URL}/solve",
headers={"X-API-Key": API_KEY},
json=payload,
timeout=SOLVE_TIMEOUT,
)
except requests.RequestException as e:
last = e
time.sleep(min(2 ** i, 8))
continue
if r.status_code == 200:
return r.json()
body = r.json()
if r.status_code in FATAL:
raise RuntimeError(f"non-retryable {r.status_code}: {body['error']}")
if r.status_code in RETRY:
last = body["error"]
time.sleep(min(2 ** i, 8))
continue
raise RuntimeError(f"unexpected {r.status_code}: {body['error']}")
raise RuntimeError(f"attempts exhausted; last error: {last}")