windmill-labs / write-script-python3

MUST use when writing Python scripts.

0 views
0 installs

Skill Content

---
name: write-script-python3
description: MUST use when writing Python scripts.
---

## CLI Commands

Place scripts in a folder. After writing, tell the user they can run:
- `wmill script generate-metadata` - Generate .script.yaml and .lock files
- `wmill sync push` - Deploy to Windmill

Do NOT run these commands yourself. Instead, inform the user that they should run them.

Use `wmill resource-type list --schema` to discover available resource types.

# Python

## Structure

The script must contain at least one function called `main`:

```python
def main(param1: str, param2: int):
    # Your code here
    return {"result": param1, "count": param2}
```

Do not call the main function. Libraries are installed automatically.

## Resource Types

On Windmill, credentials and configuration are stored in resources and passed as parameters to main.

You need to **redefine** the type of the resources that are needed before the main function as TypedDict:

```python
from typing import TypedDict

class postgresql(TypedDict):
    host: str
    port: int
    user: str
    password: str
    dbname: str

def main(db: postgresql):
    # db contains the database connection details
    pass
```

**Important rules:**

- The resource type name must be **IN LOWERCASE**
- Only include resource types if they are actually needed
- If an import conflicts with a resource type name, **rename the imported object, not the type name**
- Make sure to import TypedDict from typing **if you're using it**

## Imports

Libraries are installed automatically. Do not show installation instructions.

```python
import requests
import pandas as pd
from datetime import datetime
```

If an import name conflicts with a resource type:

```python
# Wrong - don't rename the type
import stripe as stripe_lib
class stripe_type(TypedDict): ...

# Correct - rename the import
import stripe as stripe_sdk
class stripe(TypedDict):
    api_key: str
```

## Windmill Client

Import the windmill client for platform interactions:

```python
import wmill
```

See the SDK documentation for available methods.

## Preprocessor Scripts

For preprocessor scripts, the function should be named `preprocessor` and receives an `event` parameter:

```python
from typing import TypedDict, Literal, Any

class Event(TypedDict):
    kind: Literal["webhook", "http", "websocket", "kafka", "email", "nats", "postgres", "sqs", "mqtt", "gcp"]
    body: Any
    headers: dict[str, str]
    query: dict[str, str]

def preprocessor(event: Event):
    # Transform the event into flow input parameters
    return {
        "param1": event["body"]["field1"],
        "param2": event["query"]["id"]
    }
```

## S3 Object Operations

Windmill provides built-in support for S3-compatible storage operations.

```python
import wmill

# Load file content from S3
content: bytes = wmill.load_s3_file(s3object)

# Load file as stream reader
reader: BufferedReader = wmill.load_s3_file_reader(s3object)

# Write file to S3
result: S3Object = wmill.write_s3_file(
    s3object,           # Target path (or None to auto-generate)
    file_content,       # bytes or BufferedReader
    s3_resource_path,   # Optional: specific S3 resource
    content_type,       # Optional: MIME type
    content_disposition # Optional: Content-Disposition header
)
```


# Python SDK (wmill)

Import: import wmill

def get_mocked_api() -> Optional[dict]

# Get the HTTP client instance.
# 
# Returns:
#     Configured httpx.Client for API requests
def get_client() -> httpx.Client

# Make an HTTP GET request to the Windmill API.
# 
# Args:
#     endpoint: API endpoint path
#     raise_for_status: Whether to raise an exception on HTTP errors
#     **kwargs: Additional arguments passed to httpx.get
# 
# Returns:
#     HTTP response object
def get(endpoint, raise_for_status = True, **kwargs) -> httpx.Response

# Make an HTTP POST request to the Windmill API.
# 
# Args:
#     endpoint: API endpoint path
#     raise_for_status: Whether to raise an exception on HTTP errors
#     **kwargs: Additional arguments passed to httpx.post
# 
# Returns:
#     HTTP response object
def post(endpoint, raise_for_status = True, **kwargs) -> httpx.Response

# Create a new authentication token.
# 
# Args:
#     duration: Token validity duration (default: 1 day)
# 
# Returns:
#     New authentication token string
def create_token(duration = dt.timedelta(days=1)) -> str

# Create a script job and return its job id.
# 
# .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead.
def run_script_async(path: str = None, hash_: str = None, args: dict = None, scheduled_in_secs: int = None) -> str

# Create a script job by path and return its job id.
def run_script_by_path_async(path: str, args: dict = None, scheduled_in_secs: int = None) -> str

# Create a script job by hash and return its job id.
def run_script_by_hash_async(hash_: str, args: dict = None, scheduled_in_secs: int = None) -> str

