JsonStreamWriter and ExecutorProvider

I couldn't find good explanation how JsonStreamWriter is using the provided ExecutionService.
In Java.

What is the best practice?

What is the pool size I need to set?
If I have multiple active JsonStreamWriters, do I need multiples ExecutionServices?

Solved Solved
0 5 67
1 ACCEPTED SOLUTION

The ExecutorProvider allows for more granular control over the threads used by JsonStreamWriter. This feature is particularly useful for managing asynchronous operations in a more controlled and efficient way.

By using an ExecutorProvider, you can customize the properties of the thread pool, such as:

  • Number of threads
  • Thread naming
  • Thread priorities
  • Thread factory (how threads are created)

This level of customization allows for better resource management tailored to your application's needs.

JsonStreamWriter primarily uses threads from the ExecutorProvider's pool for:

  • Network calls: Sending data to BigQuery
  • Retry logic: Handling failures and retrying operations

These are typically I/O-bound tasks, where the thread spends most of its time waiting for network responses.

Example

 
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
    .setExecutorThreadCount(4)       // Number of threads
    .setThreadFactory(r -> { 
        Thread thread = new Executors.defaultThreadFactory().newThread(r);
        thread.setDaemon(true);  
        return thread;
    })
    .build();

JsonStreamWriter writer = JsonStreamWriter.newBuilder(tableId)
    .setExecutorProvider(executorProvider)
    .build(); 

Your question about the addCallback() method potentially being evaluated after the append() method completes is a valid concern in asynchronous programming.

In Java, ApiFutures.addCallback attaches a callback to an ApiFuture. If the future is already completed by the time addCallback is called, the callback will be triggered immediately.

In your case, using MoreExecutors.directExecutor() means the callback runs in the same thread that completed the future, potentially the network I/O thread.

Best Practice: Separate Executor for Callbacks

To avoid potential issues with the main thread, especially in GUI or service-heavy applications, it's safer to use a separate executor for the callbacks:

 
ExecutorService callbackExecutor = Executors.newCachedThreadPool();
ApiFutures.addCallback(future, new AppendCompleteCallback(this, appendContext), callbackExecutor); 

This ensures callbacks are handled by a dedicated thread pool, preventing the main thread from being blocked and keeping your application responsive.

View solution in original post

5 REPLIES 5

The integration of JsonStreamWriter with ExecutorService allows for asynchronous data write operations, enhancing application responsiveness and throughput. By submitting write tasks asynchronously, your main thread remains free from blocking on each write completion. This is particularly useful when handling large datasets or high write volumes.

It is crucial to implement robust error handling within the tasks submitted to ExecutorService. Using try-catch blocks within your Runnable or Callable tasks will help manage exceptions during the append operations, such as network issues or rate limit errors. This setup not only allows for retries but also aids in implementing appropriate logging and error handling strategies.

The size of the thread pool should be slightly larger than the number of concurrent writers you expect to have. However, the optimal pool size really depends on your specific workload, data volume, and available system resources. Regular monitoring of write latency, CPU utilization, and queue length will guide you to adjust the pool size for the best performance.

If you have multiple active JsonStreamWriters, whether or not you need multiple ExecutorServices depends on your application’s architecture and requirements. Sharing a single ExecutorService among multiple writers can be efficient, but if isolated resource allocation or specific performance metrics per writer are needed, using separate ExecutorServices may be beneficial.

For situations where data generation outpaces BigQuery's ingestion capability, implementing backpressure mechanisms such as dynamic thread pool adjustments or bounded queues might be necessary. Monitoring and observability are also key—ensure you are tracking metrics like write latency, throughput, error rates, and resource utilization to continually optimize performance.

Here is an example snippet for handling asynchronous operations, including error management and retries:

Here's the formatted code snippet with explanations to make it easier to understand:

 
ExecutorService executor = Executors.newFixedThreadPool(poolSize); // Customize the pool size

