Skip to content

Tasks

In TigerFlow, a task represents a unit of work to be applied to individual files, e.g., transcribing an audio file. Tasks are user-defined Python modules that subclass one of several builtin abstract tasks. These abstract classes provide methods that automatically convert user modules into "task servers". Run individually, a task server can be used to bulk process files. Connected, tasks transform into a data processing pipeline.

Note

TigerFlow tasks are designed for embarrassingly parallel, one-to-one file processing, where each input file is transformed into a single output file independently of all other input files.

TigerFlow supports three types of tasks:

  • LocalTask: Runs synchronous operations on a login/head node
  • LocalAsyncTask: Runs asynchronous operations on a login/head node
  • SlurmTask: Runs parallel operations across compute nodes via Slurm

Hello, Tasks!

To define a task, subclass one of the above abstract types and implement the following methods:

Method Required Description
setup No Initializes shared context used across multiple files (e.g., loading a model)
run Yes Contains the processing logic applied to each file
teardown No Performs cleanup operations for graceful shutdown (e.g., closing a database connection)

Then, simply call the inherited cli() method to turn the module into a runnable CLI application.

For instance, here's a simple "Hello, World!" local task:

hello.py
from pathlib import Path

from tigerflow.tasks import LocalTask
from tigerflow.utils import SetupContext


class HelloWorld(LocalTask):
    @staticmethod
    def setup(context: SetupContext):
        context.greeting = "Hello"
        print("Setup executed successfully!")

    @staticmethod
    def run(context: SetupContext, input_file: Path, output_file: Path):
        with open(input_file, "r") as f:
            content = f.read()

        new_content = context.greeting + ", " + content.upper()

        with open(output_file, "w") as f:
            f.write(new_content)

    @staticmethod
    def teardown(context: SetupContext):
        print("Teardown executed successfully!")


HelloWorld.cli()

where:

  • context is a namespace to store and access any common, reusable data/objects (e.g., DB connection)
  • input_file is a path to the input file to be processed
  • output_file is a path to the output file to be generated

Warning

In the run method, context is read-only and will raise an error if modified.

With HelloWorld.cli(), this module becomes a runnable CLI application and we can check its details by running:

python hello.py --help
Usage: hello.py [OPTIONS]

Run the task as a CLI application

╭─ Options ───────────────────────────────────────────────────────────────────╮
│ *  --input-dir         PATH  Input directory to read data [required]        │
│ *  --input-ext         TEXT  Input file extension [required]                │
│ *  --output-dir        PATH  Output directory to store results [required]   │
│ *  --output-ext        TEXT  Output file extension [required]               │
│    --help                    Show this message and exit.                    │
╰─────────────────────────────────────────────────────────────────────────────╯

We can then run the task as follows:

python hello.py --input-dir path/to/data/ --input-ext .txt --output-dir path/to/results/ --output-ext .txt
2025-09-11 10:54:23 | INFO     | Setting up task
Setup executed successfully!
2025-09-11 10:54:23 | INFO     | Task setup complete
2025-09-11 10:54:23 | INFO     | Starting processing: 5.txt
2025-09-11 10:54:23 | INFO     | Successfully processed: 5.txt
2025-09-11 10:54:23 | INFO     | Starting processing: 4.txt
2025-09-11 10:54:23 | INFO     | Successfully processed: 4.txt
...
2025-09-11 10:54:23 | INFO     | Starting processing: 1.txt
2025-09-11 10:54:23 | INFO     | Successfully processed: 1.txt
^C2025-09-11 10:54:30 | WARNING  | Received signal 2, initiating shutdown
2025-09-11 10:54:30 | INFO     | Shutting down task
Teardown executed successfully!
2025-09-11 10:54:30 | INFO     | Task shutdown complete

The task processes every .txt file in input-dir and writes the result to output-dir using the same filename stem and the specified output extension (also .txt in this case). For example, path/to/data/4.txt produces path/to/results/4.txt.

Error Files

