Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Vertex AI Datastore - Linked unstructured documents (JSONL with metadata) from GCS

I want to create an automation for updating my vertex search datastore when a GCS source file is updated added or deleted. I did something similar with the discoveryengine API when I had a datastore for just unstructured documents. 

Since my data is now a JSONLine file with metadata and a link to the unstructured txt file for each document, I don't know how to particularly add/update/delete one file without doing a full reindex everytime (which I can now do manually). 

 

Here is my update/delete functions for my last project:

async def datastore_import_documents(parent_datastore, gcs_input_uris😞
    """
    """
    # Create a client
    client = discoveryengine_v1.DocumentServiceAsyncClient()

    gcs_source_documents = discoveryengine_v1.GcsSource(input_uris=gcs_input_uris,data_schema='content')

    # Initialize request argument(s)
    request = discoveryengine_v1.ImportDocumentsRequest(
        parent=parent_datastore,
        gcs_source=gcs_source_documents,
        reconciliation_mode=discoveryengine_v1.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL
    )

    print("Starting import Request")
    # Make the request
    operation = await client.import_documents(request=request)

    print("Finished Import Request")

    response = await operation.result()

    # Handle the response
    print(f"Finished Response: {response}")


async def datastore_delete_document(datastore_document_name😞
    """
    """
    # Create a client
    client = discoveryengine_v1.DocumentServiceAsyncClient()

    # Initialize request argument(s)
    request = discoveryengine_v1.DeleteDocumentRequest(
        name=datastore_document_name,
    )

    # Make the request
    await client.delete_document(request=request)
 
And here is the only place in the documentation where I see a reference to my type of use case - but I dont understand how I can use it, and what I do in the case of a deletion: 
 
from google.api_core.client_options import ClientOptions
from google.cloud import discoveryengine

# TODO(developer): Uncomment these variables before running the sample.
# project_id = "YOUR_PROJECT_ID"
# location = "YOUR_LOCATION" # Values: "global"
# data_store_id = "YOUR_DATA_STORE_ID"

# Examples:
# - Unstructured documents
#   - `gs://bucket/directory/file.pdf`
#   - `gs://bucket/directory/*.pdf`
# - Unstructured documents with JSONL Metadata
#   - `gs://bucket/directory/file.json`
# - Unstructured documents with CSV Metadata
#   - `gs://bucket/directory/file.csv`
# gcs_uri = "YOUR_GCS_PATH"

#  For more information, refer to:
# https://cloud.google.com/generative-ai-app-builder/docs/locations#specify_a_multi-region_for_your_data_store
client_options = (
    ClientOptions(api_endpoint=f"{location}-discoveryengine.googleapis.com")
    if location != "global"
    else None
)

# Create a client
client = discoveryengine.DocumentServiceClient(client_options=client_options)

# The full resource name of the search engine branch.
# e.g. projects/{project}/locations/{location}/dataStores/{data_store_id}/branches/{branch}
parent = client.branch_path(
    project=project_id,
    location=location,
    data_store=data_store_id,
    branch="default_branch",
)

request = discoveryengine.ImportDocumentsRequest(
    parent=parent,
    gcs_source=discoveryengine.GcsSource(
        # Multiple URIs are supported
        input_uris=[gcs_uri],
        # Options:
        # - `content` - Unstructured documents (PDF, HTML, DOC, TXT, PPTX)
        # - `custom` - Unstructured documents with custom JSONL metadata
        # - `document` - Structured documents in the discoveryengine.Document format.
        # - `csv` - Unstructured documents with CSV metadata
        data_schema="content",
    ),
    # Options: `FULL`, `INCREMENTAL`
    reconciliation_mode=discoveryengine.ImportDocumentsRequest.ReconciliationMode.INCREMENTAL,
)

# Make the request
operation = client.import_documents(request=request)

print(f"Waiting for operation to complete: {operation.operation.name}")
response = operation.result()

# After the operation is complete,
# get information from operation metadata
metadata = discoveryengine.ImportDocumentsMetadata(operation.metadata)

# Handle the response
print(response)
print(metadata)
0 2 196
2 REPLIES 2

Hi @malam,

Welcome to the Google Cloud Community!

It looks like you are encountering an issue with efficiently performing granular CRUD operations on individual documents within your Vertex AI Search datastore, as your data source is now a composite JSONL file, and you want to avoid full reindexing while handling additions, updates, and deletions.

Here are the potential ways that might help with your use case:

  • JSONL datasets: Make sure your JSONL files are properly structured, with each line representing a serialized ‘discoveryengine.Document’ object. Each entry should include an ‘id’, a ‘uri’ linking to the unstructured ‘.txt’ content, and ‘struct_data’ containing relevant metadata.
  • Automation: Set up source-system driven triggers or use GCS object event-triggered Cloud Functions with change detection logic to automatically execute granular API calls whenever your JSONL source updates.
  • Permissions: Make sure your Cloud Function or Cloud Run service account has the required permissions, such as Discovery Engine Editor or more specific roles like discoveryengine.documents.import, discoveryengine.documents.delete, discoveryengine.documents.list, and Storage Object Viewer for accessing GCS.

Was this helpful? If so, please accept this answer as “Solution”. If you need additional assistance, reply here within 2 business days and I’ll be happy to help.

hi ,thanks for responding - this doesnt really solve my issue. I am looking for the actual documentation of how to make those granular api requests using the discoveryengine library, or any other way possible. I need to be able to monitor, update and delete on a document level (guid level) and not a complete reindex each update.