executor.submit(() -> {
    try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(tableName, tableSchema).build()) {
        JSONArray rows = new JSONArray();
        boolean successful = false;
        
        while (!successful) { 
            try {
                ApiFuture<AppendRowsResponse> future = writer.append(rows);
                AppendRowsResponse response = future.get(); 
                // Handle response (e.g., print success message)
                successful = true; // Exit loop 
            } catch (Exception e) {
                // Log the error (e.g., using a logging framework)
                // Implement your retry logic (e.g., exponential backoff) 
                // Ensure you have a mechanism to avoid infinite retries 
            }
        }
    } catch (Exception e) {
        // Handle errors during writer initialization 
    }
});

Key Points and Explanations

  1. ExecutorService: This is used for managing a thread pool, allowing concurrent execution of tasks (in this case, the asynchronous operation).

  2. JsonStreamWriter (try-with-resources): The try (JsonStreamWriter writer = ...) construct ensures the writer is automatically closed when the block completes, even if exceptions occur.

  3. Error Handling:

    • The try-catch blocks within the loop handle errors during the asynchronous operation (e.g., network issues, timeouts).
    • The outer try-catch block handles errors during the setup or initialization of the writer.
    • You'll need to implement your own logging and retry logic within these blocks.
  4. Retry Logic:

    • The while (!successful) loop keeps retrying the operation until it succeeds.
    • Important: Make sure to include retry limits or backoff strategies to prevent infinite retries in the face of persistent failures.

Customization and Flexibility

  • poolSize: Tailor the size of your thread pool according to your application's expected load.
  • tableName and tableSchema: Replace these with your actual table identifiers.
  • Error Handling: Implement robust logging and custom retry mechanisms.
  • Result Handling: You might want to process the AppendRowsResponse directly within the try block or use .addListener for asynchronous handling.

Example of Error Logging and Retry

 
} catch (Exception e) {
    logger.error("Error appending rows: {}", e.getMessage());
    // Exponential backoff (increasing delay between retries)
    Thread.sleep(2 ^ retryCount * 1000);  // 2, 4, 8, 16 seconds, etc.
    retryCount++;
}

thank you for such detailed response. 

Your approach , in the snippet, is a bit different then I intendent,
JsonSteamWriter has a built-in support for ExecutorProvider.

    /**
     * Setter for the underlying StreamWriter's ExecutorProvider.
     *
     * @PARAM executorProvider
     * @return
     */
    public Builder setExecutorProvider(ExecutorProvider executorProvider) {
      this.schemaAwareStreamWriterBuilder.setExecutorProvider(executorProvider);
      return this;
    }

SO my question is fouces on that one.

What is the best practice for this one?
how does it utilize the threads from that pool? Can i tweak it?

also, from the example in your git:
https://github.com/googleapis/java-bigquerystorage/blob/d143c65b6c92b98d321408458a6b2bc18e42593b/sam...

 // Append asynchronously for increased throughput.
      ApiFuture<AppendRowsResponse> future = streamWriter.append(appendContext.data);
      ApiFutures.addCallback(
          future, new AppendCompleteCallback(this, appendContext), MoreExecutors.directExecutor());

Call back is added after task is submitted, but what will happen if addCallback() line is evaluated after append() method already completes! (theoretically it can happen)

The ExecutorProvider allows for more granular control over the threads used by JsonStreamWriter. This feature is particularly useful for managing asynchronous operations in a more controlled and efficient way.

By using an ExecutorProvider, you can customize the properties of the thread pool, such as:

  • Number of threads
  • Thread naming
  • Thread priorities
  • Thread factory (how threads are created)

This level of customization allows for better resource management tailored to your application's needs.

JsonStreamWriter primarily uses threads from the ExecutorProvider's pool for:

  • Network calls: Sending data to BigQuery
  • Retry logic: Handling failures and retrying operations

These are typically I/O-bound tasks, where the thread spends most of its time waiting for network responses.

Example

 
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
    .setExecutorThreadCount(4)       // Number of threads
    .setThreadFactory(r -> { 
        Thread thread = new Executors.defaultThreadFactory().newThread(r);
        thread.setDaemon(true);  
        return thread;
    })
    .build();

JsonStreamWriter writer = JsonStreamWriter.newBuilder(tableId)
    .setExecutorProvider(executorProvider)
    .build(); 

Your question about the addCallback() method potentially being evaluated after the append() method completes is a valid concern in asynchronous programming.

