diff --git a/tests/rag_evaluation/README.md b/tests/rag_evaluation/README.md index 1409a5e..3f571b2 100644 --- a/tests/rag_evaluation/README.md +++ b/tests/rag_evaluation/README.md @@ -116,6 +116,7 @@ uv run python tools/rag_eval_cli.py upload \ **Optional flags:** - `--category CATEGORY` - Custom category for notes (default: `nfcorpus_rag_eval`) - `--force-download` - Re-download nfcorpus dataset +- `--force` - Delete all existing notes in the target category before uploading (efficient corpus refresh) **Important:** This step requires: - A running Nextcloud instance with vector sync enabled diff --git a/tools/rag_eval_cli.py b/tools/rag_eval_cli.py index d3d8d74..30f14c9 100644 --- a/tools/rag_eval_cli.py +++ b/tools/rag_eval_cli.py @@ -411,19 +411,26 @@ def generate(provider: str, model: str | None, force_download: bool): is_flag=True, help="Force re-download of nfcorpus dataset", ) +@click.option( + "--force", + is_flag=True, + help="Delete all existing notes in the target category before uploading", +) def upload( nextcloud_url: str, username: str, password: str, category: str, force_download: bool, + force: bool, ): """Upload nfcorpus corpus documents as Nextcloud notes. This command: 1. Downloads nfcorpus dataset (if not already cached) - 2. Uploads all corpus documents as Nextcloud notes - 3. Saves document ID → note ID mapping to fixtures/note_mapping.json + 2. Optionally deletes existing notes in target category (--force) + 3. Uploads all corpus documents as Nextcloud notes + 4. Saves document ID → note ID mapping to fixtures/note_mapping.json The note mapping file is used by pytest tests to map expected document IDs to actual note IDs in Nextcloud. @@ -451,48 +458,89 @@ def upload( ) try: - # Upload documents - click.echo(f"\nUploading {len(corpus)} documents as notes...") + # Delete existing notes in category if force is specified + if force: + click.echo( + f"\n--force specified: Deleting existing notes in category '{category}'..." + ) + deleted_count = 0 + async for note in nc_client.notes.get_all_notes(): + if note.get("category") == category: + try: + await nc_client.notes.delete_note(note["id"]) + deleted_count += 1 + if deleted_count % 100 == 0: + click.echo(f" Deleted {deleted_count} notes...") + except Exception as e: + click.echo( + f" Error deleting note {note['id']}: {e}", err=True + ) + + click.echo( + f"Deleted {deleted_count} existing notes in category '{category}'" + ) + + # Upload documents concurrently + click.echo(f"\nUploading {len(corpus)} documents as notes (concurrent)...") click.echo(f"Category: {category}") note_mapping = {} uploaded_count = 0 + upload_errors = [] - for doc_id, doc in corpus.items(): - # Create note via Notes API - # Title includes doc ID for easy mapping - title = f"[{doc_id}] {doc['title'][:100]}" # Truncate long titles - content = doc["text"] + # Semaphore to limit concurrent uploads (avoid overwhelming server) + max_concurrent = 20 + semaphore = anyio.Semaphore(max_concurrent) - try: - note_data = await nc_client.notes.create_note( - title=title, - content=content, - category=category, - ) + async def upload_document(doc_id: str, doc: dict[str, Any]): + """Upload a single document as a note.""" + nonlocal uploaded_count - # Store mapping - note_id = note_data["id"] - note_mapping[doc_id] = note_id + async with semaphore: + title = f"[{doc_id}] {doc['title'][:100]}" # Truncate long titles + content = doc["text"] - uploaded_count += 1 - - # Progress indicator every 100 docs - if uploaded_count % 100 == 0: - click.echo( - f" Uploaded {uploaded_count}/{len(corpus)} documents..." + try: + note_data = await nc_client.notes.create_note( + title=title, + content=content, + category=category, ) - except Exception as e: - click.echo(f" Error uploading {doc_id}: {e}", err=True) + # Store mapping + note_id = note_data["id"] + note_mapping[doc_id] = note_id + + uploaded_count += 1 + + # Progress indicator every 100 docs + if uploaded_count % 100 == 0: + click.echo( + f" Uploaded {uploaded_count}/{len(corpus)} documents..." + ) + + except Exception as e: + error_msg = f"Error uploading {doc_id}: {e}" + upload_errors.append(error_msg) + click.echo(f" {error_msg}", err=True) + + # Upload all documents concurrently using task group + async with anyio.create_task_group() as tg: + for doc_id, doc in corpus.items(): + tg.start_soon(upload_document, doc_id, doc) click.echo(f"\nUploaded {uploaded_count} documents successfully") + if upload_errors: + click.echo( + f"Encountered {len(upload_errors)} errors during upload", err=True + ) # Save note mapping with open(NOTE_MAPPING_FILE, "w") as f: json.dump(note_mapping, f, indent=2) click.echo(f"Saved note mapping to: {NOTE_MAPPING_FILE}") + click.echo(f" Mapped {len(note_mapping)} document IDs to note IDs") finally: # Close the Nextcloud client