NSLSolver
Guides

Python

Synchronous and async clients with proper timeouts, CPM-bounded concurrency, and retry logic. Works with requests and aiohttp.

Setup

pip install requests        # sync
pip install aiohttp         # async (optional)
config.py
import os

API_KEY = os.environ["NSL_API_KEY"]
BASE_URL = "https://api.nslsolver.com"
SOLVE_TIMEOUT = 120  # 180 for Kasada

Synchronous client

solver.py
import requests
from config import API_KEY, BASE_URL, SOLVE_TIMEOUT


class SolveError(Exception):
    def __init__(self, status: int, message: str):
        super().__init__(f"{status}: {message}")
        self.status = status
        self.message = message


def solve(payload: dict) -> dict:
    r = requests.post(
        f"{BASE_URL}/solve",
        headers={"X-API-Key": API_KEY},
        json=payload,
        timeout=SOLVE_TIMEOUT,
    )
    data = r.json()
    if not data.get("success"):
        raise SolveError(r.status_code, data.get("error", "unknown"))
    return data


# Turnstile
result = solve({
    "type": "turnstile",
    "site_key": "0x4AAAAAAAB...",
    "url": "https://example.com",
})
print(result["token"], result["cost"])

Async with aiohttp

async_solver.py
import asyncio
import aiohttp
from config import API_KEY, BASE_URL, SOLVE_TIMEOUT


async def solve(session: aiohttp.ClientSession, payload: dict) -> dict:
    async with session.post(
        f"{BASE_URL}/solve",
        headers={"X-API-Key": API_KEY},
        json=payload,
        timeout=aiohttp.ClientTimeout(total=SOLVE_TIMEOUT),
    ) as r:
        data = await r.json()
        if not data.get("success"):
            raise RuntimeError(f"{r.status}: {data.get('error')}")
        return data


async def main():
    async with aiohttp.ClientSession() as session:
        payload = {
            "type": "turnstile",
            "site_key": "0x4AAAAAAAB...",
            "url": "https://example.com",
        }
        tasks = [solve(session, payload) for _ in range(20)]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, r in enumerate(results):
            print(i, r if isinstance(r, Exception) else r["token"][:32])


asyncio.run(main())

Match your concurrency to your CPM

Don't run 1,000 parallel coroutines if your key is max_cpm = 600. Use a Semaphore to bound concurrency to roughly max_cpm * avg_solve_seconds / 60 — see Rate limits.

Bounded concurrency

bounded.py
import asyncio, aiohttp
from config import API_KEY, BASE_URL, SOLVE_TIMEOUT


async def fetch_limits(session) -> dict:
    async with session.get(
        f"{BASE_URL}/balance",
        headers={"X-API-Key": API_KEY},
    ) as r:
        return await r.json()


async def main():
    async with aiohttp.ClientSession() as session:
        limits = await fetch_limits(session)
        cpm = limits.get("cpm_limit", 600) or 600
        avg_solve = 1.5  # tune to your mix
        max_parallel = max(1, int(cpm * avg_solve / 60))
        sem = asyncio.Semaphore(max_parallel)

        async def one(payload):
            async with sem:
                async with session.post(
                    f"{BASE_URL}/solve",
                    headers={"X-API-Key": API_KEY},
                    json=payload,
                    timeout=aiohttp.ClientTimeout(total=SOLVE_TIMEOUT),
                ) as r:
                    return await r.json()

        results = await asyncio.gather(*[one(p) for p in jobs])

Retry with exponential backoff

Only retry on 429 (CPM) and 503 (transient). Everything else (4xx aside from 429) is a coding error you should fix instead of retry.

retry.py
import time, requests
from config import API_KEY, BASE_URL, SOLVE_TIMEOUT

RETRY = {429, 503}
FATAL = {400, 401, 402, 403}


def solve_with_retry(payload: dict, attempts: int = 4) -> dict:
    last = None
    for i in range(attempts):
        try:
            r = requests.post(
                f"{BASE_URL}/solve",
                headers={"X-API-Key": API_KEY},
                json=payload,
                timeout=SOLVE_TIMEOUT,
            )
        except requests.RequestException as e:
            last = e
            time.sleep(min(2 ** i, 8))
            continue

        if r.status_code == 200:
            return r.json()

        body = r.json()
        if r.status_code in FATAL:
            raise RuntimeError(f"non-retryable {r.status_code}: {body['error']}")
        if r.status_code in RETRY:
            last = body["error"]
            time.sleep(min(2 ** i, 8))
            continue
        raise RuntimeError(f"unexpected {r.status_code}: {body['error']}")

    raise RuntimeError(f"attempts exhausted; last error: {last}")

On this page