refactor notability ingest stack

This commit is contained in:
2026-03-25 17:53:32 +00:00
parent 4eefa6b337
commit 49fa4d623e
7 changed files with 285 additions and 477 deletions

View File

@@ -1,29 +1,33 @@
{ {
config, config,
inputs', inputs',
lib,
pkgs, pkgs,
... ...
}: let }: let
homeDir = "/home/cschmatzler";
notabilityScripts = ./notability; notabilityScripts = ./notability;
webdavRoot = "/home/cschmatzler/.local/share/notability-ingest/webdav-root"; dataRoot = "${homeDir}/.local/share/notability-ingest";
dataRoot = "/home/cschmatzler/.local/share/notability-ingest"; stateRoot = "${homeDir}/.local/state/notability-ingest";
stateRoot = "/home/cschmatzler/.local/state/notability-ingest"; notesRoot = "${homeDir}/Notes";
notesRoot = "/home/cschmatzler/Notes"; webdavRoot = "${dataRoot}/webdav-root";
commonPath = [ userPackages = with pkgs; [
inputs'.llm-agents.packages.pi qmd
pkgs.qmd poppler-utils
pkgs.coreutils rclone
pkgs.inotify-tools sqlite
pkgs.nushell zk
pkgs.poppler-utils
pkgs.rclone
pkgs.sqlite
pkgs.util-linux
pkgs.zk
]; ];
commonPath = with pkgs;
[
inputs'.llm-agents.packages.pi
coreutils
inotify-tools
nushell
util-linux
]
++ userPackages;
commonEnvironment = { commonEnvironment = {
HOME = "/home/cschmatzler"; HOME = homeDir;
NOTABILITY_ARCHIVE_ROOT = "${dataRoot}/archive"; NOTABILITY_ARCHIVE_ROOT = "${dataRoot}/archive";
NOTABILITY_DATA_ROOT = dataRoot; NOTABILITY_DATA_ROOT = dataRoot;
NOTABILITY_DB_PATH = "${stateRoot}/db.sqlite"; NOTABILITY_DB_PATH = "${stateRoot}/db.sqlite";
@@ -33,7 +37,28 @@
NOTABILITY_STATE_ROOT = stateRoot; NOTABILITY_STATE_ROOT = stateRoot;
NOTABILITY_TRANSCRIPT_ROOT = "${stateRoot}/transcripts"; NOTABILITY_TRANSCRIPT_ROOT = "${stateRoot}/transcripts";
NOTABILITY_WEBDAV_ROOT = webdavRoot; NOTABILITY_WEBDAV_ROOT = webdavRoot;
XDG_CONFIG_HOME = "/home/cschmatzler/.config"; XDG_CONFIG_HOME = "${homeDir}/.config";
};
mkTmpDirRule = path: "d ${path} 0755 cschmatzler users -";
mkNotabilityService = {
description,
script,
after ? [],
requires ? [],
environment ? {},
}: {
inherit after description requires;
wantedBy = ["multi-user.target"];
path = commonPath;
environment = commonEnvironment // environment;
serviceConfig = {
ExecStart = "${pkgs.nushell}/bin/nu ${notabilityScripts}/${script}";
Group = "users";
Restart = "always";
RestartSec = 5;
User = "cschmatzler";
WorkingDirectory = homeDir;
};
}; };
in { in {
sops.secrets.tahani-notability-webdav-password = { sops.secrets.tahani-notability-webdav-password = {
@@ -44,13 +69,7 @@ in {
}; };
home-manager.users.cschmatzler = { home-manager.users.cschmatzler = {
home.packages = [ home.packages = userPackages;
pkgs.qmd
pkgs.poppler-utils
pkgs.rclone
pkgs.sqlite
pkgs.zk
];
home.file.".config/qmd/index.yml".text = '' home.file.".config/qmd/index.yml".text = ''
collections: collections:
notes: notes:
@@ -59,21 +78,22 @@ in {
''; '';
}; };
systemd.tmpfiles.rules = [ systemd.tmpfiles.rules =
"d ${notesRoot} 0755 cschmatzler users -" builtins.map mkTmpDirRule [
"d ${dataRoot} 0755 cschmatzler users -" notesRoot
"d ${webdavRoot} 0755 cschmatzler users -" dataRoot
"d ${dataRoot}/archive 0755 cschmatzler users -" webdavRoot
"d ${dataRoot}/rendered-pages 0755 cschmatzler users -" "${dataRoot}/archive"
"d ${stateRoot} 0755 cschmatzler users -" "${dataRoot}/rendered-pages"
"d ${stateRoot}/jobs 0755 cschmatzler users -" stateRoot
"d ${stateRoot}/jobs/queued 0755 cschmatzler users -" "${stateRoot}/jobs"
"d ${stateRoot}/jobs/running 0755 cschmatzler users -" "${stateRoot}/jobs/queued"
"d ${stateRoot}/jobs/failed 0755 cschmatzler users -" "${stateRoot}/jobs/running"
"d ${stateRoot}/jobs/done 0755 cschmatzler users -" "${stateRoot}/jobs/failed"
"d ${stateRoot}/jobs/results 0755 cschmatzler users -" "${stateRoot}/jobs/done"
"d ${stateRoot}/sessions 0755 cschmatzler users -" "${stateRoot}/jobs/results"
"d ${stateRoot}/transcripts 0755 cschmatzler users -" "${stateRoot}/sessions"
"${stateRoot}/transcripts"
]; ];
services.caddy.virtualHosts."tahani.manticore-hippocampus.ts.net".extraConfig = '' services.caddy.virtualHosts."tahani.manticore-hippocampus.ts.net".extraConfig = ''
@@ -85,43 +105,24 @@ in {
} }
''; '';
systemd.services.notability-webdav = { systemd.services.notability-webdav =
mkNotabilityService {
description = "Notability WebDAV landing zone"; description = "Notability WebDAV landing zone";
wantedBy = ["multi-user.target"]; script = "webdav.nu";
after = ["network.target"]; after = ["network.target"];
path = commonPath; environment = {
environment =
commonEnvironment
// {
NOTABILITY_WEBDAV_ADDR = "127.0.0.1:9980"; NOTABILITY_WEBDAV_ADDR = "127.0.0.1:9980";
NOTABILITY_WEBDAV_BASEURL = "/notability"; NOTABILITY_WEBDAV_BASEURL = "/notability";
NOTABILITY_WEBDAV_PASSWORD_FILE = config.sops.secrets.tahani-notability-webdav-password.path; NOTABILITY_WEBDAV_PASSWORD_FILE = config.sops.secrets.tahani-notability-webdav-password.path;
NOTABILITY_WEBDAV_USER = "notability"; NOTABILITY_WEBDAV_USER = "notability";
}; };
serviceConfig = {
ExecStart = "${pkgs.nushell}/bin/nu ${notabilityScripts}/webdav.nu";
Group = "users";
Restart = "always";
RestartSec = 5;
User = "cschmatzler";
WorkingDirectory = "/home/cschmatzler";
};
}; };
systemd.services.notability-watch = { systemd.services.notability-watch =
mkNotabilityService {
description = "Watch and ingest Notability WebDAV uploads"; description = "Watch and ingest Notability WebDAV uploads";
wantedBy = ["multi-user.target"]; script = "watch.nu";
after = ["notability-webdav.service"]; after = ["notability-webdav.service"];
requires = ["notability-webdav.service"]; requires = ["notability-webdav.service"];
path = commonPath;
environment = commonEnvironment;
serviceConfig = {
ExecStart = "${pkgs.nushell}/bin/nu ${notabilityScripts}/watch.nu";
Group = "users";
Restart = "always";
RestartSec = 5;
User = "cschmatzler";
WorkingDirectory = "/home/cschmatzler";
};
}; };
} }

