Data Infrastructure · Deterministic Pipelines

Media Ingestion, Normalization, and Exploration Platform

Deterministic pipelines for acquiring and normalizing messy real-world media into consistent, searchable artifacts suitable for exploration and dataset preparation.

The project treats messy real-world media as a systems engineering problem: inputs are inconsistent, failures are inevitable, and correctness emerges from deterministic stages, structured metadata, and resumable execution. These same principles underpin modern AI dataset pipelines.
Node.js TypeScript Puppeteer FFmpeg ImageMagick

TL;DR

This system converts inconsistent real-world media into deterministic, normalized artifacts suitable for indexing, search, and dataset preparation. The pipeline emphasizes resumability, metadata-driven decision logic, and bounded execution so large batch runs can recover safely from interruptions or partial failures.

  • Automated ingestion from publicly accessible sources using browser-driven extraction primitives.
  • Idempotent processing stages allow batch runs to resume safely after interruption.
  • Bounded concurrency protects compute resources during heavy transformations.
  • Outputs normalized artifacts plus structured metadata suitable for indexing and dataset generation.
Polyglot note: The system combined TypeScript orchestration with a small C# component used to parse FFprobe metadata in a strongly typed way.

Execution Overview

The core challenge was not downloading files — it was ingesting chaotic inputs and producing repeatable outputs under partial failure, restarts, and compute-heavy processing stages.

Public sources (rate-limited) + inconsistent formats │ ▼ Acquisition (browser-driven primitives, workflows) │ ▼ Checkpointed batch execution (resume-safe) │ ▼ Metadata extraction (FFprobe) + decision logic │ ▼ Normalization stages - re-encode - resize - audio validation - optional AI upscaling │ ▼ Structured catalog + normalized artifacts │ ▼ Search / filtering / dataset export
Responsible acquisition: crawl pipelines include configurable per-request delays and respect source rate limits.

Origin

As video resolution increased from 720p → 1080p → 4K, storage pressure escalated quickly. Re-encoding often produced large reductions in file size, but organization remained difficult: duplicate encodes, mismatched resolutions, and silent failures were common.

The failure that triggered a redesign occurred during a long flight: several downloaded episodes contained no audio track. The issue was not a bad file — it exposed a missing validation stage in the pipeline.

The system was redesigned around deterministic pipeline stages that verify inputs, normalize outputs, and record structured metadata for downstream processing.

Acquisition Engine

Acquisition is modeled as reusable browser primitives composed into resilient crawl workflows.

Browser automation interface

export interface Page {
  navigate(url: string): Promise<void>;
  exists(selector: string): Promise<boolean>;
  select(selector: string): Promise<string[]>;
  selectAttribute(selector: string, attribute: string): Promise<string[]>;
  scrollToBottom(): Promise<void>;
  download(url: string, destination: string): Promise<void>;
}
Example crawl workflow

export async function crawl(page: Page, startUrl: string) {

  await page.navigate(startUrl);

  const results: string[] = [];

  while (true) {

    const links = await page.selectAttribute("a.media-link", "href");
    results.push(...links);

    const hasNext = await page.exists("a.next-page");
    if (!hasNext) break;

    const next = (await page.selectAttribute("a.next-page", "href"))[0];
    await page.navigate(next);
  }

  return results;
}

Declarative Workflow Execution

Workflows are defined declaratively and executed by a runtime step engine so pipelines can be composed, debugged, and resumed without entangling control flow with browser automation logic.

Workflow definition

{
  "name": "acquire-gallery",
  "steps": [
    { "type": "navigate", "input": "https://example.com/gallery" },
    { "type": "scroll" },
    { "type": "select-attribute", "input": { "selector": "img", "attribute": "src" }},
    { "type": "download-many" }
  ]
}

Resumable Crawl State

Long-running ingestion pipelines persist crawl state so runs can resume safely.

Crawl state checkpoint

export interface CrawlState {
  artists: string[];
  isFinished: boolean;
  pageIndex: number;
  rootDirectory: string;
  tags: string[];
  uploaders: string[];
}

const state = io.readJson<CrawlState>(stateJsonFilePath);

io.writeJson(stateJsonFilePath, {
  artists,
  isFinished,
  pageIndex,
  rootDirectory,
  tags,
  uploaders
});

Metadata Extraction and Decision Logic

FFprobe metadata drives validation and normalization decisions.

C# FFprobe parser

internal class MediaInfo
{
    private StreamInfo[] _streams;

    public MediaInfo(XPathDocument ffProbeResult)
    {
        Result = ffProbeResult;
    }

    public string FormatName =>
        GetAttrValue("/ffprobe/format/@format_name");

