Files
org-garden/lib/org_garden/watcher.ex
2026-02-23 16:28:18 +01:00

305 lines
9.1 KiB
Elixir

defmodule OrgGarden.Watcher do
@moduledoc """
File-watching GenServer that detects `.org` file changes and triggers
incremental export + transform for only the affected files.
Uses the `file_system` package (inotify on Linux, fsevents on macOS)
to watch the notes directory. Events are debounced per-file (500ms)
to coalesce rapid writes (e.g., Emacs auto-save).
## Lifecycle
Started dynamically by `OrgGarden.CLI` after the initial batch export.
Transforms are initialized once at startup and reused across all
incremental rebuilds to avoid repeated Zotero probes and BibTeX loads.
## Usage
OrgGarden.Watcher.start_link(
notes_dir: "/path/to/notes",
output_dir: "/path/to/output",
content_dir: "/path/to/output/content",
pipeline_opts: %{zotero_url: "...", ...},
transforms: [OrgGarden.Transforms.Citations]
)
"""
use GenServer
require Logger
@debounce_ms 500
@drain_timeout 5_000
# -------------------------------------------------------------------
# Client API
# -------------------------------------------------------------------
@doc """
Start the watcher as a linked process.
## Options
* `:notes_dir` — directory to watch for `.org` changes (required)
* `:output_dir` — ox-hugo base dir (required)
* `:content_dir` — directory where `.md` files are written (required)
* `:pipeline_opts` — opts map passed to transforms (required)
* `:transforms` — list of transform modules (default: `[OrgGarden.Transforms.Citations]`)
"""
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Check if the watcher is running.
"""
def running? do
Process.whereis(__MODULE__) != nil
end
# -------------------------------------------------------------------
# GenServer callbacks
# -------------------------------------------------------------------
@impl true
def init(opts) do
Process.flag(:trap_exit, true)
notes_dir = Keyword.fetch!(opts, :notes_dir)
output_dir = Keyword.fetch!(opts, :output_dir)
content_dir = Keyword.fetch!(opts, :content_dir)
pipeline_opts = Keyword.fetch!(opts, :pipeline_opts)
transforms = Keyword.get(opts, :transforms, [OrgGarden.Transforms.Citations])
# Initialize transforms once — reused for all incremental rebuilds
initialized_transforms = OrgGarden.init_transforms(transforms, pipeline_opts)
# Start the file system watcher
{:ok, watcher_pid} = FileSystem.start_link(dirs: [notes_dir], recursive: true)
FileSystem.subscribe(watcher_pid)
Logger.info("Watcher: monitoring #{notes_dir} for .org changes")
{:ok,
%{
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
pipeline_opts: pipeline_opts,
watcher_pid: watcher_pid,
initialized_transforms: initialized_transforms,
pending: %{},
processing: false
}}
end
@impl true
def handle_info({:file_event, _pid, {path, events}}, state) do
path = to_string(path)
if org_file?(path) and not temporary_file?(path) do
event_type = classify_events(events)
Logger.debug("Watcher: #{event_type} event for #{path}")
{:noreply, schedule_debounce(path, event_type, state)}
else
{:noreply, state}
end
end
@impl true
def handle_info({:file_event, _pid, :stop}, state) do
Logger.warning("Watcher: file system monitor stopped unexpectedly")
{:stop, :watcher_stopped, state}
end
@impl true
def handle_info({:debounced, path, event_type}, state) do
state = %{state | pending: Map.delete(state.pending, path), processing: true}
case event_type do
:deleted ->
handle_delete(path, state)
_created_or_modified ->
handle_change(path, state)
end
emit_telemetry(:file_processed, %{path: path, event: event_type})
{:noreply, %{state | processing: false}}
end
@impl true
def handle_info({:EXIT, _pid_or_port, :normal}, state) do
{:noreply, state}
end
@impl true
def handle_info({:EXIT, pid_or_port, reason}, state) do
Logger.warning("Watcher: unexpected exit from #{inspect(pid_or_port)}: #{inspect(reason)}")
{:noreply, state}
end
@impl true
def terminate(reason, state) do
Logger.info("Watcher: shutting down (#{inspect(reason)})")
# Drain pending timers
drain_pending(state.pending)
# Wait for any in-flight processing
if state.processing do
Logger.info("Watcher: waiting for in-flight processing...")
Process.sleep(100)
end
# Teardown transforms
OrgGarden.teardown_transforms(state.initialized_transforms)
:ok
end
# -------------------------------------------------------------------
# Event handling
# -------------------------------------------------------------------
defp handle_change(orgfile, state) do
%{
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
pipeline_opts: pipeline_opts,
initialized_transforms: initialized_transforms
} = state
md_path = OrgGarden.Export.expected_md_path(orgfile, notes_dir, content_dir)
Logger.info("Changed: #{Path.relative_to(orgfile, notes_dir)}")
start_time = System.monotonic_time(:millisecond)
case OrgGarden.Export.export_file(orgfile, notes_dir, output_dir) do
{:ok, _} ->
Logger.info(" exported: #{Path.relative_to(md_path, content_dir)}")
{:ok, stats} =
OrgGarden.run_on_files_with([md_path], initialized_transforms, pipeline_opts)
Enum.each(stats, fn {mod, count} ->
if count > 0, do: Logger.info(" #{inspect(mod)}: #{count} file(s) modified")
end)
regenerate_index(content_dir)
duration = System.monotonic_time(:millisecond) - start_time
Logger.info(" done in #{duration}ms")
{:error, {:emacs_exit, code}} ->
Logger.error("Watcher: export failed for #{Path.basename(orgfile)} (exit code #{code})")
end
end
defp handle_delete(orgfile, state) do
%{notes_dir: notes_dir, content_dir: content_dir} = state
md_path = OrgGarden.Export.expected_md_path(orgfile, notes_dir, content_dir)
Logger.info("Deleted: #{Path.relative_to(orgfile, notes_dir)}")
if File.exists?(md_path) do
File.rm!(md_path)
Logger.info(" removed: #{Path.relative_to(md_path, content_dir)}")
# Clean up empty parent directories left behind
cleanup_empty_dirs(Path.dirname(md_path), content_dir)
end
regenerate_index(content_dir)
end
# -------------------------------------------------------------------
# Index generation
# -------------------------------------------------------------------
defp regenerate_index(content_dir) do
OrgGarden.Index.regenerate(content_dir)
end
# -------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------
defp schedule_debounce(path, event_type, state) do
# Cancel any existing timer for this path
case Map.get(state.pending, path) do
nil -> :ok
old_ref -> Process.cancel_timer(old_ref)
end
ref = Process.send_after(self(), {:debounced, path, event_type}, @debounce_ms)
%{state | pending: Map.put(state.pending, path, ref)}
end
defp drain_pending(pending) when map_size(pending) == 0, do: :ok
defp drain_pending(pending) do
Logger.info("Watcher: draining #{map_size(pending)} pending event(s)")
# Cancel all timers and log what we're dropping
Enum.each(pending, fn {path, ref} ->
Process.cancel_timer(ref)
Logger.debug("Watcher: dropped pending event for #{path}")
end)
# Give a moment for any final messages to arrive
receive do
{:debounced, _, _} -> :ok
after
@drain_timeout -> :ok
end
end
defp org_file?(path), do: String.ends_with?(path, ".org")
defp temporary_file?(path) do
basename = Path.basename(path)
# Emacs creates temp files like .#file.org and #file.org#
String.starts_with?(basename, ".#") or
(String.starts_with?(basename, "#") and String.ends_with?(basename, "#"))
end
defp classify_events(events) do
cond do
:removed in events or :deleted in events -> :deleted
:created in events -> :created
:modified in events or :changed in events -> :modified
# renamed can mean created or deleted depending on context;
# if the file exists it was renamed into the watched dir
:renamed in events -> :modified
true -> :modified
end
end
defp cleanup_empty_dirs(dir, stop_at) do
dir = Path.expand(dir)
stop_at = Path.expand(stop_at)
if dir != stop_at and File.dir?(dir) do
case File.ls!(dir) do
[] ->
File.rmdir!(dir)
cleanup_empty_dirs(Path.dirname(dir), stop_at)
_ ->
:ok
end
end
end
# -------------------------------------------------------------------
# Telemetry
# -------------------------------------------------------------------
defp emit_telemetry(event, metadata) do
:telemetry.execute([:org_garden, :watcher, event], %{}, metadata)
end
end