# Create a flow job and return its job id.
def run_flow_async(path: str, args: dict = None, scheduled_in_secs: int = None, do_not_track_in_parent: bool = True) -> str

# Run script synchronously and return its result.
# 
# .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
def run_script(path: str = None, hash_: str = None, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any

# Run script by path synchronously and return its result.
def run_script_by_path(path: str, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any

# Run script by hash synchronously and return its result.
def run_script_by_hash(hash_: str, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any

# Run a script on the current worker without creating a job
def run_inline_script_preview(content: str, language: str, args: dict = None) -> Any

# Wait for a job to complete and return its result.
# 
# Args:
#     job_id: ID of the job to wait for
#     timeout: Maximum time to wait (seconds or timedelta)
#     verbose: Enable verbose logging
#     cleanup: Register cleanup handler to cancel job on exit
#     assert_result_is_not_none: Raise exception if result is None
# 
# Returns:
#     Job result when completed
# 
# Raises:
#     TimeoutError: If timeout is reached
#     Exception: If job fails
def wait_job(job_id, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False)

# Cancel a specific job by ID.
# 
# Args:
#     job_id: UUID of the job to cancel
#     reason: Optional reason for cancellation
# 
# Returns:
#     Response message from the cancel endpoint
def cancel_job(job_id: str, reason: str = None) -> str

# Cancel currently running executions of the same script.
def cancel_running() -> dict

# Get job details by ID.
# 
# Args:
#     job_id: UUID of the job
# 
# Returns:
#     Job details dictionary
def get_job(job_id: str) -> dict

# Get the root job ID for a flow hierarchy.
# 
# Args:
#     job_id: Job ID (defaults to current WM_JOB_ID)
# 
# Returns:
#     Root job ID
def get_root_job_id(job_id: str | None = None) -> dict

# Get an OIDC JWT token for authentication to external services.
# 
# Args:
#     audience: Token audience (e.g., "vault", "aws")
#     expires_in: Optional expiration time in seconds
# 
# Returns:
#     JWT token string
def get_id_token(audience: str, expires_in: int | None = None) -> str

# Get the status of a job.
# 
# Args:
#     job_id: UUID of the job
# 
# Returns:
#     Job status: "RUNNING", "WAITING", or "COMPLETED"
def get_job_status(job_id: str) -> JobStatus

# Get the result of a completed job.
# 
# Args:
#     job_id: UUID of the completed job
#     assert_result_is_not_none: Raise exception if result is None
# 
# Returns:
#     Job result
def get_result(job_id: str, assert_result_is_not_none: bool = True) -> Any

# Get a variable value by path.
# 
# Args:
#     path: Variable path in Windmill
# 
# Returns:
#     Variable value as string
def get_variable(path: str) -> str

# Set a variable value by path, creating it if it doesn't exist.
# 
# Args:
#     path: Variable path in Windmill
#     value: Variable value to set
#     is_secret: Whether the variable should be secret (default: False)
def set_variable(path: str, value: str, is_secret: bool = False) -> None

# Get a resource value by path.
# 
# Args:
#     path: Resource path in Windmill
#     none_if_undefined: Return None instead of raising if not found
#     interpolated: if variables and resources are fully unrolled
# 
# Returns:
#     Resource value dictionary or None
def get_resource(path: str, none_if_undefined: bool = False, interpolated: bool = True) -> dict | None

# Set a resource value by path, creating it if it doesn't exist.
# 
# Args:
#     value: Resource value to set
#     path: Resource path in Windmill
#     resource_type: Resource type for creation
def set_resource(value: Any, path: str, resource_type: str)

# List resources from Windmill workspace.
# 
# Args:
#     resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3")
#     page: Optional page number for pagination
#     per_page: Optional number of results per page
#     
# Returns:
#     List of resource dictionaries
def list_resources(resource_type: str = None, page: int = None, per_page: int = None) -> list[dict]

# Set the workflow state.
# 
# Args:
#     value: State value to set
#     path: Optional state resource path override.
def set_state(value: Any, path: str | None = None) -> None

# Get the workflow state.
# 
# Args:
#     path: Optional state resource path override.
# 
# Returns:
#     State value or None if not set
def get_state(path: str | None = None) -> Any

# Set job progress percentage (0-99).
# 
# Args:
#     value: Progress percentage
#     job_id: Job ID (defaults to current WM_JOB_ID)
def set_progress(value: int, job_id: Optional[str] = None)

# Get job progress percentage.
# 
# Args:
#     job_id: Job ID (defaults to current WM_JOB_ID)
# 
# Returns:
#     Progress value (0-100) or None if not set
def get_progress(job_id: Optional[str] = None) -> Any

# Set the user state of a flow at a given key
def set_flow_user_state(key: str, value: Any) -> None

# Get the user state of a flow at a given key
def get_flow_user_state(key: str) -> Any

# Get the Windmill server version.
# 
# Returns:
#     Version string
def version()

# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection from DuckDB
def get_duckdb_connection_settings(s3_resource_path: str = '') -> DuckDbConnectionSettings | None

# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection from Polars
def get_polars_connection_settings(s3_resource_path: str = '') -> PolarsConnectionSettings

# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection using boto3
def get_boto3_connection_settings(s3_resource_path: str = '') -> Boto3ConnectionSettings

# Load a file from the workspace s3 bucket and returns its content as bytes.
# 
# '''python
# from wmill import S3Object
# 
# s3_obj = S3Object(s3="/path/to/my_file.txt")
# my_obj_content = client.load_s3_file(s3_obj)
# file_content = my_obj_content.decode("utf-8")
# '''
def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None) -> bytes

# Load a file from the workspace s3 bucket and returns the bytes stream.
# 
# '''python
# from wmill import S3Object
# 
# s3_obj = S3Object(s3="/path/to/my_file.txt")
# with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
#     print(file_reader.read())
# '''
def load_s3_file_reader(s3object: S3Object | str, s3_resource_path: str | None) -> BufferedReader

# Write a file to the workspace S3 bucket
# 
# '''python
# from wmill import S3Object
# 
# s3_obj = S3Object(s3="/path/to/my_file.txt")
# 
# # for an in memory bytes array:
# file_content = b'Hello Windmill!'
# client.write_s3_file(s3_obj, file_content)
# 
# # for a file:
# with open("my_file.txt", "rb") as my_file:
#     client.write_s3_file(s3_obj, my_file)
# '''
def write_s3_file(s3object: S3Object | str | None, file_content: BufferedReader | bytes, s3_resource_path: str | None, content_type: str | None = None, content_disposition: str | None = None) -> S3Object

# Permanently delete a file from the workspace S3 bucket.
# 
# '''python
# from wmill import S3Object
# 
# s3_obj = S3Object(s3="/path/to/my_file.txt")
# client.delete_s3_object(s3_obj)
# '''
def delete_s3_object(s3object: S3Object | str, s3_resource_path: str | None = None) -> None

# Sign S3 objects for use by anonymous users in public apps.
# 
# Args:
#     s3_objects: List of S3 objects to sign
# 
# Returns:
#     List of signed S3 objects
def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]

# Sign a single S3 object for use by anonymous users in public apps.
# 
# Args:
#     s3_object: S3 object to sign
# 
# Returns:
#     Signed S3 object
def sign_s3_object(s3_object: S3Object | str) -> S3Object

# Generate presigned public URLs for an array of S3 objects.
# If an S3 object is not signed yet, it will be signed first.
# 
# Args:
#     s3_objects: List of S3 objects to sign
#     base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)
# 
# Returns:
#     List of signed public URLs
# 
# Example:
#     >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")]
#     >>> urls = client.get_presigned_s3_public_urls(s3_objs)
def get_presigned_s3_public_urls(s3_objects: list[S3Object | str], base_url: str | None = None) -> list[str]

# Generate a presigned public URL for an S3 object.
# If the S3 object is not signed yet, it will be signed first.
# 
# Args:
#     s3_object: S3 object to sign
#     base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)
# 
# Returns:
#     Signed public URL
# 
# Example:
#     >>> s3_obj = S3Object(s3="/path/to/file.txt")
#     >>> url = client.get_presigned_s3_public_url(s3_obj)
def get_presigned_s3_public_url(s3_object: S3Object | str, base_url: str | None = None) -> str

# Get the current user information.
# 
# Returns:
#     User details dictionary
def whoami() -> dict

# Get the current user information (alias for whoami).
# 
# Returns:
#     User details dictionary
def user() -> dict

# Get the state resource path from environment.
# 
# Returns:
#     State path string
def state_path() -> str

# Get the workflow state.
# 
# Returns:
#     State value or None if not set
def state() -> Any

# Set the state in the shared folder using pickle
def set_shared_state_pickle(value: Any, path: str = 'state.pickle') -> None

# Get the state in the shared folder using pickle
def get_shared_state_pickle(path: str = 'state.pickle') -> Any

# Set the state in the shared folder using pickle
def set_shared_state(value: Any, path: str = 'state.json') -> None

# Get the state in the shared folder using pickle
def get_shared_state(path: str = 'state.json') -> None

# Get URLs needed for resuming a flow after suspension.
# 
# Args:
#     approver: Optional approver name
#     flow_level: If True, generate resume URLs for the parent flow instead of the
#         specific step. This allows pre-approvals that can be consumed by any later
#         suspend step in the same flow.
# 
# Returns:
#     Dictionary with approvalPage, resume, and cancel URLs
def get_resume_urls(approver: str = None, flow_level: bool = None) -> dict

# Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
# 
# **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality.
# Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
# 
# :param slack_resource_path: The path to the Slack resource in Windmill.
# :type slack_resource_path: str
# :param channel_id: The Slack channel ID where the approval request will be sent.
# :type channel_id: str
# :param message: Optional custom message to include in the Slack approval request.
# :type message: str, optional
# :param approver: Optional user ID or name of the approver for the request.
# :type approver: str, optional
# :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
# :type default_args_json: dict, optional
# :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
# :type dynamic_enums_json: dict, optional
# 
# :raises Exception: If the function is not called within a flow or flow preview.
# :raises Exception: If the required flow job or flow step environment variables are not set.
# 
# :return: None
# 
# **Usage Example:**
#     >>> client.request_interactive_slack_approval(
#     ...     slack_resource_path="/u/alex/my_slack_resource",
#     ...     channel_id="admins-slack-channel",
#     ...     message="Please approve this request",
#     ...     approver="approver123",
#     ...     default_args_json={"key1": "value1", "key2": 42},
#     ...     dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]},
#     ... )
# 
# **Notes:**
# - This function must be executed within a Windmill flow or flow preview.
# - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context.
def request_interactive_slack_approval(slack_resource_path: str, channel_id: str, message: str = None, approver: str = None, default_args_json: dict = None, dynamic_enums_json: dict = None) -> None

# Get email from workspace username
# This method is particularly useful for apps that require the email address of the viewer.
# Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
def username_to_email(username: str) -> str

# Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
def send_teams_message(conversation_id: str, text: str, success: bool = True, card_block: dict = None)

# Get a DataTable client for SQL queries.
# 
# Args:
#     name: Database name (default: "main")
# 
# Returns:
#     DataTableClient instance
def datatable(name: str = 'main')

# Get a DuckLake client for DuckDB queries.
# 
# Args:
#     name: Database name (default: "main")
# 
# Returns:
#     DucklakeClient instance
def ducklake(name: str = 'main')

def init_global_client(f)

def deprecate(in_favor_of: str)

# Get the current workspace ID.
# 
# Returns:
#     Workspace ID string
def get_workspace() -> str

def get_version() -> str

# Run a script synchronously by hash and return its result.
# 
# Args:
#     hash: Script hash
#     args: Script arguments
#     verbose: Enable verbose logging
#     assert_result_is_not_none: Raise exception if result is None
#     cleanup: Register cleanup handler to cancel job on exit
#     timeout: Maximum time to wait
# 
# Returns:
#     Script result
def run_script_sync(hash: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: dt.timedelta = None) -> Any

# Run a script synchronously by path and return its result.
# 
# Args:
#     path: Script path
#     args: Script arguments
#     verbose: Enable verbose logging
#     assert_result_is_not_none: Raise exception if result is None
#     cleanup: Register cleanup handler to cancel job on exit
#     timeout: Maximum time to wait
# 
# Returns:
#     Script result
def run_script_by_path_sync(path: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: dt.timedelta = None) -> Any

# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection from DuckDB
def duckdb_connection_settings(s3_resource_path: str = '') -> DuckDbConnectionSettings

# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection from Polars
def polars_connection_settings(s3_resource_path: str = '') -> PolarsConnectionSettings

# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection using boto3
def boto3_connection_settings(s3_resource_path: str = '') -> Boto3ConnectionSettings

# Get the state resource path from environment.
# 
# Returns:
#     State path string
def get_state_path() -> str

# Parse resource syntax from string.
def parse_resource_syntax(s: str) -> Optional[str]

# Parse S3 object from string or S3Object format.
def parse_s3_object(s3_object: S3Object | str) -> S3Object

# Parse variable syntax from string.
def parse_variable_syntax(s: str) -> Optional[str]

# Append a text to the result stream.
# 
# Args:
#     text: text to append to the result stream
def append_to_result_stream(text: str) -> None

# Stream to the result stream.
# 
# Args:
#     stream: stream to stream to the result stream
def stream_result(stream) -> None

# Execute a SQL query against the DataTable.
# 
# Args:
#     sql: SQL query string with $1, $2, etc. placeholders
#     *args: Positional arguments to bind to query placeholders
# 
# Returns:
#     SqlQuery instance for fetching results
def query(sql: str, *args) -> SqlQuery

# Execute query and fetch results.
# 
# Args:
#     result_collection: Optional result collection mode
# 
# Returns:
#     Query results
def fetch(result_collection: str | None = None)

# Execute query and fetch first row of results.
# 
# Returns:
#     First row of query results
def fetch_one()

# Execute query and fetch first row of results. Return result as a scalar value.
# 
# Returns:
#     First row of query result as a scalar value
def fetch_one_scalar()

# Execute query and don't return any results.
#         
def execute()

# DuckDB executor requires explicit argument types at declaration
# These types exist in both DuckDB and Postgres
# Check that the types exist if you plan to extend this function for other SQL engines.
def infer_sql_type(value) -> str

def parse_sql_client_name(name: str) -> tuple[str, Optional[str]]

# Decorator that marks a function as a workflow task.
# 
# Works in both WAC v1 (sync, HTTP-based dispatch) and WAC v2
# (async, checkpoint/replay) modes:
# 
# - **v2 (inside @workflow)**: dispatches as a checkpoint step.
# - **v1 (WM_JOB_ID set, no @workflow)**: dispatches via HTTP API.
# - **Standalone**: executes the function body directly.
# 
# Usage::
# 
#     @task
#     async def extract_data(url: str): ...
# 
#     @task(path="f/external_script", timeout=600, tag="gpu")
#     async def run_external(x: int): ...
def task(_func = None, path: Optional[str] = None, tag: Optional[str] = None, timeout: Optional[int] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)

# Create a task that dispatches to a separate Windmill script.
# 
# Usage::
# 
#     extract = task_script("f/data/extract", timeout=600)
# 
#     @workflow
#     async def main():
#         data = await extract(url="https://...")
def task_script(path: str, timeout: Optional[int] = None, tag: Optional[str] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)

# Create a task that dispatches to a separate Windmill flow.
# 
# Usage::
# 
#     pipeline = task_flow("f/etl/pipeline", priority=10)
# 
#     @workflow
#     async def main():
#         result = await pipeline(input=data)
def task_flow(path: str, timeout: Optional[int] = None, tag: Optional[str] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)

# Decorator marking an async function as a workflow-as-code entry point.
# 
# The function must be **deterministic**: given the same inputs it must call
# tasks in the same order on every replay. Branching on task results is fine
# (results are replayed from checkpoint), but branching on external state
# (current time, random values, external API calls) must use ``step()`` to
# checkpoint the value so replays see the same result.
def workflow(func)

# Execute ``fn`` inline and checkpoint the result.
# 
# On replay the cached value is returned without re-executing ``fn``.
# Use for lightweight deterministic operations (timestamps, random IDs,
# config reads) that should not incur the overhead of a child job.
async def step(name: str, fn)

# Server-side sleep — suspend the workflow for the given duration without holding a worker.
# 
# Inside a @workflow, the parent job suspends and auto-resumes after ``seconds``.
# Outside a workflow, falls back to ``asyncio.sleep``.
async def sleep(seconds: int)

# Suspend the workflow and wait for an external approval.
# 
# Use ``get_resume_urls()`` (wrapped in ``step()``) to obtain
# resume/cancel/approval URLs before calling this function.
# 
# Returns a dict with ``value`` (form data), ``approver``, and ``approved``.
# 
# Args:
#     timeout: Approval timeout in seconds (default 1800).
#     form: Optional form schema for the approval page.
#     self_approval: Whether the user who triggered the flow can approve it (default True).
# 
# Example::
# 
#     urls = await step("urls", lambda: get_resume_urls())
#     await step("notify", lambda: send_email(urls["approvalPage"]))
#     result = await wait_for_approval(timeout=3600)
async def wait_for_approval(timeout: int = 1800, form: dict | None = None, self_approval: bool = True) -> dict

# Process items in parallel with optional concurrency control.
# 
# Each item is processed by calling ``fn(item)``, which should be a @task.
# Items are dispatched in batches of ``concurrency`` (default: all at once).
# 
# Example::
# 
#     @task
#     async def process(item: str):
#         ...
# 
#     results = await parallel(items, process, concurrency=5)
async def parallel(items, fn, concurrency: Optional[int] = None)

# Commit Kafka offsets for a trigger with auto_commit disabled.
# 
# Args:
#     trigger_path: Path to the Kafka trigger (from event['wm_trigger']['trigger_path'])
#     topic: Kafka topic name (from event['topic'])
#     partition: Partition number (from event['partition'])
#     offset: Message offset to commit (from event['offset'])
def commit_kafka_offsets(trigger_path: str, topic: str, partition: int, offset: int) -> None