windmill-labs / write-script-python3
Install for your project team
Run this command in your project directory to install the skill for your entire team:
mkdir -p .claude/skills/write-script-python3 && curl -L -o skill.zip "https://fastmcp.me/Skills/Download/3757" && unzip -o skill.zip -d .claude/skills/write-script-python3 && rm skill.zip
Project Skills
This skill will be saved in .claude/skills/write-script-python3/ and checked into git. All team members will have access to it automatically.
Important: Please verify the skill by reviewing its instructions before using it.
MUST use when writing Python scripts.
0 views
0 installs
Skill Content
---
name: write-script-python3
description: MUST use when writing Python scripts.
---
## CLI Commands
Place scripts in a folder. After writing, tell the user they can run:
- `wmill script generate-metadata` - Generate .script.yaml and .lock files
- `wmill sync push` - Deploy to Windmill
Do NOT run these commands yourself. Instead, inform the user that they should run them.
Use `wmill resource-type list --schema` to discover available resource types.
# Python
## Structure
The script must contain at least one function called `main`:
```python
def main(param1: str, param2: int):
# Your code here
return {"result": param1, "count": param2}
```
Do not call the main function. Libraries are installed automatically.
## Resource Types
On Windmill, credentials and configuration are stored in resources and passed as parameters to main.
You need to **redefine** the type of the resources that are needed before the main function as TypedDict:
```python
from typing import TypedDict
class postgresql(TypedDict):
host: str
port: int
user: str
password: str
dbname: str
def main(db: postgresql):
# db contains the database connection details
pass
```
**Important rules:**
- The resource type name must be **IN LOWERCASE**
- Only include resource types if they are actually needed
- If an import conflicts with a resource type name, **rename the imported object, not the type name**
- Make sure to import TypedDict from typing **if you're using it**
## Imports
Libraries are installed automatically. Do not show installation instructions.
```python
import requests
import pandas as pd
from datetime import datetime
```
If an import name conflicts with a resource type:
```python
# Wrong - don't rename the type
import stripe as stripe_lib
class stripe_type(TypedDict): ...
# Correct - rename the import
import stripe as stripe_sdk
class stripe(TypedDict):
api_key: str
```
## Windmill Client
Import the windmill client for platform interactions:
```python
import wmill
```
See the SDK documentation for available methods.
## Preprocessor Scripts
For preprocessor scripts, the function should be named `preprocessor` and receives an `event` parameter:
```python
from typing import TypedDict, Literal, Any
class Event(TypedDict):
kind: Literal["webhook", "http", "websocket", "kafka", "email", "nats", "postgres", "sqs", "mqtt", "gcp"]
body: Any
headers: dict[str, str]
query: dict[str, str]
def preprocessor(event: Event):
# Transform the event into flow input parameters
return {
"param1": event["body"]["field1"],
"param2": event["query"]["id"]
}
```
## S3 Object Operations
Windmill provides built-in support for S3-compatible storage operations.
```python
import wmill
# Load file content from S3
content: bytes = wmill.load_s3_file(s3object)
# Load file as stream reader
reader: BufferedReader = wmill.load_s3_file_reader(s3object)
# Write file to S3
result: S3Object = wmill.write_s3_file(
s3object, # Target path (or None to auto-generate)
file_content, # bytes or BufferedReader
s3_resource_path, # Optional: specific S3 resource
content_type, # Optional: MIME type
content_disposition # Optional: Content-Disposition header
)
```
# Python SDK (wmill)
Import: import wmill
def get_mocked_api() -> Optional[dict]
# Get the HTTP client instance.
#
# Returns:
# Configured httpx.Client for API requests
def get_client() -> httpx.Client
# Make an HTTP GET request to the Windmill API.
#
# Args:
# endpoint: API endpoint path
# raise_for_status: Whether to raise an exception on HTTP errors
# **kwargs: Additional arguments passed to httpx.get
#
# Returns:
# HTTP response object
def get(endpoint, raise_for_status = True, **kwargs) -> httpx.Response
# Make an HTTP POST request to the Windmill API.
#
# Args:
# endpoint: API endpoint path
# raise_for_status: Whether to raise an exception on HTTP errors
# **kwargs: Additional arguments passed to httpx.post
#
# Returns:
# HTTP response object
def post(endpoint, raise_for_status = True, **kwargs) -> httpx.Response
# Create a new authentication token.
#
# Args:
# duration: Token validity duration (default: 1 day)
#
# Returns:
# New authentication token string
def create_token(duration = dt.timedelta(days=1)) -> str
# Create a script job and return its job id.
#
# .. deprecated:: Use run_script_by_path_async or run_script_by_hash_async instead.
def run_script_async(path: str = None, hash_: str = None, args: dict = None, scheduled_in_secs: int = None) -> str
# Create a script job by path and return its job id.
def run_script_by_path_async(path: str, args: dict = None, scheduled_in_secs: int = None) -> str
# Create a script job by hash and return its job id.
def run_script_by_hash_async(hash_: str, args: dict = None, scheduled_in_secs: int = None) -> str
# Create a flow job and return its job id.
def run_flow_async(path: str, args: dict = None, scheduled_in_secs: int = None, do_not_track_in_parent: bool = True) -> str
# Run script synchronously and return its result.
#
# .. deprecated:: Use run_script_by_path or run_script_by_hash instead.
def run_script(path: str = None, hash_: str = None, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any
# Run script by path synchronously and return its result.
def run_script_by_path(path: str, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any
# Run script by hash synchronously and return its result.
def run_script_by_hash(hash_: str, args: dict = None, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False) -> Any
# Run a script on the current worker without creating a job
def run_inline_script_preview(content: str, language: str, args: dict = None) -> Any
# Wait for a job to complete and return its result.
#
# Args:
# job_id: ID of the job to wait for
# timeout: Maximum time to wait (seconds or timedelta)
# verbose: Enable verbose logging
# cleanup: Register cleanup handler to cancel job on exit
# assert_result_is_not_none: Raise exception if result is None
#
# Returns:
# Job result when completed
#
# Raises:
# TimeoutError: If timeout is reached
# Exception: If job fails
def wait_job(job_id, timeout: dt.timedelta | int | float | None = None, verbose: bool = False, cleanup: bool = True, assert_result_is_not_none: bool = False)
# Cancel a specific job by ID.
#
# Args:
# job_id: UUID of the job to cancel
# reason: Optional reason for cancellation
#
# Returns:
# Response message from the cancel endpoint
def cancel_job(job_id: str, reason: str = None) -> str
# Cancel currently running executions of the same script.
def cancel_running() -> dict
# Get job details by ID.
#
# Args:
# job_id: UUID of the job
#
# Returns:
# Job details dictionary
def get_job(job_id: str) -> dict
# Get the root job ID for a flow hierarchy.
#
# Args:
# job_id: Job ID (defaults to current WM_JOB_ID)
#
# Returns:
# Root job ID
def get_root_job_id(job_id: str | None = None) -> dict
# Get an OIDC JWT token for authentication to external services.
#
# Args:
# audience: Token audience (e.g., "vault", "aws")
# expires_in: Optional expiration time in seconds
#
# Returns:
# JWT token string
def get_id_token(audience: str, expires_in: int | None = None) -> str
# Get the status of a job.
#
# Args:
# job_id: UUID of the job
#
# Returns:
# Job status: "RUNNING", "WAITING", or "COMPLETED"
def get_job_status(job_id: str) -> JobStatus
# Get the result of a completed job.
#
# Args:
# job_id: UUID of the completed job
# assert_result_is_not_none: Raise exception if result is None
#
# Returns:
# Job result
def get_result(job_id: str, assert_result_is_not_none: bool = True) -> Any
# Get a variable value by path.
#
# Args:
# path: Variable path in Windmill
#
# Returns:
# Variable value as string
def get_variable(path: str) -> str
# Set a variable value by path, creating it if it doesn't exist.
#
# Args:
# path: Variable path in Windmill
# value: Variable value to set
# is_secret: Whether the variable should be secret (default: False)
def set_variable(path: str, value: str, is_secret: bool = False) -> None
# Get a resource value by path.
#
# Args:
# path: Resource path in Windmill
# none_if_undefined: Return None instead of raising if not found
# interpolated: if variables and resources are fully unrolled
#
# Returns:
# Resource value dictionary or None
def get_resource(path: str, none_if_undefined: bool = False, interpolated: bool = True) -> dict | None
# Set a resource value by path, creating it if it doesn't exist.
#
# Args:
# value: Resource value to set
# path: Resource path in Windmill
# resource_type: Resource type for creation
def set_resource(value: Any, path: str, resource_type: str)
# List resources from Windmill workspace.
#
# Args:
# resource_type: Optional resource type to filter by (e.g., "postgresql", "mysql", "s3")
# page: Optional page number for pagination
# per_page: Optional number of results per page
#
# Returns:
# List of resource dictionaries
def list_resources(resource_type: str = None, page: int = None, per_page: int = None) -> list[dict]
# Set the workflow state.
#
# Args:
# value: State value to set
# path: Optional state resource path override.
def set_state(value: Any, path: str | None = None) -> None
# Get the workflow state.
#
# Args:
# path: Optional state resource path override.
#
# Returns:
# State value or None if not set
def get_state(path: str | None = None) -> Any
# Set job progress percentage (0-99).
#
# Args:
# value: Progress percentage
# job_id: Job ID (defaults to current WM_JOB_ID)
def set_progress(value: int, job_id: Optional[str] = None)
# Get job progress percentage.
#
# Args:
# job_id: Job ID (defaults to current WM_JOB_ID)
#
# Returns:
# Progress value (0-100) or None if not set
def get_progress(job_id: Optional[str] = None) -> Any
# Set the user state of a flow at a given key
def set_flow_user_state(key: str, value: Any) -> None
# Get the user state of a flow at a given key
def get_flow_user_state(key: str) -> Any
# Get the Windmill server version.
#
# Returns:
# Version string
def version()
# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection from DuckDB
def get_duckdb_connection_settings(s3_resource_path: str = '') -> DuckDbConnectionSettings | None
# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection from Polars
def get_polars_connection_settings(s3_resource_path: str = '') -> PolarsConnectionSettings
# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection using boto3
def get_boto3_connection_settings(s3_resource_path: str = '') -> Boto3ConnectionSettings
# Load a file from the workspace s3 bucket and returns its content as bytes.
#
# '''python
# from wmill import S3Object
#
# s3_obj = S3Object(s3="/path/to/my_file.txt")
# my_obj_content = client.load_s3_file(s3_obj)
# file_content = my_obj_content.decode("utf-8")
# '''
def load_s3_file(s3object: S3Object | str, s3_resource_path: str | None) -> bytes
# Load a file from the workspace s3 bucket and returns the bytes stream.
#
# '''python
# from wmill import S3Object
#
# s3_obj = S3Object(s3="/path/to/my_file.txt")
# with wmill.load_s3_file_reader(s3object, s3_resource_path) as file_reader:
# print(file_reader.read())
# '''
def load_s3_file_reader(s3object: S3Object | str, s3_resource_path: str | None) -> BufferedReader
# Write a file to the workspace S3 bucket
#
# '''python
# from wmill import S3Object
#
# s3_obj = S3Object(s3="/path/to/my_file.txt")
#
# # for an in memory bytes array:
# file_content = b'Hello Windmill!'
# client.write_s3_file(s3_obj, file_content)
#
# # for a file:
# with open("my_file.txt", "rb") as my_file:
# client.write_s3_file(s3_obj, my_file)
# '''
def write_s3_file(s3object: S3Object | str | None, file_content: BufferedReader | bytes, s3_resource_path: str | None, content_type: str | None = None, content_disposition: str | None = None) -> S3Object
# Permanently delete a file from the workspace S3 bucket.
#
# '''python
# from wmill import S3Object
#
# s3_obj = S3Object(s3="/path/to/my_file.txt")
# client.delete_s3_object(s3_obj)
# '''
def delete_s3_object(s3object: S3Object | str, s3_resource_path: str | None = None) -> None
# Sign S3 objects for use by anonymous users in public apps.
#
# Args:
# s3_objects: List of S3 objects to sign
#
# Returns:
# List of signed S3 objects
def sign_s3_objects(s3_objects: list[S3Object | str]) -> list[S3Object]
# Sign a single S3 object for use by anonymous users in public apps.
#
# Args:
# s3_object: S3 object to sign
#
# Returns:
# Signed S3 object
def sign_s3_object(s3_object: S3Object | str) -> S3Object
# Generate presigned public URLs for an array of S3 objects.
# If an S3 object is not signed yet, it will be signed first.
#
# Args:
# s3_objects: List of S3 objects to sign
# base_url: Optional base URL for the presigned URLs (defaults to WM_BASE_URL)
#
# Returns:
# List of signed public URLs
#
# Example:
# >>> s3_objs = [S3Object(s3="/path/to/file1.txt"), S3Object(s3="/path/to/file2.txt")]
# >>> urls = client.get_presigned_s3_public_urls(s3_objs)
def get_presigned_s3_public_urls(s3_objects: list[S3Object | str], base_url: str | None = None) -> list[str]
# Generate a presigned public URL for an S3 object.
# If the S3 object is not signed yet, it will be signed first.
#
# Args:
# s3_object: S3 object to sign
# base_url: Optional base URL for the presigned URL (defaults to WM_BASE_URL)
#
# Returns:
# Signed public URL
#
# Example:
# >>> s3_obj = S3Object(s3="/path/to/file.txt")
# >>> url = client.get_presigned_s3_public_url(s3_obj)
def get_presigned_s3_public_url(s3_object: S3Object | str, base_url: str | None = None) -> str
# Get the current user information.
#
# Returns:
# User details dictionary
def whoami() -> dict
# Get the current user information (alias for whoami).
#
# Returns:
# User details dictionary
def user() -> dict
# Get the state resource path from environment.
#
# Returns:
# State path string
def state_path() -> str
# Get the workflow state.
#
# Returns:
# State value or None if not set
def state() -> Any
# Set the state in the shared folder using pickle
def set_shared_state_pickle(value: Any, path: str = 'state.pickle') -> None
# Get the state in the shared folder using pickle
def get_shared_state_pickle(path: str = 'state.pickle') -> Any
# Set the state in the shared folder using pickle
def set_shared_state(value: Any, path: str = 'state.json') -> None
# Get the state in the shared folder using pickle
def get_shared_state(path: str = 'state.json') -> None
# Get URLs needed for resuming a flow after suspension.
#
# Args:
# approver: Optional approver name
# flow_level: If True, generate resume URLs for the parent flow instead of the
# specific step. This allows pre-approvals that can be consumed by any later
# suspend step in the same flow.
#
# Returns:
# Dictionary with approvalPage, resume, and cancel URLs
def get_resume_urls(approver: str = None, flow_level: bool = None) -> dict
# Sends an interactive approval request via Slack, allowing optional customization of the message, approver, and form fields.
#
# **[Enterprise Edition Only]** To include form fields in the Slack approval request, use the "Advanced -> Suspend -> Form" functionality.
# Learn more at: https://www.windmill.dev/docs/flows/flow_approval#form
#
# :param slack_resource_path: The path to the Slack resource in Windmill.
# :type slack_resource_path: str
# :param channel_id: The Slack channel ID where the approval request will be sent.
# :type channel_id: str
# :param message: Optional custom message to include in the Slack approval request.
# :type message: str, optional
# :param approver: Optional user ID or name of the approver for the request.
# :type approver: str, optional
# :param default_args_json: Optional dictionary defining or overriding the default arguments for form fields.
# :type default_args_json: dict, optional
# :param dynamic_enums_json: Optional dictionary overriding the enum default values of enum form fields.
# :type dynamic_enums_json: dict, optional
#
# :raises Exception: If the function is not called within a flow or flow preview.
# :raises Exception: If the required flow job or flow step environment variables are not set.
#
# :return: None
#
# **Usage Example:**
# >>> client.request_interactive_slack_approval(
# ... slack_resource_path="/u/alex/my_slack_resource",
# ... channel_id="admins-slack-channel",
# ... message="Please approve this request",
# ... approver="approver123",
# ... default_args_json={"key1": "value1", "key2": 42},
# ... dynamic_enums_json={"foo": ["choice1", "choice2"], "bar": ["optionA", "optionB"]},
# ... )
#
# **Notes:**
# - This function must be executed within a Windmill flow or flow preview.
# - The function checks for required environment variables (`WM_FLOW_JOB_ID`, `WM_FLOW_STEP_ID`) to ensure it is run in the appropriate context.
def request_interactive_slack_approval(slack_resource_path: str, channel_id: str, message: str = None, approver: str = None, default_args_json: dict = None, dynamic_enums_json: dict = None) -> None
# Get email from workspace username
# This method is particularly useful for apps that require the email address of the viewer.
# Indeed, in the viewer context WM_USERNAME is set to the username of the viewer but WM_EMAIL is set to the email of the creator of the app.
def username_to_email(username: str) -> str
# Send a message to a Microsoft Teams conversation with conversation_id, where success is used to style the message
def send_teams_message(conversation_id: str, text: str, success: bool = True, card_block: dict = None)
# Get a DataTable client for SQL queries.
#
# Args:
# name: Database name (default: "main")
#
# Returns:
# DataTableClient instance
def datatable(name: str = 'main')
# Get a DuckLake client for DuckDB queries.
#
# Args:
# name: Database name (default: "main")
#
# Returns:
# DucklakeClient instance
def ducklake(name: str = 'main')
def init_global_client(f)
def deprecate(in_favor_of: str)
# Get the current workspace ID.
#
# Returns:
# Workspace ID string
def get_workspace() -> str
def get_version() -> str
# Run a script synchronously by hash and return its result.
#
# Args:
# hash: Script hash
# args: Script arguments
# verbose: Enable verbose logging
# assert_result_is_not_none: Raise exception if result is None
# cleanup: Register cleanup handler to cancel job on exit
# timeout: Maximum time to wait
#
# Returns:
# Script result
def run_script_sync(hash: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: dt.timedelta = None) -> Any
# Run a script synchronously by path and return its result.
#
# Args:
# path: Script path
# args: Script arguments
# verbose: Enable verbose logging
# assert_result_is_not_none: Raise exception if result is None
# cleanup: Register cleanup handler to cancel job on exit
# timeout: Maximum time to wait
#
# Returns:
# Script result
def run_script_by_path_sync(path: str, args: Dict[str, Any] = None, verbose: bool = False, assert_result_is_not_none: bool = True, cleanup: bool = True, timeout: dt.timedelta = None) -> Any
# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection from DuckDB
def duckdb_connection_settings(s3_resource_path: str = '') -> DuckDbConnectionSettings
# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection from Polars
def polars_connection_settings(s3_resource_path: str = '') -> PolarsConnectionSettings
# Convenient helpers that takes an S3 resource as input and returns the settings necessary to
# initiate an S3 connection using boto3
def boto3_connection_settings(s3_resource_path: str = '') -> Boto3ConnectionSettings
# Get the state resource path from environment.
#
# Returns:
# State path string
def get_state_path() -> str
# Parse resource syntax from string.
def parse_resource_syntax(s: str) -> Optional[str]
# Parse S3 object from string or S3Object format.
def parse_s3_object(s3_object: S3Object | str) -> S3Object
# Parse variable syntax from string.
def parse_variable_syntax(s: str) -> Optional[str]
# Append a text to the result stream.
#
# Args:
# text: text to append to the result stream
def append_to_result_stream(text: str) -> None
# Stream to the result stream.
#
# Args:
# stream: stream to stream to the result stream
def stream_result(stream) -> None
# Execute a SQL query against the DataTable.
#
# Args:
# sql: SQL query string with $1, $2, etc. placeholders
# *args: Positional arguments to bind to query placeholders
#
# Returns:
# SqlQuery instance for fetching results
def query(sql: str, *args) -> SqlQuery
# Execute query and fetch results.
#
# Args:
# result_collection: Optional result collection mode
#
# Returns:
# Query results
def fetch(result_collection: str | None = None)
# Execute query and fetch first row of results.
#
# Returns:
# First row of query results
def fetch_one()
# Execute query and fetch first row of results. Return result as a scalar value.
#
# Returns:
# First row of query result as a scalar value
def fetch_one_scalar()
# Execute query and don't return any results.
#
def execute()
# DuckDB executor requires explicit argument types at declaration
# These types exist in both DuckDB and Postgres
# Check that the types exist if you plan to extend this function for other SQL engines.
def infer_sql_type(value) -> str
def parse_sql_client_name(name: str) -> tuple[str, Optional[str]]
# Decorator that marks a function as a workflow task.
#
# Works in both WAC v1 (sync, HTTP-based dispatch) and WAC v2
# (async, checkpoint/replay) modes:
#
# - **v2 (inside @workflow)**: dispatches as a checkpoint step.
# - **v1 (WM_JOB_ID set, no @workflow)**: dispatches via HTTP API.
# - **Standalone**: executes the function body directly.
#
# Usage::
#
# @task
# async def extract_data(url: str): ...
#
# @task(path="f/external_script", timeout=600, tag="gpu")
# async def run_external(x: int): ...
def task(_func = None, path: Optional[str] = None, tag: Optional[str] = None, timeout: Optional[int] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)
# Create a task that dispatches to a separate Windmill script.
#
# Usage::
#
# extract = task_script("f/data/extract", timeout=600)
#
# @workflow
# async def main():
# data = await extract(url="https://...")
def task_script(path: str, timeout: Optional[int] = None, tag: Optional[str] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)
# Create a task that dispatches to a separate Windmill flow.
#
# Usage::
#
# pipeline = task_flow("f/etl/pipeline", priority=10)
#
# @workflow
# async def main():
# result = await pipeline(input=data)
def task_flow(path: str, timeout: Optional[int] = None, tag: Optional[str] = None, cache_ttl: Optional[int] = None, priority: Optional[int] = None, concurrency_limit: Optional[int] = None, concurrency_key: Optional[str] = None, concurrency_time_window_s: Optional[int] = None)
# Decorator marking an async function as a workflow-as-code entry point.
#
# The function must be **deterministic**: given the same inputs it must call
# tasks in the same order on every replay. Branching on task results is fine
# (results are replayed from checkpoint), but branching on external state
# (current time, random values, external API calls) must use ``step()`` to
# checkpoint the value so replays see the same result.
def workflow(func)
# Execute ``fn`` inline and checkpoint the result.
#
# On replay the cached value is returned without re-executing ``fn``.
# Use for lightweight deterministic operations (timestamps, random IDs,
# config reads) that should not incur the overhead of a child job.
async def step(name: str, fn)
# Server-side sleep — suspend the workflow for the given duration without holding a worker.
#
# Inside a @workflow, the parent job suspends and auto-resumes after ``seconds``.
# Outside a workflow, falls back to ``asyncio.sleep``.
async def sleep(seconds: int)
# Suspend the workflow and wait for an external approval.
#
# Use ``get_resume_urls()`` (wrapped in ``step()``) to obtain
# resume/cancel/approval URLs before calling this function.
#
# Returns a dict with ``value`` (form data), ``approver``, and ``approved``.
#
# Args:
# timeout: Approval timeout in seconds (default 1800).
# form: Optional form schema for the approval page.
# self_approval: Whether the user who triggered the flow can approve it (default True).
#
# Example::
#
# urls = await step("urls", lambda: get_resume_urls())
# await step("notify", lambda: send_email(urls["approvalPage"]))
# result = await wait_for_approval(timeout=3600)
async def wait_for_approval(timeout: int = 1800, form: dict | None = None, self_approval: bool = True) -> dict
# Process items in parallel with optional concurrency control.
#
# Each item is processed by calling ``fn(item)``, which should be a @task.
# Items are dispatched in batches of ``concurrency`` (default: all at once).
#
# Example::
#
# @task
# async def process(item: str):
# ...
#
# results = await parallel(items, process, concurrency=5)
async def parallel(items, fn, concurrency: Optional[int] = None)
# Commit Kafka offsets for a trigger with auto_commit disabled.
#
# Args:
# trigger_path: Path to the Kafka trigger (from event['wm_trigger']['trigger_path'])
# topic: Kafka topic name (from event['topic'])
# partition: Partition number (from event['partition'])
# offset: Message offset to commit (from event['offset'])
def commit_kafka_offsets(trigger_path: str, topic: str, partition: int, offset: int) -> None