Skip to main content
Agents support multiple execution patterns - from simple blocking calls to advanced asynchronous monitoring. Choose the right pattern for your use case.

Execution Modes

Run (Blocking)

The simplest way to execute an agent - start and wait for completion:
run_blocking.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent = client.Agent(session=session, max_steps=10)

    result = agent.run(task="Go to example.com and find the contact email")

    if result.success:
        print(result.answer)
    else:
        print(f"Failed: {result.answer}")
When to use:
  • Simple scripts
  • Synchronous workflows
  • You don’t need to do other work while the agent runs

Start + Wait (Non-blocking)

Start the agent and wait for completion separately:
start_wait.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent = client.Agent(session=session)

    # Start agent (returns immediately)
    agent.start(task="Complete this task")

    # Do other work here...

    # Wait for completion
    result = agent.wait()
When to use:
  • You need to start multiple agents in parallel
  • You want to do other work while the agent runs
  • You need more control over execution

Async Execution

Run agents asynchronously with async/await:
async_agent.py
import asyncio

from notte_sdk import NotteClient

client = NotteClient()


async def run_agent_task():
    with client.Session() as session:
        agent = client.Agent(session=session)
        result = await agent.arun(task="Extract data from the page")
        return result


# Run async
result = asyncio.run(run_agent_task())
print(result.answer)
When to use:
  • Building async applications
  • Running multiple agents concurrently
  • Integrating with async frameworks (FastAPI, aiohttp)

Agent States

Agents transition through these states during execution:

Running

Agent is actively executing:
state_running.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent.start(task="Complete task")
    # Agent state: Running

Completed

Agent successfully finished the task:
state_completed.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent = client.Agent(session=session)

    result = agent.wait()
    if result.success:
        # Agent state: Completed
        print(result.answer)

Failed

Agent encountered an error:
state_failed.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent = client.Agent(session=session)

    result = agent.wait()
    if not result.success:
        # Agent state: Failed
        print(f"Error: {result.answer}")

Stopped (Max Steps)

Agent reached maximum step limit:
state_max_steps.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent = client.Agent(session=session, max_steps=5)
    result = agent.run(task="Very complex task")

    if len(result.steps) >= 5:
        # Agent hit max_steps limit
        print("Agent stopped: maximum steps reached")

Monitoring Progress

Status Checking

Check agent progress at any time:
status_checking.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent.start(task="Long running task")

    # Check status
    status = agent.status()

    print(f"Agent ID: {status.agent_id}")
    print(f"Current state: {status.status}")
    print(f"Steps completed: {len(status.steps)}")
    print(f"Success: {status.success}")

Live Log Streaming

Stream agent logs in real-time via WebSocket:
live_log_streaming.py
from notte_sdk import NotteClient

client = NotteClient()


async def monitor_agent():
    with client.Session() as session:
        agent.start(task="Complete task")

        # Stream logs as they happen
        await agent.watch_logs(log=True)

        # Get final status
        status = agent.status()
        return status


asyncio.run(monitor_agent())

Polling Pattern

Check status periodically:
polling_pattern.py
import time

from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent = client.Agent(session=session)
    agent.start(task="Long task")

    while True:
        status = agent.status()

        if status.status == "closed":
            break

        print(f"Progress: {len(status.steps)} steps completed")
        time.sleep(5)  # Check every 5 seconds

    print(f"Final result: {status.answer}")

Parallel Execution

Multiple Independent Agents

Run multiple agents simultaneously:
parallel_agents.py
import asyncio

from notte_sdk import NotteClient

client = NotteClient()


async def run_multiple_agents():
    tasks = []

    for task_description in ["Task 1", "Task 2", "Task 3"]:
        with client.Session() as session:
            agent = client.Agent(session=session)
            tasks.append(agent.arun(task=task_description))

    # Run all agents in parallel
    results = await asyncio.gather(*tasks)
    return results


results = asyncio.run(run_multiple_agents())
for i, result in enumerate(results):
    print(f"Agent {i + 1}: {result.answer}")

Batch Execution

Use BatchRemoteAgent for parallel execution with strategies:
batch_execution.py
import asyncio

from notte_sdk import NotteClient
from notte_sdk.agents import BatchRemoteAgent

client = NotteClient()


async def main():
    with client.Session() as session:
        batch_agent = BatchRemoteAgent(session=session, max_steps=10, _client=client)

        # Run 3 agents in parallel, return first success
        result = await batch_agent.run(task="Complete task", n_jobs=3, strategy="first_success")


asyncio.run(main())
See Batch Agents for details.

