diff --git a/modules/hosts/_parts/tahani/notability.nix b/modules/hosts/_parts/tahani/notability.nix index f177c15..eba77b8 100644 --- a/modules/hosts/_parts/tahani/notability.nix +++ b/modules/hosts/_parts/tahani/notability.nix @@ -1,29 +1,33 @@ { config, inputs', - lib, pkgs, ... }: let + homeDir = "/home/cschmatzler"; notabilityScripts = ./notability; - webdavRoot = "/home/cschmatzler/.local/share/notability-ingest/webdav-root"; - dataRoot = "/home/cschmatzler/.local/share/notability-ingest"; - stateRoot = "/home/cschmatzler/.local/state/notability-ingest"; - notesRoot = "/home/cschmatzler/Notes"; - commonPath = [ - inputs'.llm-agents.packages.pi - pkgs.qmd - pkgs.coreutils - pkgs.inotify-tools - pkgs.nushell - pkgs.poppler-utils - pkgs.rclone - pkgs.sqlite - pkgs.util-linux - pkgs.zk + dataRoot = "${homeDir}/.local/share/notability-ingest"; + stateRoot = "${homeDir}/.local/state/notability-ingest"; + notesRoot = "${homeDir}/Notes"; + webdavRoot = "${dataRoot}/webdav-root"; + userPackages = with pkgs; [ + qmd + poppler-utils + rclone + sqlite + zk ]; + commonPath = with pkgs; + [ + inputs'.llm-agents.packages.pi + coreutils + inotify-tools + nushell + util-linux + ] + ++ userPackages; commonEnvironment = { - HOME = "/home/cschmatzler"; + HOME = homeDir; NOTABILITY_ARCHIVE_ROOT = "${dataRoot}/archive"; NOTABILITY_DATA_ROOT = dataRoot; NOTABILITY_DB_PATH = "${stateRoot}/db.sqlite"; @@ -33,7 +37,28 @@ NOTABILITY_STATE_ROOT = stateRoot; NOTABILITY_TRANSCRIPT_ROOT = "${stateRoot}/transcripts"; NOTABILITY_WEBDAV_ROOT = webdavRoot; - XDG_CONFIG_HOME = "/home/cschmatzler/.config"; + XDG_CONFIG_HOME = "${homeDir}/.config"; + }; + mkTmpDirRule = path: "d ${path} 0755 cschmatzler users -"; + mkNotabilityService = { + description, + script, + after ? [], + requires ? [], + environment ? {}, + }: { + inherit after description requires; + wantedBy = ["multi-user.target"]; + path = commonPath; + environment = commonEnvironment // environment; + serviceConfig = { + ExecStart = "${pkgs.nushell}/bin/nu ${notabilityScripts}/${script}"; + Group = "users"; + Restart = "always"; + RestartSec = 5; + User = "cschmatzler"; + WorkingDirectory = homeDir; + }; }; in { sops.secrets.tahani-notability-webdav-password = { @@ -44,13 +69,7 @@ in { }; home-manager.users.cschmatzler = { - home.packages = [ - pkgs.qmd - pkgs.poppler-utils - pkgs.rclone - pkgs.sqlite - pkgs.zk - ]; + home.packages = userPackages; home.file.".config/qmd/index.yml".text = '' collections: notes: @@ -59,22 +78,23 @@ in { ''; }; - systemd.tmpfiles.rules = [ - "d ${notesRoot} 0755 cschmatzler users -" - "d ${dataRoot} 0755 cschmatzler users -" - "d ${webdavRoot} 0755 cschmatzler users -" - "d ${dataRoot}/archive 0755 cschmatzler users -" - "d ${dataRoot}/rendered-pages 0755 cschmatzler users -" - "d ${stateRoot} 0755 cschmatzler users -" - "d ${stateRoot}/jobs 0755 cschmatzler users -" - "d ${stateRoot}/jobs/queued 0755 cschmatzler users -" - "d ${stateRoot}/jobs/running 0755 cschmatzler users -" - "d ${stateRoot}/jobs/failed 0755 cschmatzler users -" - "d ${stateRoot}/jobs/done 0755 cschmatzler users -" - "d ${stateRoot}/jobs/results 0755 cschmatzler users -" - "d ${stateRoot}/sessions 0755 cschmatzler users -" - "d ${stateRoot}/transcripts 0755 cschmatzler users -" - ]; + systemd.tmpfiles.rules = + builtins.map mkTmpDirRule [ + notesRoot + dataRoot + webdavRoot + "${dataRoot}/archive" + "${dataRoot}/rendered-pages" + stateRoot + "${stateRoot}/jobs" + "${stateRoot}/jobs/queued" + "${stateRoot}/jobs/running" + "${stateRoot}/jobs/failed" + "${stateRoot}/jobs/done" + "${stateRoot}/jobs/results" + "${stateRoot}/sessions" + "${stateRoot}/transcripts" + ]; services.caddy.virtualHosts."tahani.manticore-hippocampus.ts.net".extraConfig = '' tls { @@ -85,43 +105,24 @@ in { } ''; - systemd.services.notability-webdav = { - description = "Notability WebDAV landing zone"; - wantedBy = ["multi-user.target"]; - after = ["network.target"]; - path = commonPath; - environment = - commonEnvironment - // { + systemd.services.notability-webdav = + mkNotabilityService { + description = "Notability WebDAV landing zone"; + script = "webdav.nu"; + after = ["network.target"]; + environment = { NOTABILITY_WEBDAV_ADDR = "127.0.0.1:9980"; NOTABILITY_WEBDAV_BASEURL = "/notability"; NOTABILITY_WEBDAV_PASSWORD_FILE = config.sops.secrets.tahani-notability-webdav-password.path; NOTABILITY_WEBDAV_USER = "notability"; }; - serviceConfig = { - ExecStart = "${pkgs.nushell}/bin/nu ${notabilityScripts}/webdav.nu"; - Group = "users"; - Restart = "always"; - RestartSec = 5; - User = "cschmatzler"; - WorkingDirectory = "/home/cschmatzler"; }; - }; - systemd.services.notability-watch = { - description = "Watch and ingest Notability WebDAV uploads"; - wantedBy = ["multi-user.target"]; - after = ["notability-webdav.service"]; - requires = ["notability-webdav.service"]; - path = commonPath; - environment = commonEnvironment; - serviceConfig = { - ExecStart = "${pkgs.nushell}/bin/nu ${notabilityScripts}/watch.nu"; - Group = "users"; - Restart = "always"; - RestartSec = 5; - User = "cschmatzler"; - WorkingDirectory = "/home/cschmatzler"; + systemd.services.notability-watch = + mkNotabilityService { + description = "Watch and ingest Notability WebDAV uploads"; + script = "watch.nu"; + after = ["notability-webdav.service"]; + requires = ["notability-webdav.service"]; }; - }; } diff --git a/modules/hosts/_parts/tahani/notability/ingest.nu b/modules/hosts/_parts/tahani/notability/ingest.nu deleted file mode 100644 index 44debc6..0000000 --- a/modules/hosts/_parts/tahani/notability/ingest.nu +++ /dev/null @@ -1,141 +0,0 @@ -#!/usr/bin/env nu - -use ./lib.nu * - -const vision_model = 'openai-codex/gpt-5.4' - - -def call-pi [prompt: string, inputs: list, thinking: string] { - let prompt_file = (^mktemp --suffix '.md' | str trim) - $prompt | save -f $prompt_file - let input_refs = ($inputs | each {|f| $"'@($f)'"} | str join ' ') - let cmd = $"timeout 45s pi --model '($vision_model)' --thinking ($thinking) --no-tools --no-session -p ($input_refs) '@($prompt_file)'" - let result = (bash -c $cmd | complete) - rm -f $prompt_file - let output = ($result.stdout | str trim) - if $output != '' { - $output - } else { - error make { msg: $"pi returned no output \(exit ($result.exit_code)): ($result.stderr | str trim)" } - } -} - - -def render-pages [input_path: path, job_id: string] { - let ext = (([$input_path] | path parse | first).extension? | default '' | str downcase) - if $ext == 'png' { - [$input_path] - } else if $ext == 'pdf' { - let dir = [(render-root) $job_id] | path join - mkdir $dir - ^pdftoppm -png -r 200 $input_path ([$dir 'page'] | path join) - (glob $"($dir)/*.png") | sort - } else { - error make { msg: $"Unsupported format: ($ext)" } - } -} - - -def unquote [v?: any] { - if $v == null { '' } else { $v | into string | str replace -r '^["''](.*)["'']$' '$1' } -} - - -def find-output [note_id: string, configured: path] { - if ($configured | path exists) { - let fm = (parse-output-frontmatter $configured) - if (unquote ($fm.managed_by? | default '')) == 'notability-ingest' and (unquote ($fm.note_id? | default '')) == $note_id { - return $configured - } - } - let found = (glob $"((notes-root))/**/*.md") | where not ($it | str contains '/.') | where {|f| - let fm = (parse-output-frontmatter $f) - (unquote ($fm.managed_by? | default '')) == 'notability-ingest' and (unquote ($fm.note_id? | default '')) == $note_id - } - if ($found | is-empty) { $configured } else { $found | first } -} - - -def source-format [p: path] { - ([$p] | path parse | first).extension? | default 'bin' | str downcase -} - - -def main [manifest_path: path] { - ensure-layout - let m = (open $manifest_path) - - # transcribe - let pages = (render-pages $m.input_path $m.job_id) - let transcript = (call-pi "Transcribe this note into clean Markdown. Read it like a human and preserve the intended reading order and visible structure. Keep headings, lists, and paragraphs when they are visible. Do not summarize. Do not add commentary. Return Markdown only." $pages 'low') - - mkdir ([$m.transcript_path] | path dirname) - $"($transcript)\n" | save -f $m.transcript_path - - # normalize - let normalized = (call-pi "Rewrite the attached transcription into clean Markdown. Preserve the same content and intended structure. Do not summarize. Return Markdown only." [$m.transcript_path] 'off') - - # build output - let body = ($normalized | str trim) - let body_out = if $body == '' { $"# ($m.title)" } else { $body } - let created = ($m.requested_at | str substring 0..9) - let updated = ((date now) | format date '%Y-%m-%d') - let markdown = ([ - '---' - $'title: ($m.title | to json)' - $'created: ($created | to json)' - $'updated: ($updated | to json)' - 'source: "notability"' - $'source_transport: (($m.source_transport? | default "webdav") | to json)' - $'source_relpath: ($m.source_relpath | to json)' - $'note_id: ($m.note_id | to json)' - 'managed_by: "notability-ingest"' - $'source_file: ($m.archive_path | to json)' - $'source_file_hash: ($"sha256:($m.source_hash)" | to json)' - $'source_format: ((source-format $m.archive_path) | to json)' - 'status: "active"' - 'tags:' - ' - handwritten' - ' - notability' - '---' - '' - $body_out - '' - ] | str join "\n") - - # write - let output_path = (find-output $m.note_id $m.output_path) - let write_path = if ($m.force_overwrite_generated? | default false) or not ($output_path | path exists) { - $output_path - } else { - let fm = (parse-output-frontmatter $output_path) - if (unquote ($fm.managed_by? | default '')) == 'notability-ingest' and (unquote ($fm.note_id? | default '')) == $m.note_id { - $output_path - } else { - let stamp = ((date now) | format date '%Y-%m-%dT%H-%M-%SZ') - let parsed = ([$output_path] | path parse | first) - [$parsed.parent $"($parsed.stem).conflict-($stamp).($parsed.extension)"] | path join - } - } - let write_mode = if not ($output_path | path exists) { 'create' } else if $write_path == $output_path { 'overwrite' } else { 'conflict' } - - mkdir ([$write_path] | path dirname) - $markdown | save -f $write_path - - let output_hash = (sha256 $write_path) - - # result - { - success: true - job_id: $m.job_id - note_id: $m.note_id - archive_path: $m.archive_path - source_hash: $m.source_hash - session_dir: $m.session_dir - output_path: $output_path - output_hash: $output_hash - write_mode: $write_mode - updated_main_output: ($write_path == $output_path) - transcript_path: $m.transcript_path - } | to json --indent 2 | save -f $m.result_path -} diff --git a/modules/hosts/_parts/tahani/notability/jobs.nu b/modules/hosts/_parts/tahani/notability/jobs.nu new file mode 100644 index 0000000..d4da830 --- /dev/null +++ b/modules/hosts/_parts/tahani/notability/jobs.nu @@ -0,0 +1,141 @@ +#!/usr/bin/env nu + +use ./lib.nu * + + +def active-job-exists [note_id: string, source_hash: string] { + let rows = (sql-json $" + select job_id + from jobs + where note_id = (sql-quote $note_id) + and source_hash = (sql-quote $source_hash) + and status != 'done' + and status != 'failed' + limit 1; + ") + not ($rows | is-empty) +} + + +export def archive-and-version [note_id: string, source_path: path, source_relpath: string, source_size: any, source_mtime: string, source_hash: string] { + let source_size_int = ($source_size | into int) + let archive_path = (archive-path-for $note_id $source_hash $source_relpath) + cp $source_path $archive_path + + let version_id = (new-version-id) + let seen_at = (now-iso) + let version_id_q = (sql-quote $version_id) + let note_id_q = (sql-quote $note_id) + let seen_at_q = (sql-quote $seen_at) + let archive_path_q = (sql-quote $archive_path) + let source_hash_q = (sql-quote $source_hash) + let source_mtime_q = (sql-quote $source_mtime) + let source_relpath_q = (sql-quote $source_relpath) + let sql = ([ + "insert into versions (version_id, note_id, seen_at, archive_path, source_hash, source_size, source_mtime, source_relpath, ingest_result, session_path) values (" + $version_id_q + ", " + $note_id_q + ", " + $seen_at_q + ", " + $archive_path_q + ", " + $source_hash_q + ", " + ($source_size_int | into string) + ", " + $source_mtime_q + ", " + $source_relpath_q + ", 'pending', null);" + ] | str join '') + sql-run $sql | ignore + + { + version_id: $version_id + seen_at: $seen_at + archive_path: $archive_path + } +} + + +export def enqueue-job [ + note: record, + operation: string, + input_path: string, + archive_path: string, + source_hash: string, + title: string, + force_overwrite_generated: bool = false, + source_transport: string = 'webdav', +] { + if (active-job-exists $note.note_id $source_hash) { + return null + } + + let job_id = (new-job-id) + let requested_at = (now-iso) + let manifest_path = (manifest-path-for $job_id 'queued') + let result_path = (result-path-for $job_id) + let transcript_path = (transcript-path-for $note.note_id $job_id) + let session_dir = ([(sessions-root) $note.note_id $job_id] | path join) + mkdir $session_dir + + let manifest = { + version: 1 + job_id: $job_id + note_id: $note.note_id + operation: $operation + requested_at: $requested_at + title: $title + source_relpath: $note.source_relpath + source_path: $note.source_path + input_path: $input_path + archive_path: $archive_path + output_path: $note.output_path + transcript_path: $transcript_path + result_path: $result_path + session_dir: $session_dir + source_hash: $source_hash + last_generated_output_hash: ($note.last_generated_output_hash? | default null) + force_overwrite_generated: $force_overwrite_generated + source_transport: $source_transport + } + + ($manifest | to json --indent 2) | save -f $manifest_path + let job_id_q = (sql-quote $job_id) + let note_id_q = (sql-quote $note.note_id) + let operation_q = (sql-quote $operation) + let requested_at_q = (sql-quote $requested_at) + let source_hash_q = (sql-quote $source_hash) + let manifest_path_q = (sql-quote $manifest_path) + let result_path_q = (sql-quote $result_path) + let sql = ([ + "insert into jobs (job_id, note_id, operation, status, requested_at, source_hash, job_manifest_path, result_path) values (" + $job_id_q + ", " + $note_id_q + ", " + $operation_q + ", 'queued', " + $requested_at_q + ", " + $source_hash_q + ", " + $manifest_path_q + ", " + $result_path_q + ");" + ] | str join '') + sql-run $sql | ignore + + { + job_id: $job_id + requested_at: $requested_at + manifest_path: $manifest_path + result_path: $result_path + transcript_path: $transcript_path + session_dir: $session_dir + } +} diff --git a/modules/hosts/_parts/tahani/notability/reconcile.nu b/modules/hosts/_parts/tahani/notability/reconcile.nu index cb1229c..e4042db 100644 --- a/modules/hosts/_parts/tahani/notability/reconcile.nu +++ b/modules/hosts/_parts/tahani/notability/reconcile.nu @@ -1,6 +1,7 @@ #!/usr/bin/env nu use ./lib.nu * +use ./jobs.nu [archive-and-version, enqueue-job] const settle_window = 45sec const delete_grace = 15min @@ -23,142 +24,13 @@ def is-settled [source_mtime: string] { } -def active-job-exists [note_id: string, source_hash: string] { - let rows = (sql-json $" - select job_id - from jobs - where note_id = (sql-quote $note_id) - and source_hash = (sql-quote $source_hash) - and status != 'done' - and status != 'failed' - limit 1; - ") - not ($rows | is-empty) -} - - -def enqueue-job [note: record, operation: string, archive_path: string, source_hash: string, title: string, force_overwrite_generated: bool = false] { - if (active-job-exists $note.note_id $source_hash) { - return null - } - - let job_id = (new-job-id) - let requested_at = (now-iso) - let manifest_path = (manifest-path-for $job_id 'queued') - let result_path = (result-path-for $job_id) - let transcript_path = (transcript-path-for $note.note_id $job_id) - let session_dir = ([(sessions-root) $note.note_id $job_id] | path join) - mkdir $session_dir - - let manifest = { - version: 1 - job_id: $job_id - note_id: $note.note_id - operation: $operation - requested_at: $requested_at - title: $title - source_relpath: $note.source_relpath - source_path: $note.source_path - input_path: $archive_path - archive_path: $archive_path - output_path: $note.output_path - transcript_path: $transcript_path - result_path: $result_path - session_dir: $session_dir - source_hash: $source_hash - last_generated_output_hash: ($note.last_generated_output_hash? | default null) - force_overwrite_generated: $force_overwrite_generated - source_transport: 'webdav' - } - - ($manifest | to json --indent 2) | save -f $manifest_path - let job_id_q = (sql-quote $job_id) - let note_id_q = (sql-quote $note.note_id) - let operation_q = (sql-quote $operation) - let requested_at_q = (sql-quote $requested_at) - let source_hash_q = (sql-quote $source_hash) - let manifest_path_q = (sql-quote $manifest_path) - let result_path_q = (sql-quote $result_path) - let sql = ([ - "insert into jobs (job_id, note_id, operation, status, requested_at, source_hash, job_manifest_path, result_path) values (" - $job_id_q - ", " - $note_id_q - ", " - $operation_q - ", 'queued', " - $requested_at_q - ", " - $source_hash_q - ", " - $manifest_path_q - ", " - $result_path_q - ");" - ] | str join '') - sql-run $sql | ignore - - log-event $note.note_id 'job-enqueued' { +def log-job-enqueued [note_id: string, job_id: string, operation: string, source_hash: string, archive_path: string] { + log-event $note_id 'job-enqueued' { job_id: $job_id operation: $operation source_hash: $source_hash archive_path: $archive_path } - - $job_id -} - - -def archive-and-version [note_id: string, source_path: path, source_relpath: string, source_size: any, source_mtime: string, source_hash: string] { - let source_size_int = ($source_size | into int) - let archive_path = (archive-path-for $note_id $source_hash $source_relpath) - cp $source_path $archive_path - - let version_id = (new-version-id) - let seen_at = (now-iso) - let version_id_q = (sql-quote $version_id) - let note_id_q = (sql-quote $note_id) - let seen_at_q = (sql-quote $seen_at) - let archive_path_q = (sql-quote $archive_path) - let source_hash_q = (sql-quote $source_hash) - let source_mtime_q = (sql-quote $source_mtime) - let source_relpath_q = (sql-quote $source_relpath) - let sql = ([ - "insert into versions (version_id, note_id, seen_at, archive_path, source_hash, source_size, source_mtime, source_relpath, ingest_result, session_path) values (" - $version_id_q - ", " - $note_id_q - ", " - $seen_at_q - ", " - $archive_path_q - ", " - $source_hash_q - ", " - ($source_size_int | into string) - ", " - $source_mtime_q - ", " - $source_relpath_q - ", 'pending', null);" - ] | str join '') - sql-run $sql | ignore - - { - version_id: $version_id - seen_at: $seen_at - archive_path: $archive_path - } -} - - -def find-note-by-source [source_relpath: string] { - sql-json $" - select * - from notes - where source_relpath = (sql-quote $source_relpath) - limit 1; - " } @@ -270,15 +142,16 @@ def process-existing [note: record, source: record] { } let runtime_note = ($note | upsert source_path $source.source_path | upsert source_relpath $source.source_relpath | upsert output_path $note.output_path | upsert last_generated_output_hash ($note.last_generated_output_hash? | default null)) - let retry_job_id = (enqueue-job $runtime_note 'upsert' $archive_path $source_hash $title) - if $retry_job_id != null { + let retry_job = (enqueue-job $runtime_note 'upsert' $archive_path $archive_path $source_hash $title) + if $retry_job != null { + log-job-enqueued $note_id $retry_job.job_id 'upsert' $source_hash $archive_path let reason = if $note_status == 'failed' { 'retry-failed-note' } else { 'missing-generated-output' } log-event $note_id 'job-reenqueued' { - job_id: $retry_job_id + job_id: $retry_job.job_id reason: $reason source_hash: $source_hash archive_path: $archive_path @@ -313,7 +186,10 @@ def process-existing [note: record, source: record] { | ignore let runtime_note = ($note | upsert source_path $source.source_path | upsert source_relpath $source.source_relpath | upsert output_path $note.output_path | upsert last_generated_output_hash ($note.last_generated_output_hash? | default null)) - let _ = (enqueue-job $runtime_note 'upsert' $version.archive_path $source_hash $title) + let job = (enqueue-job $runtime_note 'upsert' $version.archive_path $version.archive_path $source_hash $title) + if $job != null { + log-job-enqueued $note_id $job.job_id 'upsert' $source_hash $version.archive_path + } log-event $note_id 'source-updated' { source_relpath: $source.source_relpath @@ -405,7 +281,10 @@ def process-new [source: record] { output_path: $output_path last_generated_output_hash: null } - let _ = (enqueue-job $note 'upsert' $version.archive_path $source_hash $source.title) + let job = (enqueue-job $note 'upsert' $version.archive_path $version.archive_path $source_hash $source.title) + if $job != null { + log-job-enqueued $note_id $job.job_id 'upsert' $source_hash $version.archive_path + } log-event $note_id 'source-discovered' { source_relpath: $source.source_relpath @@ -462,7 +341,7 @@ def mark-missing [seen_relpaths: list] { } -def main [] { +export def reconcile-run [] { ensure-layout mut sources = (scan-source-files) @@ -501,3 +380,8 @@ def main [] { mark-missing ($sources | get source_relpath) } + + +def main [] { + reconcile-run +} diff --git a/modules/hosts/_parts/tahani/notability/reingest.nu b/modules/hosts/_parts/tahani/notability/reingest.nu index df7b902..8e709bc 100644 --- a/modules/hosts/_parts/tahani/notability/reingest.nu +++ b/modules/hosts/_parts/tahani/notability/reingest.nu @@ -1,8 +1,8 @@ #!/usr/bin/env nu use ./lib.nu * - -const script_dir = (path self | path dirname) +use ./jobs.nu [archive-and-version, enqueue-job] +use ./worker.nu [worker-run] def latest-version [note_id: string] { @@ -17,17 +17,18 @@ def latest-version [note_id: string] { } -def active-job-exists [note_id: string, source_hash: string] { - let rows = (sql-json $" +def existing-active-job [note_id: string, source_hash: string] { + sql-json $" select job_id from jobs where note_id = (sql-quote $note_id) and source_hash = (sql-quote $source_hash) and status != 'done' and status != 'failed' + order by requested_at desc limit 1; - ") - not ($rows | is-empty) + " + | first } @@ -41,46 +42,15 @@ def archive-current-source [note: record] { let source_hash = (sha256 $note.source_path) let source_size = (((ls -l $note.source_path | first).size) | into int) let source_mtime = (((ls -l $note.source_path | first).modified) | format date "%Y-%m-%dT%H:%M:%SZ") - let archive_path = (archive-path-for $note.note_id $source_hash $note.source_relpath) - cp $note.source_path $archive_path - - let version_id = (new-version-id) - let seen_at = (now-iso) - let version_id_q = (sql-quote $version_id) - let note_id_q = (sql-quote $note.note_id) - let seen_at_q = (sql-quote $seen_at) - let archive_path_q = (sql-quote $archive_path) - let source_hash_q = (sql-quote $source_hash) - let source_mtime_q = (sql-quote $source_mtime) - let source_relpath_q = (sql-quote $note.source_relpath) - let insert_sql = ([ - "insert into versions (version_id, note_id, seen_at, archive_path, source_hash, source_size, source_mtime, source_relpath, ingest_result, session_path) values (" - $version_id_q - ", " - $note_id_q - ", " - $seen_at_q - ", " - $archive_path_q - ", " - $source_hash_q - ", " - ($source_size | into string) - ", " - $source_mtime_q - ", " - $source_relpath_q - ", 'pending', null);" - ] | str join '') - sql-run $insert_sql | ignore + let version = (archive-and-version $note.note_id $note.source_path $note.source_relpath $source_size $source_mtime $source_hash) sql-run $" update notes set current_source_hash = (sql-quote $source_hash), current_source_size = ($source_size), current_source_mtime = (sql-quote $source_mtime), - current_archive_path = (sql-quote $archive_path), - latest_version_id = (sql-quote $version_id), + current_archive_path = (sql-quote $version.archive_path), + latest_version_id = (sql-quote $version.version_id), last_seen_at = (sql-quote (now-iso)), status = 'active', missing_since = null, @@ -90,96 +60,35 @@ def archive-current-source [note: record] { | ignore { - input_path: $archive_path - archive_path: $archive_path + input_path: $version.archive_path + archive_path: $version.archive_path source_hash: $source_hash } } -def enqueue-job [note: record, source_hash: string, input_path: string, archive_path: string, force_overwrite_generated: bool] { - if (active-job-exists $note.note_id $source_hash) { - let existing = (sql-json $" - select job_id - from jobs - where note_id = (sql-quote $note.note_id) - and source_hash = (sql-quote $source_hash) - and status != 'done' - and status != 'failed' - order by requested_at desc - limit 1; - " | first) - print $"Already queued: ($existing.job_id)" +def enqueue-reingest-job [note: record, source_hash: string, input_path: string, archive_path: string, force_overwrite_generated: bool] { + let job = (enqueue-job $note 'reingest' $input_path $archive_path $source_hash $note.title $force_overwrite_generated) + if $job == null { + let existing = (existing-active-job $note.note_id $source_hash) + print $"Already queued: ($existing.job_id? | default 'unknown')" return } - let job_id = (new-job-id) - let requested_at = (now-iso) - let manifest_path = (manifest-path-for $job_id 'queued') - let result_path = (result-path-for $job_id) - let transcript_path = (transcript-path-for $note.note_id $job_id) - let session_dir = ([(sessions-root) $note.note_id $job_id] | path join) - mkdir $session_dir - - let manifest = { - version: 1 - job_id: $job_id - note_id: $note.note_id - operation: 'reingest' - requested_at: $requested_at - title: $note.title - source_relpath: $note.source_relpath - source_path: $note.source_path - input_path: $input_path - archive_path: $archive_path - output_path: $note.output_path - transcript_path: $transcript_path - result_path: $result_path - session_dir: $session_dir - source_hash: $source_hash - last_generated_output_hash: ($note.last_generated_output_hash? | default null) - force_overwrite_generated: $force_overwrite_generated - source_transport: 'webdav' - } - - ($manifest | to json --indent 2) | save -f $manifest_path - let job_id_q = (sql-quote $job_id) - let note_id_q = (sql-quote $note.note_id) - let requested_at_q = (sql-quote $requested_at) - let source_hash_q = (sql-quote $source_hash) - let manifest_path_q = (sql-quote $manifest_path) - let result_path_q = (sql-quote $result_path) - let sql = ([ - "insert into jobs (job_id, note_id, operation, status, requested_at, source_hash, job_manifest_path, result_path) values (" - $job_id_q - ", " - $note_id_q - ", 'reingest', 'queued', " - $requested_at_q - ", " - $source_hash_q - ", " - $manifest_path_q - ", " - $result_path_q - ");" - ] | str join '') - sql-run $sql | ignore - log-event $note.note_id 'reingest-enqueued' { - job_id: $job_id + job_id: $job.job_id source_hash: $source_hash archive_path: $archive_path force_overwrite_generated: $force_overwrite_generated } - print $"Enqueued ($job_id) for ($note.note_id)" + print $"Enqueued ($job.job_id) for ($note.note_id)" - let worker_script = ([ $script_dir 'worker.nu' ] | path join) - let worker_result = (^nu $worker_script --drain | complete) - if $worker_result.exit_code != 0 { + try { + worker-run --drain + } catch {|error| error make { - msg: $"worker drain failed: ($worker_result.stderr | str trim)" + msg: (($error.msg? | default ($error | to nuon)) | into string) } } } @@ -224,7 +133,7 @@ def main [note_id: string, --latest-source, --latest-archive, --force-overwrite- if $source_mode == 'source' { let archived = (archive-current-source $note) - enqueue-job $note $archived.source_hash $archived.input_path $archived.archive_path $force_overwrite_generated + enqueue-reingest-job $note $archived.source_hash $archived.input_path $archived.archive_path $force_overwrite_generated return } @@ -235,5 +144,5 @@ def main [note_id: string, --latest-source, --latest-archive, --force-overwrite- } } - enqueue-job $note $version.source_hash $version.archive_path $version.archive_path $force_overwrite_generated + enqueue-reingest-job $note $version.source_hash $version.archive_path $version.archive_path $force_overwrite_generated } diff --git a/modules/hosts/_parts/tahani/notability/watch.nu b/modules/hosts/_parts/tahani/notability/watch.nu index ed5f2d0..03afce5 100644 --- a/modules/hosts/_parts/tahani/notability/watch.nu +++ b/modules/hosts/_parts/tahani/notability/watch.nu @@ -1,27 +1,36 @@ #!/usr/bin/env nu use ./lib.nu * +use ./reconcile.nu [reconcile-run] +use ./worker.nu [worker-run] -const script_dir = (path self | path dirname) + +def error-message [error: any] { + let msg = (($error.msg? | default '') | into string) + if $msg == '' { + $error | to nuon + } else { + $msg + } +} def run-worker [] { - let worker_script = ([ $script_dir 'worker.nu' ] | path join) - let worker_result = (^nu $worker_script --drain | complete) - if $worker_result.exit_code != 0 { - print $"worker failed: ($worker_result.stderr | str trim)" + try { + worker-run --drain + } catch {|error| + print $"worker failed: (error-message $error)" } } def run-sync [] { - let reconcile_script = ([ $script_dir 'reconcile.nu' ] | path join) - run-worker - let reconcile_result = (^nu $reconcile_script | complete) - if $reconcile_result.exit_code != 0 { - print $"reconcile failed: ($reconcile_result.stderr | str trim)" + try { + reconcile-run + } catch {|error| + print $"reconcile failed: (error-message $error)" return } diff --git a/modules/hosts/_parts/tahani/notability/worker.nu b/modules/hosts/_parts/tahani/notability/worker.nu index 7c96bd7..4eeee10 100644 --- a/modules/hosts/_parts/tahani/notability/worker.nu +++ b/modules/hosts/_parts/tahani/notability/worker.nu @@ -479,7 +479,7 @@ def drain-queued-jobs [] { } -def main [--drain] { +export def worker-run [--drain] { ensure-layout recover-running-jobs if $drain { @@ -499,3 +499,8 @@ def main [--drain] { maybe-update-qmd } } + + +def main [--drain] { + worker-run --drain=$drain +}