Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Datastore | Cloud Import | Pub Sub

 

 

from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine

# TODO(developer): Uncomment these variables before running the sample.
project_id = "YOUR_PROJECT_ID"
location = "YOUR_LOCATION" # Values: "global"
data_store_id = "YOUR_DATA_STORE_ID"


gcs_uri = "gs://bucket/directory/file.jsonl"

#  For more information, refer to:
# https://cloud.google.com/generative-ai-app-builder/docs/locations#specify_a_multi-region_for_your_data_store
client_options = (
    ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    if location != "global"
    else None
)

# Create a client
client = discoveryengine.DocumentServiceClient(client_options=client_options)

# The full resource name of the search engine branch.
# e.g. projects/{project}/locations/{location}/dataStores/{data_store_id}/branches/{branch}
parent = client.branch_path(
    project=project_id,
    location=location,
    data_store=data_store_id,
    branch="default_branch",
)

request = discoveryengine.ImportDocumentsRequest(
    parent=parent,
    gcs_source=discoveryengine.GcsSource(
        # Multiple URIs are supported
        input_uris=[gcs_uri],
        # Options:
        # - `content` - Unstructured documents (PDF, HTML, DOC, TXT, PPTX)
        # - `custom` - Unstructured documents with custom JSONL metadata
        # - `document` - Structured documents in the discoveryengine.Document format.
        # - `csv` - Unstructured documents with CSV metadata
        data_schema="document",
    ),
    # Options: `FULL`, `INCREMENTAL`
    reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL,
)

# Make the request
operation = client.import_documents(request=request)

print(f"Waiting for operation to complete: {operation.operation.name}")
response = operation.result()

# After the operation is complete,
# get information from operation metadata
metadata = discoveryengine.ImportDocumentsMetadata(operation.metadata)

# Handle the response
print(response)
print(metadata)

 

 

I am using this code to import documents using jsonl file.  And it gets stuck at:

 

 

Waiting for operation to complete: projects/805/locations/global/collections/default_collection/dataStores/mydatastore/branches/0/operations/import-documents-12061499744554098

 

 

After 900 seconds, it throws error:

 

 

Traceback (most recent call last):
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/future/polling.py", line 137, in _blocking_poll
    polling(self._done_or_raise)(retry=retry)
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
             ^^^^^^^^
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/future/polling.py", line 119, in _done_or_raise
    if not self.done(retry=retry):
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/operation.py", line 174, in done
    self._refresh_and_update(retry)
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/operation.py", line 162, in _refresh_and_update
    self._operation = self._refresh(retry=retry) if retry else self._refresh()
                                                               ^^^^^^^^^^^^^^^
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/operations_v1/operations_client.py", line 159, in get_operation
    return self._get_operation(
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/gapic_v1/method.py", line 131, in __call__
    return wrapped_func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
           ^^^^^^^^^^^^^
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/retry/retry_base.py", line 221, in _retry_error_helper
    raise final_exc from source_exc
google.api_core.exceptions.RetryError: Timeout of 600.0s exceeded, last exception: 504 Deadline Exceeded

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/apple/Documents/foyr/vertex/today/ingest.py", line 105, in <module>
    result = upload_metadata_to_gcs(
             ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/apple/Documents/foyr/vertex/today/ingest.py", line 90, in upload_metadata_to_gcs
    response = operation.result()
               ^^^^^^^^^^^^^^^^^^
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/future/polling.py", line 256, in result
    self._blocking_poll(timeout=timeout, retry=retry, polling=polling)
  File "/Users/apple/.local/lib/python3.12/site-packages/google/api_core/future/polling.py", line 139, in _blocking_poll
    raise concurrent.futures.TimeoutError(
TimeoutError: Operation did not complete within the designated timeout of 900 seconds.

 

 

I used LRO polling that breaks before reaching the condition.

 

 

from time import sleep

from google.cloud import discoveryengine
from google.longrunning import operations_pb2

operation_name = "projects/805/locations/global/collections/default_collection/dataStores/mydatastore/branches/0/operations/import-documents-12061499744554098"

def poll_operation_sample(
    operation_name: str, limit: int = 10
) -> operations_pb2.Operation:
    # Create a client
    client = discoveryengine.DocumentServiceClient()

    # Make GetOperation request
    request = operations_pb2.GetOperationRequest(name=operation_name)

    for _ in range(limit):
        operation = client.get_operation(request=request)
        # Print the Operation Information
        print(operation)

        # Stop polling when Operation is no longer running
        if operation.done:
            break

        # Wait 10 seconds before polling again
        sleep(10)

    return operation

poll_operation_sample(operation_name)

 

 

On console, within some minutes i can see documents and its even queryable. In Activity tab, Import in progress but in Documents I can see my imported document. How do I exactly know when my file.jsonl operation has been done?

hardikitis_0-1738860202051.png

 

0 1 172
1 REPLY 1

Hi @hardikitis,

Welcome to Google Cloud Community!

The error you have encountered may be due to a large JSONL file or a temporary network issue. To confirm that your file has been imported, wait a few minutes for the process to complete, below is a sample test:

testCass1.png

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.