ParallelQuery
execute_batch
def execute_batch(
q: Commands,
blobs: Blobs,
db: Connector,
success_statuses: list[int] = [0],
response_handler: Optional[Callable] = None,
commands_per_query: int = 1,
blobs_per_query: int = 0,
strict_response_validation: bool = False
) -> Tuple[int, CommandResponses, Blobs]
Execute a batch of queries, doing useful logging around it. Calls the response handler if provided. This should be used (without the parallel machinery) instead of db.query to keep the response handling consistent, better logging, etc.
Arguments:
q
Commands - List of commands to execute.blobs
Blobs - List of blobs to send.db
Connector - The database connector.success_statuses
list[int], optional - The list of success statuses. Defaults to [0].response_handler
Callable, optional - The response handler. Defaults to None.commands_per_query
int, optional - The number of commands per query. Defaults to 1.blobs_per_query
int, optional - The number of blobs per query. Defaults to 0.strict_response_validation
bool, optional - Whether to strictly validate the response. Defaults to False.
Returns:
int
- The result code.- 0 : if all commands succeeded
- 1 : if there was -1 in the response
- 2 : For any other code.
CommandReponses
- The response.Blobs
- The blobs.
ParallelQuery Objects
class ParallelQuery(Parallelizer.Parallelizer)
Parallel and Batch Querier for ApertureDB
This class provides the abstraction for partitioning data into batches, so that they may be processed using different threads.
Arguments:
db
Connector - The database connector.dry_run
bool, optional - Whether to run in dry run mode. Defaults to False.
generate_batch
def generate_batch(
data: List[Tuple[Commands, Blobs]]) -> Tuple[Commands, Blobs]
Here we flatten the individual queries to run them as a single query in a batch We also update the _ref values and connections refs.
Arguments:
data
list[tuple[Query, Blobs]] - The data to be batched. Each tuple contains a list of commands and a list of blobs.
Returns:
commands
Commands - The batched commands.blobs
Blobs - The batched blobs.
do_batch
def do_batch(db: Connector, data: List[Tuple[Commands, Blobs]]) -> None
Executes batch of queries and blobs in the database.
Arguments:
db
Connector - The database connector.data
list[tuple[Commands, Blobs]] - The data to be batched. Each tuple contains a list of commands and a list of blobs.It also provides a way for invoking a user defined function to handle the responses of each of the queries executed. This function can be used to process the responses from each of the corresponding queries in Parallelizer It will be called once per query, and it needs to have 4 parameters:
- requests
- input_blobs
- responses
- output_blobs Example usage:
class MyQueries(QueryGenerator):
def process_responses(requests, input_blobs, responses, output_blobs):
self.requests.extend(requests)
self.responses.extend(responses)
loader = ParallelLoader(self.db)
generator = MyQueries()
loader.ingest(generator)
query
def query(generator,
batchsize: int = 1,
numthreads: int = 4,
stats: bool = False) -> None
This function takes as input the data to be executed in specified number of threads. The generator yields a tuple : (array of commands, array of blobs)
Arguments:
generator
type - The class that generates the queries to be executed.batchsize
int, optional - Number of queries per transaction. Defaults to 1.numthreads
int, optional - Number of parallel workers. Defaults to 4.stats
bool, optional - Show statistics at end of ingestion. Defaults to False.
debug_sample
def debug_sample(**kwargs) -> None
Sample the data to be ingested for debugging purposes.