If a task encounters an error, TigerFlow generates an error output file, e.g., 4.err instead of 4.txt. This file contains specific error messages to assist with debugging.

Lazy Imports

When working with heavy dependencies like whisper, duckdb, or aiohttp, it is best to import them inside your setup, run, or teardown methods rather than at the top of the task module. This "lazy loading" pattern defers imports until code actually executes:

# import whisper  ❌ Runs every time the CLI is invoked

from tigerflow.tasks import SlurmTask
from tigerflow.utils import SetupContext


class Transcribe(SlurmTask):
    @staticmethod
    def setup(context: SetupContext):
        import whisper  # ✅ Imported only when setup runs

        context.model = whisper.load_model("/home/sp8538/.cache/whisper/medium.pt")

Why does this matter?

  • For SlurmTask, lazy imports are essential. Task modules are serialized and shipped to cluster workers, so module-level imports would require dependencies on the head node where they are not needed — and may fail entirely if packages are only installed on workers.

  • For LocalTask and LocalAsyncTask, lazy imports are a good practice that reduces startup time and memory footprint, especially when validating task modules.

Custom Parameters

Tasks can define custom CLI parameters using an inner Params class. These parameters are automatically added to the CLI and made available via context in the run method.

greet.py
from pathlib import Path
from typing import Annotated

import typer

from tigerflow.tasks import LocalTask
from tigerflow.utils import SetupContext


class Greet(LocalTask):
    class Params:
        greeting: Annotated[
            str,
            typer.Option(help="Greeting to prepend"),
        ] = "Hello"
        uppercase: Annotated[
            bool,
            typer.Option("--uppercase", help="Convert to uppercase"),
        ] = False

    @staticmethod
    def run(context: SetupContext, input_file: Path, output_file: Path):
        with open(input_file) as f:
            content = f.read()

        if context.uppercase:
            content = content.upper()

        result = f"{context.greeting}, {content}"

        with open(output_file, "w") as f:
            f.write(result)


Greet.cli()

The Params class supports:

  • Type annotations: Use standard Python types (str, int, bool, etc.)
  • Default values: Parameters with defaults become optional CLI arguments
  • Typer options: Use Annotated[type, typer.Option(...)] to add help text and customize CLI behavior

Parameters are accessible via context using their attribute names. For example, context.greeting and context.uppercase in the example above.

python greet.py --help
Usage: greet.py [OPTIONS]

Run the task as a CLI application

╭─ Options ────────────────────────────────────────────────────────────────────╮
│ *  --input-dir         PATH  Input directory to read data [required]         │
│ *  --input-ext         TEXT  Input file extension [required]                 │
│ *  --output-dir        PATH  Output directory to store results [required]    │
│ *  --output-ext        TEXT  Output file extension [required]                │
│    --greeting          TEXT  Greeting to prepend [default: Hello]            │
│    --uppercase               Convert to uppercase                            │
│    --task-name         TEXT  Task name [default: Greet]                      │
│    --help                    Show this message and exit.                     │
╰──────────────────────────────────────────────────────────────────────────────╯

We can then run the task with custom parameters:

python greet.py \
  --input-dir path/to/data/ \
  --input-ext .txt \
  --output-dir path/to/results/ \
  --output-ext .txt \
  --greeting "Hi there" \
  --uppercase

Parameter Names

Parameter names with underscores (e.g., my_param) are converted to hyphenated CLI flags (e.g., --my-param). In context, use the original underscore form: context.my_param.

Reserved Parameter Names

Custom parameter names must not conflict with base CLI parameters. Reserved names include input_dir, output_dir, input_ext, output_ext, and task-specific options like max_workers, cpus, memory, time, gpus, and concurrency_limit.

Library Tasks

Library tasks are reusable, pre-packaged tasks that can be run directly or referenced in pipeline configurations. TigerFlow supports two types of library tasks:

  • Built-in tasks: Shipped with TigerFlow in tigerflow.library
  • Installed tasks: Third-party tasks installed via Python packages

Discovering Tasks

