from datetime import datetimefrom notte_sdk import NotteClientdef run(source_url: str): """Collect data every hour.""" client = NotteClient() with client.Session() as session: session.execute(type="goto", url=source_url) data = session.scrape() return {"data": data, "timestamp": datetime.now().isoformat()}
def run(): try: # Your automation result = perform_automation() return result except Exception as e: # Send to Slack/Discord/Email requests.post( os.getenv("WEBHOOK_URL", ""), json={"text": f"Scheduled function failed: {str(e)}", "function_id": "function_abc123"}, ) raise # Re-raise to mark run as failed
from notte_sdk import NotteClientdef run(): client = NotteClient() # Set timeout appropriate for schedule with client.Session(idle_timeout_minutes=10) as session: # Your automation pass
from notte_sdk import NotteClientdef run(): """Check if all schedules are healthy.""" client = NotteClient() # Get all workflow runs from last 24 hours workflows = client.functions.list() health_report = [] for workflow in workflows.items: runs = client.functions.list_runs(workflow.workflow_id) # Check recent runs recent_failures = [r for r in runs.items if r.status == "failed"] health_report.append({"workflow_id": workflow.workflow_id, "recent_failures": len(recent_failures)}) return health_report
def already_processed(date: str): """Check if date has already been processed.""" # TODO: Implement logic to check if date has already been processed Falsedef process_data(date: str): """Process data for the given date.""" # TODO: Implement logic to process data for the given date "processed"def mark_processed(date: str): """Mark date as processed.""" # TODO: Implement logic to mark date as processed "mark_as_processed"def run(date: str): """Idempotent function - safe to run multiple times.""" # Use date as unique identifier # Skip if already processed if already_processed(date): return {"status": "already_processed", "date": date} # Process data result = process_data(date) # Mark as processed mark_processed(date) return {"status": "processed", "date": date, "result": result}
Schedules run in UTC by default. Convert to your timezone:
# If you want 9 AM EST (UTC-5), use:# 14 * * * * (9 AM + 5 hours = 14:00 UTC)# If you want 9 AM PST (UTC-8), use:# 17 * * * * (9 AM + 8 hours = 17:00 UTC)
Or handle timezone in function:
timezone.py
from datetime import datetimeimport pytzdef run(): # Get current time in specific timezone tz = pytz.timezone("America/New_York") current_time = datetime.now(tz) # Your automation pass