Data Transformation Workflows
Checkout our Quick Start guide to set your ApertureDB server and client environment if not already set up.
CRUD Operations on Cookbook Data
Data is bunch of objects your applications works with. We shall focus on CRUD (create, read, update, and delete) operations on Image objects.
For example, let's assume, our objects are a set of JPEG files in a folder. Here's a sample dataset to illustrate the flow. The following code snippet downloads and extracts the images in CookBook Dataset hosted by ApertureData.
mkdir --parents cookbook/extracted
curl --output cookbook/archive.zip https://storage.googleapis.com/ad-demos-datasets/cookbook/archive.zip
unzip cookbook/archive.zip -d cookbook/extracted
If you've setup your environment using our docker compose file, the rest of the code and commands here will work inside your Jupyter notebook.
Create the data in the database
The core of this code is the function gen_query
. It returns an AddImage command with corresponding contents from a file. See AddImage for more details.
from typing import Dict, Tuple, List
import os
from aperturedb.Utils import create_connector
from aperturedb.ParallelQuery import execute_batch
def gen_query(filename:str, dataset_name:str) -> Tuple[Dict, List]:
query = [
{
"AddImage": {
"properties": {
"filename": os.path.basename(filename),
"dataset": dataset_name
}
}
}
]
blob = None
with open(filename, "rb") as ins:
blob = ins.read()
return query, [blob]
db = create_connector()
for root, subdir, files in os.walk("cookbook/extracted"):
for file in files:
if file.endswith(".jpg") or file.endswith(".jpeg"):
query, blobs = gen_query(os.path.join(root, file), "cookbook")
result, response, op_blobs = execute_batch(db=db, q=query, blobs=blobs)
print(f"{file=} : {db.last_response=}")
- Each AddImage command adds properties [filename, dataset] to the Image it's adding. These will have a use later on.
- The query is an array of commands. AddImages have corresponding blobs, which is just raw image file content. A tuple of query and blobs is what is getting returned via gen_query
Read the data from the database
Once some images have been written into the DB, the next thing ApertureDB facilitates is reading them. We read the images off the DB with the FindImage command
import json
from aperturedb.Utils import create_connector
from aperturedb.ParallelQuery import execute_batch
query = [{
"FindImage": {
"blobs": True,
"constraints": {
"dataset" : ["==", "cookbook"]
},
"results": {
"all_properties": True
}
}
}]
db = create_connector()
result, response, op_blobs = execute_batch(db=db, q=query, blobs=[])
print(json.dumps(response, indent=2))
print(f"{len(op_blobs)=}")
- The constraints is a filtering condition that would be applied at the server side, to retrieve only the necessary information. This predicate here implies that only Images with property
dataset
set tocookbook
should be returned. - The response is a JSON object, along with blobs. The JSON object contains all the properties associated with each Image that is retrieved. The results section of FindImage command can be used to get counts, group counts, etc.
- Take a look at the return values from
execute_batch
call. Theop_blobs
is the list of buffers. They can be written to file system to recreate the input files.
Update the data in the DB.
It's a rarity that the data is spot on in the first pass. Often there's a need to iterate on the schema, values, etc.
Overview of UpdateImage
Let's exercise that feature of ApertureDB.
We update the data in 2 ways as follows.
- Update Properties
- Add Connections
Add more properties to Images
In the folder we extracted in the first step, there is a file called Metadata.csv
.
It has certain attributes, against each image that has been persisted in the DB.
The CSV file has a column for the image filename, which we saved as a property on the ImageObject
.
Also, execute_batch
accepts a response_handler
that can be used to process the results.
from typing import Dict, List
from aperturedb.Utils import create_connector
from aperturedb.ParallelQuery import execute_batch
import pandas as pd
query = [{
"FindImage": {
"constraints": {
"dataset" : ["==", "cookbook"]
},
"results": {
"list": ["filename"]
}
}
}]
df = pd.read_csv("cookbook/extracted/Metadata.csv")
db = create_connector()
def response_handler(
q,
input_blobs,
response: Dict,
blobs:List[bytes],
idx
) -> None:
all_entities = response[0]["FindImage"]["entities"]
def gen_command(filename:str):
more_properties=df[df["filename"]==filename].to_dict('records')
command = {
"UpdateImage": {
"constraints":{
"filename": ["==", filename]
},
"properties": more_properties[0]
}
}
return command
query = [gen_command(entity["filename"]) for entity in all_entities]
result, response, b = execute_batch(db=db, q=query, blobs=[])
print(f"{result=} {response=}")
result, response, op_blobs = execute_batch(db=db, q=query, blobs=[], response_handler=response_handler)
print(f"{result=} {response=}")
- UpdateImage command can also remove properties using remove_props.
- more_properties is a look up the rest of the data from the CSV available, based on the matching filename column.
Add some connections to Images
ApertureDB stores the data in a graph representation. A graph has nodes and edges. Here the images are nodes. There can also be arbitrarily-classed entities in ApertureDB. An edge or "connection" indicates a relation between two nodes.
from typing import Dict, List
from aperturedb.Utils import create_connector
from aperturedb.ParallelQuery import execute_batch
import pandas as pd
query = [{
"FindImage": {
"constraints": {
"dataset" : ["==", "cookbook"]
},
"results": {
"list": ["filename"]
}
}
}]
df = pd.read_csv("cookbook/extracted/Metadata.csv")
db = create_connector()
def response_handler(
q,
input_blobs,
response: Dict,
blobs:List[bytes],
idx
) -> None:
all_entities = response[0]["FindImage"]["entities"]
def gen_command(filename:str):
more_properties=df[df["filename"]==filename].to_dict('records')[0]
print(f"{filename=} : {more_properties=}")
commands = [
{
"FindImage": {
"constraints":{
"filename": ["==", filename]
},
"_ref": 1
}
},
{
"AddEntity": {
"class": "Cuisine",
"if_not_found": {
"name": ["==", more_properties["food_tags"]]
},
"properties": {
"name": more_properties["food_tags"],
"dataset": "cookbook"
},
"_ref": 2
}
},
{
"AddConnection": {
"src": 2,
"dst": 1,
"class": "IsFoodType"
}
}
]
return commands
for entity in all_entities:
query = gen_command(entity["filename"])
result, response, b = execute_batch(db=db, q=query, blobs=[])
print(f"{result=} {response=}")
result, response, op_blobs = execute_batch(db=db, q=query, blobs=[], response_handler=response_handler)
print(f"{result=} {response=}")
- _ref is a way to declare a reference for use later.
- AddConnection takes 2 refs, and makes a connection between them. A connection can also have associated properties.
- if_not_found makes the AddEntity work like a conditional add (similar to primart key constraints in relational database).
Delete the data from the DB.
Let's cleanup after our modifications above. Again, there's a query to accomplish the same.
from aperturedb.Utils import create_connector
from aperturedb.ParallelQuery import execute_batch
query = [{
"DeleteImage": {
"constraints": {
"dataset": ["==", "cookbook"]
}
}
},{
"DeleteEntity": {
"constraints": {
"dataset": ["==", "cookbook"]
}
}
}]
db = create_connector()
result, response, output_blobs = execute_batch(db=db, q=query, blobs=[])
print(db.last_response)
Transformations While Loading
All of the ingestion methods described earlier can be updated to run additional steps to either filter out bad data (the definition depends on your use case) or to introduce additional, derived data e.g. embeddings when adding images or text. The easiest way to test this out is with some command line adb examples as shown below:
adb ingest from-csv dishes.adb.csv --ingest-type IMAGE --transformer image_properties --transformer clip_pytorch_embeddings
In the example above, the predefined transformation function "image_properties" introduces new metadata in the images being ingested from the CSV file (e.g. adb_width, adb_height about the images), and "clip_pytorch_embeddings" defines the logic to run embedding generation logic on every image and add an embedding when adding an image (along with a connection between the image and the embedding).