Use the tigerflow tasks CLI commands to discover available tasks:

tigerflow tasks list
Built-in tasks:
  echo - Echo task - copies input to output with optional transformations.
tigerflow tasks info echo
Task: echo
Source: built-in
Module: tigerflow.library.echo

Description:
Echo task - copies input to output with optional transformations.

Parameters for Echo:
  --prefix:
  --suffix:
  --uppercase: False

Running Library Tasks

Library tasks can be run directly as Python modules:

python -m tigerflow.library.echo \
  --input-dir ./input \
  --output-dir ./output \
  --input-ext .txt \
  --output-ext .txt \
  --prefix ">>> "

Creating Installable Tasks

To distribute your tasks as an installable Python package, register them using entry points. In your pyproject.toml:

[project.entry-points."tigerflow.tasks"]
my-task = "mypackage.tasks.mytask:MyTask"

The entry point value can be either:

  • A module path: mypackage.tasks.mytask
  • A module path with class name: mypackage.tasks.mytask:MyTask

Once installed, the task appears in tigerflow tasks list under "Installed tasks" and can be referenced by name in pipeline configurations.

Examples

Say we want to implement a workflow that involves the following tasks:

  1. Transcribe video files using an open-source model (e.g., Whisper)
  2. Embed the transcription files using an external API service (e.g., Voyage AI)
  3. Ingest the embeddings into a single-writer database (e.g., DuckDB)

We can create each task as shown below.

Follow Along

You can follow along with the examples using the code and data provided here. Videos have been substituted with audio files due to intellectual property constraints and storage limitations, but the task logic remains otherwise identical.

Transcribing Video Files (SlurmTask)

We implement the transcription step as a Slurm task because it involves compute-intensive work and we want to process files in parallel.

transcribe.py
from pathlib import Path
from typing import Annotated

import typer

from tigerflow.tasks import SlurmTask
from tigerflow.utils import SetupContext


class Transcribe(SlurmTask):
    class Params:
        model_file: Annotated[
            Path,
            typer.Option(help="Path to the Whisper model file"),
        ]

    @staticmethod
    def setup(context: SetupContext):
        import whisper

        context.model = whisper.load_model(str(context.model_file))
        print("Model loaded successfully")

    @staticmethod
    def run(context: SetupContext, input_file: Path, output_file: Path):
        result = context.model.transcribe(str(input_file))
        print(f"Transcription ran successfully for {input_file}")

        with open(output_file, "w") as f:
            f.write(result["text"])


Transcribe.cli()

As shown, the task is defined such that:

  • The model is loaded once during setup and stored in context
  • This pre-loaded model is then accessed from context to transcribe each file

Calling Transcribe.cli() turns this module into a runnable CLI application:

python transcribe.py --help
Usage: transcribe.py [OPTIONS]

Run the task as a CLI application

╭─ Options ──────────────────────────────────────────────────────────────────────────────╮
│ *  --input-dir             PATH     Input directory to read data [required]            │
│ *  --input-ext             TEXT     Input file extension [required]                    │
│ *  --output-dir            PATH     Output directory to store results [required]       │
│ *  --output-ext            TEXT     Output file extension [required]                   │
│ *  --max-workers           INTEGER  Max number of workers for autoscaling [required]   │
│ *  --cpus                  INTEGER  Number of CPUs per worker [required]               │
│ *  --memory                TEXT     Memory per worker [required]                       │
│ *  --time                  TEXT     Wall time per worker [required]                    │
│    --gpus                  INTEGER  Number of GPUs per worker                          │
│    --sbatch-option         TEXT     Additional Slurm option for workers (repeatable)   │
│    --setup-command         TEXT     Shell command to run before the task starts        │
│                                     (repeatable)                                       │
│    --model-file            PATH     Path to the Whisper model file                     │
│    --task-name             TEXT     Task name [default: Transcribe]                    │
│    --help                           Show this message and exit.                        │
╰────────────────────────────────────────────────────────────────────────────────────────╯

We can then run the task as follows:

