feat(notes): add Notability WebDAV ingest pipeline

This commit is contained in:
2026-03-25 11:08:45 +00:00
parent bef2afed66
commit 4eefa6b337
21 changed files with 8305 additions and 2 deletions

View File

@@ -0,0 +1,141 @@
#!/usr/bin/env nu
use ./lib.nu *
const vision_model = 'openai-codex/gpt-5.4'
def call-pi [prompt: string, inputs: list<path>, thinking: string] {
let prompt_file = (^mktemp --suffix '.md' | str trim)
$prompt | save -f $prompt_file
let input_refs = ($inputs | each {|f| $"'@($f)'"} | str join ' ')
let cmd = $"timeout 45s pi --model '($vision_model)' --thinking ($thinking) --no-tools --no-session -p ($input_refs) '@($prompt_file)'"
let result = (bash -c $cmd | complete)
rm -f $prompt_file
let output = ($result.stdout | str trim)
if $output != '' {
$output
} else {
error make { msg: $"pi returned no output \(exit ($result.exit_code)): ($result.stderr | str trim)" }
}
}
def render-pages [input_path: path, job_id: string] {
let ext = (([$input_path] | path parse | first).extension? | default '' | str downcase)
if $ext == 'png' {
[$input_path]
} else if $ext == 'pdf' {
let dir = [(render-root) $job_id] | path join
mkdir $dir
^pdftoppm -png -r 200 $input_path ([$dir 'page'] | path join)
(glob $"($dir)/*.png") | sort
} else {
error make { msg: $"Unsupported format: ($ext)" }
}
}
def unquote [v?: any] {
if $v == null { '' } else { $v | into string | str replace -r '^["''](.*)["'']$' '$1' }
}
def find-output [note_id: string, configured: path] {
if ($configured | path exists) {
let fm = (parse-output-frontmatter $configured)
if (unquote ($fm.managed_by? | default '')) == 'notability-ingest' and (unquote ($fm.note_id? | default '')) == $note_id {
return $configured
}
}
let found = (glob $"((notes-root))/**/*.md") | where not ($it | str contains '/.') | where {|f|
let fm = (parse-output-frontmatter $f)
(unquote ($fm.managed_by? | default '')) == 'notability-ingest' and (unquote ($fm.note_id? | default '')) == $note_id
}
if ($found | is-empty) { $configured } else { $found | first }
}
def source-format [p: path] {
([$p] | path parse | first).extension? | default 'bin' | str downcase
}
def main [manifest_path: path] {
ensure-layout
let m = (open $manifest_path)
# transcribe
let pages = (render-pages $m.input_path $m.job_id)
let transcript = (call-pi "Transcribe this note into clean Markdown. Read it like a human and preserve the intended reading order and visible structure. Keep headings, lists, and paragraphs when they are visible. Do not summarize. Do not add commentary. Return Markdown only." $pages 'low')
mkdir ([$m.transcript_path] | path dirname)
$"($transcript)\n" | save -f $m.transcript_path
# normalize
let normalized = (call-pi "Rewrite the attached transcription into clean Markdown. Preserve the same content and intended structure. Do not summarize. Return Markdown only." [$m.transcript_path] 'off')
# build output
let body = ($normalized | str trim)
let body_out = if $body == '' { $"# ($m.title)" } else { $body }
let created = ($m.requested_at | str substring 0..9)
let updated = ((date now) | format date '%Y-%m-%d')
let markdown = ([
'---'
$'title: ($m.title | to json)'
$'created: ($created | to json)'
$'updated: ($updated | to json)'
'source: "notability"'
$'source_transport: (($m.source_transport? | default "webdav") | to json)'
$'source_relpath: ($m.source_relpath | to json)'
$'note_id: ($m.note_id | to json)'
'managed_by: "notability-ingest"'
$'source_file: ($m.archive_path | to json)'
$'source_file_hash: ($"sha256:($m.source_hash)" | to json)'
$'source_format: ((source-format $m.archive_path) | to json)'
'status: "active"'
'tags:'
' - handwritten'
' - notability'
'---'
''
$body_out
''
] | str join "\n")
# write
let output_path = (find-output $m.note_id $m.output_path)
let write_path = if ($m.force_overwrite_generated? | default false) or not ($output_path | path exists) {
$output_path
} else {
let fm = (parse-output-frontmatter $output_path)
if (unquote ($fm.managed_by? | default '')) == 'notability-ingest' and (unquote ($fm.note_id? | default '')) == $m.note_id {
$output_path
} else {
let stamp = ((date now) | format date '%Y-%m-%dT%H-%M-%SZ')
let parsed = ([$output_path] | path parse | first)
[$parsed.parent $"($parsed.stem).conflict-($stamp).($parsed.extension)"] | path join
}
}
let write_mode = if not ($output_path | path exists) { 'create' } else if $write_path == $output_path { 'overwrite' } else { 'conflict' }
mkdir ([$write_path] | path dirname)
$markdown | save -f $write_path
let output_hash = (sha256 $write_path)
# result
{
success: true
job_id: $m.job_id
note_id: $m.note_id
archive_path: $m.archive_path
source_hash: $m.source_hash
session_dir: $m.session_dir
output_path: $output_path
output_hash: $output_hash
write_mode: $write_mode
updated_main_output: ($write_path == $output_path)
transcript_path: $m.transcript_path
} | to json --indent 2 | save -f $m.result_path
}

View File

