Tasks¶
In TigerFlow, a task represents a unit of work to be applied to individual files, e.g., transcribing an audio file. Tasks are user-defined Python modules that subclass one of several builtin abstract tasks. These abstract classes provide methods that automatically convert user modules into "task servers". Run individually, a task server can be used to bulk process files. Connected, tasks transform into a data processing pipeline.
Note
TigerFlow tasks are designed for embarrassingly parallel, one-to-one file processing, where each input file is transformed into a single output file independently of all other input files.
TigerFlow supports three types of tasks:
LocalTask: Runs synchronous operations on a login/head nodeLocalAsyncTask: Runs asynchronous operations on a login/head nodeSlurmTask: Runs parallel operations across compute nodes via Slurm
Hello, Tasks!¶
To define a task, subclass one of the above abstract types and implement the following methods:
| Method | Required | Description |
|---|---|---|
setup |
No | Initializes shared context used across multiple files (e.g., loading a model) |
run |
Yes | Contains the processing logic applied to each file |
teardown |
No | Performs cleanup operations for graceful shutdown (e.g., closing a database connection) |
Then, simply call the inherited cli() method to turn the module into a runnable CLI application.
For instance, here's a simple "Hello, World!" local task:
from tigerflow.tasks import LocalTask
class HelloWorld(LocalTask):
@staticmethod
def setup(context):
context.greeting = "Hello"
print("Setup executed successfully!")
@staticmethod
def run(context, input_file, output_file):
with open(input_file, "r") as fi:
content = fi.read()
new_content = context.greeting + ", " + content.upper()
with open(output_file, "w") as fo:
fo.write(new_content)
@staticmethod
def teardown(context):
print("Teardown executed successfully!")
HelloWorld.cli()
where:
contextis a namespace to store and access any common, reusable data/objects (e.g., DB connection)input_fileis a path to the input file to be processedoutput_fileis a path to the output file to be generated
Warning
In the run method, context is read-only and will raise an error if modified.
With HelloWorld.cli(), this module becomes a runnable CLI application and we can check its details by running:
python hello.py --help
Usage: hello.py [OPTIONS]
Run the task as a CLI application
╭─ Options ───────────────────────────────────────────────────────────────────╮
│ * --input-dir PATH Input directory to read data [required] │
│ * --input-ext TEXT Input file extension [required] │
│ * --output-dir PATH Output directory to store results [required] │
│ * --output-ext TEXT Output file extension [required] │
│ --help Show this message and exit. │
╰─────────────────────────────────────────────────────────────────────────────╯
We can then run the task as follows:
python hello.py --input-dir path/to/data/ --input-ext .txt --output-dir path/to/results/ --output-ext .txt
2025-09-11 10:54:23 | INFO | Setting up task
Setup executed successfully!
2025-09-11 10:54:23 | INFO | Task setup complete
2025-09-11 10:54:23 | INFO | Starting processing: 5.txt
2025-09-11 10:54:23 | INFO | Successfully processed: 5.txt
2025-09-11 10:54:23 | INFO | Starting processing: 4.txt
2025-09-11 10:54:23 | INFO | Successfully processed: 4.txt
...
2025-09-11 10:54:23 | INFO | Starting processing: 1.txt
2025-09-11 10:54:23 | INFO | Successfully processed: 1.txt
^C2025-09-11 10:54:30 | WARNING | Received signal 2, initiating shutdown
2025-09-11 10:54:30 | INFO | Shutting down task
Teardown executed successfully!
2025-09-11 10:54:30 | INFO | Task shutdown complete
The task processes every .txt file in input-dir and writes the result to output-dir
using the same filename stem and the specified output extension (also .txt in this case).
For example, path/to/data/4.txt produces path/to/results/4.txt.
Error Files
If a task encounters an error, TigerFlow generates an error output file, e.g., 4.err instead of 4.txt. This file contains specific error messages to assist with debugging.
Examples¶
Say we want to implement a workflow that involves the following tasks:
- Transcribe video files using an open-source model (e.g., Whisper)
- Embed the transcription files using an external API service (e.g., Voyage AI)
- Ingest the embeddings into a single-writer database (e.g., DuckDB)
We can create each task as shown below.
Follow Along
You can follow along with the examples using the code and data provided here. Videos have been substituted with audio files due to intellectual property constraints and storage limitations, but the task logic remains otherwise identical.
Transcribing Video Files (SlurmTask)¶
We implement the transcription step as a Slurm task because it involves compute-intensive work and we want to process files in parallel.
import whisper
from tigerflow.tasks import SlurmTask
class Transcribe(SlurmTask):
@staticmethod
def setup(context):
context.model = whisper.load_model("/home/sp8538/.cache/whisper/medium.pt")
print("Model loaded successfully")
@staticmethod
def run(context, input_file, output_file):
result = context.model.transcribe(str(input_file))
print(f"Transcription ran successfully for {input_file}")
with open(output_file, "w") as f:
f.write(result["text"])
Transcribe.cli()
As shown, the task is defined such that:
- The model is loaded once during setup and stored in
context - This pre-loaded model is then accessed from
contextto transcribe each file
Calling Transcribe.cli() turns this module into a runnable CLI application:
python transcribe.py --help
Usage: transcribe.py [OPTIONS]
Run the task as a CLI application
╭─ Options ──────────────────────────────────────────────────────────────────────────────╮
│ * --input-dir PATH Input directory to read data [required] │
│ * --input-ext TEXT Input file extension [required] │
│ * --output-dir PATH Output directory to store results [required] │
│ * --output-ext TEXT Output file extension [required] │
│ * --max-workers INTEGER Max number of workers for autoscaling [required] │
│ * --cpus INTEGER Number of CPUs per worker [required] │
│ * --memory TEXT Memory per worker [required] │
│ * --time TEXT Wall time per worker [required] │
│ --gpus INTEGER Number of GPUs per worker │
│ --sbatch-option TEXT Additional Slurm option for workers (repeatable) │
│ --setup-command TEXT Shell command to run before the task starts │
│ (repeatable) │
│ --task-name TEXT Task name [default: Transcribe] │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────╯
We can then run the task as follows:
python transcribe.py \
--input-dir path/to/data/ \
--input-ext .mp4 \
--output-dir path/to/results/ \
--output-ext .txt \
--max-workers 3 \
--cpus 1 \
--memory "12G" \
--time "02:00:00" \
--gpus 1 \
--sbatch-option "--mail-user=sp8538@princeton.edu" \
--setup-command "module purge" \
--setup-command "module load anaconda3/2024.6" \
--setup-command "conda activate tiktok"
2025-09-16 10:53:44 | INFO | Submitted task with Slurm job ID 690468
2025-09-16 10:53:44 | INFO | Status changed: INACTIVE -> PENDING (Reason: (None))
2025-09-16 10:54:04 | INFO | Status changed: PENDING (Reason: (None)) -> ACTIVE (0 workers)
2025-09-16 10:59:55 | INFO | Status changed: ACTIVE (0 workers) -> ACTIVE (1 workers)
2025-09-16 11:00:45 | INFO | 4 processed files
2025-09-16 11:00:55 | INFO | Status changed: ACTIVE (1 workers) -> ACTIVE (3 workers)
2025-09-16 11:00:55 | INFO | 1 processed files
2025-09-16 11:01:05 | INFO | 6 processed files
...
2025-09-16 11:03:08 | INFO | 2 processed files
2025-09-16 11:04:08 | INFO | 1 processed files
2025-09-16 11:04:58 | INFO | Status changed: ACTIVE (3 workers) -> ACTIVE (1 workers)
2025-09-16 11:05:58 | INFO | Status changed: ACTIVE (1 workers) -> ACTIVE (0 workers)
^C2025-09-16 11:06:40 | WARNING | Received signal 2, initiating shutdown
2025-09-16 11:06:40 | INFO | Shutting down task
2025-09-16 11:06:40 | ERROR | Status changed: ACTIVE (0 workers) -> INACTIVE (Reason: CANCELLED+)
2025-09-16 11:06:41 | INFO | Task shutdown complete
The resources specified above, including time, apply to each individual worker. Workers can be
spun up and down dynamically in response to incoming workloads, so it is recommended to allocate
only the minimal necessary resources per worker. For example, setting the worker time to a smaller
value like 2 hours (instead of 12 hours) can reduce scheduling delays, as longer Slurm job requests
often result in longer queue times.
Embedding Text Files (LocalAsyncTask)¶
We implement the embedding step as a local asynchronous task because it involves I/O-bound work (i.e., making external API requests) that can be performed concurrently.
import asyncio
import os
import aiofiles
import aiohttp
from tigerflow.tasks import LocalAsyncTask
class Embed(LocalAsyncTask):
@staticmethod
async def setup(context):
context.url = "https://api.voyageai.com/v1/embeddings"
context.headers = {
"Authorization": f"Bearer {os.environ['VOYAGE_API_KEY']}",
"Content-Type": "application/json",
}
context.session = aiohttp.ClientSession()
print("Session created successfully!")
@staticmethod
async def run(context, input_file, output_file):
async with aiofiles.open(input_file, "r") as f:
text = await f.read()
async with context.session.post(
context.url,
headers=context.headers,
json={
"input": text.strip(),
"model": "voyage-3.5",
"input_type": "document",
},
) as resp:
resp.raise_for_status() # Raise error if unsuccessful
result = await resp.text() # Raw JSON
await asyncio.sleep(1) # For API rate limit
async with aiofiles.open(output_file, "w") as f:
await f.write(result)
@staticmethod
async def teardown(context):
await context.session.close()
print("Session closed successfully!")
Embed.cli()
As shown, the task is defined such that it:
- Initializes reusable resources (e.g., HTTP session) and stores them in
context - Utilizes these resources from
contextto send a request to the external API for each input file - Cleans up resources (e.g., HTTP session) at the end to ensure a graceful shutdown
Notice that LocalAsyncTask requires all operations to adhere to Python's async/await syntax.
For example, file reading and writing should be performed using aiofiles, since standard file I/O
would block the event loop and prevent concurrent file processing. Similarly, LocalAsyncTask should
not include compute-intensive logic, as this would also block the event loop and goes against its
intended use. For compute-heavy tasks, consider using SlurmTask instead.
API Rate Limits
API services often enforce rate limits (e.g., 2000 requests per minute).
To comply with these limits, we can use asyncio.sleep() within the run
logic (as shown above), in combination with the --concurrency-limit option
(see below), which controls the maximum number of files processed concurrently.
For example, if each API request takes less than a second and the service allows up to 2000 requests per minute, we can:
- Use
asyncio.sleep(1)in therunlogic to ensure each request takes at least one second - Set
--concurrency-limitto 30 to ensure no more than 30 requests are processed concurrently
Together, these measures effectively cap the request rate at 1800 requests per minute, keeping it safely within the limit.
Calling Embed.cli() turns this module into a runnable CLI application:
python embed.py --help
Usage: embed.py [OPTIONS]
Run the task as a CLI application
╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ * --input-dir PATH Input directory to read data [required] │
│ * --input-ext TEXT Input file extension [required] │
│ * --output-dir PATH Output directory to store results [required] │
│ * --output-ext TEXT Output file extension [required] │
│ * --concurrency-limit INTEGER Maximum number of coroutines that may run concurrently at any given │
│ time (excess coroutines are queued until capacity becomes available) │
│ [required] │
│ --task-name TEXT Task name [default: Embed] │
│ --help Show this message and exit. │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
We can then run the task as follows:
python embed.py \
--input-dir path/to/data/ \
--input-ext .txt \
--output-dir path/to/results/ \
--output-ext .json \
--concurrency-limit 30
2025-09-19 10:28:32 | INFO | Setting up task
Session created successfully!
2025-09-19 10:28:32 | INFO | Task setup complete
2025-09-19 10:28:32 | INFO | Starting processing: 7501870786337115434.txt
2025-09-19 10:28:32 | INFO | Starting processing: 7501870786941127967.txt
2025-09-19 10:28:32 | INFO | Starting processing: 7501870783862541576.txt
...
2025-09-19 10:28:34 | INFO | Starting processing: 7501869901028592927.txt
2025-09-19 10:28:34 | INFO | Successfully processed: 7501871089715268906.txt
2025-09-19 10:28:34 | INFO | Starting processing: 7501870443775692078.txt
2025-09-19 10:28:34 | INFO | Successfully processed: 7501863546456771870.txt
2025-09-19 10:28:34 | INFO | Starting processing: 7501869899782901022.txt
2025-09-19 10:28:34 | INFO | Successfully processed: 7501870775461317906.txt
...
2025-09-19 10:28:36 | INFO | Successfully processed: 7501870542656474398.txt
2025-09-19 10:28:36 | INFO | Successfully processed: 7501870861306318126.txt
2025-09-19 10:28:36 | INFO | Successfully processed: 7501870700089707807.txt
^C2025-09-19 10:28:40 | WARNING | Received signal 2, initiating shutdown
2025-09-19 10:28:40 | INFO | Shutting down task
Session closed successfully!
2025-09-19 10:28:40 | INFO | Task shutdown complete
Ingesting Text Embeddings (LocalTask)¶
We implement the ingestion step as a local synchronous task because our target database (DuckDB) only supports writes from a single process.
import json
import duckdb
from tigerflow.tasks import LocalTask
class Ingest(LocalTask):
@staticmethod
def setup(context):
db_path = "/home/sp8538/tiktok/pipeline/tigerflow/demo/results/test.db"
conn = duckdb.connect(db_path) # Creates file if not existing
print(f"Successfully connected to {db_path}")
conn.execute("""
CREATE TABLE IF NOT EXISTS embeddings (
id UBIGINT,
embedding FLOAT[1024],
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""")
context.conn = conn
@staticmethod
def run(context, input_file, output_file):
with open(input_file, "r") as f:
content = json.load(f)
embedding = content["data"][0]["embedding"]
context.conn.execute(
"INSERT INTO embeddings (id, embedding) VALUES (?, ?)",
(input_file.stem, embedding),
)
@staticmethod
def teardown(context):
context.conn.close()
print("DB connection closed")
Ingest.cli()
As shown, the task is defined such that:
- A database connection is created once during setup and stored in
context - This database connection is then accessed from
contextto ingest each file - The database connection is closed at the end to ensure a graceful shutdown
Calling Ingest.cli() turns this module into a runnable CLI application:
python ingest.py --help
Usage: ingest.py [OPTIONS]
Run the task as a CLI application
╭─ Options ────────────────────────────────────────────────────────────────────╮
│ * --input-dir PATH Input directory to read data [required] │
│ * --input-ext TEXT Input file extension [required] │
│ * --output-dir PATH Output directory to store results [required] │
│ * --output-ext TEXT Output file extension [required] │
│ --task-name TEXT Task name [default: Ingest] │
│ --help Show this message and exit. │
╰──────────────────────────────────────────────────────────────────────────────╯
We can then run the task as follows:
python ingest.py \
--input-dir path/to/data/ \
--input-ext .json \
--output-dir path/to/results/ \
--output-ext .out
2025-09-19 13:02:16 | INFO | Setting up task
Successfully connected to /home/sp8538/tiktok/pipeline/tigerflow/demo/results/test.db
2025-09-19 13:02:16 | INFO | Task setup complete
2025-09-19 13:02:16 | INFO | Starting processing: 7501869531975929119.json
2025-09-19 13:02:17 | INFO | Successfully processed: 7501869531975929119.json
2025-09-19 13:02:17 | INFO | Starting processing: 7501869470705782062.json
2025-09-19 13:02:17 | INFO | Successfully processed: 7501869470705782062.json
2025-09-19 13:02:17 | INFO | Starting processing: 7501871439457373470.json
...
2025-09-19 13:02:18 | INFO | Successfully processed: 7501870861306318126.json
2025-09-19 13:02:18 | INFO | Starting processing: 7501870700089707807.json
2025-09-19 13:02:18 | INFO | Successfully processed: 7501870700089707807.json
^C2025-09-19 13:02:22 | WARNING | Received signal 2, initiating shutdown
2025-09-19 13:02:22 | INFO | Shutting down task
DB connection closed
2025-09-19 13:02:22 | INFO | Task shutdown complete
Output Files
Note that we specify --output-dir and --output-ext even though the task’s run logic
does not write to output_file. This is necessary because TigerFlow creates an empty
"placeholder" file even when no content is written. This placeholder indicates that the file
was processed successfully according to the user-provided run logic, even if no concrete
output was produced.