Stopping Agents

Manual Stop

Stop a running agent:
manual_stop.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent.start(task="Long task")

    # Do something...

    # Stop the agent
    agent.stop()
Note: You cannot stop agents once they complete a step - they must finish the current action.

Timeout Pattern

Implement custom timeouts:
timeout_pattern.py
import asyncio

from notte_sdk import NotteClient


async def run_with_timeout(agent, timeout_seconds=60):
    try:
        result = await asyncio.wait_for(agent.arun(task="Complete task"), timeout=timeout_seconds)
        return result
    except asyncio.TimeoutError:
        agent.stop()
        raise TimeoutError(f"Agent exceeded {timeout_seconds}s timeout")


client = NotteClient()

with client.Session() as session:
    agent = client.Agent(session=session)
    # Run with 60 second timeout
    result = asyncio.run(run_with_timeout(agent, timeout_seconds=60))

Response Structure

Agent responses contain execution details:
response_structure.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent = client.Agent(session=session)
    result = agent.run(task="Extract data")

    # Access result properties
    print(result.success)  # bool: Did agent succeed?
    print(result.answer)  # str: Agent's response
    print(result.steps)  # list: All steps taken
    print(result.agent_id)  # str: Unique agent ID
    print(result.session_id)  # str: Session used

Step Details

Inspect individual steps:
step_details.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    result = agent.run(task="Navigate and extract")

    for i, step in enumerate(result.steps):
        print(f"Step {i + 1}:")
        print(f"  Action: {step.action}")
        print(f"  Success: {step.success}")
        print(f"  Message: {step.message}")

Error Handling

Graceful Degradation

Handle failures gracefully:
graceful_degradation.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    result = agent.run(task="Complete task")

    if result.success:
        # Process successful result
        process_data(result.answer)
    else:
        # Handle failure
        logger.error(f"Agent failed: {result.answer}")

        # Fallback strategy
        fallback_approach()

Retry Pattern

Retry failed agents:
agent_retry.py
import time

from notte_sdk import NotteClient

client = NotteClient()

MAX_RETRIES = 3

for attempt in range(MAX_RETRIES):
    with client.Session() as session:
        agent = client.Agent(session=session)
        result = agent.run(task="Complete task")

        if result.success:
            print(f"Success: {result.answer}")
            break

        print(f"Attempt {attempt + 1} failed, retrying...")
        time.sleep(2**attempt)  # Exponential backoff
else:
    raise RuntimeError("Agent failed after all retries")

Agent Fallback

Use AgentFallback for automatic error recovery:
agent_fallback_example.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    with client.AgentFallback(session, task="Add item to cart") as agent_fb:
        # Try deterministic actions first
        session.execute(type="click", selector="#add-to-cart")
        session.execute(type="click", selector="#checkout")
        # Agent automatically handles any failures

if agent_fb.success:
    print("Task completed (possibly with agent help)")
See Agent Fallback for details.

Best Practices

1. Use Appropriate Execution Mode

bp_execution_mode.py
import asyncio

from notte_sdk import NotteClient

client = NotteClient()


async def main():
    with client.Session() as session:
        agent = client.Agent(session=session)

        # Simple tasks: Use run()
        result = agent.run(task="Quick task")

        # Multiple agents: Use async
        agent1 = client.Agent(session=session)
        agent2 = client.Agent(session=session)
        results = await asyncio.gather(agent1.arun(task="Task 1"), agent2.arun(task="Task 2"))

        # Long tasks with monitoring: Use start() + polling
        agent.start(task="Long task")
        done = False
        while not done:
            status = agent.status()
            # Update UI, log progress, etc.
            done = True  # placeholder


asyncio.run(main())

2. Always Check Success

bp_check_success.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    agent = client.Agent(session=session)

    result = agent.run(task="Critical task")

    if not result.success:
        # Don't proceed if agent failed
        raise RuntimeError(f"Critical task failed: {result.answer}")

    # Safe to proceed
    process_result(result)

3. Set Appropriate Step Limits

bp_step_limits.py
from notte_sdk import NotteClient

client = NotteClient()

with client.Session() as session:
    # Match max_steps to expected complexity
    agent = client.Agent(
        session=session,
        max_steps=10,  # For tasks requiring 5-8 steps
    )

4. Clean Up Resources

bp_cleanup.py
from notte_sdk import NotteClient

client = NotteClient()

# Use context managers for automatic cleanup
with client.Session() as session:
    agent = client.Agent(session=session)
    result = agent.run(task="Task")
# Session automatically closed

Next Steps