python transcribe.py \
--input-dir path/to/data/ \
--input-ext .mp4 \
--output-dir path/to/results/ \
--output-ext .txt \
--max-workers 3 \
--cpus 1 \
--memory "12G" \
--time "02:00:00" \
--gpus 1 \
--model-file "/home/sp8538/.cache/whisper/medium.pt" \
--sbatch-option "--mail-user=sp8538@princeton.edu" \
--setup-command "module purge" \
--setup-command "module load anaconda3/2024.6" \
--setup-command "conda activate tiktok"
2025-09-16 10:53:44 | INFO     | Submitted task with Slurm job ID 690468
2025-09-16 10:53:44 | INFO     | Status changed: INACTIVE -> PENDING (Reason: (None))
2025-09-16 10:54:04 | INFO     | Status changed: PENDING (Reason: (None)) -> ACTIVE (0 workers)
2025-09-16 10:59:55 | INFO     | Status changed: ACTIVE (0 workers) -> ACTIVE (1 workers)
2025-09-16 11:00:45 | INFO     | 4 processed files
2025-09-16 11:00:55 | INFO     | Status changed: ACTIVE (1 workers) -> ACTIVE (3 workers)
2025-09-16 11:00:55 | INFO     | 1 processed files
2025-09-16 11:01:05 | INFO     | 6 processed files
...
2025-09-16 11:03:08 | INFO     | 2 processed files
2025-09-16 11:04:08 | INFO     | 1 processed files
2025-09-16 11:04:58 | INFO     | Status changed: ACTIVE (3 workers) -> ACTIVE (1 workers)
2025-09-16 11:05:58 | INFO     | Status changed: ACTIVE (1 workers) -> ACTIVE (0 workers)
^C2025-09-16 11:06:40 | WARNING  | Received signal 2, initiating shutdown
2025-09-16 11:06:40 | INFO     | Shutting down task
2025-09-16 11:06:40 | ERROR    | Status changed: ACTIVE (0 workers) -> INACTIVE (Reason: CANCELLED+)
2025-09-16 11:06:41 | INFO     | Task shutdown complete

The resources specified above, including time, apply to each individual worker. Workers can be spun up and down dynamically in response to incoming workloads, so it is recommended to allocate only the minimal necessary resources per worker. For example, setting the worker time to a smaller value like 2 hours (instead of 12 hours) can reduce scheduling delays, as longer Slurm job requests often result in longer queue times.

Embedding Text Files (LocalAsyncTask)

We implement the embedding step as a local asynchronous task because it involves I/O-bound work (i.e., making external API requests) that can be performed concurrently.

embed.py
import asyncio
from pathlib import Path
from typing import Annotated

import typer

from tigerflow.tasks import LocalAsyncTask
from tigerflow.utils import SetupContext


class Embed(LocalAsyncTask):
    class Params:
        model: Annotated[
            str,
            typer.Option(help="Embedding model name"),
        ] = "voyage-3.5"

    @staticmethod
    async def setup(context: SetupContext):
        import os

        import aiohttp

        context.url = "https://api.voyageai.com/v1/embeddings"
        context.headers = {
            "Authorization": f"Bearer {os.environ['VOYAGE_API_KEY']}",
            "Content-Type": "application/json",
        }
        context.session = aiohttp.ClientSession()
        print("Session created successfully!")

    @staticmethod
    async def run(context: SetupContext, input_file: Path, output_file: Path):
        import aiofiles

        async with aiofiles.open(input_file, "r") as f:
            text = await f.read()

        async with context.session.post(
            context.url,
            headers=context.headers,
            json={
                "input": text.strip(),
                "model": context.model,
                "input_type": "document",
            },
        ) as resp:
            resp.raise_for_status()  # Raise error if unsuccessful
            result = await resp.text()  # Raw JSON
            await asyncio.sleep(1)  # For API rate limit

        async with aiofiles.open(output_file, "w") as f:
            await f.write(result)

    @staticmethod
    async def teardown(context: SetupContext):
        await context.session.close()
        print("Session closed successfully!")


