From 1076bf31ed983b13982a0ef80b1a6f88743d2d4b Mon Sep 17 00:00:00 2001 From: Ignacio Ballesteros Date: Fri, 20 Feb 2026 23:31:06 +0100 Subject: [PATCH] feat: pipeline - org2md file watch --- .gitignore | 3 + pipeline/flake.nix | 5 +- pipeline/lib/pipeline.ex | 161 +++++++++++++++------ pipeline/lib/pipeline/cli.ex | 199 ++++++++++---------------- pipeline/lib/pipeline/export.ex | 135 ++++++++++++++++++ pipeline/lib/pipeline/index.ex | 83 +++++++++++ pipeline/lib/pipeline/watcher.ex | 236 +++++++++++++++++++++++++++++++ pipeline/mix.exs | 3 +- pipeline/mix.lock | 1 + 9 files changed, 655 insertions(+), 171 deletions(-) create mode 100644 pipeline/lib/pipeline/export.ex create mode 100644 pipeline/lib/pipeline/index.ex create mode 100644 pipeline/lib/pipeline/watcher.ex diff --git a/.gitignore b/.gitignore index 557bdac18..0f8c7b173 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,6 @@ scripts/pipeline/erl_crash.dump # Test helpers (not needed in production) scripts/test.bib scripts/test_pipeline.exs +/pipeline/deps/ +/pipeline/_build/ +/pipeline/result diff --git a/pipeline/flake.nix b/pipeline/flake.nix index a851a7ada..df0c9fca6 100644 --- a/pipeline/flake.nix +++ b/pipeline/flake.nix @@ -30,7 +30,7 @@ ./mix.lock ]; }; - sha256 = "sha256-E79X+nUy86G1Jrwv3T7dXekoGv8Hd14ZgJSKWjvlmAw="; + sha256 = "sha256-si7JAomY1HZ33m6ihUJP5i6PO39CE1clYvuMtn0CbPU="; }; # Compiled pipeline escript (without runtime wrappers). @@ -49,7 +49,7 @@ # the escript's System.cmd("emacs", ...) calls succeed. pipelineApp = pkgs.writeShellApplication { name = "pipeline"; - runtimeInputs = [ emacsWithOxHugo ]; + runtimeInputs = [ emacsWithOxHugo pkgs.inotify-tools ]; text = '' exec ${pipelineEscript}/bin/pipeline "$@" ''; @@ -62,6 +62,7 @@ devShells.default = pkgs.mkShell { buildInputs = [ pkgs.elixir + pkgs.inotify-tools emacsWithOxHugo ]; }; diff --git a/pipeline/lib/pipeline.ex b/pipeline/lib/pipeline.ex index 089540a6f..93ba696f8 100644 --- a/pipeline/lib/pipeline.ex +++ b/pipeline/lib/pipeline.ex @@ -2,10 +2,8 @@ defmodule Pipeline do @moduledoc """ Post-export markdown transformation pipeline. - Applies a list of transform modules sequentially over every .md file - in a content directory. Each transform module must implement: - - apply(content :: String.t(), opts :: map()) :: String.t() + Applies a list of transform modules sequentially over markdown files. + Each transform module must implement the `Pipeline.Transform` behaviour. Transforms are applied in the order given. A file is rewritten only when at least one transform mutates its content (checked via equality). @@ -18,16 +16,55 @@ defmodule Pipeline do citation_mode: :warn # :silent | :warn | :strict } + # Batch: all .md files in a directory Pipeline.run(content_dir, [Pipeline.Transforms.Citations], opts) + + # Targeted: specific files only + Pipeline.run_on_files(["content/foo.md"], [Pipeline.Transforms.Citations], opts) + + # With pre-initialized transforms (for watch mode, avoids re-init) + initialized = Pipeline.init_transforms([Pipeline.Transforms.Citations], opts) + Pipeline.run_on_files_with(["content/foo.md"], initialized, opts) """ require Logger @type transform :: module() + @type initialized_transform :: {module(), term()} @type opts :: map() @doc """ - Run all transforms over every .md file under `content_dir`. + Initialize transform modules. Returns a list of `{module, state}` tuples. + + Call this once and reuse the result with `run_on_files_with/3` to avoid + re-initializing transforms on every file change (e.g., in watch mode). + """ + @spec init_transforms([transform()], opts()) :: [initialized_transform()] + def init_transforms(transforms, opts) do + Enum.map(transforms, fn mod -> + state = mod.init(opts) + {mod, state} + end) + end + + @doc """ + Tear down previously initialized transforms, releasing any resources. + """ + @spec teardown_transforms([initialized_transform()]) :: :ok + def teardown_transforms(initialized) do + Enum.each(initialized, fn {mod, state} -> + if function_exported?(mod, :teardown, 1) do + mod.teardown(state) + end + end) + + :ok + end + + @doc """ + Run all transforms over every `.md` file under `content_dir`. + + Initializes and tears down transforms automatically. Returns `{:ok, stats}` where stats maps each transform to a count of files it changed. """ @spec run(String.t(), [transform()], opts()) :: {:ok, map()} @@ -41,43 +78,87 @@ defmodule Pipeline do Logger.warning("Pipeline: no .md files found in #{content_dir}") {:ok, %{}} else - Logger.info("Pipeline: processing #{length(md_files)} markdown files with #{length(transforms)} transform(s)") - - # Initialise transforms (allows them to perform setup such as loading a .bib file). - # Each transform module must implement the Pipeline.Transform behaviour. - initialized = - Enum.map(transforms, fn mod -> - state = mod.init(opts) - {mod, state} - end) - - stats = - Enum.reduce(md_files, %{}, fn path, acc -> - original = File.read!(path) - - {transformed, file_stats} = - Enum.reduce(initialized, {original, %{}}, fn {mod, state}, {content, fstats} -> - result = mod.apply(content, state, opts) - changed = result != content - {result, Map.update(fstats, mod, (if changed, do: 1, else: 0), &(&1 + (if changed, do: 1, else: 0)))} - end) - - if transformed != original do - File.write!(path, transformed) - Logger.debug("Pipeline: updated #{Path.relative_to_cwd(path)}") - end - - Map.merge(acc, file_stats, fn _k, a, b -> a + b end) - end) - - Enum.each(initialized, fn {mod, state} -> - # teardown/1 is optional in the behaviour - if function_exported?(mod, :teardown, 1) do - mod.teardown(state) - end - end) + Logger.info( + "Pipeline: processing #{length(md_files)} markdown files " <> + "with #{length(transforms)} transform(s)" + ) + initialized = init_transforms(transforms, opts) + stats = apply_transforms(md_files, initialized, opts) + teardown_transforms(initialized) {:ok, stats} end end + + @doc """ + Run all transforms over specific `.md` files only. + + Initializes and tears down transforms automatically. + Files that don't exist are silently skipped. + """ + @spec run_on_files([String.t()], [transform()], opts()) :: {:ok, map()} + def run_on_files(file_paths, transforms, opts \\ %{}) do + existing = Enum.filter(file_paths, &File.exists?/1) + + if existing == [] do + Logger.debug("Pipeline: no files to process") + {:ok, %{}} + else + Logger.info("Pipeline: processing #{length(existing)} file(s)") + initialized = init_transforms(transforms, opts) + stats = apply_transforms(existing, initialized, opts) + teardown_transforms(initialized) + {:ok, stats} + end + end + + @doc """ + Run pre-initialized transforms over specific `.md` files. + + Does NOT call `init` or `teardown` — the caller manages the transform + lifecycle. Use this in watch mode to avoid re-initializing on every change. + """ + @spec run_on_files_with([String.t()], [initialized_transform()], opts()) :: {:ok, map()} + def run_on_files_with(file_paths, initialized, opts) do + existing = Enum.filter(file_paths, &File.exists?/1) + + if existing == [] do + Logger.debug("Pipeline: no files to process") + {:ok, %{}} + else + stats = apply_transforms(existing, initialized, opts) + {:ok, stats} + end + end + + # ------------------------------------------------------------------- + # Private + # ------------------------------------------------------------------- + + defp apply_transforms(md_files, initialized, opts) do + Enum.reduce(md_files, %{}, fn path, acc -> + original = File.read!(path) + + {transformed, file_stats} = + Enum.reduce(initialized, {original, %{}}, fn {mod, state}, {content, fstats} -> + result = mod.apply(content, state, opts) + changed = result != content + + {result, + Map.update( + fstats, + mod, + if(changed, do: 1, else: 0), + &(&1 + if(changed, do: 1, else: 0)) + )} + end) + + if transformed != original do + File.write!(path, transformed) + Logger.debug("Pipeline: updated #{Path.relative_to_cwd(path)}") + end + + Map.merge(acc, file_stats, fn _k, a, b -> a + b end) + end) + end end diff --git a/pipeline/lib/pipeline/cli.ex b/pipeline/lib/pipeline/cli.ex index 4978c0ec0..181c4142f 100644 --- a/pipeline/lib/pipeline/cli.ex +++ b/pipeline/lib/pipeline/cli.ex @@ -5,13 +5,16 @@ defmodule Pipeline.CLI do Runs four phases in sequence: 1. Wipe `content/` (preserving `.gitkeep`) - 2. Export each `.org` file via `emacs --batch` + ox-hugo → `content/**/*.md` + 2. Export each `.org` file via `emacs --batch` + ox-hugo -> `content/**/*.md` 3. Run Elixir transform modules over every `.md` file 4. Generate a fallback `content/index.md` if none was exported + With `--watch`, after the initial batch the process stays alive and + incrementally re-exports only changed `.org` files. + ## Usage - pipeline [--output ] + pipeline [--output ] [--watch] Arguments: notes-dir Path to the directory containing `.org` notes (required). @@ -23,6 +26,8 @@ defmodule Pipeline.CLI do working directory. --content-dir

