Pipelines¶
Note
This guide assumes you are familiar with TigerFlow tasks. Please review how to create and use tasks in TigerFlow before proceeding.
In TigerFlow, tasks are organized into a pipeline by creating a configuration file that describes the tasks to be run, the resources required by each task, and the dependencies between tasks. Tasks communicate through the file system: a parent task writes its outputs to a designated directory, which downstream tasks monitor for new inputs.1 Since each task performs embarrassingly parallel, one-to-one file processing (i.e., each input is transformed into a single output independently of all other inputs), multiple tasks may share the same parent, but each task can have at most one parent.
Dependency Graph
TigerFlow supports pipelines where the graph of task input/output files forms an arborescence. That is, there is a single root file, and every other file depends on exactly one parent. This means that the pipeline can contain multiple root tasks (i.e., tasks that depend on no other tasks), but they should share the same input file.
Let's build on the example from the previous section, where we created a sequence of tasks to:
- Transcribe videos using an open-source model (Whisper)
- Embed the transcriptions using an external API service (Voyage AI)
- Ingest the embeddings into a single-writer database (DuckDB)
Follow Along
You can follow along with the example using the code and data provided here. Videos have been substituted with audio files due to intellectual property constraints and storage limitations, but the pipeline remains otherwise identical.
Defining a Pipeline¶
A pipeline is configured using a YAML file. For example, the tasks above can be structured into a pipeline as follows:
tasks:
- name: transcribe
kind: slurm
module: ./transcribe.py
input_ext: .mp4
output_ext: .txt
max_workers: 3
worker_resources:
cpus: 1
gpus: 1
memory: 8G
time: 02:00:00
sbatch_options:
- "--mail-user=sp8538@princeton.edu"
setup_commands:
- module purge
- module load anaconda3/2024.6
- conda activate tiktok
- name: embed
depends_on: transcribe
kind: local_async
module: ./embed.py
input_ext: .txt
output_ext: .json
keep_output: false
concurrency_limit: 10
setup_commands:
- module purge
- module load anaconda3/2024.6
- conda activate tiktok
- name: ingest
depends_on: embed
kind: local
module: ./ingest.py
input_ext: .json
keep_output: false
setup_commands:
- module purge
- module load anaconda3/2024.6
- conda activate tiktok
where:
kindspecifies the task type (one of:local,local_async, orslurm).modulespecifies the Python script defining task logic. Care should be taken when using a relative file path as it may resolve incorrectly when running the pipeline.depends_onspecifies the name of the parent task whose output is used as input for the current task.keep_outputspecifies whether to retain output files from the current task. If unspecified, it defaults totrue.setup_commandsspecifies a list of Bash commands to run before starting the task. This can be used to activate a virtual environment required for the task logic.max_workersis a field applicable only to Slurm tasks. It specifies the maximum number of parallel workers used for auto-scaling.worker_resourcesis a section applicable only to Slurm tasks. It specifies compute, memory, and other resources to allocate for each worker.concurrency_limitis a field applicable only to local asynchronous tasks. It specifies the maximum number of coroutines (e.g., API requests) that may run concurrently at any given time (excess coroutines are queued until capacity becomes available).
Running a Pipeline¶
Assuming the configuration file and task scripts are in the current directory, we can run the pipeline as follows:
tigerflow run config.yaml path/to/data/ path/to/results/
2025-09-22 09:20:10 | INFO | Starting pipeline execution
2025-09-22 09:20:10 | INFO | [transcribe] Starting as a SLURM task
2025-09-22 09:20:10 | INFO | [transcribe] Submitted with Slurm job ID 847632
2025-09-22 09:20:10 | INFO | [embed] Starting as a LOCAL_ASYNC task
2025-09-22 09:20:10 | INFO | [embed] Started with PID 3007442
2025-09-22 09:20:10 | INFO | [ingest] Starting as a LOCAL task
2025-09-22 09:20:10 | INFO | [ingest] Started with PID 3007443
2025-09-22 09:20:10 | INFO | All tasks started, beginning pipeline tracking loop
2025-09-22 09:20:10 | INFO | [transcribe] Status changed: INACTIVE -> PENDING (Reason: (None))
2025-09-22 09:20:10 | INFO | [embed] Status changed: INACTIVE -> ACTIVE
2025-09-22 09:20:10 | INFO | [ingest] Status changed: INACTIVE -> ACTIVE
2025-09-22 09:20:11 | INFO | Staged 91 new file(s) for processing
2025-09-22 09:20:31 | INFO | [transcribe] Status changed: PENDING (Reason: (None)) -> ACTIVE (0 workers)
2025-09-22 09:21:01 | INFO | [transcribe] Status changed: ACTIVE (0 workers) -> ACTIVE (3 workers)
2025-09-22 09:21:54 | ERROR | [embed] 4 failed file(s)
2025-09-22 09:21:55 | INFO | Completed processing 25 file(s)
2025-09-22 09:22:05 | ERROR | [embed] 1 failed file(s)
2025-09-22 09:22:05 | INFO | Completed processing 7 file(s)
2025-09-22 09:22:15 | ERROR | [embed] 1 failed file(s)
2025-09-22 09:22:15 | INFO | Completed processing 13 file(s)
2025-09-22 09:22:25 | INFO | Completed processing 11 file(s)
2025-09-22 09:22:35 | INFO | Completed processing 3 file(s)
2025-09-22 09:22:45 | INFO | Completed processing 5 file(s)
2025-09-22 09:22:55 | ERROR | [embed] 1 failed file(s)
2025-09-22 09:22:55 | INFO | Completed processing 8 file(s)
2025-09-22 09:23:05 | ERROR | [embed] 1 failed file(s)
2025-09-22 09:23:05 | INFO | Completed processing 4 file(s)
2025-09-22 09:23:15 | INFO | Completed processing 6 file(s)
2025-09-22 09:23:55 | INFO | Completed processing 1 file(s)
2025-09-22 09:23:55 | INFO | No more files to process, starting idle time count
2025-09-22 09:25:06 | INFO | [transcribe] Status changed: ACTIVE (3 workers) -> ACTIVE (1 workers)
2025-09-22 09:25:46 | INFO | [transcribe] Status changed: ACTIVE (1 workers) -> ACTIVE (0 workers)
2025-09-22 09:33:48 | WARNING | Idle timeout reached, initiating shutdown
2025-09-22 09:33:48 | INFO | Shutting down pipeline
2025-09-22 09:33:48 | INFO | [embed] Terminating...
2025-09-22 09:33:48 | INFO | [ingest] Terminating...
2025-09-22 09:33:48 | INFO | [transcribe] Terminating...
2025-09-22 09:33:49 | ERROR | [transcribe] Status changed: ACTIVE (0 workers) -> INACTIVE (Reason: CANCELLED+)
2025-09-22 09:33:50 | ERROR | [embed] Status changed: ACTIVE -> INACTIVE (Exit Code: 143)
2025-09-22 09:33:50 | ERROR | [ingest] Status changed: ACTIVE -> INACTIVE (Exit Code: 143)
2025-09-22 09:33:51 | INFO | Pipeline shutdown complete
Tip
Run each task individually (see examples) to ensure they work correctly before executing the entire pipeline.
The console output shows that the pipeline:
- Runs like a server, "listening" for and staging new files for processing
- Acts as a central orchestrator that launches, monitors, and manages the lifecycle of tasks
- Optimizes resource usage through autoscaling and idle timeout
By default, pipelines time out after 10 minutes of inactivity (i.e., when there are no more files
left to process). We can override this behavior using the --idle-timeout option, like so:
# Time out after 30 days of inactivity
tigerflow run config.yaml path/to/data/ path/to/results/ --idle-timeout 43200
Before the timeout threshold is reached, the pipeline will remain active with a minimal resource footprint, ready to stage and process any new files placed in the input directory. This behavior is useful for streaming-like workflows where data may arrive sporadically.
Info
To see all available options for the run subcommand, run tigerflow run --help.
Since the pipeline has been configured to retain output files only for the transcription task,
the output directory (i.e., path/to/results/) will look as follows:
path/to/results/
├── .tigerflow/
└── transcribe/
├── 1.txt
├── 2.txt
└── ...
where .tigerflow/ is an internal directory storing the pipeline's operational state and related metadata.
Warning
.tigerflow/ is what enables resuming a previous pipeline run, so it should not be deleted or modified.
Checking Progress¶
We can check the pipeline's progress at any point by running:
tigerflow report progress path/to/results/
┏━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━┳━━━━━━━━┓
┃ Task ┃ Processed ┃ Ongoing ┃ Failed ┃
┡━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━╇━━━━━━━━┩
│ transcribe │ 91 │ 0 │ 0 │
│ embed │ 83 │ 0 │ 8 │
│ ingest │ 83 │ 0 │ 0 │
└────────────┴───────────┴─────────┴────────┘
██████████████████████████████ 91/91 (100.0%)
where path/to/results/ must be a valid output directory containing .tigerflow/.
Checking Errors¶
If the progress reports any failed files, we can identify them by running:
tigerflow report errors path/to/results/
[embed] 8 failed files (open to view errors):
results/.tigerflow/embed/7501863358941940997.err
results/.tigerflow/embed/7501867598829702430.err
results/.tigerflow/embed/7501869468910423326.err
results/.tigerflow/embed/7501869707121757470.err
results/.tigerflow/embed/7501870655906860306.err
results/.tigerflow/embed/7501870694288985390.err
results/.tigerflow/embed/7501870878855154987.err
results/.tigerflow/embed/7501870943883545899.err
Each error file contains specific error messages that help identify and resolve issues in the code or data.
Example
In this case, all error files contain the same message:
Traceback (most recent call last):
File "/home/sp8538/.conda/envs/tiktok/lib/python3.12/site-packages/tigerflow/tasks/local_async.py", line 47, in task
await self.run(self._context, input_file, temp_file)
File "/home/sp8538/tiktok/pipeline/tigerflow/demo/code/embed.py", line 35, in run
resp.raise_for_status() # Raise error if unsuccessful
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/sp8538/.conda/envs/tiktok/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 629, in raise_for_status
raise ClientResponseError(
aiohttp.client_exceptions.ClientResponseError: 400, message='Bad Request', url='https://api.voyageai.com/v1/embeddings'
which suggests an issue with the embedding API request. However, since the same request was successful for other files, the issue likely lies in the input data (i.e., transcription).
Upon inspection, we find the failed files have empty transcriptions, which explains the API request failure. Furthermore, we can confirm that the corresponding videos contain no audio, which led to the empty transcriptions in the first place.
We may then exclude such videos from the pipeline to prevent future errors.
-
TigerFlow uses the dependency information specified in the pipeline configuration to automatically organize input and output directories between tasks, so users do not need to handle this file organization manually. ↩