Use this file to discover all available pages before exploring further.
Agents support multiple execution patterns - from simple blocking calls to advanced asynchronous monitoring. Choose the right pattern for your use case.
The simplest way to execute an agent - start and wait for completion:
run_blocking.py
from notte_sdk import NotteClientclient = NotteClient()with client.Session() as session: agent = client.Agent(session=session, max_steps=10) result = agent.run(task="Go to example.com and find the contact email") if result.success: print(result.answer) else: print(f"Failed: {result.answer}")
When to use:
Simple scripts
Synchronous workflows
You don’t need to do other work while the agent runs
Start the agent and wait for completion separately:
start_wait.py
from notte_sdk import NotteClientclient = NotteClient()with client.Session() as session: agent = client.Agent(session=session) # Start agent (returns immediately) agent.start(task="Complete this task") # Do other work here... # Wait for completion result = agent.wait()
import asynciofrom notte_sdk import NotteClientclient = NotteClient()async def run_agent_task(): with client.Session() as session: agent = client.Agent(session=session) agent.start(task="Extract data from the page") result = await agent.async_watch_logs_and_wait() return result# Run asyncresult = asyncio.run(run_agent_task())print(result.answer)
When to use:
Building async applications
Running multiple agents concurrently
Integrating with async frameworks (FastAPI, aiohttp)
from notte_sdk import NotteClientclient = NotteClient()with client.Session() as session: agent = client.Agent(session=session) result = agent.wait() if not result.success: # Agent state: Failed print(f"Error: {result.answer}")
def monitor_agent(): with client.Session() as session: agent = client.Agent(session=session) agent.start(task="Complete task") # Stream logs and get final status status = agent.watch_logs(log=True) if status is None: status = agent.status() return status
import asynciofrom notte_sdk import NotteClientclient = NotteClient()async def run_multiple_agents(): async def run_one(task_description: str): with client.Session() as session: agent = client.Agent(session=session) agent.start(task=task_description) return await agent.async_watch_logs_and_wait() # Run all agents in parallel (each with its own session) results = await asyncio.gather(*[run_one(t) for t in ["Task 1", "Task 2", "Task 3"]]) return resultsresults = asyncio.run(run_multiple_agents())for i, result in enumerate(results): print(f"Agent {i + 1}: {result.answer}")
import asynciofrom notte_sdk import NotteClientasync def run_with_timeout(agent, timeout_seconds=60): try: async with asyncio.timeout(timeout_seconds): agent.start(task="Complete task") result = await agent.async_watch_logs_and_wait() return result except TimeoutError as e: # agent.stop() is already called internally by async_watch_logs_and_wait on cancellation raise TimeoutError(f"Agent exceeded {timeout_seconds}s timeout") from eclient = NotteClient()with client.Session() as session: agent = client.Agent(session=session) # Run with 60 second timeout result = asyncio.run(run_with_timeout(agent, timeout_seconds=60))
with client.Session() as session: agent = client.Agent(session=session) result = agent.run(task="Complete task") if result.success: # Process successful result print(result.answer) else: # Handle failure logger.error(f"Agent failed: {result.answer}") # Fallback strategy
from notte_sdk import NotteClientclient = NotteClient()with client.Session() as session: with client.AgentFallback(session, task="Add item to cart") as agent_fb: # Try deterministic actions first session.execute(type="click", selector="#add-to-cart") session.execute(type="click", selector="#checkout") # Agent automatically handles any failuresif agent_fb.success: print("Task completed (possibly with agent help)")
import asynciofrom notte_sdk import NotteClientclient = NotteClient()async def main(): with client.Session() as session: agent = client.Agent(session=session) # Simple tasks: Use run() result = agent.run(task="Quick task") # Multiple agents: Use async (each with its own session) async def run_one(task: str): with client.Session() as s: a = client.Agent(session=s) a.start(task=task) return await a.async_watch_logs_and_wait() results = await asyncio.gather(run_one("Task 1"), run_one("Task 2")) # Long tasks with monitoring: Use start() + polling agent.start(task="Long task") done = False while not done: status = agent.status() # Update UI, log progress, etc. done = True # placeholderasyncio.run(main())
from notte_sdk import NotteClientclient = NotteClient()# Use context managers for automatic cleanupwith client.Session() as session: agent = client.Agent(session=session) result = agent.run(task="Task")# Session automatically closed