Skip to main content

Structured Ingestion with DataModels

The ApertureDB Python SDK includes a Pydantic-based DataModels API that generates AQL queries from typed schemas. Use it when you want schema validation, readable ingestion code, or reusable models across multiple pipelines.

For simple one-off scripts, client.query() directly is fine. For production pipelines that ingest thousands of objects with consistent structure, DataModels reduces boilerplate and catches schema errors at definition time.


Defining a DataModel

Define your record as a Pydantic class. The SDK generates the correct AddImage + AddDescriptor + AddConnection commands automatically:

pip install -U aperturedb torch
pip install git+https://github.com/openai/CLIP.git
import torch
import clip
from PIL import Image
from aperturedb.DataModels import DescriptorDataModel, DescriptorSetDataModel, ImageDataModel
from aperturedb.Query import generate_add_query
from aperturedb.CommonLibrary import create_connector, execute_query

class FoodImageRecord(ImageDataModel):
embedding: DescriptorDataModel
dish_name: str
cuisine: str

device = "cuda" if torch.cuda.is_available() else "cpu"
clip_model, preprocess = clip.load("ViT-B/16", device=device)
client = create_connector()

# Create the DescriptorSet
descriptor_set = DescriptorSetDataModel(name="food_image_search", dimensions=512)
query, blobs, _ = generate_add_query(descriptor_set)
execute_query(client, query, blobs)

# Add a record — generate_add_query produces AddImage + AddDescriptor + AddConnection
dish_images = [
{"path": "butter_chicken.jpg", "name": "Butter Chicken", "cuisine": "Indian"},
{"path": "focaccia.jpg", "name": "Focaccia", "cuisine": "Italian"},
]

for dish in dish_images:
image = preprocess(Image.open(dish["path"])).unsqueeze(0).to(device)
with torch.no_grad():
embedding = clip_model.encode_image(image).squeeze()

record = FoodImageRecord(
url=dish["path"],
embedding=DescriptorDataModel(vector=embedding, set=descriptor_set),
dish_name=dish["name"],
cuisine=dish["cuisine"],
)
query, blobs, _ = generate_add_query(record)
execute_query(client, query, blobs)

generate_add_query inspects the model's type annotations to determine which AQL commands to generate. A field typed DescriptorDataModel becomes an AddDescriptor linked to the parent object.


Parallel Ingestion with DataModels

DataModels compose naturally with ParallelLoader for high-throughput ingestion. Define a generator that yields (query, blobs) tuples using generate_add_query, then pass it to the loader:

from aperturedb.ParallelLoader import ParallelLoader
from aperturedb.Query import generate_add_query

class ImageEmbeddingGenerator:
def __init__(self, records):
self.records = records

def __len__(self):
return len(self.records)

def __getitem__(self, idx):
if isinstance(idx, slice):
return [self[i] for i in range(*idx.indices(len(self)))]
record = self.records[idx]
# ... build FoodImageRecord, compute embedding ...
query, blobs, _ = generate_add_query(record)
return query, blobs

loader = ParallelLoader(client)
loader.ingest(ImageEmbeddingGenerator(all_records), batch_size=32, numthreads=4)
loader.print_stats()

See Bulk Embedding Ingestion for the full ParallelLoader pattern without DataModels.


Real-World Examples


What's Next