Files
quartz-org-roam/pipeline/lib/pipeline.ex
2026-02-20 23:31:06 +01:00

165 lines
5.1 KiB
Elixir

defmodule Pipeline do
@moduledoc """
Post-export markdown transformation pipeline.
Applies a list of transform modules sequentially over markdown files.
Each transform module must implement the `Pipeline.Transform` behaviour.
Transforms are applied in the order given. A file is rewritten only
when at least one transform mutates its content (checked via equality).
## Usage
opts = %{
zotero_url: "http://localhost:23119",
bibtex_file: System.get_env("BIBTEX_FILE"),
citation_mode: :warn # :silent | :warn | :strict
}
# Batch: all .md files in a directory
Pipeline.run(content_dir, [Pipeline.Transforms.Citations], opts)
# Targeted: specific files only
Pipeline.run_on_files(["content/foo.md"], [Pipeline.Transforms.Citations], opts)
# With pre-initialized transforms (for watch mode, avoids re-init)
initialized = Pipeline.init_transforms([Pipeline.Transforms.Citations], opts)
Pipeline.run_on_files_with(["content/foo.md"], initialized, opts)
"""
require Logger
@type transform :: module()
@type initialized_transform :: {module(), term()}
@type opts :: map()
@doc """
Initialize transform modules. Returns a list of `{module, state}` tuples.
Call this once and reuse the result with `run_on_files_with/3` to avoid
re-initializing transforms on every file change (e.g., in watch mode).
"""
@spec init_transforms([transform()], opts()) :: [initialized_transform()]
def init_transforms(transforms, opts) do
Enum.map(transforms, fn mod ->
state = mod.init(opts)
{mod, state}
end)
end
@doc """
Tear down previously initialized transforms, releasing any resources.
"""
@spec teardown_transforms([initialized_transform()]) :: :ok
def teardown_transforms(initialized) do
Enum.each(initialized, fn {mod, state} ->
if function_exported?(mod, :teardown, 1) do
mod.teardown(state)
end
end)
:ok
end
@doc """
Run all transforms over every `.md` file under `content_dir`.
Initializes and tears down transforms automatically.
Returns `{:ok, stats}` where stats maps each transform to a count of files it changed.
"""
@spec run(String.t(), [transform()], opts()) :: {:ok, map()}
def run(content_dir, transforms, opts \\ %{}) do
md_files =
content_dir
|> Path.join("**/*.md")
|> Path.wildcard()
if md_files == [] do
Logger.warning("Pipeline: no .md files found in #{content_dir}")
{:ok, %{}}
else
Logger.info(
"Pipeline: processing #{length(md_files)} markdown files " <>
"with #{length(transforms)} transform(s)"
)
initialized = init_transforms(transforms, opts)
stats = apply_transforms(md_files, initialized, opts)
teardown_transforms(initialized)
{:ok, stats}
end
end
@doc """
Run all transforms over specific `.md` files only.
Initializes and tears down transforms automatically.
Files that don't exist are silently skipped.
"""
@spec run_on_files([String.t()], [transform()], opts()) :: {:ok, map()}
def run_on_files(file_paths, transforms, opts \\ %{}) do
existing = Enum.filter(file_paths, &File.exists?/1)
if existing == [] do
Logger.debug("Pipeline: no files to process")
{:ok, %{}}
else
Logger.info("Pipeline: processing #{length(existing)} file(s)")
initialized = init_transforms(transforms, opts)
stats = apply_transforms(existing, initialized, opts)
teardown_transforms(initialized)
{:ok, stats}
end
end
@doc """
Run pre-initialized transforms over specific `.md` files.
Does NOT call `init` or `teardown` — the caller manages the transform
lifecycle. Use this in watch mode to avoid re-initializing on every change.
"""
@spec run_on_files_with([String.t()], [initialized_transform()], opts()) :: {:ok, map()}
def run_on_files_with(file_paths, initialized, opts) do
existing = Enum.filter(file_paths, &File.exists?/1)
if existing == [] do
Logger.debug("Pipeline: no files to process")
{:ok, %{}}
else
stats = apply_transforms(existing, initialized, opts)
{:ok, stats}
end
end
# -------------------------------------------------------------------
# Private
# -------------------------------------------------------------------
defp apply_transforms(md_files, initialized, opts) do
Enum.reduce(md_files, %{}, fn path, acc ->
original = File.read!(path)
{transformed, file_stats} =
Enum.reduce(initialized, {original, %{}}, fn {mod, state}, {content, fstats} ->
result = mod.apply(content, state, opts)
changed = result != content
{result,
Map.update(
fstats,
mod,
if(changed, do: 1, else: 0),
&(&1 + if(changed, do: 1, else: 0))
)}
end)
if transformed != original do
File.write!(path, transformed)
Logger.debug("Pipeline: updated #{Path.relative_to_cwd(path)}")
end
Map.merge(acc, file_stats, fn _k, a, b -> a + b end)
end)
end
end