Output directory for exported Markdown. Defaults to `/content`. + --watch After initial batch, watch notes-dir for changes and + incrementally re-export affected files. Optional env vars: BIBTEX_FILE Path to a `.bib` file used as citation fallback. @@ -32,14 +37,18 @@ defmodule Pipeline.CLI do require Logger + @transforms [Pipeline.Transforms.Citations] + def main(argv) do Application.ensure_all_started(:pipeline) - {notes_dir, output_dir, content_dir} = parse_args(argv) + {notes_dir, output_dir, content_dir, watch?} = parse_args(argv) + pipeline_opts = build_pipeline_opts() + # Phase 1-4: full batch export wipe(content_dir) - export_org_files(notes_dir, output_dir) - run_pipeline(content_dir) + export_all(notes_dir, output_dir) + run_pipeline(content_dir, pipeline_opts) generate_index(content_dir) md_count = @@ -49,6 +58,22 @@ defmodule Pipeline.CLI do |> length() IO.puts("==> Done. #{md_count} markdown files in #{content_dir}") + + # Phase 5: optional watch mode + if watch? do + IO.puts("==> Watching #{notes_dir} for .org changes... (Ctrl+C to stop)") + + {:ok, _pid} = + Pipeline.Watcher.start_link( + notes_dir: notes_dir, + output_dir: output_dir, + content_dir: content_dir, + pipeline_opts: pipeline_opts, + transforms: @transforms + ) + + Process.sleep(:infinity) + end end # --------------------------------------------------------------------------- @@ -58,7 +83,7 @@ defmodule Pipeline.CLI do defp parse_args(argv) do {opts, positional, _invalid} = OptionParser.parse(argv, - strict: [output: :string, content_dir: :string] + strict: [output: :string, content_dir: :string, watch: :boolean] ) notes_dir = @@ -68,7 +93,7 @@ defmodule Pipeline.CLI do [] -> System.get_env("NOTES_DIR") || - abort("Usage: pipeline [--output ]") + abort("Usage: pipeline [--output ] [--watch]") end notes_dir = Path.expand(notes_dir) @@ -85,7 +110,9 @@ defmodule Pipeline.CLI do (opts[:content_dir] || Path.join(output_dir, "content")) |> Path.expand() - {notes_dir, output_dir, content_dir} + watch? = Keyword.get(opts, :watch, false) + + {notes_dir, output_dir, content_dir, watch?} end # --------------------------------------------------------------------------- @@ -108,71 +135,25 @@ defmodule Pipeline.CLI do # Phase 2: Export org files via Emacs + ox-hugo # --------------------------------------------------------------------------- - defp export_org_files(notes_dir, output_dir) do + defp export_all(notes_dir, output_dir) do IO.puts("==> Exporting org files from #{notes_dir}") - # ox-hugo requires static/ to exist for image asset copying - File.mkdir_p!(Path.join(output_dir, "static")) + case Pipeline.Export.export_all(notes_dir, output_dir) do + {:ok, 0} -> + IO.puts("No .org files found in #{notes_dir}") + System.halt(0) - org_files = - Path.join(notes_dir, "**/*.org") - |> Path.wildcard() + {:ok, count} -> + IO.puts(" exported #{count} file(s)") - if org_files == [] do - IO.puts("No .org files found in #{notes_dir}") - System.halt(0) - end + {:error, failures} -> + IO.puts(:stderr, "\nFailed to export #{length(failures)} file(s):") - results = - Enum.map(org_files, fn orgfile -> - IO.puts(" exporting: #{orgfile}") + Enum.each(failures, fn {f, {:error, reason}} -> + IO.puts(:stderr, " #{f}: #{inspect(reason)}") + end) - section = - orgfile - |> Path.dirname() - |> Path.relative_to(notes_dir) - - {output, exit_code} = - System.cmd( - "emacs", - [ - "--batch", - "--eval", "(require 'ox-hugo)", - "--eval", """ - (org-cite-register-processor 'passthrough - :export-citation - (lambda (citation _style _backend _info) - (let ((keys (mapcar (lambda (ref) - (concat "@" (org-element-property :key ref))) - (org-cite-get-references citation)))) - (format "[cite:%s]" (string-join keys ";"))))) - """, - "--eval", "(setq org-cite-export-processors '((t passthrough)))", - "--eval", ~s[(setq org-hugo-base-dir "#{output_dir}")], - "--eval", ~s[(setq org-hugo-default-section-directory "#{section}")], - "--visit", orgfile, - "--funcall", "org-hugo-export-to-md" - ], - stderr_to_stdout: true - ) - - filtered = - output - |> String.split("\n") - |> Enum.reject(&String.match?(&1, ~r/^Loading|^ad-handle|^For information/)) - |> Enum.join("\n") - - if filtered != "", do: IO.puts(filtered) - - {orgfile, exit_code} - end) - - failures = Enum.filter(results, fn {_, code} -> code != 0 end) - - if failures != [] do - IO.puts(:stderr, "\nFailed to export #{length(failures)} file(s):") - Enum.each(failures, fn {f, code} -> IO.puts(:stderr, " [exit #{code}] #{f}") end) - System.halt(1) + System.halt(1) end end @@ -180,10 +161,31 @@ defmodule Pipeline.CLI do # Phase 3: Markdown transformation pipeline # --------------------------------------------------------------------------- - defp run_pipeline(content_dir) do + defp run_pipeline(content_dir, pipeline_opts) do IO.puts("==> Running markdown pipeline") - pipeline_opts = %{ + {:ok, stats} = Pipeline.run(content_dir, @transforms, pipeline_opts) + + Enum.each(stats, fn {mod, count} -> + IO.puts(" #{inspect(mod)}: #{count} file(s) modified") + end) + end + + # --------------------------------------------------------------------------- + # Phase 4: Generate default index.md if none was exported + # --------------------------------------------------------------------------- + + defp generate_index(content_dir) do + IO.puts("==> Generating index") + Pipeline.Index.generate(content_dir) + end + + # --------------------------------------------------------------------------- + # Helpers + # --------------------------------------------------------------------------- + + defp build_pipeline_opts do + %{ zotero_url: System.get_env("ZOTERO_URL", "http://localhost:23119"), bibtex_file: System.get_env("BIBTEX_FILE"), citation_mode: @@ -193,67 +195,8 @@ defmodule Pipeline.CLI do _ -> :warn end } - - transforms = [Pipeline.Transforms.Citations] - - case Pipeline.run(content_dir, transforms, pipeline_opts) do - {:ok, stats} -> - Enum.each(stats, fn {mod, count} -> - IO.puts(" #{inspect(mod)}: #{count} file(s) modified") - end) - - {:error, reason} -> - IO.puts(:stderr, "Pipeline error: #{inspect(reason)}") - System.halt(1) - end end - # --------------------------------------------------------------------------- - # Phase 4: Generate default index.md if none was exported - # --------------------------------------------------------------------------- - - defp generate_index(content_dir) do - index_path = Path.join(content_dir, "index.md") - - unless File.exists?(index_path) do - IO.puts("==> Generating default index.md") - - pages = - Path.join(content_dir, "**/*.md") - |> Path.wildcard() - |> Enum.map(fn path -> - slug = Path.relative_to(path, content_dir) |> Path.rootname() - - title = - path - |> File.read!() - |> then(fn content -> - case Regex.run(~r/^title\s*=\s*"(.+)"/m, content) do - [_, t] -> t - _ -> slug - end - end) - - {slug, title} - end) - |> Enum.sort_by(fn {_, title} -> title end) - |> Enum.map(fn {slug, title} -> "- [#{title}](#{slug})" end) - |> Enum.join("\n") - - File.write!(index_path, """ - --- - title: Index - --- - - #{pages} - """) - end - end - - # --------------------------------------------------------------------------- - # Helpers - # --------------------------------------------------------------------------- - defp abort(message) do IO.puts(:stderr, message) System.halt(1) diff --git a/pipeline/lib/pipeline/export.ex b/pipeline/lib/pipeline/export.ex new file mode 100644 index 000000000..e0cb0bebd --- /dev/null +++ b/pipeline/lib/pipeline/export.ex @@ -0,0 +1,135 @@ +defmodule Pipeline.Export do + @moduledoc """ + Org-to-Markdown export via Emacs batch + ox-hugo. + + Provides both single-file and batch export, plus a helper to compute + the expected `.md` output path for a given `.org` source file. + """ + + require Logger + + @doc """ + Export a single `.org` file to Markdown via `emacs --batch` + ox-hugo. + + Returns `{:ok, exit_code}` with the emacs exit code (0 = success), + or `{:error, reason}` if the command could not be executed. + """ + @spec export_file(String.t(), String.t(), String.t()) :: {:ok, non_neg_integer()} | {:error, term()} + def export_file(orgfile, notes_dir, output_dir) do + section = + orgfile + |> Path.dirname() + |> Path.relative_to(notes_dir) + + # ox-hugo requires static/ to exist for image asset copying + File.mkdir_p!(Path.join(output_dir, "static")) + + {output, exit_code} = + System.cmd( + "emacs", + [ + "--batch", + "--eval", "(require 'ox-hugo)", + "--eval", """ + (org-cite-register-processor 'passthrough + :export-citation + (lambda (citation _style _backend _info) + (let ((keys (mapcar (lambda (ref) + (concat "@" (org-element-property :key ref))) + (org-cite-get-references citation)))) + (format "[cite:%s]" (string-join keys ";"))))) + """, + "--eval", "(setq org-cite-export-processors '((t passthrough)))", + "--eval", ~s[(setq org-hugo-base-dir "#{output_dir}")], + "--eval", ~s[(setq org-hugo-default-section-directory "#{section}")], + "--visit", orgfile, + "--funcall", "org-hugo-export-to-md" + ], + stderr_to_stdout: true + ) + + filtered = + output + |> String.split("\n") + |> Enum.reject(&String.match?(&1, ~r/^Loading|^ad-handle|^For information/)) + |> Enum.join("\n") + + if filtered != "", do: Logger.info("emacs: #{filtered}") + + if exit_code == 0 do + {:ok, exit_code} + else + {:error, {:emacs_exit, exit_code, filtered}} + end + rescue + e -> {:error, e} + end + + @doc """ + Export all `.org` files found under `notes_dir`. + + Returns `{:ok, count}` where `count` is the number of successfully + exported files, or `{:error, failures}` if any files failed. + """ + @spec export_all(String.t(), String.t()) :: {:ok, non_neg_integer()} | {:error, list()} + def export_all(notes_dir, output_dir) do + org_files = + Path.join(notes_dir, "**/*.org") + |> Path.wildcard() + + if org_files == [] do + Logger.warning("No .org files found in #{notes_dir}") + {:ok, 0} + else + Logger.info("Exporting #{length(org_files)} org file(s) from #{notes_dir}") + + results = + Enum.map(org_files, fn orgfile -> + IO.puts(" exporting: #{orgfile}") + {orgfile, export_file(orgfile, notes_dir, output_dir)} + end) + + failures = + Enum.filter(results, fn + {_, {:ok, _}} -> false + {_, {:error, _}} -> true + end) + + if failures == [] do + {:ok, length(results)} + else + {:error, failures} + end + end + end + + @doc """ + Compute the expected `.md` path for a given `.org` file. + + Uses the same section-mapping logic as ox-hugo: the relative directory + of the `.org` file within `notes_dir` becomes the section directory + under `content_dir`. + + ## Examples + + iex> Pipeline.Export.expected_md_path("/notes/bus/emt.org", "/notes", "/out/content") + "/out/content/bus/emt.md" + + iex> Pipeline.Export.expected_md_path("/notes/top-level.org", "/notes", "/out/content") + "/out/content/top-level.md" + """ + @spec expected_md_path(String.t(), String.t(), String.t()) :: String.t() + def expected_md_path(orgfile, notes_dir, content_dir) do + section = + orgfile + |> Path.dirname() + |> Path.relative_to(notes_dir) + + basename = Path.basename(orgfile, ".org") <> ".md" + + case section do + "." -> Path.join(content_dir, basename) + _ -> Path.join([content_dir, section, basename]) + end + end +end diff --git a/pipeline/lib/pipeline/index.ex b/pipeline/lib/pipeline/index.ex new file mode 100644 index 000000000..a0bd5d061 --- /dev/null +++ b/pipeline/lib/pipeline/index.ex @@ -0,0 +1,83 @@ +defmodule Pipeline.Index do + @moduledoc """ + Generates a fallback `index.md` in the content directory if none was + exported from an `.org` file. + + The generated index lists all markdown pages alphabetically with links. + """ + + @doc """ + Generate `content_dir/index.md` if it does not already exist. + + If an `index.md` was already created by ox-hugo (from an `index.org`), + it is left untouched. + """ + @spec generate(String.t()) :: :ok + def generate(content_dir) do + index_path = Path.join(content_dir, "index.md") + + unless File.exists?(index_path) do + IO.puts(" generating default index.md") + + pages = + Path.join(content_dir, "**/*.md") + |> Path.wildcard() + |> Enum.map(fn path -> + slug = Path.relative_to(path, content_dir) |> Path.rootname() + + title = + path + |> File.read!() + |> then(fn content -> + case Regex.run(~r/^title\s*=\s*"(.+)"/m, content) do + [_, t] -> t + _ -> slug + end + end) + + {slug, title} + end) + |> Enum.sort_by(fn {_, title} -> title end) + |> Enum.map(fn {slug, title} -> "- [#{title}](#{slug})" end) + |> Enum.join("\n") + + File.write!(index_path, """ + --- + title: Index + --- + + #{pages} + """) + end + + :ok + end + + @doc """ + Regenerate the index by removing any previously generated one first. + + Only removes the index if it was generated by us (contains `title: Index`). + User-exported index files (from `index.org`) are left untouched. + """ + @spec regenerate(String.t()) :: :ok + def regenerate(content_dir) do + index_path = Path.join(content_dir, "index.md") + + if File.exists?(index_path) do + content = File.read!(index_path) + + if generated_index?(content) do + File.rm!(index_path) + end + end + + generate(content_dir) + end + + defp generated_index?(content) do + # Our generated index uses "title: Index" in YAML frontmatter. + # ox-hugo uses TOML frontmatter (title = "..."), so this won't + # match user-exported files. + String.contains?(content, "title: Index") + end +end diff --git a/pipeline/lib/pipeline/watcher.ex b/pipeline/lib/pipeline/watcher.ex new file mode 100644 index 000000000..cc626a3c5 --- /dev/null +++ b/pipeline/lib/pipeline/watcher.ex @@ -0,0 +1,236 @@ +defmodule Pipeline.Watcher do + @moduledoc """ + File-watching GenServer that detects `.org` file changes and triggers + incremental export + transform for only the affected files. + + Uses the `file_system` package (inotify on Linux, fsevents on macOS) + to watch the notes directory. Events are debounced per-file (500ms) + to coalesce rapid writes (e.g., Emacs auto-save). + + ## Lifecycle + + Started dynamically by `Pipeline.CLI` after the initial batch export. + Transforms are initialized once at startup and reused across all + incremental rebuilds to avoid repeated Zotero probes and BibTeX loads. + + ## Usage + + Pipeline.Watcher.start_link( + notes_dir: "/path/to/notes", + output_dir: "/path/to/output", + content_dir: "/path/to/output/content", + pipeline_opts: %{zotero_url: "...", ...}, + transforms: [Pipeline.Transforms.Citations] + ) + """ + + use GenServer + + require Logger + + @debounce_ms 500 + + # ------------------------------------------------------------------- + # Client API + # ------------------------------------------------------------------- + + @doc """ + Start the watcher as a linked process. + + ## Options + + * `:notes_dir` — directory to watch for `.org` changes (required) + * `:output_dir` — ox-hugo base dir (required) + * `:content_dir` — directory where `.md` files are written (required) + * `:pipeline_opts` — opts map passed to transforms (required) + * `:transforms` — list of transform modules (default: `[Pipeline.Transforms.Citations]`) + """ + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + # ------------------------------------------------------------------- + # GenServer callbacks + # ------------------------------------------------------------------- + + @impl true + def init(opts) do + notes_dir = Keyword.fetch!(opts, :notes_dir) + output_dir = Keyword.fetch!(opts, :output_dir) + content_dir = Keyword.fetch!(opts, :content_dir) + pipeline_opts = Keyword.fetch!(opts, :pipeline_opts) + transforms = Keyword.get(opts, :transforms, [Pipeline.Transforms.Citations]) + + # Initialize transforms once — reused for all incremental rebuilds + initialized_transforms = Pipeline.init_transforms(transforms, pipeline_opts) + + # Start the file system watcher + {:ok, watcher_pid} = FileSystem.start_link(dirs: [notes_dir], recursive: true) + FileSystem.subscribe(watcher_pid) + + Logger.info("Watcher: monitoring #{notes_dir} for .org changes") + + {:ok, + %{ + notes_dir: notes_dir, + output_dir: output_dir, + content_dir: content_dir, + pipeline_opts: pipeline_opts, + watcher_pid: watcher_pid, + initialized_transforms: initialized_transforms, + pending: %{} + }} + end + + @impl true + def handle_info({:file_event, _pid, {path, events}}, state) do + path = to_string(path) + + if org_file?(path) and not temporary_file?(path) do + event_type = classify_events(events) + Logger.debug("Watcher: #{event_type} event for #{path}") + {:noreply, schedule_debounce(path, event_type, state)} + else + {:noreply, state} + end + end + + @impl true + def handle_info({:file_event, _pid, :stop}, state) do + Logger.warning("Watcher: file system monitor stopped unexpectedly") + {:stop, :watcher_stopped, state} + end + + @impl true + def handle_info({:debounced, path, event_type}, state) do + state = %{state | pending: Map.delete(state.pending, path)} + + case event_type do + :deleted -> + handle_delete(path, state) + + _created_or_modified -> + handle_change(path, state) + end + + {:noreply, state} + end + + @impl true + def terminate(_reason, state) do + Pipeline.teardown_transforms(state.initialized_transforms) + :ok + end + + # ------------------------------------------------------------------- + # Event handling + # ------------------------------------------------------------------- + + defp handle_change(orgfile, state) do + %{ + notes_dir: notes_dir, + output_dir: output_dir, + content_dir: content_dir, + pipeline_opts: pipeline_opts, + initialized_transforms: initialized_transforms + } = state + + md_path = Pipeline.Export.expected_md_path(orgfile, notes_dir, content_dir) + IO.puts("==> Changed: #{Path.relative_to(orgfile, notes_dir)}") + + case Pipeline.Export.export_file(orgfile, notes_dir, output_dir) do + {:ok, _} -> + IO.puts(" exported: #{Path.relative_to(md_path, content_dir)}") + + {:ok, stats} = Pipeline.run_on_files_with([md_path], initialized_transforms, pipeline_opts) + + Enum.each(stats, fn {mod, count} -> + if count > 0, do: IO.puts(" #{inspect(mod)}: #{count} file(s) modified") + end) + + regenerate_index(content_dir) + IO.puts("==> Done") + + {:error, reason} -> + Logger.error("Watcher: export failed for #{orgfile}: #{inspect(reason)}") + end + end + + defp handle_delete(orgfile, state) do + %{notes_dir: notes_dir, content_dir: content_dir} = state + + md_path = Pipeline.Export.expected_md_path(orgfile, notes_dir, content_dir) + IO.puts("==> Deleted: #{Path.relative_to(orgfile, notes_dir)}") + + if File.exists?(md_path) do + File.rm!(md_path) + IO.puts(" removed: #{Path.relative_to(md_path, content_dir)}") + + # Clean up empty parent directories left behind + cleanup_empty_dirs(Path.dirname(md_path), content_dir) + end + + regenerate_index(content_dir) + IO.puts("==> Done") + end + + # ------------------------------------------------------------------- + # Index generation + # ------------------------------------------------------------------- + + defp regenerate_index(content_dir) do + Pipeline.Index.regenerate(content_dir) + end + + # ------------------------------------------------------------------- + # Helpers + # ------------------------------------------------------------------- + + defp schedule_debounce(path, event_type, state) do + # Cancel any existing timer for this path + case Map.get(state.pending, path) do + nil -> :ok + old_ref -> Process.cancel_timer(old_ref) + end + + ref = Process.send_after(self(), {:debounced, path, event_type}, @debounce_ms) + %{state | pending: Map.put(state.pending, path, ref)} + end + + defp org_file?(path), do: String.ends_with?(path, ".org") + + defp temporary_file?(path) do + basename = Path.basename(path) + # Emacs creates temp files like .#file.org and #file.org# + String.starts_with?(basename, ".#") or + (String.starts_with?(basename, "#") and String.ends_with?(basename, "#")) + end + + defp classify_events(events) do + cond do + :removed in events or :deleted in events -> :deleted + :created in events -> :created + :modified in events or :changed in events -> :modified + # renamed can mean created or deleted depending on context; + # if the file exists it was renamed into the watched dir + :renamed in events -> :modified + true -> :modified + end + end + + defp cleanup_empty_dirs(dir, stop_at) do + dir = Path.expand(dir) + stop_at = Path.expand(stop_at) + + if dir != stop_at and File.dir?(dir) do + case File.ls!(dir) do + [] -> + File.rmdir!(dir) + cleanup_empty_dirs(Path.dirname(dir), stop_at) + + _ -> + :ok + end + end + end +end diff --git a/pipeline/mix.exs b/pipeline/mix.exs index 8a22f7026..f499e7349 100644 --- a/pipeline/mix.exs +++ b/pipeline/mix.exs @@ -27,7 +27,8 @@ defmodule Pipeline.MixProject do [ {:finch, "~> 0.19"}, {:req, "~> 0.5"}, - {:jason, "~> 1.4"} + {:jason, "~> 1.4"}, + {:file_system, "~> 1.0"} ] end end diff --git a/pipeline/mix.lock b/pipeline/mix.lock index 862aa1b7b..c5e4b01eb 100644 --- a/pipeline/mix.lock +++ b/pipeline/mix.lock @@ -1,4 +1,5 @@ %{ + "file_system": {:hex, :file_system, "1.1.1", "31864f4685b0148f25bd3fbef2b1228457c0c89024ad67f7a81a3ffbc0bbad3a", [:mix], [], "hexpm", "7a15ff97dfe526aeefb090a7a9d3d03aa907e100e262a0f8f7746b78f8f87a5d"}, "finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"}, "hpax": {:hex, :hpax, "1.0.3", "ed67ef51ad4df91e75cc6a1494f851850c0bd98ebc0be6e81b026e765ee535aa", [:mix], [], "hexpm", "8eab6e1cfa8d5918c2ce4ba43588e894af35dbd8e91e6e55c817bca5847df34a"}, "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},