Embed.cli()

As shown, the task is defined such that it:

  • Initializes reusable resources (e.g., HTTP session) and stores them in context
  • Utilizes these resources from context to send a request to the external API for each input file
  • Cleans up resources (e.g., HTTP session) at the end to ensure a graceful shutdown

Notice that LocalAsyncTask requires all operations to adhere to Python's async/await syntax. For example, file reading and writing should be performed using aiofiles, since standard file I/O would block the event loop and prevent concurrent file processing. Similarly, LocalAsyncTask should not include compute-intensive logic, as this would also block the event loop and goes against its intended use. For compute-heavy tasks, consider using SlurmTask instead.

API Rate Limits

API services often enforce rate limits (e.g., 2000 requests per minute). To comply with these limits, we can use asyncio.sleep() within the run logic (as shown above), in combination with the --concurrency-limit option (see below), which controls the maximum number of files processed concurrently.

For example, if each API request takes less than a second and the service allows up to 2000 requests per minute, we can:

  • Use asyncio.sleep(1) in the run logic to ensure each request takes at least one second
  • Set --concurrency-limit to 30 to ensure no more than 30 requests are processed concurrently

Together, these measures effectively cap the request rate at 1800 requests per minute, keeping it safely within the limit.

Calling Embed.cli() turns this module into a runnable CLI application:

python embed.py --help
Usage: embed.py [OPTIONS]

Run the task as a CLI application

╭─ Options ──────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ *  --input-dir                PATH     Input directory to read data [required]                                 │
│ *  --input-ext                TEXT     Input file extension [required]                                         │
│ *  --output-dir               PATH     Output directory to store results [required]                            │
│ *  --output-ext               TEXT     Output file extension [required]                                        │
│ *  --concurrency-limit        INTEGER  Maximum number of coroutines that may run concurrently at any given     │
│                                        time (excess coroutines are queued until capacity becomes available)    │
│                                        [required]                                                              │
│    --model                    TEXT     Embedding model name [default: voyage-3.5]                              │
│    --task-name                TEXT     Task name [default: Embed]                                              │
│    --help                              Show this message and exit.                                             │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

We can then run the task as follows:

python embed.py \
--input-dir path/to/data/ \
--input-ext .txt \
--output-dir path/to/results/ \
--output-ext .json \
--concurrency-limit 30 \
--model "voyage-4-lite"
2025-09-19 10:28:32 | INFO     | Setting up task
Session created successfully!
2025-09-19 10:28:32 | INFO     | Task setup complete
2025-09-19 10:28:32 | INFO     | Starting processing: 7501870786337115434.txt
2025-09-19 10:28:32 | INFO     | Starting processing: 7501870786941127967.txt
2025-09-19 10:28:32 | INFO     | Starting processing: 7501870783862541576.txt
...
2025-09-19 10:28:34 | INFO     | Starting processing: 7501869901028592927.txt
2025-09-19 10:28:34 | INFO     | Successfully processed: 7501871089715268906.txt
2025-09-19 10:28:34 | INFO     | Starting processing: 7501870443775692078.txt
2025-09-19 10:28:34 | INFO     | Successfully processed: 7501863546456771870.txt
2025-09-19 10:28:34 | INFO     | Starting processing: 7501869899782901022.txt
2025-09-19 10:28:34 | INFO     | Successfully processed: 7501870775461317906.txt
...
2025-09-19 10:28:36 | INFO     | Successfully processed: 7501870542656474398.txt
2025-09-19 10:28:36 | INFO     | Successfully processed: 7501870861306318126.txt
2025-09-19 10:28:36 | INFO     | Successfully processed: 7501870700089707807.txt
^C2025-09-19 10:28:40 | WARNING  | Received signal 2, initiating shutdown
2025-09-19 10:28:40 | INFO     | Shutting down task
Session closed successfully!
2025-09-19 10:28:40 | INFO     | Task shutdown complete

Ingesting Text Embeddings (LocalTask)