In Java, ApiFutures.addCallback attaches a callback to an ApiFuture. If the future is already completed by the time addCallback is called, the callback will be triggered immediately.

In your case, using MoreExecutors.directExecutor() means the callback runs in the same thread that completed the future, potentially the network I/O thread.

Best Practice: Separate Executor for Callbacks

To avoid potential issues with the main thread, especially in GUI or service-heavy applications, it's safer to use a separate executor for the callbacks:

 
ExecutorService callbackExecutor = Executors.newCachedThreadPool();
ApiFutures.addCallback(future, new AppendCompleteCallback(this, appendContext), callbackExecutor); 

This ensures callbacks are handled by a dedicated thread pool, preventing the main thread from being blocked and keeping your application responsive.

if possible, i need another clarification: 

lets say I call:

future1 = stream.append(batch1);
future2 = stream.append(batch2);
future3 = stream.append(batch3);

 execution will be async?
are those 3 lines will be executed in parallel?  meaning, batch2 can be written to database before batch1?

if not, why append returns Future? just for callbacks?



When using JsonStreamWriter in Google Cloud BigQuery to append data batches asynchronously, it's essential to understand the concurrency and ordering behavior associated with these operations. Here’s an explanation of how appending works and the role of futures in this context.

Asynchronous Execution and Ordering

When you call the append method of JsonStreamWriter, each call is indeed asynchronous; that is, it returns an ApiFuture which will complete once the batch has been processed (i.e., sent to BigQuery and acknowledged). However, the asynchronous nature of these calls doesn't necessarily mean that the operations (batch1, batch2, batch3) execute in parallel or out of order. Here’s why:

  1. Serial Execution of Batches: JsonStreamWriter maintains the order of data batches as they are appended. Even though the operations are asynchronous, JsonStreamWriter ensures that batches are written to the database in the order they were submitted. This means batch1 will complete before batch2 starts processing, and so on. This ordered processing is crucial for maintaining data consistency, especially in applications where the order of records matters.

  2. Why Use Futures?: The use of ApiFuture is primarily for non-blocking operations and efficient resource utilization. Futures allow the calling thread to continue with other work rather than waiting for the network operation to complete. This is particularly beneficial in high-throughput environments where blocking on I/O operations can severely degrade performance.

    • Non-blocking: By using futures, your application can initiate a data append and then immediately proceed to prepare the next batch of data or perform other computations. The future lets your application check the completion status of the append operation at a later time or add callbacks to react upon completion.

    • Callbacks: Futures provide a way to cleanly handle post-operation tasks such as logging, error handling, or triggering subsequent actions. Callbacks can be attached to futures to execute code once the append operation completes, whether successfully or with an error.

Consider a scenario where you need to append multiple data batches asynchronously while ensuring that processing does not block your main application logic:

/**
 * Understanding Asynchronous Appends with JsonStreamWriter in Google Cloud BigQuery
 */

// ... (Your import statements)

public class BigQueryAppendExample {

    public void appendDataBatches(JsonStreamWriter stream, List<TableDataInsertAllRequest> batches) {

        ExecutorService executor = Executors.newCachedThreadPool();

        List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();

        for (TableDataInsertAllRequest batch : batches) {
            ApiFuture<AppendRowsResponse> future = stream.append(batch);
            futures.add(future);
        }

        for (int i = 0; i < futures.size(); i++) {
            final int batchNumber = i + 1;  // For logging
            ApiFuture<AppendRowsResponse> future = futures.get(i);

            ApiFutures.addCallback(future, new ApiFutureCallback<AppendRowsResponse>() {
                @Override
                public void onSuccess(AppendRowsResponse result) {
                    System.out.println("Batch " + batchNumber + " appended successfully");
                }

                @Override
                public void onFailure(Throwable t) {
                    System.err.println("Failed to append Batch " + batchNumber + ": " + t.getMessage());
                }
            }, executor);
        }

        // ... Additional error handling or future chaining if needed
    }
}

The asynchronous nature of the append method in JsonStreamWriter does not imply parallel execution of batches in terms of writing to the database. The batches are processed in the order they are submitted to ensure data integrity. Futures are used to enable non-blocking operations and to efficiently handle completion events through callbacks, thereby enhancing the responsiveness and throughput of your application.