    public TimeSpan Duration
    {
        get
        {
            string attrValue = GetAttrValue("/ffprobe/format/@duration");

            return !string.IsNullOrEmpty(attrValue) &&
                   TimeSpan.TryParse(attrValue, out TimeSpan result)
                ? result
                : TimeSpan.Zero;
        }
    }

    public StreamInfo[] Streams => _streams ??= GetStreams();

    public XPathDocument Result { get; }

    public string GetAttrValue(string xpath)
    {
        return Result.CreateNavigator().SelectSingleNode(xpath)?.Value;
    }
}
Stream metadata extraction

internal class StreamInfo
{
    private readonly MediaInfo _info;

    internal StreamInfo(MediaInfo info, string index)
    {
        _info = info;
        Index = index;
    }

    public string Index { get; }

    public string CodecType =>
        _info.GetAttrValue(XPathPrefix + "/@codec_type");

    public int Width =>
        ParseInt(_info.GetAttrValue(XPathPrefix + "/@width"));

    public int Height =>
        ParseInt(_info.GetAttrValue(XPathPrefix + "/@height"));

    public float FrameRate
    {
        get
        {
            string attrValue =
                _info.GetAttrValue(XPathPrefix + "/@r_frame_rate");

            if (string.IsNullOrEmpty(attrValue))
                return -1f;

            string[] parts = attrValue.Split('/');

            int num1 = ParseInt(parts[0]);
            int num2 = ParseInt(parts[1]);

            return (num1 > 0 && num2 > 0)
                ? (float)num1 / num2
                : -1f;
        }
    }

    private string XPathPrefix =>
        "/ffprobe/streams/stream[@index=\"" + Index + "\"]";

    private int ParseInt(string s)
    {
        int result;

        return !string.IsNullOrEmpty(s) &&
               int.TryParse(s, out result)
            ? result
            : -1;
    }
}
Metadata-driven normalization

const { height, width } = imageSize(filePath);

if (width <= maxWidth && height <= maxHeight) return;

if (height > maxHeight && width <= maxWidth) {
  scaleByHeight();
} else if (width > maxWidth && height <= maxHeight) {
  scaleByWidth();
}

Idempotent Normalization Pipeline

All transformation stages are designed to be safely re-run. Outputs act as deterministic checkpoints: if the expected artifact already exists, the stage can be skipped without recomputing work. This makes large batch runs resumable even after partial failure.

Idempotent processing stage

async Task ProcessVideoAsync(string input, string output)
{
    // If the normalized artifact already exists,
    // the stage can be skipped safely.
    if (File.Exists(output))
        return;

    var settings = new OutputSettings
    {
        VideoCodec = "libx264",
        VideoFrameRate = 30
    };

    settings.SetVideoFrameSize(1920, 1080);

    await RunFfmpegAsync(input, output, settings);
}

AI Upscaling and GPU-Bound Orchestration

GPU-bound stages are executed with bounded concurrency and idempotent outputs.

Concurrency control

export function createSemaphore(maxConcurrent: number) {

  let inFlight = 0;
  const queue: Array<() => void> = [];

  const acquire = () => new Promise<void>(resolve => {
    const tryEnter = () => {
      if (inFlight < maxConcurrent) {
        inFlight++;
        resolve();
        return;
      }
      queue.push(tryEnter);
    };

    tryEnter();
  });

  const release = () => {
    inFlight--;
    const next = queue.shift();
    if (next) next();
  };

  return { acquire, release };
}
Bounded stage execution

const gpu = createSemaphore(1);

async function runUpscaleStage(input: string, output: string) {

  if (await fileExists(output)) return;

  await gpu.acquire();

  try {
    await execTool(`realesrgan -i "${input}" -o "${output}"`);
  }
  finally {
    gpu.release();
  }
}

Pipeline Output Example

Normalized media record

{
  "id": "vid_48321",
  "format": "mp4",
  "codec": "h264",
  "width": 1920,
  "height": 1080,
  "fps": 30,
  "duration": 142.6,
  "hasAudio": true,
  "normalized": true,
  "source": "public-media"
}

Why This Matters for AI Infrastructure

Many machine learning systems fail not because of models, but because of brittle data infrastructure. The same principles used to stabilize this media pipeline map directly onto training data pipelines and large-scale model evaluation systems.

  • Idempotent stages allow interrupted dataset builds to resume safely.
  • Checkpointed pipeline state enables long-running data preparation workflows.
  • Metadata-driven transformations ensure deterministic preprocessing.
  • Bounded concurrency protects GPU resources during heavy transformations.
  • Deterministic outputs ensure downstream systems can trust dataset artifacts.
Reliable AI systems depend on reliable data pipelines. The engineering discipline required to normalize chaotic media inputs is the same discipline required to produce stable training datasets.