inclusionAI / add-workflow
Install for your project team
Run this command in your project directory to install the skill for your entire team:
mkdir -p .claude/skills/add-workflow && curl -L -o skill.zip "https://fastmcp.me/Skills/Download/3002" && unzip -o skill.zip -d .claude/skills/add-workflow && rm skill.zip
Project Skills
This skill will be saved in .claude/skills/add-workflow/ and checked into git. All team members will have access to it automatically.
Important: Please verify the skill by reviewing its instructions before using it.
Guide for adding a new RolloutWorkflow to AReaL. Use when user wants to create a new workflow.
0 views
0 installs
Skill Content
---
name: add-workflow
description: Guide for adding a new RolloutWorkflow to AReaL. Use when user wants to create a new workflow.
---
# Add Workflow
Add a new RolloutWorkflow implementation to AReaL.
## When to Use
This skill is triggered when:
- User asks "how do I add a workflow?"
- User wants to create a new RolloutWorkflow
- User mentions implementing a custom rollout
## Prerequisites
Before starting, ensure you understand:
- The workflow's purpose and requirements
- Input/output data format
- Reward function to use
## Step-by-Step Guide
### Step 1: Create Workflow File
Create `areal/workflow/<name>.py`:
```python
import uuid
from typing import Any, Callable
import torch
from areal.api.cli_args import GenerationHyperparameters
from areal.api.engine_api import InferenceEngine
from areal.api.io_struct import ModelRequest, ModelResponse
from areal.api.reward_api import AsyncRewardWrapper
from areal.api.workflow_api import RolloutWorkflow
from areal.utils import logging
logger = logging.getLogger("MyWorkflow")
class MyWorkflow(RolloutWorkflow):
"""Description of your workflow."""
def __init__(
self,
gconfig: GenerationHyperparameters,
tokenizer,
reward_fn: Callable,
):
self.gconfig = gconfig.new_with_stop_and_pad_token_ids(tokenizer)
self.tokenizer = tokenizer
self.async_reward_fn = AsyncRewardWrapper(reward_fn)
async def arun_episode(
self,
engine: InferenceEngine,
data: dict[str, Any],
) -> dict[str, torch.Tensor]:
"""Run a single episode. MUST be async and non-blocking."""
# 1. Prepare input_ids from data
input_ids = self.tokenizer.apply_chat_template(
data["messages"],
tokenize=True,
add_generation_prompt=True,
)
# 2. Build ModelRequest
req = ModelRequest(
rid=uuid.uuid4().hex,
input_ids=list(input_ids),
gconfig=self.gconfig.new(n_samples=1),
tokenizer=self.tokenizer,
)
# 3. Generate completion (async)
resp: ModelResponse = await engine.agenerate(req)
# 4. Compute reward (async)
prompt_str = self.tokenizer.decode(input_ids)
completion_str = self.tokenizer.decode(resp.output_tokens)
reward = await self.async_reward_fn(
prompt_str,
completion_str,
resp.input_tokens,
resp.output_tokens,
**data,
)
# 5. Return results in expected format
return {
"input_ids": torch.tensor(resp.input_tokens),
"output_ids": torch.tensor(resp.output_tokens),
"reward": torch.tensor(reward),
}
```
### Step 2: Register in __init__.py
Add to `areal/workflow/__init__.py`:
```python
from areal.workflow.<name> import MyWorkflow
__all__ = [
# ... existing exports
"MyWorkflow",
]
```
### Step 3: Update Entry Script
Update your training script to use the new workflow:
```python
trainer.train(
workflow="areal.workflow.<name>.MyWorkflow",
# ... other args
)
```
### Step 4: Add Tests
Create `tests/test_<name>_workflow.py`:
```python
import pytest
from areal.workflow.<name> import MyWorkflow
@pytest.mark.asyncio
async def test_workflow_basic():
# Test basic functionality
pass
```
## Reference Implementations
| Workflow | File | Description |
| ------------------ | ------------------------------- | -------------------------- |
| MultiTurnWorkflow | `areal/workflow/multi_turn.py` | Multi-turn conversation |
| RLVRWorkflow | `areal/workflow/rlvr.py` | RL with verifiable rewards |
| VisionRLVRWorkflow | `areal/workflow/vision_rlvr.py` | Vision + RLVR |
## Key Requirements
1. **Async**: `arun_episode` must be `async def` and non-blocking
1. **No sync I/O**: Use `aiofiles` for file operations
1. **Wrap rewards**: Use `AsyncRewardWrapper` for reward functions
1. **Tensor format**: Output tensors should be `[batch, seq_len, ...]`
1. **Use helpers**: `concat_padded_tensors` for combining outputs
## Common Mistakes
- ❌ Using `open()` instead of `aiofiles.open()`
- ❌ Forgetting to `await` async calls
- ❌ Not wrapping reward function with `AsyncRewardWrapper`
- ❌ Wrong tensor shape conventions
______________________________________________________________________
<!--
================================================================================
MAINTAINER GUIDE
================================================================================
Location: .claude/skills/add-workflow/SKILL.md
Invocation: /add-workflow <name>
## Purpose
Step-by-step guide for adding new RolloutWorkflow implementations.
## How to Update
### When Workflow API Changes
1. Update the code template in Step 1
2. Update the required imports
3. Update the method signature if changed
### When New Patterns Emerge
1. Add to "Reference Implementations" table
2. Update "Key Requirements" if new requirements added
================================================================================
-->