This tutorial demonstrates how to use Feast with Docling and Milvus to build a Retrieval Augmented Generation (RAG) application. You'll learn how to store document embeddings in Feast and retrieve the most relevant documents for a given query.
RAG is a technique that combines generative models (e.g., LLMs) with retrieval systems to generate contextually relevant output for a particular goal (e.g., question and answering). Feast makes it easy to store and retrieve document embeddings for RAG applications by providing integrations with vector databases like Milvus.
The typical RAG process involves:
Sourcing text data relevant for your application
Transforming each text document into smaller chunks of text
Transforming those chunks of text into embeddings
Inserting those chunks of text along with some identifier for the chunk and document in a database
Retrieving those chunks of text along with the identifiers at run-time to inject that text into the LLM's context
Calling some API to run inference with your LLM to generate contextually relevant output
Returning the output to some end user
Prerequisites
Python 3.10 or later
Feast installed with Milvus support: pip install feast[milvus, nlp]
A basic understanding of feature stores and vector embeddings
Step 0: Download, Compute, and Export the Docling Sample Dataset
Step 1: Configure Milvus in Feast
Create a feature_store.yaml file with the following configuration:
Step 2: Define your Data Sources and Views
Create a feature_repo.py file to define your entities, data sources, and feature views:
Step 3: Update your Registry
Apply the feature view definitions to the registry:
Step 4: Ingest your Data
Process your documents, generate embeddings, and ingest them into the Feast online store:
Step 5: Retrieve Relevant Documents
Now you can retrieve the most relevant documents for a given query:
Step 6: Use Retrieved Documents for Generation
Finally, you can use the retrieved documents as context for an LLM:
Alternative: Using DocEmbedder for Simplified Ingestion
Instead of manually chunking, embedding, and writing documents as shown above, you can use Feast's DocEmbedder class to handle the entire pipeline in a single step. DocEmbedder automates chunking, embedding generation, FeatureView creation, and writing to the online store.
Install Dependencies
Set Up and Ingest with DocEmbedder
Retrieve and Query
Once documents are ingested, you can retrieve them the same way as shown in Step 5 above:
Customizing the Pipeline
DocEmbedder is extensible at every stage. Below are examples of how to create custom components and wire them together.
Custom Chunker
Subclass BaseChunker to implement your own chunking strategy. The load_parse_and_chunk method receives each document and must return a list of chunk dictionaries.
Or simply configure the built-in TextChunker:
Custom Embedder
Subclass BaseEmbedder to use a different embedding model. Register modality handlers in _register_default_modalities and implement the embed method.
Custom Logical Layer Function
The schema transform function transforms the chunked + embedded DataFrame into the exact schema your FeatureView expects. It must accept a pd.DataFrame and return a pd.DataFrame.
Putting It All Together
Pass your custom components to DocEmbedder:
Note: When using a custom schema_transform_fn, ensure the returned DataFrame columns match your FeatureView schema. When using a custom embedder with a different output dimension, set vector_length accordingly (or let it auto-detect via get_embedding_dim).
from datetime import timedelta
import pandas as pd
from feast import (
FeatureView,
Field,
FileSource,
Entity,
RequestSource,
)
from feast.data_format import ParquetFormat
from feast.types import Float64, Array, String, ValueType, PdfBytes
from feast.on_demand_feature_view import on_demand_feature_view
from sentence_transformers import SentenceTransformer
from typing import Dict, Any, List
import hashlib
from docling.datamodel.base_models import DocumentStream
import io
from docling.document_converter import DocumentConverter
from transformers import AutoTokenizer
from sentence_transformers import SentenceTransformer
from docling.chunking import HybridChunker
# Load tokenizer and embedding model
EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"
MAX_TOKENS = 64 # Small token limit for demonstration
tokenizer = AutoTokenizer.from_pretrained(EMBED_MODEL_ID)
embedding_model = SentenceTransformer(EMBED_MODEL_ID)
chunker = HybridChunker(tokenizer=tokenizer, max_tokens=MAX_TOKENS, merge_peers=True)
def embed_text(text: str) -> list[float]:
"""Generate an embedding for a given text."""
return embedding_model.encode([text], normalize_embeddings=True).tolist()[0]
def generate_chunk_id(file_name: str, raw_chunk_markdown: str="") -> str:
"""Generate a unique chunk ID based on file_name and raw_chunk_markdown."""
unique_string = f"{file_name}-{raw_chunk_markdown}" if raw_chunk_markdown != "" else f"{file_name}"
return hashlib.sha256(unique_string.encode()).hexdigest()
# Define entities
chunk = Entity(
name="chunk_id",
description="Chunk ID",
value_type=ValueType.STRING,
join_keys=["chunk_id"],
)
document = Entity(
name="document_id",
description="Document ID",
value_type=ValueType.STRING,
join_keys=["document_id"],
)
source = FileSource(
file_format=ParquetFormat(),
path="./data/docling_samples.parquet",
timestamp_field="created",
)
input_request_pdf = RequestSource(
name="pdf_request_source",
schema=[
Field(name="document_id", dtype=String),
Field(name="pdf_bytes", dtype=PdfBytes),
Field(name="file_name", dtype=String),
],
)
# Define the view for retrieval
docling_example_feature_view = FeatureView(
name="docling_feature_view",
entities=[chunk],
schema=[
Field(name="file_name", dtype=String),
Field(name="raw_chunk_markdown", dtype=String),
Field(
name="vector",
dtype=Array(Float64),
vector_index=True,
vector_search_metric="COSINE",
),
Field(name="chunk_id", dtype=String),
],
source=source,
ttl=timedelta(hours=2),
)
@on_demand_feature_view(
entities=[chunk, document],
sources=[input_request_pdf],
schema=[
Field(name="document_id", dtype=String),
Field(name="chunk_id", dtype=String),
Field(name="chunk_text", dtype=String),
Field(
name="vector",
dtype=Array(Float64),
vector_index=True,
vector_search_metric="L2",
),
],
mode="python",
write_to_online_store=True,
singleton=True,
)
def docling_transform_docs(inputs: dict[str, Any]):
document_ids, chunks, embeddings, chunk_ids = [], [], [], []
buf = io.BytesIO(
inputs["pdf_bytes"],
)
doc_source = DocumentStream(name=inputs["file_name"], stream=buf)
converter = DocumentConverter()
result = converter.convert(doc_source)
for i, chunk in enumerate(chunker.chunk(dl_doc=result.document)):
raw_chunk = chunker.serialize(chunk=chunk)
embedding = embed_text(raw_chunk)
chunk_id = f"chunk-{i}"
document_ids.append(inputs["document_id"])
chunks.append(raw_chunk)
chunk_ids.append(chunk_id)
embeddings.append(embedding)
return {
"document_id": document_ids,
"chunk_id": chunk_ids,
"vector": embeddings,
"chunk_text": chunks,
}
feast apply
import pandas as pd
from feast import FeatureStore
store = FeatureStore(repo_path=".")
df = pd.read_parquet("./data/docling_samples.parquet")
mdf = pd.read_parquet("./data/metadata_samples.parquet")
df['chunk_embedding'] = df['vector'].apply(lambda x: x.tolist())
embedding_length = len(df['vector'][0])
print(f'embedding length = {embedding_length}')
df['created'] = pd.Timestamp.now()
mdf['created'] = pd.Timestamp.now()
# Ingesting transformed data to the feature view that has no associated transformation
store.write_to_online_store(feature_view_name='docling_feature_view', df=df)
# Turning off transformation on writes is as simple as changing the default behavior
store.write_to_online_store(
feature_view_name='docling_transform_docs',
df=df[df['document_id']!='doc-1'],
transform_on_write=False,
)
# Now we can transform a raw PDF on the fly
store.write_to_online_store(
feature_view_name='docling_transform_docs',
df=mdf[mdf['document_id']=='doc-1'],
transform_on_write=True, # this is the default
)
from feast import FeatureStore
# Initialize FeatureStore
store = FeatureStore(".")
# Generate query embedding
question = 'Who are the authors of the paper?'
query_embedding = embed_text(question)
# Retrieve similar documents
context_data = store.retrieve_online_documents_v2(
features=[
"docling_feature_view:vector",
"docling_feature_view:file_name",
"docling_feature_view:raw_chunk_markdown",
"docling_feature_view:chunk_id",
],
query=query_embedding,
top_k=3,
distance_metric='COSINE',
).to_df()
print(context_data)
from openai import OpenAI
import os
client = OpenAI(
api_key=os.environ.get("OPENAI_API_KEY"),
)
# Format documents for context
def format_documents(context_data, base_prompt):
documents = "\n".join([f"Document {i+1}: {row['embedded_documents__sentence_chunks']}"
for i, row in context_data.iterrows()])
return f"{base_prompt}\n\nContext documents:\n{documents}"
BASE_PROMPT = """You are a helpful assistant that answers questions based on the provided context."""
FULL_PROMPT = format_documents(context_data, BASE_PROMPT)
# Generate response
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": FULL_PROMPT},
{"role": "user", "content": query_embedding}
],
)
print('\n'.join([c.message.content for c in response.choices]))
pip install feast[milvus,rag]
from feast import DocEmbedder
import pandas as pd
# Prepare your documents as a DataFrame
df = pd.DataFrame({
"id": ["doc1", "doc2", "doc3"],
"text": [
"Aaron is a prophet, high priest, and the brother of Moses...",
"God at Sinai granted Aaron the priesthood for himself...",
"His rod turned into a snake. Then he stretched out...",
],
})
# DocEmbedder handles everything: generates FeatureView, applies repo,
# chunks text, generates embeddings, and writes to the online store
embedder = DocEmbedder(
repo_path="feature_repo/",
feature_view_name="text_feature_view",
)
result = embedder.embed_documents(
documents=df,
id_column="id",
source_column="text",
column_mapping=("text", "text_embedding"),
)
from feast import FeatureStore
store = FeatureStore("feature_repo/")
query_embedding = embed_text("Who are the authors of the paper?")
context_data = store.retrieve_online_documents_v2(
features=[
"text_feature_view:embedding",
"text_feature_view:text",
"text_feature_view:source_id",
],
query=query_embedding,
top_k=3,
distance_metric="COSINE",
).to_df()
from feast.chunker import BaseChunker, ChunkingConfig
from typing import Any, Optional
class SentenceChunker(BaseChunker):
"""Chunks text by sentences instead of word count."""
def load_parse_and_chunk(
self,
source: Any,
source_id: str,
source_column: str,
source_type: Optional[str] = None,
) -> list[dict]:
import re
text = str(source)
# Split on sentence boundaries
sentences = re.split(r'(?<=[.!?])\s+', text)
chunks = []
current_chunk = []
chunk_index = 0
for sentence in sentences:
current_chunk.append(sentence)
combined = " ".join(current_chunk)
if len(combined.split()) >= self.config.chunk_size:
chunks.append({
"chunk_id": f"{source_id}_{chunk_index}",
"original_id": source_id,
source_column: combined,
"chunk_index": chunk_index,
})
# Keep overlap by retaining the last sentence
current_chunk = [sentence]
chunk_index += 1
# Don't forget the last chunk
if current_chunk and len(" ".join(current_chunk).split()) >= self.config.min_chunk_size:
chunks.append({
"chunk_id": f"{source_id}_{chunk_index}",
"original_id": source_id,
source_column: " ".join(current_chunk),
"chunk_index": chunk_index,
})
return chunks