Add service infrastructure for long-running deployment

- Add configuration system (config/*.exs, OrgGarden.Config)
- Refactor supervision tree with DynamicSupervisor and Registry
- Add OrgGarden.Server for serve mode lifecycle management
- Add health check HTTP endpoints (Bandit/Plug on :9090)
- Add telemetry events for export and watcher operations
- Implement graceful shutdown with SIGTERM handling
- Add Mix Release support with overlay scripts
- Add NixOS module for systemd service deployment
- Update documentation with service usage
This commit is contained in:
Ignacio Ballesteros
2026-02-21 20:38:47 +01:00
parent 6476b45f04
commit 01805dbf39
23 changed files with 1147 additions and 83 deletions

View File

@@ -1,14 +1,39 @@
defmodule OrgGarden.Application do
@moduledoc false
@moduledoc """
OTP Application for org-garden.
Starts core infrastructure services:
- Finch HTTP client pool
- Process registry for named lookups
- DynamicSupervisor for runtime-started services
- Health check HTTP server
Service-mode components (Watcher, Quartz) are started dynamically
via OrgGarden.Server when running in serve mode.
"""
use Application
@impl true
def start(_type, _args) do
children = [
{Finch, name: OrgGarden.Finch}
# HTTP client pool for Zotero/external requests
{Finch, name: OrgGarden.Finch},
# Process registry for named lookups
{Registry, keys: :unique, name: OrgGarden.Registry},
# Dynamic supervisor for serve mode components
{DynamicSupervisor, name: OrgGarden.DynamicSupervisor, strategy: :one_for_one},
# Health check HTTP server
{Bandit, plug: OrgGarden.Health, port: health_port(), scheme: :http}
]
opts = [strategy: :one_for_one, name: OrgGarden.AppSupervisor]
Supervisor.start_link(children, opts)
end
defp health_port do
Application.get_env(:org_garden, :health_port, 9090)
end
end

View File

@@ -48,6 +48,8 @@ defmodule OrgGarden.CLI do
require Logger
alias OrgGarden.Config
@transforms [OrgGarden.Transforms.Citations]
def main(argv) do
@@ -70,31 +72,36 @@ defmodule OrgGarden.CLI do
def handle_serve(argv) do
require_quartz_env()
{notes_dir, output_dir, content_dir, opts} = parse_serve_args(argv)
pipeline_opts = build_pipeline_opts()
# Initial batch export
wipe(content_dir)
export_all(notes_dir, output_dir)
run_pipeline(content_dir, pipeline_opts)
generate_index(content_dir)
IO.puts("==> Starting development server...")
{:ok, _pid} =
OrgGarden.Supervisor.start_link(
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
pipeline_opts: pipeline_opts,
transforms: @transforms,
port: opts[:port] || 8080,
ws_port: opts[:ws_port] || 3001
)
case OrgGarden.Server.start_link(
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
port: opts[:port],
ws_port: opts[:ws_port]
) do
{:ok, pid} ->
IO.puts("==> Server running at http://localhost:#{opts[:port] || Config.get(:http_port, 8080)}")
IO.puts("==> Watching #{notes_dir} for changes (Ctrl+C to stop)")
IO.puts("==> Server running at http://localhost:#{opts[:port] || 8080}")
IO.puts("==> Watching #{notes_dir} for changes (Ctrl+C to stop)")
# Wait for server to exit
ref = Process.monitor(pid)
Process.sleep(:infinity)
receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
case reason do
:normal -> :ok
:shutdown -> :ok
{:shutdown, _} -> :ok
_ -> abort("Server crashed: #{inspect(reason)}")
end
end
{:error, reason} ->
abort("Failed to start server: #{inspect(reason)}")
end
end
defp parse_serve_args(argv) do
@@ -122,7 +129,7 @@ defmodule OrgGarden.CLI do
def handle_build(argv) do
quartz_path = require_quartz_env()
{notes_dir, output_dir, content_dir, _opts} = parse_build_args(argv)
pipeline_opts = build_pipeline_opts()
pipeline_opts = Config.pipeline_opts()
# Full batch export
wipe(content_dir)
@@ -130,7 +137,7 @@ defmodule OrgGarden.CLI do
run_pipeline(content_dir, pipeline_opts)
generate_index(content_dir)
node_path = System.get_env("NODE_PATH", "node")
node_path = Config.get(:node_path, "node")
IO.puts("==> Building static site with Quartz...")
@@ -177,7 +184,7 @@ defmodule OrgGarden.CLI do
def handle_export(argv) do
{notes_dir, output_dir, content_dir, watch?} = parse_export_args(argv)
pipeline_opts = build_pipeline_opts()
pipeline_opts = Config.pipeline_opts()
# Phase 1-4: full batch export
wipe(content_dir)
@@ -197,16 +204,29 @@ defmodule OrgGarden.CLI do
if watch? do
IO.puts("==> Watching #{notes_dir} for .org changes... (Ctrl+C to stop)")
{:ok, _pid} =
OrgGarden.Watcher.start_link(
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
pipeline_opts: pipeline_opts,
transforms: @transforms
{:ok, pid} =
DynamicSupervisor.start_child(
OrgGarden.DynamicSupervisor,
{OrgGarden.Watcher,
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
pipeline_opts: pipeline_opts,
transforms: @transforms}
)
Process.sleep(:infinity)
# Wait for watcher to exit
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, reason} ->
case reason do
:normal -> :ok
:shutdown -> :ok
{:shutdown, _} -> :ok
_ -> abort("Watcher crashed: #{inspect(reason)}")
end
end
end
end
@@ -235,7 +255,7 @@ defmodule OrgGarden.CLI do
dir
[] ->
System.get_env("NOTES_DIR") ||
Config.get(:notes_dir) ||
abort("Usage: org-garden #{command} <notes-dir> [options]")
end
@@ -249,7 +269,7 @@ defmodule OrgGarden.CLI do
end
defp extract_output_dir(opts) do
(opts[:output] || System.get_env("OUTPUT_DIR") || File.cwd!())
(opts[:output] || Config.get(:output_dir) || File.cwd!())
|> Path.expand()
end
@@ -328,7 +348,7 @@ defmodule OrgGarden.CLI do
# ---------------------------------------------------------------------------
defp require_quartz_env do
case System.get_env("QUARTZ_PATH") do
case Config.get(:quartz_path) do
nil ->
abort("""
Error: QUARTZ_PATH environment variable not set.
@@ -355,19 +375,6 @@ defmodule OrgGarden.CLI do
end
end
defp build_pipeline_opts do
%{
zotero_url: System.get_env("ZOTERO_URL", "http://localhost:23119"),
bibtex_file: System.get_env("BIBTEX_FILE"),
citation_mode:
case System.get_env("CITATION_MODE", "warn") do
"silent" -> :silent
"strict" -> :strict
_ -> :warn
end
}
end
defp abort(message) do
IO.puts(:stderr, message)
System.halt(1)

98
lib/org_garden/config.ex Normal file
View File

@@ -0,0 +1,98 @@
defmodule OrgGarden.Config do
@moduledoc """
Centralized configuration access with validation.
Provides a unified interface for accessing configuration values,
with support for defaults and required value validation.
## Usage
OrgGarden.Config.get(:http_port)
#=> 8080
OrgGarden.Config.get!(:quartz_path)
#=> "/path/to/quartz" or raises if not set
OrgGarden.Config.pipeline_opts()
#=> %{zotero_url: "...", bibtex_file: nil, citation_mode: :warn}
"""
@doc """
Get a configuration value with an optional default.
"""
def get(key, default \\ nil) do
Application.get_env(:org_garden, key, default)
end
@doc """
Get a required configuration value. Raises if not set.
"""
def get!(key) do
case Application.get_env(:org_garden, key) do
nil -> raise ArgumentError, "Missing required configuration: #{key}"
value -> value
end
end
@doc """
Build pipeline options map for transforms.
"""
def pipeline_opts do
%{
zotero_url: get(:zotero_url, "http://localhost:23119"),
bibtex_file: get(:bibtex_file),
citation_mode: get(:citation_mode, :warn)
}
end
@doc """
Validate that all required configuration is present.
Returns :ok or {:error, reasons}.
"""
def validate do
errors =
[]
|> validate_quartz_path()
|> validate_citation_mode()
case errors do
[] -> :ok
errors -> {:error, errors}
end
end
@doc """
Validate configuration and raise on errors.
"""
def validate! do
case validate() do
:ok -> :ok
{:error, errors} -> raise "Configuration errors: #{inspect(errors)}"
end
end
# Private validation helpers
defp validate_quartz_path(errors) do
case get(:quartz_path) do
nil ->
errors
path ->
cli_path = Path.join(path, "quartz/bootstrap-cli.mjs")
if File.exists?(cli_path) do
errors
else
[{:quartz_path, "bootstrap-cli.mjs not found at #{cli_path}"} | errors]
end
end
end
defp validate_citation_mode(errors) do
case get(:citation_mode) do
mode when mode in [:silent, :warn, :strict] -> errors
other -> [{:citation_mode, "Invalid citation mode: #{inspect(other)}"} | errors]
end
end
end

View File

@@ -16,6 +16,12 @@ defmodule OrgGarden.Export do
"""
@spec export_file(String.t(), String.t(), String.t()) :: {:ok, non_neg_integer()} | {:error, term()}
def export_file(orgfile, notes_dir, output_dir) do
OrgGarden.Telemetry.span_export(orgfile, fn ->
do_export_file(orgfile, notes_dir, output_dir)
end)
end
defp do_export_file(orgfile, notes_dir, output_dir) do
section =
orgfile
|> Path.dirname()

89
lib/org_garden/health.ex Normal file
View File

@@ -0,0 +1,89 @@
defmodule OrgGarden.Health do
@moduledoc """
Health check HTTP endpoints.
Provides liveness and readiness probes for systemd/kubernetes health checks.
## Endpoints
* `GET /health/live` — Always returns 200 if the process is alive
* `GET /health/ready` — Returns 200 if all components are ready, 503 otherwise
* `GET /health` — Returns JSON status of all components
"""
use Plug.Router
plug(:match)
plug(:dispatch)
# Liveness probe — is the process alive?
get "/health/live" do
send_resp(conn, 200, "ok")
end
# Readiness probe — are all components ready to serve?
get "/health/ready" do
checks = run_checks()
all_healthy = Enum.all?(checks, fn {_name, status} -> status == :ok end)
if all_healthy do
send_resp(conn, 200, "ready")
else
send_resp(conn, 503, "not ready")
end
end
# Full health status as JSON
get "/health" do
checks = run_checks()
all_healthy = Enum.all?(checks, fn {_name, status} -> status == :ok end)
status =
if all_healthy do
"healthy"
else
"degraded"
end
body =
Jason.encode!(%{
status: status,
checks:
Map.new(checks, fn {name, status} ->
{name, Atom.to_string(status)}
end)
})
conn
|> put_resp_content_type("application/json")
|> send_resp(if(all_healthy, do: 200, else: 503), body)
end
match _ do
send_resp(conn, 404, "not found")
end
# -------------------------------------------------------------------
# Health checks
# -------------------------------------------------------------------
defp run_checks do
[
{:server, check_server()},
{:watcher, check_watcher()},
{:quartz, check_quartz()}
]
end
defp check_server do
if Process.whereis(OrgGarden.Server), do: :ok, else: :not_running
end
defp check_watcher do
if Process.whereis(OrgGarden.Watcher), do: :ok, else: :not_running
end
defp check_quartz do
if Process.whereis(OrgGarden.Quartz), do: :ok, else: :not_running
end
end

View File

@@ -16,7 +16,11 @@ defmodule OrgGarden.Quartz do
require Logger
defstruct [:port, :quartz_path, :content_dir, :http_port, :ws_port]
alias OrgGarden.Config
@shutdown_timeout 5_000
defstruct [:port, :os_pid, :quartz_path, :content_dir, :http_port, :ws_port]
# -------------------------------------------------------------------
# Client API
@@ -35,21 +39,27 @@ defmodule OrgGarden.Quartz do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Check if Quartz is running.
"""
def running? do
Process.whereis(__MODULE__) != nil
end
# -------------------------------------------------------------------
# GenServer callbacks
# -------------------------------------------------------------------
@impl true
def init(opts) do
quartz_path =
System.get_env("QUARTZ_PATH") ||
raise "QUARTZ_PATH environment variable not set"
Process.flag(:trap_exit, true)
node_path = System.get_env("NODE_PATH", "node")
quartz_path = Config.get!(:quartz_path)
node_path = Config.get(:node_path, "node")
content_dir = Keyword.fetch!(opts, :content_dir)
http_port = Keyword.get(opts, :port, 8080)
ws_port = Keyword.get(opts, :ws_port, 3001)
http_port = Keyword.get(opts, :port, Config.get(:http_port, 8080))
ws_port = Keyword.get(opts, :ws_port, Config.get(:ws_port, 3001))
cli_path = Path.join(quartz_path, "quartz/bootstrap-cli.mjs")
@@ -61,9 +71,12 @@ defmodule OrgGarden.Quartz do
cli_path,
"build",
"--serve",
"--directory", content_dir,
"--port", to_string(http_port),
"--wsPort", to_string(ws_port)
"--directory",
content_dir,
"--port",
to_string(http_port),
"--wsPort",
to_string(ws_port)
]
Logger.info("[quartz] Starting: #{node_path} #{Enum.join(args, " ")}")
@@ -79,8 +92,12 @@ defmodule OrgGarden.Quartz do
env: [{~c"NODE_NO_WARNINGS", ~c"1"}]
])
# Get the OS process ID for graceful shutdown
{:os_pid, os_pid} = Port.info(port, :os_pid)
state = %__MODULE__{
port: port,
os_pid: os_pid,
quartz_path: quartz_path,
content_dir: content_dir,
http_port: http_port,
@@ -102,17 +119,65 @@ defmodule OrgGarden.Quartz do
@impl true
def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
Logger.error("[quartz] Process exited with status #{status}")
{:stop, {:quartz_exit, status}, state}
{:stop, {:quartz_exit, status}, %{state | port: nil, os_pid: nil}}
end
@impl true
def terminate(_reason, %{port: port}) when is_port(port) do
# Attempt graceful shutdown
Port.close(port)
:ok
rescue
_ -> :ok
def handle_info({:EXIT, port, reason}, %{port: port} = state) do
Logger.warning("[quartz] Port terminated: #{inspect(reason)}")
{:stop, {:port_exit, reason}, %{state | port: nil, os_pid: nil}}
end
def terminate(_reason, _state), do: :ok
@impl true
def terminate(_reason, %{os_pid: nil}) do
# Process already exited
:ok
end
@impl true
def terminate(_reason, %{port: port, os_pid: os_pid}) do
Logger.info("[quartz] Shutting down gracefully...")
# Send SIGTERM to the Node.js process
case System.cmd("kill", ["-TERM", to_string(os_pid)], stderr_to_stdout: true) do
{_, 0} ->
# Wait for graceful exit
wait_for_exit(port, @shutdown_timeout)
{output, _} ->
Logger.warning("[quartz] Failed to send SIGTERM: #{output}")
force_close(port, os_pid)
end
:ok
end
# -------------------------------------------------------------------
# Private functions
# -------------------------------------------------------------------
defp wait_for_exit(port, timeout) do
receive do
{^port, {:exit_status, status}} ->
Logger.info("[quartz] Exited with status #{status}")
:ok
after
timeout ->
Logger.warning("[quartz] Shutdown timeout, forcing kill")
{:os_pid, os_pid} = Port.info(port, :os_pid)
force_close(port, os_pid)
end
end
defp force_close(port, os_pid) do
# Send SIGKILL
System.cmd("kill", ["-KILL", to_string(os_pid)], stderr_to_stdout: true)
# Close the port
try do
Port.close(port)
rescue
_ -> :ok
end
end
end

212
lib/org_garden/server.ex Normal file
View File

@@ -0,0 +1,212 @@
defmodule OrgGarden.Server do
@moduledoc """
Manages the serve mode lifecycle.
This GenServer encapsulates the full serve mode workflow:
1. Run initial export pipeline
2. Start supervised Watcher + Quartz under DynamicSupervisor
3. Handle graceful shutdown on termination
## Usage
# Start the server (blocks until stopped)
{:ok, pid} = OrgGarden.Server.start_link(
notes_dir: "/path/to/notes",
output_dir: "/path/to/output"
)
# Stop gracefully
OrgGarden.Server.stop()
"""
use GenServer
require Logger
alias OrgGarden.Config
@transforms [OrgGarden.Transforms.Citations]
# -------------------------------------------------------------------
# Client API
# -------------------------------------------------------------------
@doc """
Start the serve mode server.
## Options
* `:notes_dir` — path to org-roam notes (required)
* `:output_dir` — output base directory (required)
* `:content_dir` — markdown output directory (default: output_dir/content)
* `:port` — HTTP server port (default: from config)
* `:ws_port` — WebSocket port (default: from config)
"""
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Stop the server gracefully.
"""
def stop(timeout \\ 10_000) do
GenServer.stop(__MODULE__, :normal, timeout)
end
@doc """
Check if the server is running.
"""
def running? do
Process.whereis(__MODULE__) != nil
end
# -------------------------------------------------------------------
# GenServer callbacks
# -------------------------------------------------------------------
@impl true
def init(opts) do
Process.flag(:trap_exit, true)
notes_dir = Keyword.fetch!(opts, :notes_dir)
output_dir = Keyword.fetch!(opts, :output_dir)
content_dir = Keyword.get(opts, :content_dir, Path.join(output_dir, "content"))
http_port = Keyword.get(opts, :port, Config.get(:http_port, 8080))
ws_port = Keyword.get(opts, :ws_port, Config.get(:ws_port, 3001))
pipeline_opts = Config.pipeline_opts()
state = %{
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
pipeline_opts: pipeline_opts,
http_port: http_port,
ws_port: ws_port,
supervisor_pid: nil
}
# Run initial pipeline synchronously
case run_initial_pipeline(state) do
:ok ->
# Start supervised components
case start_supervisor(state) do
{:ok, sup_pid} ->
Logger.info("Server started on http://localhost:#{http_port}")
Logger.info("Watching #{notes_dir} for changes")
{:ok, %{state | supervisor_pid: sup_pid}}
{:error, reason} ->
{:stop, reason}
end
{:error, reason} ->
{:stop, reason}
end
end
@impl true
def handle_info({:EXIT, pid, reason}, %{supervisor_pid: pid} = state) do
Logger.error("Supervisor exited: #{inspect(reason)}")
{:stop, {:supervisor_exit, reason}, state}
end
@impl true
def handle_info({:EXIT, _pid, reason}, state) do
Logger.warning("Linked process exited: #{inspect(reason)}")
{:noreply, state}
end
@impl true
def terminate(reason, state) do
Logger.info("Server shutting down: #{inspect(reason)}")
if state.supervisor_pid do
# Stop the supervisor gracefully
DynamicSupervisor.terminate_child(OrgGarden.DynamicSupervisor, state.supervisor_pid)
end
:ok
end
# -------------------------------------------------------------------
# Private functions
# -------------------------------------------------------------------
defp run_initial_pipeline(state) do
%{
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
pipeline_opts: pipeline_opts
} = state
Logger.info("Running initial export pipeline...")
# Wipe content directory
wipe(content_dir)
# Export all org files
case OrgGarden.Export.export_all(notes_dir, output_dir) do
{:ok, 0} ->
Logger.warning("No .org files found in #{notes_dir}")
:ok
{:ok, count} ->
Logger.info("Exported #{count} file(s)")
# Run transforms
{:ok, stats} = OrgGarden.run(content_dir, @transforms, pipeline_opts)
Enum.each(stats, fn {mod, c} ->
Logger.info("#{inspect(mod)}: #{c} file(s) modified")
end)
# Generate index
OrgGarden.Index.generate(content_dir)
:ok
{:error, failures} ->
Logger.error("Failed to export #{length(failures)} file(s)")
{:error, {:export_failed, failures}}
end
end
defp start_supervisor(state) do
%{
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
pipeline_opts: pipeline_opts,
http_port: http_port,
ws_port: ws_port
} = state
child_spec = {
OrgGarden.Supervisor,
[
notes_dir: notes_dir,
output_dir: output_dir,
content_dir: content_dir,
pipeline_opts: pipeline_opts,
transforms: @transforms,
port: http_port,
ws_port: ws_port
]
}
DynamicSupervisor.start_child(OrgGarden.DynamicSupervisor, child_spec)
end
defp wipe(content_dir) do
Logger.info("Wiping #{content_dir}")
File.mkdir_p!(content_dir)
content_dir
|> File.ls!()
|> Enum.reject(&(&1 == ".gitkeep"))
|> Enum.each(fn entry ->
Path.join(content_dir, entry) |> File.rm_rf!()
end)
end
end

109
lib/org_garden/telemetry.ex Normal file
View File

@@ -0,0 +1,109 @@
defmodule OrgGarden.Telemetry do
@moduledoc """
Telemetry event definitions and logging handler.
## Events
The following telemetry events are emitted:
* `[:org_garden, :export, :start]` — Export of a single file started
- Metadata: `%{file: path}`
* `[:org_garden, :export, :stop]` — Export of a single file completed
- Measurements: `%{duration: native_time}`
- Metadata: `%{file: path}`
* `[:org_garden, :export, :exception]` — Export failed
- Measurements: `%{duration: native_time}`
- Metadata: `%{file: path, kind: kind, reason: reason}`
* `[:org_garden, :watcher, :file_processed]` — File change processed
- Metadata: `%{path: path, event: :created | :modified | :deleted}`
* `[:org_garden, :server, :start]` — Server started
- Metadata: `%{port: port}`
* `[:org_garden, :server, :stop]` — Server stopped
- Metadata: `%{reason: reason}`
## Usage
Attach a handler to log events:
OrgGarden.Telemetry.attach_logger()
Or use `:telemetry.attach/4` for custom handling.
"""
require Logger
@doc """
Attach a simple logging handler for telemetry events.
"""
def attach_logger do
events = [
[:org_garden, :export, :stop],
[:org_garden, :export, :exception],
[:org_garden, :watcher, :file_processed],
[:org_garden, :server, :start],
[:org_garden, :server, :stop]
]
:telemetry.attach_many(
"org-garden-logger",
events,
&handle_event/4,
nil
)
end
@doc """
Detach the logging handler.
"""
def detach_logger do
:telemetry.detach("org-garden-logger")
end
# -------------------------------------------------------------------
# Event handlers
# -------------------------------------------------------------------
defp handle_event([:org_garden, :export, :stop], measurements, metadata, _config) do
duration_ms = System.convert_time_unit(measurements.duration, :native, :millisecond)
Logger.debug("Export completed: #{metadata.file} (#{duration_ms}ms)")
end
defp handle_event([:org_garden, :export, :exception], _measurements, metadata, _config) do
Logger.error("Export failed: #{metadata.file} - #{inspect(metadata.reason)}")
end
defp handle_event([:org_garden, :watcher, :file_processed], _measurements, metadata, _config) do
Logger.debug("Watcher processed: #{metadata.event} #{metadata.path}")
end
defp handle_event([:org_garden, :server, :start], _measurements, metadata, _config) do
Logger.info("Server started on port #{metadata.port}")
end
defp handle_event([:org_garden, :server, :stop], _measurements, metadata, _config) do
Logger.info("Server stopped: #{inspect(metadata.reason)}")
end
# -------------------------------------------------------------------
# Convenience functions for emitting events
# -------------------------------------------------------------------
@doc """
Wrap a function with export telemetry events.
"""
def span_export(file, fun) when is_function(fun, 0) do
:telemetry.span(
[:org_garden, :export],
%{file: file},
fn ->
result = fun.()
{result, %{file: file}}
end
)
end
end

View File

@@ -29,6 +29,7 @@ defmodule OrgGarden.Watcher do
require Logger
@debounce_ms 500
@drain_timeout 5_000
# -------------------------------------------------------------------
# Client API
@@ -49,12 +50,21 @@ defmodule OrgGarden.Watcher do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Check if the watcher is running.
"""
def running? do
Process.whereis(__MODULE__) != nil
end
# -------------------------------------------------------------------
# GenServer callbacks
# -------------------------------------------------------------------
@impl true
def init(opts) do
Process.flag(:trap_exit, true)
notes_dir = Keyword.fetch!(opts, :notes_dir)
output_dir = Keyword.fetch!(opts, :output_dir)
content_dir = Keyword.fetch!(opts, :content_dir)
@@ -78,7 +88,8 @@ defmodule OrgGarden.Watcher do
pipeline_opts: pipeline_opts,
watcher_pid: watcher_pid,
initialized_transforms: initialized_transforms,
pending: %{}
pending: %{},
processing: false
}}
end
@@ -103,7 +114,7 @@ defmodule OrgGarden.Watcher do
@impl true
def handle_info({:debounced, path, event_type}, state) do
state = %{state | pending: Map.delete(state.pending, path)}
state = %{state | pending: Map.delete(state.pending, path), processing: true}
case event_type do
:deleted ->
@@ -113,12 +124,27 @@ defmodule OrgGarden.Watcher do
handle_change(path, state)
end
{:noreply, state}
emit_telemetry(:file_processed, %{path: path, event: event_type})
{:noreply, %{state | processing: false}}
end
@impl true
def terminate(_reason, state) do
def terminate(reason, state) do
Logger.info("Watcher: shutting down (#{inspect(reason)})")
# Drain pending timers
drain_pending(state.pending)
# Wait for any in-flight processing
if state.processing do
Logger.info("Watcher: waiting for in-flight processing...")
Process.sleep(100)
end
# Teardown transforms
OrgGarden.teardown_transforms(state.initialized_transforms)
:ok
end
@@ -136,20 +162,25 @@ defmodule OrgGarden.Watcher do
} = state
md_path = OrgGarden.Export.expected_md_path(orgfile, notes_dir, content_dir)
IO.puts("==> Changed: #{Path.relative_to(orgfile, notes_dir)}")
Logger.info("Changed: #{Path.relative_to(orgfile, notes_dir)}")
start_time = System.monotonic_time(:millisecond)
case OrgGarden.Export.export_file(orgfile, notes_dir, output_dir) do
{:ok, _} ->
IO.puts(" exported: #{Path.relative_to(md_path, content_dir)}")
Logger.info(" exported: #{Path.relative_to(md_path, content_dir)}")
{:ok, stats} = OrgGarden.run_on_files_with([md_path], initialized_transforms, pipeline_opts)
{:ok, stats} =
OrgGarden.run_on_files_with([md_path], initialized_transforms, pipeline_opts)
Enum.each(stats, fn {mod, count} ->
if count > 0, do: IO.puts(" #{inspect(mod)}: #{count} file(s) modified")
if count > 0, do: Logger.info(" #{inspect(mod)}: #{count} file(s) modified")
end)
regenerate_index(content_dir)
IO.puts("==> Done")
duration = System.monotonic_time(:millisecond) - start_time
Logger.info(" done in #{duration}ms")
{:error, reason} ->
Logger.error("Watcher: export failed for #{orgfile}: #{inspect(reason)}")
@@ -160,18 +191,17 @@ defmodule OrgGarden.Watcher do
%{notes_dir: notes_dir, content_dir: content_dir} = state
md_path = OrgGarden.Export.expected_md_path(orgfile, notes_dir, content_dir)
IO.puts("==> Deleted: #{Path.relative_to(orgfile, notes_dir)}")
Logger.info("Deleted: #{Path.relative_to(orgfile, notes_dir)}")
if File.exists?(md_path) do
File.rm!(md_path)
IO.puts(" removed: #{Path.relative_to(md_path, content_dir)}")
Logger.info(" removed: #{Path.relative_to(md_path, content_dir)}")
# Clean up empty parent directories left behind
cleanup_empty_dirs(Path.dirname(md_path), content_dir)
end
regenerate_index(content_dir)
IO.puts("==> Done")
end
# -------------------------------------------------------------------
@@ -197,6 +227,25 @@ defmodule OrgGarden.Watcher do
%{state | pending: Map.put(state.pending, path, ref)}
end
defp drain_pending(pending) when map_size(pending) == 0, do: :ok
defp drain_pending(pending) do
Logger.info("Watcher: draining #{map_size(pending)} pending event(s)")
# Cancel all timers and log what we're dropping
Enum.each(pending, fn {path, ref} ->
Process.cancel_timer(ref)
Logger.debug("Watcher: dropped pending event for #{path}")
end)
# Give a moment for any final messages to arrive
receive do
{:debounced, _, _} -> :ok
after
@drain_timeout -> :ok
end
end
defp org_file?(path), do: String.ends_with?(path, ".org")
defp temporary_file?(path) do
@@ -233,4 +282,12 @@ defmodule OrgGarden.Watcher do
end
end
end
# -------------------------------------------------------------------
# Telemetry
# -------------------------------------------------------------------
defp emit_telemetry(event, metadata) do
:telemetry.execute([:org_garden, :watcher, event], %{}, metadata)
end
end