@@ -0,0 +1,433 @@
export def home-dir [] {
$nu.home-dir
}
export def data-root [] {
if ('NOTABILITY_DATA_ROOT' in ($env | columns)) {
$env.NOTABILITY_DATA_ROOT
} else {
[$nu.home-dir ".local" "share" "notability-ingest"] | path join
}
}
export def state-root [] {
if ('NOTABILITY_STATE_ROOT' in ($env | columns)) {
$env.NOTABILITY_STATE_ROOT
} else {
[$nu.home-dir ".local" "state" "notability-ingest"] | path join
}
}
export def notes-root [] {
if ('NOTABILITY_NOTES_DIR' in ($env | columns)) {
$env.NOTABILITY_NOTES_DIR
} else {
[$nu.home-dir "Notes"] | path join
}
}
export def webdav-root [] {
if ('NOTABILITY_WEBDAV_ROOT' in ($env | columns)) {
$env.NOTABILITY_WEBDAV_ROOT
} else {
[(data-root) "webdav-root"] | path join
}
}
export def archive-root [] {
if ('NOTABILITY_ARCHIVE_ROOT' in ($env | columns)) {
$env.NOTABILITY_ARCHIVE_ROOT
} else {
[(data-root) "archive"] | path join
}
}
export def render-root [] {
if ('NOTABILITY_RENDER_ROOT' in ($env | columns)) {
$env.NOTABILITY_RENDER_ROOT
} else {
[(data-root) "rendered-pages"] | path join
}
}
export def transcript-root [] {
if ('NOTABILITY_TRANSCRIPT_ROOT' in ($env | columns)) {
$env.NOTABILITY_TRANSCRIPT_ROOT
} else {
[(state-root) "transcripts"] | path join
}
}
export def jobs-root [] {
if ('NOTABILITY_JOBS_ROOT' in ($env | columns)) {
$env.NOTABILITY_JOBS_ROOT
} else {
[(state-root) "jobs"] | path join
}
}
export def queued-root [] {
[(jobs-root) "queued"] | path join
}
export def running-root [] {
[(jobs-root) "running"] | path join
}
export def failed-root [] {
[(jobs-root) "failed"] | path join
}
export def done-root [] {
[(jobs-root) "done"] | path join
}
export def results-root [] {
[(jobs-root) "results"] | path join
}
export def sessions-root [] {
if ('NOTABILITY_SESSIONS_ROOT' in ($env | columns)) {
$env.NOTABILITY_SESSIONS_ROOT
} else {
[(state-root) "sessions"] | path join
}
}
export def qmd-dirty-file [] {
[(state-root) "qmd-dirty"] | path join
}
export def db-path [] {
if ('NOTABILITY_DB_PATH' in ($env | columns)) {
$env.NOTABILITY_DB_PATH
} else {
[(state-root) "db.sqlite"] | path join
}
}
export def now-iso [] {
date now | format date "%Y-%m-%dT%H:%M:%SZ"
}
export def sql-quote [value?: any] {
if $value == null {
"NULL"
} else {
let text = ($value | into string | str replace -a "'" "''")
["'" $text "'"] | str join ''
}
}
export def sql-run [sql: string] {
let database = (db-path)
let result = (^sqlite3 -cmd '.timeout 5000' $database $sql | complete)
if $result.exit_code != 0 {
error make {
msg: $"sqlite3 failed: ($result.stderr | str trim)"
}
}
$result.stdout
}
export def sql-json [sql: string] {
let database = (db-path)
let result = (^sqlite3 -cmd '.timeout 5000' -json $database $sql | complete)
if $result.exit_code != 0 {
error make {
msg: $"sqlite3 failed: ($result.stderr | str trim)"
}
}
let text = ($result.stdout | str trim)
if $text == "" {
[]
} else {
$text | from json
}
}
export def ensure-layout [] {
mkdir (data-root)
mkdir (state-root)
mkdir (notes-root)
mkdir (webdav-root)
mkdir (archive-root)
mkdir (render-root)
mkdir (transcript-root)
mkdir (jobs-root)
mkdir (queued-root)
mkdir (running-root)
mkdir (failed-root)
mkdir (done-root)
mkdir (results-root)
mkdir (sessions-root)
sql-run '
create table if not exists notes (
note_id text primary key,
source_relpath text not null unique,
title text not null,
output_path text not null,
status text not null,
first_seen_at text not null,
last_seen_at text not null,
last_processed_at text,
missing_since text,
deleted_at text,
current_source_hash text,
current_source_size integer,
current_source_mtime text,
current_archive_path text,
latest_version_id text,
last_generated_source_hash text,
last_generated_output_hash text,
conflict_path text,
last_error text
);
create table if not exists versions (
version_id text primary key,
note_id text not null,
seen_at text not null,
archive_path text not null unique,
source_hash text not null,
source_size integer not null,
source_mtime text not null,
source_relpath text not null,
ingest_result text,
session_path text,
foreign key (note_id) references notes (note_id)
);
create table if not exists jobs (
job_id text primary key,
note_id text not null,
operation text not null,
status text not null,
requested_at text not null,
started_at text,
finished_at text,
source_hash text,
job_manifest_path text not null,
result_path text not null,
error_summary text,
foreign key (note_id) references notes (note_id)
);
create table if not exists events (
id integer primary key autoincrement,
note_id text not null,
ts text not null,
kind text not null,
details text,
foreign key (note_id) references notes (note_id)
);
create index if not exists idx_jobs_status_requested_at on jobs(status, requested_at);
create index if not exists idx_versions_note_id_seen_at on versions(note_id, seen_at);
create index if not exists idx_events_note_id_ts on events(note_id, ts);
'
| ignore
}
export def log-event [note_id: string, kind: string, details?: any] {
let payload = if $details == null { null } else { $details | to json }
let note_id_q = (sql-quote $note_id)
let now_q = (sql-quote (now-iso))
let kind_q = (sql-quote $kind)
let payload_q = (sql-quote $payload)
let sql = ([
"insert into events (note_id, ts, kind, details) values ("
$note_id_q
", "
$now_q
", "
$kind_q
", "
$payload_q
");"
] | str join '')
sql-run $sql | ignore
}
export def slugify [value: string] {
let slug = (
$value
| str downcase
| str replace -r '[^a-z0-9]+' '-'
| str replace -r '^-+' ''
| str replace -r '-+$' ''
)
if $slug == '' {
'note'
} else {
$slug
}
}
export def sha256 [file: path] {
(^sha256sum $file | lines | first | split row ' ' | first)
}
export def parse-output-frontmatter [file: path] {
if not ($file | path exists) {
{}
} else {
let content = (open --raw $file)
if not ($content | str starts-with "---\n") {
{}
} else {
let rest = ($content | str substring 4..)
let end = ($rest | str index-of "\n---\n")
if $end == null {
{}
} else {
let block = ($rest | str substring 0..($end - 1))
$block
| lines
| where ($it | str contains ':')
| reduce --fold {} {|line, acc|
let idx = ($line | str index-of ':')
if $idx == null {
$acc
} else {
let key = ($line | str substring 0..($idx - 1) | str trim)
let value = ($line | str substring ($idx + 1).. | str trim)
$acc | upsert $key $value
}
}
}
}
}
}
export def zk-generated-note-path [title: string] {
let root = (notes-root)
let effective_title = if ($title | str trim) == '' {
'Imported note'
} else {
$title
}
let result = (
^zk --notebook-dir $root --working-dir $root new $root --no-input --title $effective_title --print-path --dry-run
| complete
)
if $result.exit_code != 0 {
error make {
msg: $"zk failed to generate a note path: ($result.stderr | str trim)"
}
}
let path_text = ($result.stderr | str trim)
if $path_text == '' {
error make {
msg: 'zk did not return a generated note path'
}
}
$path_text
| lines
| last
| str trim
}
export def new-note-id [] {
let suffix = (random uuid | str replace -a '-' '')
$"ntl_($suffix)"
}
export def new-job-id [] {
let suffix = (random uuid | str replace -a '-' '')
$"job_($suffix)"
}
export def new-version-id [] {
let suffix = (random uuid | str replace -a '-' '')
$"ver_($suffix)"
}
export def archive-path-for [note_id: string, source_hash: string, source_relpath: string] {
let stamp = (date now | format date "%Y-%m-%dT%H-%M-%SZ")
let short = ($source_hash | str substring 0..11)
let directory = [(archive-root) $note_id] | path join
let parsed = ($source_relpath | path parse)
let extension = if (($parsed.extension? | default '') | str trim) == '' {
'bin'
} else {
($parsed.extension | str downcase)
}
mkdir $directory
[$directory $"($stamp)-($short).($extension)"] | path join
}
export def transcript-path-for [note_id: string, job_id: string] {
let directory = [(transcript-root) $note_id] | path join
mkdir $directory
[$directory $"($job_id).md"] | path join
}
export def result-path-for [job_id: string] {
[(results-root) $"($job_id).json"] | path join
}
export def manifest-path-for [job_id: string, status: string] {
let root = match $status {
'queued' => (queued-root)
'running' => (running-root)
'failed' => (failed-root)
'done' => (done-root)
_ => (queued-root)
}
[$root $"($job_id).json"] | path join
}
export def note-output-path [title: string] {
zk-generated-note-path $title
}
export def is-supported-source-path [path: string] {
let lower = ($path | str downcase)
(($lower | str ends-with '.pdf') or ($lower | str ends-with '.png'))
}
export def is-ignored-path [relpath: string] {
let lower = ($relpath | str downcase)
let hidden = (($lower | str contains '/.') or ($lower | str starts-with '.'))
let temp = (($lower | str contains '/~') or ($lower | str ends-with '.tmp') or ($lower | str ends-with '.part'))
let conflict = ($lower | str contains '.sync-conflict')
($hidden or $temp or $conflict)
}
export def scan-source-files [] {
let root = (webdav-root)
if not ($root | path exists) {
[]
} else {
let files = ([
(glob $"($root)/**/*.pdf")
(glob $"($root)/**/*.PDF")
(glob $"($root)/**/*.png")
(glob $"($root)/**/*.PNG")
] | flatten)
$files
| sort
| uniq
| each {|file|
let relpath = ($file | path relative-to $root)
if ((is-ignored-path $relpath) or not (is-supported-source-path $file)) {
null
} else {
let stat = (ls -l $file | first)
{
source_path: $file
source_relpath: $relpath
source_size: $stat.size
source_mtime: ($stat.modified | format date "%Y-%m-%dT%H:%M:%SZ")
title: (($relpath | path parse).stem)
}
}
}
| where $it != null
}
}

