25ef33de7f
- Switch from sequential loop to /api/embed batch endpoint - Use 'input' array parameter instead of individual 'prompt' requests - Process in chunks of 32 to avoid quality degradation (issue #6262) - Reduces HTTP overhead: 128 texts = 4 requests instead of 128 - Maintains backward compatibility with embed() for single embeddings Ref: ollama/ollama#6262
235 lines
7.8 KiB
Python
235 lines
7.8 KiB
Python
"""Unified Ollama provider for embeddings and text generation."""
|
|
|
|
import logging
|
|
|
|
import httpx
|
|
|
|
from .base import Provider
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OllamaProvider(Provider):
|
|
"""
|
|
Ollama provider supporting both embeddings and text generation.
|
|
|
|
Supports TLS, SSL verification, and automatic model loading.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
embedding_model: str | None = None,
|
|
generation_model: str | None = None,
|
|
verify_ssl: bool = True,
|
|
timeout: httpx.Timeout | None = None,
|
|
):
|
|
"""
|
|
Initialize Ollama provider.
|
|
|
|
Args:
|
|
base_url: Ollama API base URL (e.g., https://ollama.internal.example.com:443)
|
|
embedding_model: Model for embeddings (e.g., "nomic-embed-text"). None disables embeddings.
|
|
generation_model: Model for text generation (e.g., "llama3.2:1b"). None disables generation.
|
|
verify_ssl: Verify SSL certificates (default: True)
|
|
timeout: HTTP timeout configuration
|
|
"""
|
|
self.base_url = base_url.rstrip("/")
|
|
self.embedding_model = embedding_model
|
|
self.generation_model = generation_model
|
|
self.verify_ssl = verify_ssl
|
|
|
|
if timeout is None:
|
|
timeout = httpx.Timeout(timeout=120, connect=5)
|
|
|
|
self.client = httpx.AsyncClient(verify=verify_ssl, timeout=timeout)
|
|
self._dimension: int | None = None # Detected dynamically for embeddings
|
|
|
|
logger.info(
|
|
f"Initialized Ollama provider: {base_url} "
|
|
f"(embedding_model={embedding_model}, generation_model={generation_model}, "
|
|
f"verify_ssl={verify_ssl})"
|
|
)
|
|
|
|
# Pre-check and auto-load models
|
|
if embedding_model:
|
|
self._check_model_is_loaded(embedding_model, autoload=True)
|
|
if generation_model:
|
|
self._check_model_is_loaded(generation_model, autoload=True)
|
|
|
|
@property
|
|
def supports_embeddings(self) -> bool:
|
|
"""Whether this provider supports embedding generation."""
|
|
return self.embedding_model is not None
|
|
|
|
@property
|
|
def supports_generation(self) -> bool:
|
|
"""Whether this provider supports text generation."""
|
|
return self.generation_model is not None
|
|
|
|
async def embed(self, text: str) -> list[float]:
|
|
"""
|
|
Generate embedding vector for text.
|
|
|
|
Args:
|
|
text: Input text to embed
|
|
|
|
Returns:
|
|
Vector embedding as list of floats
|
|
|
|
Raises:
|
|
NotImplementedError: If embeddings not enabled (no embedding_model)
|
|
"""
|
|
if not self.supports_embeddings:
|
|
raise NotImplementedError(
|
|
"Embedding not supported - no embedding_model configured"
|
|
)
|
|
|
|
response = await self.client.post(
|
|
f"{self.base_url}/api/embeddings",
|
|
json={"model": self.embedding_model, "prompt": text},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()["embedding"]
|
|
|
|
async def embed_batch(
|
|
self, texts: list[str], batch_size: int = 32
|
|
) -> list[list[float]]:
|
|
"""
|
|
Generate embeddings for multiple texts using Ollama's batch API.
|
|
|
|
Uses /api/embed endpoint with array input for efficient batch processing.
|
|
Conservative batch size (32) prevents quality degradation observed in
|
|
Ollama issue #6262 with larger batches.
|
|
|
|
Note: Ollama processes batches serially, not in parallel.
|
|
|
|
Args:
|
|
texts: List of texts to embed
|
|
batch_size: Maximum texts per batch (default: 32)
|
|
|
|
Returns:
|
|
List of vector embeddings
|
|
|
|
Raises:
|
|
NotImplementedError: If embeddings not enabled (no embedding_model)
|
|
"""
|
|
if not self.supports_embeddings:
|
|
raise NotImplementedError(
|
|
"Embedding not supported - no embedding_model configured"
|
|
)
|
|
|
|
all_embeddings = []
|
|
for i in range(0, len(texts), batch_size):
|
|
batch = texts[i : i + batch_size]
|
|
response = await self.client.post(
|
|
f"{self.base_url}/api/embed",
|
|
json={"model": self.embedding_model, "input": batch},
|
|
)
|
|
response.raise_for_status()
|
|
all_embeddings.extend(response.json()["embeddings"])
|
|
|
|
return all_embeddings
|
|
|
|
async def _detect_dimension(self):
|
|
"""
|
|
Detect embedding dimension by generating a test embedding.
|
|
|
|
This method queries the model to determine the actual dimension
|
|
instead of relying on hardcoded values.
|
|
"""
|
|
if self._dimension is None and self.supports_embeddings:
|
|
logger.debug(
|
|
f"Detecting embedding dimension for model {self.embedding_model}..."
|
|
)
|
|
test_embedding = await self.embed("test")
|
|
self._dimension = len(test_embedding)
|
|
logger.info(
|
|
f"Detected embedding dimension: {self._dimension} "
|
|
f"for model {self.embedding_model}"
|
|
)
|
|
|
|
def get_dimension(self) -> int:
|
|
"""
|
|
Get embedding dimension.
|
|
|
|
Returns:
|
|
Vector dimension for the configured embedding model
|
|
|
|
Raises:
|
|
NotImplementedError: If embeddings not enabled (no embedding_model)
|
|
RuntimeError: If dimension not detected yet (call _detect_dimension first)
|
|
"""
|
|
if not self.supports_embeddings:
|
|
raise NotImplementedError(
|
|
"Embedding not supported - no embedding_model configured"
|
|
)
|
|
|
|
if self._dimension is None:
|
|
raise RuntimeError(
|
|
f"Embedding dimension not detected yet for model {self.embedding_model}. "
|
|
"Call _detect_dimension() first or generate an embedding."
|
|
)
|
|
return self._dimension
|
|
|
|
async def generate(self, prompt: str, max_tokens: int = 500) -> str:
|
|
"""
|
|
Generate text from a prompt.
|
|
|
|
Args:
|
|
prompt: The prompt to generate from
|
|
max_tokens: Maximum tokens to generate
|
|
|
|
Returns:
|
|
Generated text
|
|
|
|
Raises:
|
|
NotImplementedError: If generation not enabled (no generation_model)
|
|
"""
|
|
if not self.supports_generation:
|
|
raise NotImplementedError(
|
|
"Text generation not supported - no generation_model configured"
|
|
)
|
|
|
|
response = await self.client.post(
|
|
f"{self.base_url}/api/generate",
|
|
json={
|
|
"model": self.generation_model,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"options": {
|
|
"num_predict": max_tokens,
|
|
"temperature": 0.7,
|
|
},
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data["response"]
|
|
|
|
def _check_model_is_loaded(self, model: str, autoload: bool = True):
|
|
"""
|
|
Check if model is loaded in Ollama, optionally auto-loading it.
|
|
|
|
Args:
|
|
model: Model name to check
|
|
autoload: Whether to automatically pull the model if not loaded
|
|
"""
|
|
response = httpx.get(f"{self.base_url}/api/tags")
|
|
response.raise_for_status()
|
|
|
|
models = [m["name"] for m in response.json().get("models", [])]
|
|
logger.info("Ollama has following models pre-loaded: %s", models)
|
|
|
|
if (model not in models) and autoload:
|
|
logger.warning(
|
|
"Model '%s' not yet available in ollama, attempting to pull now...",
|
|
model,
|
|
)
|
|
response = httpx.post(f"{self.base_url}/api/pull", json={"model": model})
|
|
response.raise_for_status()
|
|
|
|
async def close(self) -> None:
|
|
"""Close HTTP client."""
|
|
await self.client.aclose()
|