Skip to main content

ParallelQuery

execute_batch

def execute_batch(
q: Commands,
blobs: Blobs,
db: Connector,
success_statuses: list[int] = [0],
response_handler: Optional[Callable] = None,
commands_per_query: int = 1,
blobs_per_query: int = 0,
strict_response_validation: bool = False
) -> Tuple[int, CommandResponses, Blobs]

Execute a batch of queries, doing useful logging around it. Calls the response handler if provided. This should be used (without the parallel machinery) instead of db.query to keep the response handling consistent, better logging, etc.

Arguments:

  • q Commands - List of commands to execute.
  • blobs Blobs - List of blobs to send.
  • db Connector - The database connector.
  • success_statuses list[int], optional - The list of success statuses. Defaults to [0].
  • response_handler Callable, optional - The response handler. Defaults to None.
  • commands_per_query int, optional - The number of commands per query. Defaults to 1.
  • blobs_per_query int, optional - The number of blobs per query. Defaults to 0.
  • strict_response_validation bool, optional - Whether to strictly validate the response. Defaults to False.

Returns:

  • int - The result code.
    • 0 : if all commands succeeded
    • 1 : if there was -1 in the response
    • 2 : For any other code.
  • CommandReponses - The response.
  • Blobs - The blobs.

ParallelQuery Objects

class ParallelQuery(Parallelizer.Parallelizer)

Parallel and Batch Querier for ApertureDB

This class provides the abstraction for partitioning data into batches, so that they may be processed using different threads.

Arguments:

  • db Connector - The database connector.
  • dry_run bool, optional - Whether to run in dry run mode. Defaults to False.

generate_batch

def generate_batch(
data: List[Tuple[Commands, Blobs]]) -> Tuple[Commands, Blobs]

Here we flatten the individual queries to run them as a single query in a batch We also update the _ref values and connections refs.

Arguments:

  • data list[tuple[Query, Blobs]] - The data to be batched. Each tuple contains a list of commands and a list of blobs.

Returns:

  • commands Commands - The batched commands.
  • blobs Blobs - The batched blobs.

do_batch

def do_batch(db: Connector, data: List[Tuple[Commands, Blobs]]) -> None

Executes batch of queries and blobs in the database.

Arguments:

  • db Connector - The database connector.

  • data list[tuple[Commands, Blobs]] - The data to be batched. Each tuple contains a list of commands and a list of blobs.

    It also provides a way for invoking a user defined function to handle the responses of each of the queries executed. This function can be used to process the responses from each of the corresponding queries in Parallelizer It will be called once per query, and it needs to have 4 parameters:

    • requests
    • input_blobs
    • responses
    • output_blobs Example usage:
    class MyQueries(QueryGenerator):
def process_responses(requests, input_blobs, responses, output_blobs):
self.requests.extend(requests)
self.responses.extend(responses)
loader = ParallelLoader(self.db)
generator = MyQueries()
loader.ingest(generator)

query

def query(generator,
batchsize: int = 1,
numthreads: int = 4,
stats: bool = False) -> None

This function takes as input the data to be executed in specified number of threads. The generator yields a tuple : (array of commands, array of blobs)

Arguments:

  • generator type - The class that generates the queries to be executed.
  • batchsize int, optional - Number of queries per transaction. Defaults to 1.
  • numthreads int, optional - Number of parallel workers. Defaults to 4.
  • stats bool, optional - Show statistics at end of ingestion. Defaults to False.

debug_sample

def debug_sample(**kwargs) -> None

Sample the data to be ingested for debugging purposes.