View File

@@ -0,0 +1,503 @@
#!/usr/bin/env nu
use ./lib.nu *
const settle_window = 45sec
const delete_grace = 15min
def settle-remaining [source_mtime: string] {
let modified = ($source_mtime | into datetime)
let age = ((date now) - $modified)
if $age >= $settle_window {
0sec
} else {
$settle_window - $age
}
}
def is-settled [source_mtime: string] {
let modified = ($source_mtime | into datetime)
((date now) - $modified) >= $settle_window
}
def active-job-exists [note_id: string, source_hash: string] {
let rows = (sql-json $"
select job_id
from jobs
where note_id = (sql-quote $note_id)
and source_hash = (sql-quote $source_hash)
and status != 'done'
and status != 'failed'
limit 1;
")
not ($rows | is-empty)
}
def enqueue-job [note: record, operation: string, archive_path: string, source_hash: string, title: string, force_overwrite_generated: bool = false] {
if (active-job-exists $note.note_id $source_hash) {
return null
}
let job_id = (new-job-id)
let requested_at = (now-iso)
let manifest_path = (manifest-path-for $job_id 'queued')
let result_path = (result-path-for $job_id)
let transcript_path = (transcript-path-for $note.note_id $job_id)
let session_dir = ([(sessions-root) $note.note_id $job_id] | path join)
mkdir $session_dir
let manifest = {
version: 1
job_id: $job_id
note_id: $note.note_id
operation: $operation
requested_at: $requested_at
title: $title
source_relpath: $note.source_relpath
source_path: $note.source_path
input_path: $archive_path
archive_path: $archive_path
output_path: $note.output_path
transcript_path: $transcript_path
result_path: $result_path
session_dir: $session_dir
source_hash: $source_hash
last_generated_output_hash: ($note.last_generated_output_hash? | default null)
force_overwrite_generated: $force_overwrite_generated
source_transport: 'webdav'
}
($manifest | to json --indent 2) | save -f $manifest_path
let job_id_q = (sql-quote $job_id)
let note_id_q = (sql-quote $note.note_id)
let operation_q = (sql-quote $operation)
let requested_at_q = (sql-quote $requested_at)
let source_hash_q = (sql-quote $source_hash)
let manifest_path_q = (sql-quote $manifest_path)
let result_path_q = (sql-quote $result_path)
let sql = ([
"insert into jobs (job_id, note_id, operation, status, requested_at, source_hash, job_manifest_path, result_path) values ("
$job_id_q
", "
$note_id_q
", "
$operation_q
", 'queued', "
$requested_at_q
", "
$source_hash_q
", "
$manifest_path_q
", "
$result_path_q
");"
] | str join '')
sql-run $sql | ignore
log-event $note.note_id 'job-enqueued' {
job_id: $job_id
operation: $operation
source_hash: $source_hash
archive_path: $archive_path
}
$job_id
}
def archive-and-version [note_id: string, source_path: path, source_relpath: string, source_size: any, source_mtime: string, source_hash: string] {
let source_size_int = ($source_size | into int)
let archive_path = (archive-path-for $note_id $source_hash $source_relpath)
cp $source_path $archive_path
let version_id = (new-version-id)
let seen_at = (now-iso)
let version_id_q = (sql-quote $version_id)
let note_id_q = (sql-quote $note_id)
let seen_at_q = (sql-quote $seen_at)
let archive_path_q = (sql-quote $archive_path)
let source_hash_q = (sql-quote $source_hash)
let source_mtime_q = (sql-quote $source_mtime)
let source_relpath_q = (sql-quote $source_relpath)
let sql = ([
"insert into versions (version_id, note_id, seen_at, archive_path, source_hash, source_size, source_mtime, source_relpath, ingest_result, session_path) values ("
$version_id_q
", "
$note_id_q
", "
$seen_at_q
", "
$archive_path_q
", "
$source_hash_q
", "
($source_size_int | into string)
", "
$source_mtime_q
", "
$source_relpath_q
", 'pending', null);"
] | str join '')
sql-run $sql | ignore
{
version_id: $version_id
seen_at: $seen_at
archive_path: $archive_path
}
}
def find-note-by-source [source_relpath: string] {
sql-json $"
select *
from notes
where source_relpath = (sql-quote $source_relpath)
limit 1;
"
}
def find-rename-candidate [source_hash: string] {
sql-json $"
select *
from notes
where current_source_hash = (sql-quote $source_hash)
and status != 'active'
and status != 'failed'
and status != 'conflict'
order by last_seen_at desc
limit 1;
"
}
def touch-note [note_id: string, source_size: any, source_mtime: string, status: string = 'active'] {
let source_size_int = ($source_size | into int)
let now_q = (sql-quote (now-iso))
let source_mtime_q = (sql-quote $source_mtime)
let status_q = (sql-quote $status)
let note_id_q = (sql-quote $note_id)
sql-run $"
update notes
set last_seen_at = ($now_q),
current_source_size = ($source_size_int),
current_source_mtime = ($source_mtime_q),
status = ($status_q)
where note_id = ($note_id_q);
"
| ignore
}
def process-existing [note: record, source: record] {
let title = $source.title
let note_id = ($note | get note_id)
let note_status = ($note | get status)
let source_size_int = ($source.source_size | into int)
if not (is-settled $source.source_mtime) {
touch-note $note_id $source_size_int $source.source_mtime $note_status
return
}
let previous_size = ($note.current_source_size? | default (-1))
let previous_mtime = ($note.current_source_mtime? | default '')
let size_changed = ($previous_size != $source_size_int)
let mtime_changed = ($previous_mtime != $source.source_mtime)
let needs_ingest = (($note.last_generated_source_hash? | default '') != ($note.current_source_hash? | default ''))
let hash_needed = ($note.current_source_hash? | default null) == null or $size_changed or $mtime_changed or ($note_status != 'active') or $needs_ingest
if not $hash_needed {
let now_q = (sql-quote (now-iso))
let title_q = (sql-quote $title)
let note_id_q = (sql-quote $note_id)
sql-run $"
update notes
set last_seen_at = ($now_q),
status = 'active',
title = ($title_q),
missing_since = null,
deleted_at = null
where note_id = ($note_id_q);
"
| ignore
return
}
let source_hash = (sha256 $source.source_path)
if ($source_hash == ($note.current_source_hash? | default '')) {
let now_q = (sql-quote (now-iso))
let title_q = (sql-quote $title)
let source_mtime_q = (sql-quote $source.source_mtime)
let note_id_q = (sql-quote $note_id)
let next_status = if $note_status == 'failed' { 'failed' } else { 'active' }
sql-run $"
update notes
set last_seen_at = ($now_q),
title = ($title_q),
status = (sql-quote $next_status),
missing_since = null,
deleted_at = null,
current_source_size = ($source_size_int),
current_source_mtime = ($source_mtime_q)
where note_id = ($note_id_q);
"
| ignore
let should_enqueue = ($note_status == 'failed' or (($note.last_generated_source_hash? | default '') != $source_hash))
if not $should_enqueue {
return
}
let archive_path = if (($note.current_archive_path? | default '') | str trim) == '' {
let version = (archive-and-version $note_id $source.source_path $source.source_relpath $source_size_int $source.source_mtime $source_hash)
let archive_path_q = (sql-quote $version.archive_path)
let version_id_q = (sql-quote $version.version_id)
sql-run $"
update notes
set current_archive_path = ($archive_path_q),
latest_version_id = ($version_id_q)
where note_id = ($note_id_q);
"
| ignore
$version.archive_path
} else {
$note.current_archive_path
}
let runtime_note = ($note | upsert source_path $source.source_path | upsert source_relpath $source.source_relpath | upsert output_path $note.output_path | upsert last_generated_output_hash ($note.last_generated_output_hash? | default null))
let retry_job_id = (enqueue-job $runtime_note 'upsert' $archive_path $source_hash $title)
if $retry_job_id != null {
let reason = if $note_status == 'failed' {
'retry-failed-note'
} else {
'missing-generated-output'
}
log-event $note_id 'job-reenqueued' {
job_id: $retry_job_id
reason: $reason
source_hash: $source_hash
archive_path: $archive_path
}
}
return
}
let version = (archive-and-version $note_id $source.source_path $source.source_relpath $source_size_int $source.source_mtime $source_hash)
let now_q = (sql-quote (now-iso))
let title_q = (sql-quote $title)
let source_hash_q = (sql-quote $source_hash)
let source_mtime_q = (sql-quote $source.source_mtime)
let archive_path_q = (sql-quote $version.archive_path)
let version_id_q = (sql-quote $version.version_id)
let note_id_q = (sql-quote $note_id)
sql-run $"
update notes
set last_seen_at = ($now_q),
title = ($title_q),
status = 'active',
missing_since = null,
deleted_at = null,
current_source_hash = ($source_hash_q),
current_source_size = ($source_size_int),
current_source_mtime = ($source_mtime_q),
current_archive_path = ($archive_path_q),
latest_version_id = ($version_id_q),
last_error = null
where note_id = ($note_id_q);
"
| ignore
let runtime_note = ($note | upsert source_path $source.source_path | upsert source_relpath $source.source_relpath | upsert output_path $note.output_path | upsert last_generated_output_hash ($note.last_generated_output_hash? | default null))
let _ = (enqueue-job $runtime_note 'upsert' $version.archive_path $source_hash $title)
log-event $note_id 'source-updated' {
source_relpath: $source.source_relpath
source_hash: $source_hash
archive_path: $version.archive_path
}
}
def process-new [source: record] {
if not (is-settled $source.source_mtime) {
return
}
let source_hash = (sha256 $source.source_path)
let source_size_int = ($source.source_size | into int)
let rename_candidates = (find-rename-candidate $source_hash)
if not ($rename_candidates | is-empty) {
let rename_candidate = ($rename_candidates | first)
let source_relpath_q = (sql-quote $source.source_relpath)
let title_q = (sql-quote $source.title)
let now_q = (sql-quote (now-iso))
let source_mtime_q = (sql-quote $source.source_mtime)
let note_id_q = (sql-quote $rename_candidate.note_id)
sql-run $"
update notes
set source_relpath = ($source_relpath_q),
title = ($title_q),
last_seen_at = ($now_q),
status = 'active',
missing_since = null,
deleted_at = null,
current_source_size = ($source_size_int),
current_source_mtime = ($source_mtime_q)
where note_id = ($note_id_q);
"
| ignore
log-event $rename_candidate.note_id 'source-renamed' {
from: $rename_candidate.source_relpath
to: $source.source_relpath
}
return
}
let note_id = (new-note-id)
let first_seen_at = (now-iso)
let output_path = (note-output-path $source.title)
let version = (archive-and-version $note_id $source.source_path $source.source_relpath $source_size_int $source.source_mtime $source_hash)
let note_id_q = (sql-quote $note_id)
let source_relpath_q = (sql-quote $source.source_relpath)
let title_q = (sql-quote $source.title)
let output_path_q = (sql-quote $output_path)
let first_seen_q = (sql-quote $first_seen_at)
let source_hash_q = (sql-quote $source_hash)
let source_mtime_q = (sql-quote $source.source_mtime)
let archive_path_q = (sql-quote $version.archive_path)
let version_id_q = (sql-quote $version.version_id)
let sql = ([
"insert into notes (note_id, source_relpath, title, output_path, status, first_seen_at, last_seen_at, current_source_hash, current_source_size, current_source_mtime, current_archive_path, latest_version_id) values ("
$note_id_q
", "
$source_relpath_q
", "
$title_q
", "
$output_path_q
", 'active', "
$first_seen_q
", "
$first_seen_q
", "
$source_hash_q
", "
($source_size_int | into string)
", "
$source_mtime_q
", "
$archive_path_q
", "
$version_id_q
");"
] | str join '')
sql-run $sql | ignore
let note = {
note_id: $note_id
source_relpath: $source.source_relpath
source_path: $source.source_path
output_path: $output_path
last_generated_output_hash: null
}
let _ = (enqueue-job $note 'upsert' $version.archive_path $source_hash $source.title)
log-event $note_id 'source-discovered' {
source_relpath: $source.source_relpath
source_hash: $source_hash
archive_path: $version.archive_path
output_path: $output_path
}
}
def mark-missing [seen_relpaths: list<string>] {
let notes = (sql-json 'select note_id, source_relpath, status, missing_since from notes;')
for note in $notes {
if ($seen_relpaths | any {|rel| $rel == $note.source_relpath }) {
continue
}
if $note.status == 'active' {
let missing_since = (now-iso)
let missing_since_q = (sql-quote $missing_since)
let note_id_q = (sql-quote $note.note_id)
sql-run $"
update notes
set status = 'source_missing',
missing_since = ($missing_since_q)
where note_id = ($note_id_q);
"
| ignore
log-event $note.note_id 'source-missing' {
source_relpath: $note.source_relpath
}
continue
}
if $note.status == 'source_missing' and ($note.missing_since? | default null) != null {
let missing_since = ($note.missing_since | into datetime)
if ((date now) - $missing_since) >= $delete_grace {
let deleted_at = (now-iso)
let deleted_at_q = (sql-quote $deleted_at)
let note_id_q = (sql-quote $note.note_id)
sql-run $"
update notes
set status = 'source_deleted',
deleted_at = ($deleted_at_q)
where note_id = ($note_id_q);
"
| ignore
log-event $note.note_id 'source-deleted' {
source_relpath: $note.source_relpath
}
}
}
}
}
def main [] {
ensure-layout
mut sources = (scan-source-files)
let unsettled = (
$sources
| each {|source|
{
source_path: $source.source_path
remaining: (settle-remaining $source.source_mtime)
}
}
| where remaining > 0sec
)
if not ($unsettled | is-empty) {
let max_remaining = ($unsettled | get remaining | math max)
print $"Waiting ($max_remaining) for recent Notability uploads to settle"
sleep ($max_remaining + 2sec)
$sources = (scan-source-files)
}
for source in $sources {
let existing_rows = (sql-json $"
select *
from notes
where source_relpath = (sql-quote $source.source_relpath)
limit 1;
")
if (($existing_rows | length) == 0) {
process-new $source
} else {
let existing = ($existing_rows | first)
process-existing ($existing | upsert source_path $source.source_path) $source
}
}
mark-missing ($sources | get source_relpath)
}

View File

@@ -0,0 +1,239 @@
#!/usr/bin/env nu
use ./lib.nu *
const script_dir = (path self | path dirname)
def latest-version [note_id: string] {
sql-json $"
select *
from versions
where note_id = (sql-quote $note_id)
order by seen_at desc
limit 1;
"
| first
}
def active-job-exists [note_id: string, source_hash: string] {
let rows = (sql-json $"
select job_id
from jobs
where note_id = (sql-quote $note_id)
and source_hash = (sql-quote $source_hash)
and status != 'done'
and status != 'failed'
limit 1;
")
not ($rows | is-empty)
}
def archive-current-source [note: record] {
if not ($note.source_path | path exists) {
error make {
msg: $"Current source path is missing: ($note.source_path)"
}
}
let source_hash = (sha256 $note.source_path)
let source_size = (((ls -l $note.source_path | first).size) | into int)
let source_mtime = (((ls -l $note.source_path | first).modified) | format date "%Y-%m-%dT%H:%M:%SZ")
let archive_path = (archive-path-for $note.note_id $source_hash $note.source_relpath)
cp $note.source_path $archive_path
let version_id = (new-version-id)
let seen_at = (now-iso)
let version_id_q = (sql-quote $version_id)
let note_id_q = (sql-quote $note.note_id)
let seen_at_q = (sql-quote $seen_at)
let archive_path_q = (sql-quote $archive_path)
let source_hash_q = (sql-quote $source_hash)
let source_mtime_q = (sql-quote $source_mtime)
let source_relpath_q = (sql-quote $note.source_relpath)
let insert_sql = ([
"insert into versions (version_id, note_id, seen_at, archive_path, source_hash, source_size, source_mtime, source_relpath, ingest_result, session_path) values ("
$version_id_q
", "
$note_id_q
", "
$seen_at_q
", "
$archive_path_q
", "
$source_hash_q
", "
($source_size | into string)
", "
$source_mtime_q
", "
$source_relpath_q
", 'pending', null);"
] | str join '')
sql-run $insert_sql | ignore
sql-run $"
update notes
set current_source_hash = (sql-quote $source_hash),
current_source_size = ($source_size),
current_source_mtime = (sql-quote $source_mtime),
current_archive_path = (sql-quote $archive_path),
latest_version_id = (sql-quote $version_id),
last_seen_at = (sql-quote (now-iso)),
status = 'active',
missing_since = null,
deleted_at = null
where note_id = (sql-quote $note.note_id);
"
| ignore
{
input_path: $archive_path
archive_path: $archive_path
source_hash: $source_hash
}
}
def enqueue-job [note: record, source_hash: string, input_path: string, archive_path: string, force_overwrite_generated: bool] {
if (active-job-exists $note.note_id $source_hash) {
let existing = (sql-json $"
select job_id
from jobs
where note_id = (sql-quote $note.note_id)
and source_hash = (sql-quote $source_hash)
and status != 'done'
and status != 'failed'
order by requested_at desc
limit 1;
" | first)
print $"Already queued: ($existing.job_id)"
return
}
let job_id = (new-job-id)
let requested_at = (now-iso)
let manifest_path = (manifest-path-for $job_id 'queued')
let result_path = (result-path-for $job_id)
let transcript_path = (transcript-path-for $note.note_id $job_id)
let session_dir = ([(sessions-root) $note.note_id $job_id] | path join)
mkdir $session_dir
let manifest = {
version: 1
job_id: $job_id
note_id: $note.note_id
operation: 'reingest'
requested_at: $requested_at
title: $note.title
source_relpath: $note.source_relpath
source_path: $note.source_path
input_path: $input_path
archive_path: $archive_path
output_path: $note.output_path
transcript_path: $transcript_path
result_path: $result_path
session_dir: $session_dir
source_hash: $source_hash
last_generated_output_hash: ($note.last_generated_output_hash? | default null)
force_overwrite_generated: $force_overwrite_generated
source_transport: 'webdav'
}
($manifest | to json --indent 2) | save -f $manifest_path
let job_id_q = (sql-quote $job_id)
let note_id_q = (sql-quote $note.note_id)
let requested_at_q = (sql-quote $requested_at)
let source_hash_q = (sql-quote $source_hash)
let manifest_path_q = (sql-quote $manifest_path)
let result_path_q = (sql-quote $result_path)
let sql = ([
"insert into jobs (job_id, note_id, operation, status, requested_at, source_hash, job_manifest_path, result_path) values ("
$job_id_q
", "
$note_id_q
", 'reingest', 'queued', "
$requested_at_q
", "
$source_hash_q
", "
$manifest_path_q
", "
$result_path_q
");"
] | str join '')
sql-run $sql | ignore
log-event $note.note_id 'reingest-enqueued' {
job_id: $job_id
source_hash: $source_hash
archive_path: $archive_path
force_overwrite_generated: $force_overwrite_generated
}
print $"Enqueued ($job_id) for ($note.note_id)"
let worker_script = ([ $script_dir 'worker.nu' ] | path join)
let worker_result = (^nu $worker_script --drain | complete)
if $worker_result.exit_code != 0 {
error make {
msg: $"worker drain failed: ($worker_result.stderr | str trim)"
}
}
}
def main [note_id: string, --latest-source, --latest-archive, --force-overwrite-generated] {
ensure-layout
let note_row = (sql-json $"
select *
from notes
where note_id = (sql-quote $note_id)
limit 1;
" | first)
let note = if $note_row == null {
null
} else {
$note_row | upsert source_path ([ (webdav-root) $note_row.source_relpath ] | path join)
}
if $note == null {
error make {
msg: $"Unknown note id: ($note_id)"
}
}
if $latest_source and $latest_archive {
error make {
msg: 'Choose only one of --latest-source or --latest-archive'
}
}
let source_mode = if $latest_source {
'source'
} else if $latest_archive {
'archive'
} else if ($note.status == 'active' and ($note.source_path | path exists)) {
'source'
} else {
'archive'
}
if $source_mode == 'source' {
let archived = (archive-current-source $note)
enqueue-job $note $archived.source_hash $archived.input_path $archived.archive_path $force_overwrite_generated
return
}
let version = (latest-version $note.note_id)
if $version == null {
error make {
msg: $"No archived version found for ($note.note_id)"
}
}
enqueue-job $note $version.source_hash $version.archive_path $version.archive_path $force_overwrite_generated
}

View File

@@ -0,0 +1,202 @@
#!/usr/bin/env nu
use ./lib.nu *
def format-summary [] {
let counts = (sql-json '
select status, count(*) as count
from notes
group by status
order by status;
')
let queue = (sql-json "
select status, count(*) as count
from jobs
where status in ('queued', 'running', 'failed')
group by status
order by status;
")
let lines = [
$"notes db: (db-path)"
$"webdav root: (webdav-root)"
$"notes root: (notes-root)"
''
'notes:'
]
let note_statuses = ('active,source_missing,source_deleted,conflict,failed' | split row ',')
let note_lines = (
$note_statuses
| each {|status|
let row = ($counts | where {|row| ($row | get 'status') == $status } | first)
let count = ($row.count? | default 0)
$" ($status): ($count)"
}
)
let job_statuses = ('queued,running,failed' | split row ',')
let job_lines = (
$job_statuses
| each {|status|
let row = ($queue | where {|row| ($row | get 'status') == $status } | first)
let count = ($row.count? | default 0)
$" ($status): ($count)"
}
)
($lines ++ $note_lines ++ ['' 'jobs:'] ++ $job_lines ++ ['']) | str join "\n"
}
def format-note [note_id: string] {
let note = (sql-json $"
select *
from notes
where note_id = (sql-quote $note_id)
limit 1;
" | first)
if $note == null {
error make {
msg: $"Unknown note id: ($note_id)"
}
}
let jobs = (sql-json $"
select job_id, operation, status, requested_at, started_at, finished_at, source_hash, error_summary
from jobs
where note_id = (sql-quote $note_id)
order by requested_at desc
limit 5;
")
let events = (sql-json $"
select ts, kind, details
from events
where note_id = (sql-quote $note_id)
order by ts desc
limit 10;
")
let output_exists = ($note.output_path | path exists)
let frontmatter = (parse-output-frontmatter $note.output_path)
let lines = [
$"note_id: ($note.note_id)"
$"title: ($note.title)"
$"status: ($note.status)"
$"source_relpath: ($note.source_relpath)"
$"output_path: ($note.output_path)"
$"output_exists: ($output_exists)"
$"managed_by: ($frontmatter.managed_by? | default '')"
$"frontmatter_note_id: ($frontmatter.note_id? | default '')"
$"current_source_hash: ($note.current_source_hash? | default '')"
$"last_generated_output_hash: ($note.last_generated_output_hash? | default '')"
$"current_archive_path: ($note.current_archive_path? | default '')"
$"last_processed_at: ($note.last_processed_at? | default '')"
$"missing_since: ($note.missing_since? | default '')"
$"deleted_at: ($note.deleted_at? | default '')"
$"conflict_path: ($note.conflict_path? | default '')"
$"last_error: ($note.last_error? | default '')"
''
'recent jobs:'
]
let job_lines = if ($jobs | is-empty) {
[' (none)']
} else {
$jobs | each {|job|
$" ($job.job_id) [($job.status)] ($job.operation) requested=($job.requested_at) error=($job.error_summary? | default '')"
}
}
let event_lines = if ($events | is-empty) {
[' (none)']
} else {
$events | each {|event|
$" ($event.ts) ($event.kind) ($event.details? | default '')"
}
}
($lines ++ $job_lines ++ ['' 'recent events:'] ++ $event_lines ++ ['']) | str join "\n"
}
def format-filtered [status: string, label: string] {
let notes = (sql-json $"
select note_id, title, source_relpath, output_path, status, last_error, conflict_path
from notes
where status = (sql-quote $status)
order by last_seen_at desc;
")
let header = [$label]
let body = if ($notes | is-empty) {
[' (none)']
} else {
$notes | each {|note|
let extra = if $status == 'conflict' {
$" conflict_path=($note.conflict_path? | default '')"
} else if $status == 'failed' {
$" last_error=($note.last_error? | default '')"
} else {
''
}
$" ($note.note_id) ($note.title) [($note.status)] source=($note.source_relpath) output=($note.output_path)($extra)"
}
}
($header ++ $body ++ ['']) | str join "\n"
}
def format-queue [] {
let jobs = (sql-json "
select job_id, note_id, operation, status, requested_at, started_at, error_summary
from jobs
where status in ('queued', 'running', 'failed')
order by requested_at asc;
")
let lines = if ($jobs | is-empty) {
['queue' ' (empty)' '']
} else {
['queue'] ++ ($jobs | each {|job|
$" ($job.job_id) note=($job.note_id) [($job.status)] ($job.operation) requested=($job.requested_at) error=($job.error_summary? | default '')"
}) ++ ['']
}
$lines | str join "\n"
}
def main [note_id?: string, --failed, --queue, --deleted, --conflicts] {
ensure-layout
if $queue {
print (format-queue)
return
}
if $failed {
print (format-filtered 'failed' 'failed notes')
return
}
if $deleted {
print (format-filtered 'source_deleted' 'deleted notes')
return
}
if $conflicts {
print (format-filtered 'conflict' 'conflict notes')
return
}
if $note_id != null {
print (format-note $note_id)
return
}
print (format-summary)
}

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env nu
use ./lib.nu *
const script_dir = (path self | path dirname)
def run-worker [] {
let worker_script = ([ $script_dir 'worker.nu' ] | path join)
let worker_result = (^nu $worker_script --drain | complete)
if $worker_result.exit_code != 0 {
print $"worker failed: ($worker_result.stderr | str trim)"
}
}
def run-sync [] {
let reconcile_script = ([ $script_dir 'reconcile.nu' ] | path join)
run-worker
let reconcile_result = (^nu $reconcile_script | complete)
if $reconcile_result.exit_code != 0 {
print $"reconcile failed: ($reconcile_result.stderr | str trim)"
return
}
run-worker
}
def main [] {
ensure-layout
let root = (webdav-root)
print $"Watching ($root) for Notability WebDAV updates"
run-sync
^inotifywait -m -r --format '%w%f' -e create -e close_write -e moved_to -e moved_from -e delete -e attrib $root
| lines
| each {|changed_path|
if not (is-supported-source-path $changed_path) {
return
}
print $"Filesystem event for ($changed_path)"
run-sync
}
}

View File

@@ -0,0 +1,36 @@
#!/usr/bin/env nu
use ./lib.nu *
def main [] {
ensure-layout
let root = (webdav-root)
let addr = if ('NOTABILITY_WEBDAV_ADDR' in ($env | columns)) {
$env.NOTABILITY_WEBDAV_ADDR
} else {
'127.0.0.1:9980'
}
let user = if ('NOTABILITY_WEBDAV_USER' in ($env | columns)) {
$env.NOTABILITY_WEBDAV_USER
} else {
'notability'
}
let baseurl = if ('NOTABILITY_WEBDAV_BASEURL' in ($env | columns)) {
$env.NOTABILITY_WEBDAV_BASEURL
} else {
'/'
}
let password_file = if ('NOTABILITY_WEBDAV_PASSWORD_FILE' in ($env | columns)) {
$env.NOTABILITY_WEBDAV_PASSWORD_FILE
} else {
error make {
msg: 'NOTABILITY_WEBDAV_PASSWORD_FILE is required'
}
}
let password = (open --raw $password_file | str trim)
print $"Starting WebDAV on ($addr), serving ($root), base URL ($baseurl)"
run-external rclone 'serve' 'webdav' $root '--addr' $addr '--baseurl' $baseurl '--user' $user '--pass' $password
}

View File

@@ -0,0 +1,501 @@
#!/usr/bin/env nu
use ./lib.nu *
const qmd_debounce = 1min
const idle_sleep = 10sec
const vision_model = 'openai-codex/gpt-5.4'
const transcribe_timeout = '90s'
const normalize_timeout = '60s'
def next-queued-job [] {
sql-json "
select job_id, note_id, operation, job_manifest_path, result_path, source_hash
from jobs
where status = 'queued'
order by requested_at asc
limit 1;
"
| first
}
def maybe-update-qmd [] {
let dirty = (qmd-dirty-file)
if not ($dirty | path exists) {
return
}
let modified = ((ls -l $dirty | first).modified)
if ((date now) - $modified) < $qmd_debounce {
return
}
print 'Running qmd update'
let result = (do {
cd (notes-root)
run-external qmd 'update' | complete
})
if $result.exit_code != 0 {
print $"qmd update failed: ($result.stderr | str trim)"
return
}
rm -f $dirty
}
def write-result [result_path: path, payload: record] {
mkdir ($result_path | path dirname)
($payload | to json --indent 2) | save -f $result_path
}
def error-message [error: any] {
let msg = (($error.msg? | default '') | into string)
if ($msg == '' or $msg == 'External command failed') {
$error | to nuon
} else {
$msg
}
}
def unquote [value?: any] {
if $value == null {
''
} else {
($value | into string | str replace -r '^"(.*)"$' '$1' | str replace -r "^'(.*)'$" '$1')
}
}
def source-format [file: path] {
(([$file] | path parse | first).extension? | default 'bin' | str downcase)
}
def conflict-path-for [output_path: path] {
let parsed = ([$output_path] | path parse | first)
let stamp = ((date now) | format date '%Y-%m-%dT%H-%M-%SZ')
[$parsed.parent $"($parsed.stem).conflict-($stamp).($parsed.extension)"] | path join
}
def find-managed-outputs [note_id: string] {
let root = (notes-root)
if not ($root | path exists) {
[]
} else {
(glob $"($root)/**/*.md")
| where not ($it | str contains '/.')
| where {|file|
let parsed = (parse-output-frontmatter $file)
(unquote ($parsed.managed_by? | default '')) == 'notability-ingest' and (unquote ($parsed.note_id? | default '')) == $note_id
}
| sort
}
}
def resolve-managed-output-path [note_id: string, configured_output_path: path] {
if ($configured_output_path | path exists) {
let parsed = (parse-output-frontmatter $configured_output_path)
let managed_by = (unquote ($parsed.managed_by? | default ''))
let frontmatter_note_id = (unquote ($parsed.note_id? | default ''))
if ($managed_by == 'notability-ingest' and $frontmatter_note_id == $note_id) {
return $configured_output_path
}
}
let discovered = (find-managed-outputs $note_id)
if ($discovered | is-empty) {
$configured_output_path
} else if (($discovered | length) == 1) {
$discovered | first
} else {
error make {
msg: $"Multiple managed note files found for ($note_id): (($discovered | str join ', '))"
}
}
}
def determine-write-target [manifest: record] {
let output_path = (resolve-managed-output-path $manifest.note_id $manifest.output_path)
if not ($output_path | path exists) {
return {
output_path: $output_path
write_path: $output_path
write_mode: 'create'
updated_main_output: true
}
}
let parsed = (parse-output-frontmatter $output_path)
let managed_by = (unquote ($parsed.managed_by? | default ''))
let frontmatter_note_id = (unquote ($parsed.note_id? | default ''))
if ($managed_by == 'notability-ingest' and $frontmatter_note_id == $manifest.note_id) {
return {
output_path: $output_path
write_path: $output_path
write_mode: 'overwrite'
updated_main_output: true
}
}
{
output_path: $output_path
write_path: (conflict-path-for $output_path)
write_mode: 'conflict'
updated_main_output: false
}
}
def build-markdown [manifest: record, normalized: string] {
let body = ($normalized | str trim)
let output_body = if $body == '' {
$"# ($manifest.title)"
} else {
$body
}
let created = ($manifest.requested_at | str substring 0..9)
let updated = ((date now) | format date '%Y-%m-%d')
[
'---'
$"title: ($manifest.title | to json)"
$"created: ($created | to json)"
$"updated: ($updated | to json)"
'source: "notability"'
$"source_transport: (($manifest.source_transport? | default 'webdav') | to json)"
$"source_relpath: ($manifest.source_relpath | to json)"
$"note_id: ($manifest.note_id | to json)"
'managed_by: "notability-ingest"'
$"source_file: ($manifest.archive_path | to json)"
$"source_file_hash: ($'sha256:($manifest.source_hash)' | to json)"
$"source_format: ((source-format $manifest.archive_path) | to json)"
'status: "active"'
'tags:'
' - handwritten'
' - notability'
'---'
''
$output_body
''
] | str join "\n"
}
def render-pages [input_path: path, job_id: string] {
let extension = (([$input_path] | path parse | first).extension? | default '' | str downcase)
if $extension == 'png' {
[ $input_path ]
} else if $extension == 'pdf' {
let render_dir = [(render-root) $job_id] | path join
mkdir $render_dir
let prefix = [$render_dir 'page'] | path join
^pdftoppm -png -r 200 $input_path $prefix
let pages = ((glob $"($render_dir)/*.png") | sort)
if ($pages | is-empty) {
error make {
msg: $"No PNG pages rendered from ($input_path)"
}
}
$pages
} else {
error make {
msg: $"Unsupported Notability input format: ($input_path)"
}
}
}
def call-pi [timeout_window: string, prompt: string, inputs: list<path>, thinking: string] {
let prompt_file = (^mktemp --suffix '.md' | str trim)
$prompt | save -f $prompt_file
let input_refs = ($inputs | each {|input| $'@($input)' })
let prompt_ref = $'@($prompt_file)'
let result = (try {
^timeout $timeout_window pi --model $vision_model --thinking $thinking --no-tools --no-session -p ...$input_refs $prompt_ref | complete
} catch {|error|
rm -f $prompt_file
error make {
msg: (error-message $error)
}
})
rm -f $prompt_file
let output = ($result.stdout | str trim)
if $output != '' {
$output
} else {
let stderr = ($result.stderr | str trim)
if $stderr == '' {
error make {
msg: $"pi returned no output (exit ($result.exit_code))"
}
} else {
error make {
msg: $"pi returned no output (exit ($result.exit_code)): ($stderr)"
}
}
}
}
def ingest-job [manifest: record] {
mkdir $manifest.session_dir
let page_paths = (render-pages $manifest.input_path $manifest.job_id)
let transcribe_prompt = ([
'Transcribe this note into clean Markdown.'
''
'Read it like a human and reconstruct the intended reading order and structure.'
''
'Do not preserve handwritten layout literally.'
''
'Handwritten line breaks, word stacking, font size changes, and spacing are not semantic structure by default.'
''
'If adjacent handwritten lines clearly belong to one sentence or short phrase, merge them into normal prose with spaces instead of separate Markdown lines.'
''
'Only keep separate lines or blank lines when there is clear evidence of separate paragraphs, headings, list items, checkboxes, or other distinct blocks.'
''
'Keep headings, lists, and paragraphs when they are genuinely present.'
''
'Do not summarize. Do not add commentary. Return Markdown only.'
] | str join "\n")
print $"Transcribing ($manifest.job_id) with page count ($page_paths | length)"
let transcript = (call-pi $transcribe_timeout $transcribe_prompt $page_paths 'low')
mkdir ($manifest.transcript_path | path dirname)
$"($transcript)\n" | save -f $manifest.transcript_path
let normalize_prompt = ([
'Rewrite the attached transcription into clean Markdown.'
''
'Preserve the same content and intended structure.'
''
'Collapse layout-only line breaks from handwriting.'
''
'If short adjacent lines are really one sentence or phrase, join them with spaces instead of keeping one line per handwritten row.'
''
'Use separate lines only for real headings, list items, checkboxes, or distinct paragraphs.'
''
'Do not summarize. Return Markdown only.'
] | str join "\n")
print $"Normalizing ($manifest.job_id)"
let normalized = (call-pi $normalize_timeout $normalize_prompt [ $manifest.transcript_path ] 'off')
let markdown = (build-markdown $manifest $normalized)
let target = (determine-write-target $manifest)
mkdir ($target.write_path | path dirname)
$markdown | save -f $target.write_path
{
success: true
job_id: $manifest.job_id
note_id: $manifest.note_id
archive_path: $manifest.archive_path
source_hash: $manifest.source_hash
session_dir: $manifest.session_dir
output_path: $target.output_path
output_hash: (if $target.updated_main_output { sha256 $target.write_path } else { null })
conflict_path: (if $target.write_mode == 'conflict' { $target.write_path } else { null })
write_mode: $target.write_mode
updated_main_output: $target.updated_main_output
transcript_path: $manifest.transcript_path
}
}
def mark-failure [job: record, running_path: string, error_summary: string, result?: any] {
let finished_at = (now-iso)
sql-run $"
update jobs
set status = 'failed',
finished_at = (sql-quote $finished_at),
error_summary = (sql-quote $error_summary),
job_manifest_path = (sql-quote (manifest-path-for $job.job_id 'failed'))
where job_id = (sql-quote $job.job_id);
update notes
set status = 'failed',
last_error = (sql-quote $error_summary)
where note_id = (sql-quote $job.note_id);
"
| ignore
if $result != null and ($result.archive_path? | default null) != null {
sql-run $"
update versions
set ingest_result = 'failed',
session_path = (sql-quote ($result.session_dir? | default ''))
where archive_path = (sql-quote $result.archive_path);
"
| ignore
}
let failed_path = (manifest-path-for $job.job_id 'failed')
if ($running_path | path exists) {
mv -f $running_path $failed_path
}
log-event $job.note_id 'job-failed' {
job_id: $job.job_id
error: $error_summary
}
}
def mark-success [job: record, running_path: string, result: record] {
let finished_at = (now-iso)
let note_status = if ($result.write_mode? | default 'write') == 'conflict' {
'conflict'
} else {
'active'
}
let output_path_q = (sql-quote ($result.output_path? | default null))
let output_hash_update = if ($result.updated_main_output? | default false) {
sql-quote ($result.output_hash? | default null)
} else {
'last_generated_output_hash'
}
let source_hash_update = if ($result.updated_main_output? | default false) {
sql-quote ($result.source_hash? | default null)
} else {
'last_generated_source_hash'
}
sql-run $"
update jobs
set status = 'done',
finished_at = (sql-quote $finished_at),
error_summary = null,
job_manifest_path = (sql-quote (manifest-path-for $job.job_id 'done'))
where job_id = (sql-quote $job.job_id);
update notes
set status = (sql-quote $note_status),
output_path = ($output_path_q),
last_processed_at = (sql-quote $finished_at),
last_generated_output_hash = ($output_hash_update),
last_generated_source_hash = ($source_hash_update),
conflict_path = (sql-quote ($result.conflict_path? | default null)),
last_error = null
where note_id = (sql-quote $job.note_id);
update versions
set ingest_result = 'success',
session_path = (sql-quote ($result.session_dir? | default ''))
where archive_path = (sql-quote $result.archive_path);
"
| ignore
let done_path = (manifest-path-for $job.job_id 'done')
if ($running_path | path exists) {
mv -f $running_path $done_path
}
^touch (qmd-dirty-file)
log-event $job.note_id 'job-finished' {
job_id: $job.job_id
write_mode: ($result.write_mode? | default 'write')
output_path: ($result.output_path? | default '')
conflict_path: ($result.conflict_path? | default '')
}
}
def recover-running-jobs [] {
let jobs = (sql-json "
select job_id, note_id, job_manifest_path, result_path
from jobs
where status = 'running'
order by started_at asc;
")
for job in $jobs {
let running_path = (manifest-path-for $job.job_id 'running')
let result = if ($job.result_path | path exists) {
open $job.result_path
} else {
null
}
mark-failure $job $running_path 'worker interrupted before completion' $result
}
}
def process-job [job: record] {
let running_path = (manifest-path-for $job.job_id 'running')
mv -f $job.job_manifest_path $running_path
sql-run $"
update jobs
set status = 'running',
started_at = (sql-quote (now-iso)),
job_manifest_path = (sql-quote $running_path)
where job_id = (sql-quote $job.job_id);
"
| ignore
print $"Processing ($job.job_id) for ($job.note_id)"
let manifest = (open $running_path)
try {
let result = (ingest-job $manifest)
write-result $job.result_path $result
mark-success $job $running_path $result
} catch {|error|
let message = (error-message $error)
let result = {
success: false
job_id: $manifest.job_id
note_id: $manifest.note_id
archive_path: $manifest.archive_path
source_hash: $manifest.source_hash
session_dir: $manifest.session_dir
error: $message
}
write-result $job.result_path $result
mark-failure $job $running_path $message $result
}
}
def drain-queued-jobs [] {
loop {
let job = (next-queued-job)
if $job == null {
maybe-update-qmd
break
}
process-job $job
maybe-update-qmd
}
}
def main [--drain] {
ensure-layout
recover-running-jobs
if $drain {
drain-queued-jobs
return
}
while true {
let job = (next-queued-job)
if $job == null {
maybe-update-qmd
sleep $idle_sleep
continue
}
process-job $job
maybe-update-qmd
}
}