View File

@@ -1,141 +0,0 @@
#!/usr/bin/env nu
use ./lib.nu *
const vision_model = 'openai-codex/gpt-5.4'
def call-pi [prompt: string, inputs: list<path>, thinking: string] {
let prompt_file = (^mktemp --suffix '.md' | str trim)
$prompt | save -f $prompt_file
let input_refs = ($inputs | each {|f| $"'@($f)'"} | str join ' ')
let cmd = $"timeout 45s pi --model '($vision_model)' --thinking ($thinking) --no-tools --no-session -p ($input_refs) '@($prompt_file)'"
let result = (bash -c $cmd | complete)
rm -f $prompt_file
let output = ($result.stdout | str trim)
if $output != '' {
$output
} else {
error make { msg: $"pi returned no output \(exit ($result.exit_code)): ($result.stderr | str trim)" }
}
}
def render-pages [input_path: path, job_id: string] {
let ext = (([$input_path] | path parse | first).extension? | default '' | str downcase)
if $ext == 'png' {
[$input_path]
} else if $ext == 'pdf' {
let dir = [(render-root) $job_id] | path join
mkdir $dir
^pdftoppm -png -r 200 $input_path ([$dir 'page'] | path join)
(glob $"($dir)/*.png") | sort
} else {
error make { msg: $"Unsupported format: ($ext)" }
}
}
def unquote [v?: any] {
if $v == null { '' } else { $v | into string | str replace -r '^["''](.*)["'']$' '$1' }
}
def find-output [note_id: string, configured: path] {
if ($configured | path exists) {
let fm = (parse-output-frontmatter $configured)
if (unquote ($fm.managed_by? | default '')) == 'notability-ingest' and (unquote ($fm.note_id? | default '')) == $note_id {
return $configured
}
}
let found = (glob $"((notes-root))/**/*.md") | where not ($it | str contains '/.') | where {|f|
let fm = (parse-output-frontmatter $f)
(unquote ($fm.managed_by? | default '')) == 'notability-ingest' and (unquote ($fm.note_id? | default '')) == $note_id
}
if ($found | is-empty) { $configured } else { $found | first }
}
def source-format [p: path] {
([$p] | path parse | first).extension? | default 'bin' | str downcase
}
def main [manifest_path: path] {
ensure-layout
let m = (open $manifest_path)
# transcribe
let pages = (render-pages $m.input_path $m.job_id)
let transcript = (call-pi "Transcribe this note into clean Markdown. Read it like a human and preserve the intended reading order and visible structure. Keep headings, lists, and paragraphs when they are visible. Do not summarize. Do not add commentary. Return Markdown only." $pages 'low')
mkdir ([$m.transcript_path] | path dirname)
$"($transcript)\n" | save -f $m.transcript_path
# normalize
let normalized = (call-pi "Rewrite the attached transcription into clean Markdown. Preserve the same content and intended structure. Do not summarize. Return Markdown only." [$m.transcript_path] 'off')
# build output
let body = ($normalized | str trim)
let body_out = if $body == '' { $"# ($m.title)" } else { $body }
let created = ($m.requested_at | str substring 0..9)
let updated = ((date now) | format date '%Y-%m-%d')
let markdown = ([
'---'
$'title: ($m.title | to json)'
$'created: ($created | to json)'
$'updated: ($updated | to json)'
'source: "notability"'
$'source_transport: (($m.source_transport? | default "webdav") | to json)'
$'source_relpath: ($m.source_relpath | to json)'
$'note_id: ($m.note_id | to json)'
'managed_by: "notability-ingest"'
$'source_file: ($m.archive_path | to json)'
$'source_file_hash: ($"sha256:($m.source_hash)" | to json)'
$'source_format: ((source-format $m.archive_path) | to json)'
'status: "active"'
'tags:'
' - handwritten'
' - notability'
'---'
''
$body_out
''
] | str join "\n")
# write
let output_path = (find-output $m.note_id $m.output_path)
let write_path = if ($m.force_overwrite_generated? | default false) or not ($output_path | path exists) {
$output_path
} else {
let fm = (parse-output-frontmatter $output_path)
if (unquote ($fm.managed_by? | default '')) == 'notability-ingest' and (unquote ($fm.note_id? | default '')) == $m.note_id {
$output_path
} else {
let stamp = ((date now) | format date '%Y-%m-%dT%H-%M-%SZ')
let parsed = ([$output_path] | path parse | first)
[$parsed.parent $"($parsed.stem).conflict-($stamp).($parsed.extension)"] | path join
}
}
let write_mode = if not ($output_path | path exists) { 'create' } else if $write_path == $output_path { 'overwrite' } else { 'conflict' }
mkdir ([$write_path] | path dirname)
$markdown | save -f $write_path
let output_hash = (sha256 $write_path)
# result
{
success: true
job_id: $m.job_id
note_id: $m.note_id
archive_path: $m.archive_path
source_hash: $m.source_hash
session_dir: $m.session_dir
output_path: $output_path
output_hash: $output_hash
write_mode: $write_mode
updated_main_output: ($write_path == $output_path)
transcript_path: $m.transcript_path
} | to json --indent 2 | save -f $m.result_path
}

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env nu
use ./lib.nu *
def active-job-exists [note_id: string, source_hash: string] {
let rows = (sql-json $"
select job_id
from jobs
where note_id = (sql-quote $note_id)
and source_hash = (sql-quote $source_hash)
and status != 'done'
and status != 'failed'
limit 1;
")
not ($rows | is-empty)
}
export def archive-and-version [note_id: string, source_path: path, source_relpath: string, source_size: any, source_mtime: string, source_hash: string] {
let source_size_int = ($source_size | into int)
let archive_path = (archive-path-for $note_id $source_hash $source_relpath)
cp $source_path $archive_path
let version_id = (new-version-id)
let seen_at = (now-iso)
let version_id_q = (sql-quote $version_id)
let note_id_q = (sql-quote $note_id)
let seen_at_q = (sql-quote $seen_at)
let archive_path_q = (sql-quote $archive_path)
let source_hash_q = (sql-quote $source_hash)
let source_mtime_q = (sql-quote $source_mtime)
let source_relpath_q = (sql-quote $source_relpath)
let sql = ([
"insert into versions (version_id, note_id, seen_at, archive_path, source_hash, source_size, source_mtime, source_relpath, ingest_result, session_path) values ("
$version_id_q
", "
$note_id_q
", "
$seen_at_q
", "
$archive_path_q
", "
$source_hash_q
", "
($source_size_int | into string)
", "
$source_mtime_q
", "
$source_relpath_q
", 'pending', null);"
] | str join '')
sql-run $sql | ignore
{
version_id: $version_id
seen_at: $seen_at
archive_path: $archive_path
}
}
export def enqueue-job [
note: record,
operation: string,
input_path: string,
archive_path: string,
source_hash: string,
title: string,
force_overwrite_generated: bool = false,
source_transport: string = 'webdav',
] {
if (active-job-exists $note.note_id $source_hash) {
return null
}
let job_id = (new-job-id)
let requested_at = (now-iso)
let manifest_path = (manifest-path-for $job_id 'queued')
let result_path = (result-path-for $job_id)
let transcript_path = (transcript-path-for $note.note_id $job_id)
let session_dir = ([(sessions-root) $note.note_id $job_id] | path join)
mkdir $session_dir
let manifest = {
version: 1
job_id: $job_id
note_id: $note.note_id
operation: $operation
requested_at: $requested_at
title: $title
source_relpath: $note.source_relpath
source_path: $note.source_path
input_path: $input_path
archive_path: $archive_path
output_path: $note.output_path
transcript_path: $transcript_path
result_path: $result_path
session_dir: $session_dir
source_hash: $source_hash
last_generated_output_hash: ($note.last_generated_output_hash? | default null)
force_overwrite_generated: $force_overwrite_generated
source_transport: $source_transport
}
($manifest | to json --indent 2) | save -f $manifest_path
let job_id_q = (sql-quote $job_id)
let note_id_q = (sql-quote $note.note_id)
let operation_q = (sql-quote $operation)
let requested_at_q = (sql-quote $requested_at)
let source_hash_q = (sql-quote $source_hash)
let manifest_path_q = (sql-quote $manifest_path)
let result_path_q = (sql-quote $result_path)
let sql = ([
"insert into jobs (job_id, note_id, operation, status, requested_at, source_hash, job_manifest_path, result_path) values ("
$job_id_q
", "
$note_id_q
", "
$operation_q
", 'queued', "
$requested_at_q
", "
$source_hash_q
", "
$manifest_path_q
", "
$result_path_q
");"
] | str join '')
sql-run $sql | ignore
{
job_id: $job_id
requested_at: $requested_at
manifest_path: $manifest_path
result_path: $result_path
transcript_path: $transcript_path
session_dir: $session_dir
}
}

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env nu #!/usr/bin/env nu
use ./lib.nu * use ./lib.nu *
use ./jobs.nu [archive-and-version, enqueue-job]
const settle_window = 45sec const settle_window = 45sec
const delete_grace = 15min const delete_grace = 15min
@@ -23,142 +24,13 @@ def is-settled [source_mtime: string] {
} }
def active-job-exists [note_id: string, source_hash: string] { def log-job-enqueued [note_id: string, job_id: string, operation: string, source_hash: string, archive_path: string] {
let rows = (sql-json $" log-event $note_id 'job-enqueued' {
select job_id
from jobs
where note_id = (sql-quote $note_id)
and source_hash = (sql-quote $source_hash)
and status != 'done'
and status != 'failed'
limit 1;
")
not ($rows | is-empty)
}
def enqueue-job [note: record, operation: string, archive_path: string, source_hash: string, title: string, force_overwrite_generated: bool = false] {
if (active-job-exists $note.note_id $source_hash) {
return null
}
let job_id = (new-job-id)
let requested_at = (now-iso)
let manifest_path = (manifest-path-for $job_id 'queued')
let result_path = (result-path-for $job_id)
let transcript_path = (transcript-path-for $note.note_id $job_id)
let session_dir = ([(sessions-root) $note.note_id $job_id] | path join)
mkdir $session_dir
let manifest = {
version: 1
job_id: $job_id
note_id: $note.note_id
operation: $operation
requested_at: $requested_at
title: $title
source_relpath: $note.source_relpath
source_path: $note.source_path
input_path: $archive_path
archive_path: $archive_path
output_path: $note.output_path
transcript_path: $transcript_path
result_path: $result_path
session_dir: $session_dir
source_hash: $source_hash
last_generated_output_hash: ($note.last_generated_output_hash? | default null)
force_overwrite_generated: $force_overwrite_generated
source_transport: 'webdav'
}
($manifest | to json --indent 2) | save -f $manifest_path
let job_id_q = (sql-quote $job_id)
let note_id_q = (sql-quote $note.note_id)
let operation_q = (sql-quote $operation)
let requested_at_q = (sql-quote $requested_at)
let source_hash_q = (sql-quote $source_hash)
let manifest_path_q = (sql-quote $manifest_path)
let result_path_q = (sql-quote $result_path)
let sql = ([
"insert into jobs (job_id, note_id, operation, status, requested_at, source_hash, job_manifest_path, result_path) values ("
$job_id_q
", "
$note_id_q
", "
$operation_q
", 'queued', "
$requested_at_q
", "
$source_hash_q
", "
$manifest_path_q
", "
$result_path_q
");"
] | str join '')
sql-run $sql | ignore
log-event $note.note_id 'job-enqueued' {
job_id: $job_id job_id: $job_id
operation: $operation operation: $operation
source_hash: $source_hash source_hash: $source_hash
archive_path: $archive_path archive_path: $archive_path
} }
$job_id
}
def archive-and-version [note_id: string, source_path: path, source_relpath: string, source_size: any, source_mtime: string, source_hash: string] {
let source_size_int = ($source_size | into int)
let archive_path = (archive-path-for $note_id $source_hash $source_relpath)
cp $source_path $archive_path
let version_id = (new-version-id)
let seen_at = (now-iso)
let version_id_q = (sql-quote $version_id)
let note_id_q = (sql-quote $note_id)
let seen_at_q = (sql-quote $seen_at)
let archive_path_q = (sql-quote $archive_path)
let source_hash_q = (sql-quote $source_hash)
let source_mtime_q = (sql-quote $source_mtime)
let source_relpath_q = (sql-quote $source_relpath)
let sql = ([
"insert into versions (version_id, note_id, seen_at, archive_path, source_hash, source_size, source_mtime, source_relpath, ingest_result, session_path) values ("
$version_id_q
", "
$note_id_q
", "
$seen_at_q
", "
$archive_path_q
", "
$source_hash_q
", "
($source_size_int | into string)
", "
$source_mtime_q
", "
$source_relpath_q
", 'pending', null);"
] | str join '')
sql-run $sql | ignore
{
version_id: $version_id
seen_at: $seen_at
archive_path: $archive_path
}
}
def find-note-by-source [source_relpath: string] {
sql-json $"
select *
from notes
where source_relpath = (sql-quote $source_relpath)
limit 1;
"
} }
@@ -270,15 +142,16 @@ def process-existing [note: record, source: record] {
} }
let runtime_note = ($note | upsert source_path $source.source_path | upsert source_relpath $source.source_relpath | upsert output_path $note.output_path | upsert last_generated_output_hash ($note.last_generated_output_hash? | default null)) let runtime_note = ($note | upsert source_path $source.source_path | upsert source_relpath $source.source_relpath | upsert output_path $note.output_path | upsert last_generated_output_hash ($note.last_generated_output_hash? | default null))
let retry_job_id = (enqueue-job $runtime_note 'upsert' $archive_path $source_hash $title) let retry_job = (enqueue-job $runtime_note 'upsert' $archive_path $archive_path $source_hash $title)
if $retry_job_id != null { if $retry_job != null {
log-job-enqueued $note_id $retry_job.job_id 'upsert' $source_hash $archive_path
let reason = if $note_status == 'failed' { let reason = if $note_status == 'failed' {
'retry-failed-note' 'retry-failed-note'
} else { } else {
'missing-generated-output' 'missing-generated-output'
} }
log-event $note_id 'job-reenqueued' { log-event $note_id 'job-reenqueued' {
job_id: $retry_job_id job_id: $retry_job.job_id
reason: $reason reason: $reason
source_hash: $source_hash source_hash: $source_hash
archive_path: $archive_path archive_path: $archive_path
@@ -313,7 +186,10 @@ def process-existing [note: record, source: record] {
| ignore | ignore
let runtime_note = ($note | upsert source_path $source.source_path | upsert source_relpath $source.source_relpath | upsert output_path $note.output_path | upsert last_generated_output_hash ($note.last_generated_output_hash? | default null)) let runtime_note = ($note | upsert source_path $source.source_path | upsert source_relpath $source.source_relpath | upsert output_path $note.output_path | upsert last_generated_output_hash ($note.last_generated_output_hash? | default null))
let _ = (enqueue-job $runtime_note 'upsert' $version.archive_path $source_hash $title) let job = (enqueue-job $runtime_note 'upsert' $version.archive_path $version.archive_path $source_hash $title)
if $job != null {
log-job-enqueued $note_id $job.job_id 'upsert' $source_hash $version.archive_path
}
log-event $note_id 'source-updated' { log-event $note_id 'source-updated' {
source_relpath: $source.source_relpath source_relpath: $source.source_relpath
@@ -405,7 +281,10 @@ def process-new [source: record] {
output_path: $output_path output_path: $output_path
last_generated_output_hash: null last_generated_output_hash: null
} }
let _ = (enqueue-job $note 'upsert' $version.archive_path $source_hash $source.title) let job = (enqueue-job $note 'upsert' $version.archive_path $version.archive_path $source_hash $source.title)
if $job != null {
log-job-enqueued $note_id $job.job_id 'upsert' $source_hash $version.archive_path
}
log-event $note_id 'source-discovered' { log-event $note_id 'source-discovered' {
source_relpath: $source.source_relpath source_relpath: $source.source_relpath
@@ -462,7 +341,7 @@ def mark-missing [seen_relpaths: list<string>] {
} }
def main [] { export def reconcile-run [] {
ensure-layout ensure-layout
mut sources = (scan-source-files) mut sources = (scan-source-files)
@@ -501,3 +380,8 @@ def main [] {
mark-missing ($sources | get source_relpath) mark-missing ($sources | get source_relpath)
} }
def main [] {
reconcile-run
}

View File

@@ -1,8 +1,8 @@
#!/usr/bin/env nu #!/usr/bin/env nu
use ./lib.nu * use ./lib.nu *
use ./jobs.nu [archive-and-version, enqueue-job]
const script_dir = (path self | path dirname) use ./worker.nu [worker-run]
def latest-version [note_id: string] { def latest-version [note_id: string] {
@@ -17,17 +17,18 @@ def latest-version [note_id: string] {
} }
def active-job-exists [note_id: string, source_hash: string] { def existing-active-job [note_id: string, source_hash: string] {
let rows = (sql-json $" sql-json $"
select job_id select job_id
from jobs from jobs
where note_id = (sql-quote $note_id) where note_id = (sql-quote $note_id)
and source_hash = (sql-quote $source_hash) and source_hash = (sql-quote $source_hash)
and status != 'done' and status != 'done'
and status != 'failed' and status != 'failed'
order by requested_at desc
limit 1; limit 1;
") "
not ($rows | is-empty) | first
} }
@@ -41,46 +42,15 @@ def archive-current-source [note: record] {
let source_hash = (sha256 $note.source_path) let source_hash = (sha256 $note.source_path)
let source_size = (((ls -l $note.source_path | first).size) | into int) let source_size = (((ls -l $note.source_path | first).size) | into int)
let source_mtime = (((ls -l $note.source_path | first).modified) | format date "%Y-%m-%dT%H:%M:%SZ") let source_mtime = (((ls -l $note.source_path | first).modified) | format date "%Y-%m-%dT%H:%M:%SZ")
let archive_path = (archive-path-for $note.note_id $source_hash $note.source_relpath) let version = (archive-and-version $note.note_id $note.source_path $note.source_relpath $source_size $source_mtime $source_hash)
cp $note.source_path $archive_path
let version_id = (new-version-id)
let seen_at = (now-iso)
let version_id_q = (sql-quote $version_id)
let note_id_q = (sql-quote $note.note_id)
let seen_at_q = (sql-quote $seen_at)
let archive_path_q = (sql-quote $archive_path)
let source_hash_q = (sql-quote $source_hash)
let source_mtime_q = (sql-quote $source_mtime)
let source_relpath_q = (sql-quote $note.source_relpath)
let insert_sql = ([
"insert into versions (version_id, note_id, seen_at, archive_path, source_hash, source_size, source_mtime, source_relpath, ingest_result, session_path) values ("
$version_id_q
", "
$note_id_q
", "
$seen_at_q
", "
$archive_path_q
", "
$source_hash_q
", "
($source_size | into string)
", "
$source_mtime_q
", "
$source_relpath_q
", 'pending', null);"
] | str join '')
sql-run $insert_sql | ignore
sql-run $" sql-run $"
update notes update notes
set current_source_hash = (sql-quote $source_hash), set current_source_hash = (sql-quote $source_hash),
current_source_size = ($source_size), current_source_size = ($source_size),
current_source_mtime = (sql-quote $source_mtime), current_source_mtime = (sql-quote $source_mtime),
current_archive_path = (sql-quote $archive_path), current_archive_path = (sql-quote $version.archive_path),
latest_version_id = (sql-quote $version_id), latest_version_id = (sql-quote $version.version_id),
last_seen_at = (sql-quote (now-iso)), last_seen_at = (sql-quote (now-iso)),
status = 'active', status = 'active',
missing_since = null, missing_since = null,
@@ -90,96 +60,35 @@ def archive-current-source [note: record] {
| ignore | ignore
{ {
input_path: $archive_path input_path: $version.archive_path
archive_path: $archive_path archive_path: $version.archive_path
source_hash: $source_hash source_hash: $source_hash
} }
} }
def enqueue-job [note: record, source_hash: string, input_path: string, archive_path: string, force_overwrite_generated: bool] { def enqueue-reingest-job [note: record, source_hash: string, input_path: string, archive_path: string, force_overwrite_generated: bool] {
if (active-job-exists $note.note_id $source_hash) { let job = (enqueue-job $note 'reingest' $input_path $archive_path $source_hash $note.title $force_overwrite_generated)
let existing = (sql-json $" if $job == null {
select job_id let existing = (existing-active-job $note.note_id $source_hash)
from jobs print $"Already queued: ($existing.job_id? | default 'unknown')"
where note_id = (sql-quote $note.note_id)
and source_hash = (sql-quote $source_hash)
and status != 'done'
and status != 'failed'
order by requested_at desc
limit 1;
" | first)
print $"Already queued: ($existing.job_id)"
return return
} }
let job_id = (new-job-id)
let requested_at = (now-iso)
let manifest_path = (manifest-path-for $job_id 'queued')
let result_path = (result-path-for $job_id)
let transcript_path = (transcript-path-for $note.note_id $job_id)
let session_dir = ([(sessions-root) $note.note_id $job_id] | path join)
mkdir $session_dir
let manifest = {
version: 1
job_id: $job_id
note_id: $note.note_id
operation: 'reingest'
requested_at: $requested_at
title: $note.title
source_relpath: $note.source_relpath
source_path: $note.source_path
input_path: $input_path
archive_path: $archive_path
output_path: $note.output_path
transcript_path: $transcript_path
result_path: $result_path
session_dir: $session_dir
source_hash: $source_hash
last_generated_output_hash: ($note.last_generated_output_hash? | default null)
force_overwrite_generated: $force_overwrite_generated
source_transport: 'webdav'
}
($manifest | to json --indent 2) | save -f $manifest_path
let job_id_q = (sql-quote $job_id)
let note_id_q = (sql-quote $note.note_id)
let requested_at_q = (sql-quote $requested_at)
let source_hash_q = (sql-quote $source_hash)
let manifest_path_q = (sql-quote $manifest_path)
let result_path_q = (sql-quote $result_path)
let sql = ([
"insert into jobs (job_id, note_id, operation, status, requested_at, source_hash, job_manifest_path, result_path) values ("
$job_id_q
", "
$note_id_q
", 'reingest', 'queued', "
$requested_at_q
", "
$source_hash_q
", "
$manifest_path_q
", "
$result_path_q
");"
] | str join '')
sql-run $sql | ignore
log-event $note.note_id 'reingest-enqueued' { log-event $note.note_id 'reingest-enqueued' {
job_id: $job_id job_id: $job.job_id
source_hash: $source_hash source_hash: $source_hash
archive_path: $archive_path archive_path: $archive_path
force_overwrite_generated: $force_overwrite_generated force_overwrite_generated: $force_overwrite_generated
} }
print $"Enqueued ($job_id) for ($note.note_id)" print $"Enqueued ($job.job_id) for ($note.note_id)"
let worker_script = ([ $script_dir 'worker.nu' ] | path join) try {
let worker_result = (^nu $worker_script --drain | complete) worker-run --drain
if $worker_result.exit_code != 0 { } catch {|error|
error make { error make {
msg: $"worker drain failed: ($worker_result.stderr | str trim)" msg: (($error.msg? | default ($error | to nuon)) | into string)
} }
} }
} }
@@ -224,7 +133,7 @@ def main [note_id: string, --latest-source, --latest-archive, --force-overwrite-
if $source_mode == 'source' { if $source_mode == 'source' {
let archived = (archive-current-source $note) let archived = (archive-current-source $note)
enqueue-job $note $archived.source_hash $archived.input_path $archived.archive_path $force_overwrite_generated enqueue-reingest-job $note $archived.source_hash $archived.input_path $archived.archive_path $force_overwrite_generated
return return
} }
@@ -235,5 +144,5 @@ def main [note_id: string, --latest-source, --latest-archive, --force-overwrite-
} }
} }
enqueue-job $note $version.source_hash $version.archive_path $version.archive_path $force_overwrite_generated enqueue-reingest-job $note $version.source_hash $version.archive_path $version.archive_path $force_overwrite_generated
} }

View File

@@ -1,27 +1,36 @@
#!/usr/bin/env nu #!/usr/bin/env nu
use ./lib.nu * use ./lib.nu *
use ./reconcile.nu [reconcile-run]
use ./worker.nu [worker-run]
const script_dir = (path self | path dirname)
def error-message [error: any] {
let msg = (($error.msg? | default '') | into string)
if $msg == '' {
$error | to nuon
} else {
$msg
}
}
def run-worker [] { def run-worker [] {
let worker_script = ([ $script_dir 'worker.nu' ] | path join) try {
let worker_result = (^nu $worker_script --drain | complete) worker-run --drain
if $worker_result.exit_code != 0 { } catch {|error|
print $"worker failed: ($worker_result.stderr | str trim)" print $"worker failed: (error-message $error)"
} }
} }
def run-sync [] { def run-sync [] {
let reconcile_script = ([ $script_dir 'reconcile.nu' ] | path join)
run-worker run-worker
let reconcile_result = (^nu $reconcile_script | complete) try {
if $reconcile_result.exit_code != 0 { reconcile-run
print $"reconcile failed: ($reconcile_result.stderr | str trim)" } catch {|error|
print $"reconcile failed: (error-message $error)"
return return
} }

View File

@@ -479,7 +479,7 @@ def drain-queued-jobs [] {
} }
def main [--drain] { export def worker-run [--drain] {
ensure-layout ensure-layout
recover-running-jobs recover-running-jobs
if $drain { if $drain {
@@ -499,3 +499,8 @@ def main [--drain] {
maybe-update-qmd maybe-update-qmd
} }
} }
def main [--drain] {
worker-run --drain=$drain
}