We implement the ingestion step as a local synchronous task because our target database (DuckDB) only supports writes from a single process.

ingest.py
import json
from pathlib import Path
from typing import Annotated

import typer

from tigerflow.tasks import LocalTask
from tigerflow.utils import SetupContext


class Ingest(LocalTask):
    class Params:
        db_path: Annotated[
            Path,
            typer.Option(help="Path to the DuckDB database file"),
        ]

    @staticmethod
    def setup(context: SetupContext):
        import duckdb

        conn = duckdb.connect(str(context.db_path))  # Creates file if not existing
        print(f"Successfully connected to {context.db_path}")

        conn.execute("""
            CREATE TABLE IF NOT EXISTS embeddings (
                id UBIGINT,
                embedding FLOAT[1024],
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
        """)

        context.conn = conn

    @staticmethod
    def run(context: SetupContext, input_file: Path, output_file: Path):
        with open(input_file, "r") as f:
            content = json.load(f)

        embedding = content["data"][0]["embedding"]

        context.conn.execute(
            "INSERT INTO embeddings (id, embedding) VALUES (?, ?)",
            (input_file.stem, embedding),
        )

    @staticmethod
    def teardown(context: SetupContext):
        context.conn.close()
        print("DB connection closed")


Ingest.cli()

As shown, the task is defined such that:

  • A database connection is created once during setup and stored in context
  • This database connection is then accessed from context to ingest each file
  • The database connection is closed at the end to ensure a graceful shutdown

Calling Ingest.cli() turns this module into a runnable CLI application:

python ingest.py --help
Usage: ingest.py [OPTIONS]

Run the task as a CLI application

╭─ Options ────────────────────────────────────────────────────────────────────╮
│ *  --input-dir         PATH  Input directory to read data [required]         │
│ *  --input-ext         TEXT  Input file extension [required]                 │
│ *  --output-dir        PATH  Output directory to store results [required]    │
│ *  --output-ext        TEXT  Output file extension [required]                │
│    --db-path           PATH  Path to the DuckDB database file                │
│    --task-name         TEXT  Task name [default: Ingest]                     │
│    --help                    Show this message and exit.                     │
╰──────────────────────────────────────────────────────────────────────────────╯

We can then run the task as follows:

python ingest.py \
--input-dir path/to/data/ \
--input-ext .json \
--output-dir path/to/results/ \
--output-ext .out \
--db-path "/home/sp8538/tiktok/pipeline/tigerflow/demo/results/test.db"
2025-09-19 13:02:16 | INFO     | Setting up task
Successfully connected to /home/sp8538/tiktok/pipeline/tigerflow/demo/results/test.db
2025-09-19 13:02:16 | INFO     | Task setup complete
2025-09-19 13:02:16 | INFO     | Starting processing: 7501869531975929119.json
2025-09-19 13:02:17 | INFO     | Successfully processed: 7501869531975929119.json
2025-09-19 13:02:17 | INFO     | Starting processing: 7501869470705782062.json
2025-09-19 13:02:17 | INFO     | Successfully processed: 7501869470705782062.json
2025-09-19 13:02:17 | INFO     | Starting processing: 7501871439457373470.json
...
2025-09-19 13:02:18 | INFO     | Successfully processed: 7501870861306318126.json
2025-09-19 13:02:18 | INFO     | Starting processing: 7501870700089707807.json
2025-09-19 13:02:18 | INFO     | Successfully processed: 7501870700089707807.json
^C2025-09-19 13:02:22 | WARNING  | Received signal 2, initiating shutdown
2025-09-19 13:02:22 | INFO     | Shutting down task
DB connection closed
2025-09-19 13:02:22 | INFO     | Task shutdown complete

Output Files

Note that we specify --output-dir and --output-ext even though the task’s run logic does not write to output_file. This is necessary because TigerFlow creates an empty "placeholder" file even when no content is written. This placeholder indicates that the file was processed successfully according to the user-provided run logic, even if no concrete output was produced.