Add checkpoint/interrupt support to BLAKE3 hashing

- Add HashCheckpoint dataclass for saving/resuming interrupted hash computations
- compute_blake3_hash now accepts interrupt_check and checkpoint parameters
- Returns (digest, None) on completion or (None, checkpoint) on interruption
- Update ingest.py caller to handle new tuple return type

Amp-Thread-ID: https://ampcode.com/threads/T-019cbb0b-8563-7199-b628-33e3c4fe9f41
Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
Luke Mino-Altherr 2026-03-04 15:22:42 -08:00
parent 58582f1faf
commit ba30d76d36
2 changed files with 64 additions and 6 deletions

View File

@ -1,24 +1,80 @@
import io import io
import os import os
from typing import IO from dataclasses import dataclass
from typing import IO, Any, Callable
from blake3 import blake3 from blake3 import blake3
DEFAULT_CHUNK = 8 * 1024 * 1024 DEFAULT_CHUNK = 8 * 1024 * 1024
InterruptCheck = Callable[[], bool]
@dataclass
class HashCheckpoint:
"""Saved state for resuming an interrupted hash computation."""
bytes_processed: int
hasher: Any # blake3 hasher instance
def compute_blake3_hash( def compute_blake3_hash(
fp: str | IO[bytes], fp: str | IO[bytes],
chunk_size: int = DEFAULT_CHUNK, chunk_size: int = DEFAULT_CHUNK,
) -> str: interrupt_check: InterruptCheck | None = None,
checkpoint: HashCheckpoint | None = None,
) -> tuple[str | None, HashCheckpoint | None]:
"""Compute BLAKE3 hash of a file, with optional checkpoint support.
Args:
fp: File path or file-like object
chunk_size: Size of chunks to read at a time
interrupt_check: Optional callable that may block (e.g. while paused)
and returns True if the operation should be cancelled. Checked
between chunk reads.
checkpoint: Optional checkpoint to resume from (file paths only)
Returns:
Tuple of (hex_digest, None) on completion, or
(None, checkpoint) on interruption (file paths only), or
(None, None) on interruption of a file object
"""
if hasattr(fp, "read"): if hasattr(fp, "read"):
return _hash_file_obj(fp, chunk_size) digest = _hash_file_obj(fp, chunk_size, interrupt_check)
return digest, None
with open(os.fspath(fp), "rb") as f: with open(os.fspath(fp), "rb") as f:
return _hash_file_obj(f, chunk_size) if checkpoint is not None:
f.seek(checkpoint.bytes_processed)
h = checkpoint.hasher
bytes_processed = checkpoint.bytes_processed
else:
h = blake3()
bytes_processed = 0
if chunk_size <= 0:
chunk_size = DEFAULT_CHUNK
while True:
if interrupt_check is not None and interrupt_check():
return None, HashCheckpoint(
bytes_processed=bytes_processed,
hasher=h,
)
chunk = f.read(chunk_size)
if not chunk:
break
h.update(chunk)
bytes_processed += len(chunk)
return h.hexdigest(), None
def _hash_file_obj(file_obj: IO, chunk_size: int = DEFAULT_CHUNK) -> str: def _hash_file_obj(
file_obj: IO,
chunk_size: int = DEFAULT_CHUNK,
interrupt_check: InterruptCheck | None = None,
) -> str | None:
if chunk_size <= 0: if chunk_size <= 0:
chunk_size = DEFAULT_CHUNK chunk_size = DEFAULT_CHUNK
@ -37,6 +93,8 @@ def _hash_file_obj(file_obj: IO, chunk_size: int = DEFAULT_CHUNK) -> str:
try: try:
h = blake3() h = blake3()
while True: while True:
if interrupt_check is not None and interrupt_check():
return None
chunk = file_obj.read(chunk_size) chunk = file_obj.read(chunk_size)
if not chunk: if not chunk:
break break

View File

@ -244,7 +244,7 @@ def upload_from_temp_path(
expected_hash: str | None = None, expected_hash: str | None = None,
) -> UploadResult: ) -> UploadResult:
try: try:
digest = hashing.compute_blake3_hash(temp_path) digest, _ = hashing.compute_blake3_hash(temp_path)
except ImportError as e: except ImportError as e:
raise DependencyMissingError(str(e)) raise DependencyMissingError(str(e))
except Exception as e: except Exception as e: