Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Vertex AI Vector Search - Unable to query from index endpoint using Go client

I want to query from a deployed matching engine index via Go. I am running this on go 1.22.5, and using cloud.google.com/go/aiplatform version 1.68.0.

Here is my full implementation:

package main

import (
    "fmt"

    "context"

    "regexp"

    aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"

    aiplatform "cloud.google.com/go/aiplatform/apiv1"

    "google.golang.org/api/option"
    "google.golang.org/protobuf/types/known/structpb"
)

func embedTexts(project, location string, texts []string)([][]float32, error) {
    ...
}

func main() {
    ctx := context.Background()

    query_string_arr := []string{"Some text to be used as a query for nearest neighbors"}
    var project_id = "<project id>"
    var location = "us-central1"

    // Get text embeddings for query
    embeddings, err := embedTexts(project_id, location, query_string_arr)
    if err != nil {
        fmt.Println(err.Error())
    } else {
        fmt.Println("The length of the embedding vector is:")
        fmt.Println(len(embeddings[0]))
    }

    // ensure that we are hitting the correct API endpoint for vector search client
    apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location)

    // initialize matching engine client
    c, err := aiplatform.NewMatchClient(ctx, option.WithEndpoint(apiEndpoint))
    if err != nil {
        fmt.Println(err.Error())
    } else {
        fmt.Println("Vector Search Client successfully initialized!")
    }

    // // initialize structs to create request body

    //// 1. initialize the index datapoint restriction struct
    restricts_params := &aiplatformpb.IndexDatapoint_Restriction{
        Namespace: "<some namespace>",
        AllowList: []string{"<some value to allow for namespace>"},
    }

    //// 2. initialize the index datapoint struct
    query_datapoint := &aiplatformpb.IndexDatapoint{
        DatapointId:   "<Placeholder>",
        FeatureVector: embeddings[0],
        Restricts:     []*aiplatformpb.IndexDatapoint_Restriction{restricts_params},
    }

    //// 3. initialize the query struct
    find_nbrs_query := &aiplatformpb.FindNeighborsRequest_Query{
        Datapoint:     query_datapoint,
        NeighborCount: 50,
    }

    //// 4. initialize the nearest neighbors request struct
    req := &aiplatformpb.FindNeighborsRequest{
        IndexEndpoint:       "projects/<project number>/locations/us-central1/indexEndpoints/<vector search index endpoint id>",
        DeployedIndexId:     "<vector search index id>",
        Queries:             []*aiplatformpb.FindNeighborsRequest_Query{find_nbrs_query},
        ReturnFullDatapoint: true,
    }

    // send request to vector search
    resp, err := c.FindNeighbors(ctx, req)
    if err != nil {
        fmt.Println("Failing at vector search request time.")
        fmt.Println(err.Error())
    } else {
        fmt.Println("Vector Search Request successfully sent!")
    }

    defer c.Close()

    for _, neighbor := range resp.GetNearestNeighbors() {
        fmt.Printf("Neighbor ID:%s\n", neighbor.Id)
    }
}

Everything here compiles properly. The text embedding prediction client runs properly, and the matching engine client initializes successfully. The issue arises when I submit the API request to query from the vector search index. The above code gives me the following error:

rpc error: code = Unimplemented desc = Operation is not implemented, or supported, or enabled.

This seems to indicate that the FindNeighbors method is not actually implemented in the Go Vertex AI SDK, because I am able to query from my vector search indexes properly via Python, so I know that it's not an endpoint issue.

My question here is: Am I doing something incorrectly? Or is Go's SDK not up to speed on querying from a Vector Search index endpoint?

Update:

I have figured out that us-central1-aiplatform.googleapis.com:443 is not the correct endpoint. My vector search index is within a VPC network, therefore, the private service access requires a different endpoint.

Here is the new implementation.

package main

import (
    "fmt"

    "context"

    "regexp"

    aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb"

    aiplatform "cloud.google.com/go/aiplatform/apiv1"

    "google.golang.org/api/option"
    grpc "google.golang.org/grpc"
    insecure "google.golang.org/grpc/credentials/insecure"
    "google.golang.org/protobuf/types/known/structpb"
)

// method to invoke text embeddings
func embedTexts(
    project, location string, texts []string) ([][]float32, error) {
...
}

// NOTE:
// In order to perform candidate generation, we must do the following:
// 0. Build struct to form request body of text embedding model
// 1. Call text embedding model to convert a string to a dense feature vector
// 2. Build the structs to form the request for the nearest neighbors search
// 3. Call vector search index

//////// 1. Make API request to text embedding model
// in main method

func main() {
    ctx := context.Background()

    query_string_arr := []string{"<query contents>"}
    var project_id = "<project id>"
    var location = "us-central1"

    //////// 2. Call Text Embeddings

    // Get text embeddings for query
    embeddings, err := embedTexts(project_id, location, query_string_arr)
    if err != nil {
        fmt.Println(err.Error())
    } else {
        fmt.Println("The length of the embedding vector is:")
        fmt.Println(len(embeddings[0]))
    }

    // ensure that we are hitting the correct API endpoint for vector search index
    // NOTE: Since we are hitting a PSA index (Private Service Access), we must route our request to the internal
    // gRPC endpoint

    grpc_conn, err := grpc.NewClient("10.25.0.5:10000", grpc.WithTransportCredentials(insecure.NewCredentials()))
    if err != nil {
        fmt.Println(err.Error())
    } else {
        fmt.Println("Successfully connected to gRPC endpoint.")
    }

    // initialize matching engine client
    c, err := aiplatform.NewMatchClient(ctx, option.WithGRPCConn(grpc_conn))
    if err != nil {
        fmt.Println(err.Error())
    } else {
        fmt.Println("Vector Search Client successfully initialized!")
    }

    defer c.Close()

    //////// 3. Create Request Body for Vector Search

    // // initialize structs to create request body

    //// 1. initialize the index datapoint restriction struct
    restricts_params := &aiplatformpb.IndexDatapoint_Restriction{
        Namespace: "<namespace>",
        AllowList: []string{<allowed namespace values>},
        DenyList:  []string{<denied namespace values>},
    }

    //// 2. initialize the index datapoint struct
    query_datapoint := &aiplatformpb.IndexDatapoint{
        DatapointId:   "Placeholder",
        FeatureVector: embeddings[0],
        Restricts:     []*aiplatformpb.IndexDatapoint_Restriction{restricts_params},
    }

    //// 3. initialize the query struct
    find_nbrs_query := &aiplatformpb.FindNeighborsRequest_Query{
        Datapoint:     query_datapoint,
        NeighborCount: 10,
    }

    //// 4. initialize the nearest neighbors request struct
    req := &aiplatformpb.FindNeighborsRequest{
        IndexEndpoint:       "projects/<project number>/locations/us-central1/indexEndpoints/<index endpoint id>",
        DeployedIndexId:     "<deployed_index_endpoint_name>_<index id>",
        Queries:             []*aiplatformpb.FindNeighborsRequest_Query{find_nbrs_query},
        ReturnFullDatapoint: false,
    }

    // send request to vector search
    resp, err := c.FindNeighbors(ctx, req)
    if err != nil {
        fmt.Println("Failing at vector search request time.")
        fmt.Println(err.Error())
        fmt.Printf("Response object: %s", resp)
    } else {
        fmt.Println("Vector Search Request successfully sent!")
    }

    for _, neighbor := range resp.GetNearestNeighbors() {
        fmt.Printf("Neighbor ID:%s\n", neighbor.Id)
    }

}

I realize that I have to somehow route the request to the gRPC host when initializing the matching engine client.

With that said when I route to the gRPC matching engine IP, I am now getting a (different) error:

rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.25.0.5:10000: connect: connection refused"

I am not sure why the connection is being refused here?

1 4 2,148
4 REPLIES 4

Hello andrew_chang_cl,

Welcome to Google Cloud Community!

Using insecure.NewCredentials() is strongly discouraged for production environments. This code will establish an unencrypted connection to the gRPC server at 10.25.0.5:10000

If security is a concern, I highly recommend replacing insecure.NewCredentials() with a secure credential option like TLS/SSL certificates or GCP-specific methods.

Here in gRPC, stated in the column of Cons that we must implement authentication. The following authentication mechanisms are built-in to gRPC: 

As additional reference for example code using Go.

I hope the above information is helpful.

Hi!

Thank you for responding. This gives me a lot of ideas to try out. I am just wondering if these authentication methods are compatible with serverless platforms (i.e. cloud run service, cloud functions) meant to be executed in a stateless manner?

I would like to run the above code snippet in a RESTful API hosted on a cloud run service or cloud function, and from experience, utilizing cloud run/functions in a stateless manner is cause for concern with inherently stateful authentications like TLS, which require some notion of a session to persist the agreement between client and server. I am assuming that if we can use the cloud load balancer to front the cloud run/function, this would actually not be an issue as we would be persisting an IP for a server (the LB) that can always handle ingress/egress from the API. But for non-technical reasons, the cloud load balancer is not an option immediately. Is there a way to leverage a stateless authentication, like JWT's, to encrypt the connection to the gRPC server? Please feel free to correct me if any of my understanding of TLS and the serverless platforms is incorrect as well.

Yes, these authentication methods are compatible with serverless platforms. You may refer to this opening a gRPC connection to a service such as Cloud Run. You're right with TLS it requires some notion of a session to persist the agreement between client and server. 

JWTs are not used to encrypt the connection itself. Instead, they are used for authentication and authorization. The encryption of the connection is handled by TLS.

How JWTs and TLS Work Together

  • TLS encrypts the communication channel between the client and server, protecting data from eavesdropping and tampering.
  • JWTs verify the identity of the client and provide necessary information for authorization.

Implementing JWT Authentication with gRPC

  1. Generate JWT: Create a JWT with relevant claims (user information, expiration time, etc.) using a secret key.
  2. Secure Key Storage: Store the secret key securely (e.g., environment variables, secret management services).
  3. JWT Verification: Verify the JWT on the server-side before processing the gRPC request.
  4. gRPC Metadata: Include the JWT in the gRPC metadata as a bearer token.

Code Example (Go)

package main

import (
        "context"
        "crypto/tls"
        "log"

        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials"
        "google.golang.org/grpc/metadata"
)

func main() {
        // Load TLS credentials (replace with your certificates)
        cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
        if err != nil {
                log.Fatalf("failed to load keys: %v", err)
        }

        creds := credentials.NewTLS(&tls.Config{
                Certificates: []tls.Certificate{cert},
                ServerName:   "your_server_name",
        })

        // Create gRPC connection
        conn, err := grpc.Dial("your_server_address:port", grpc.WithTransportCredentials(creds))
        if err != nil {
                log.Fatalf("did not connect: %v", err)
        }
        defer conn.Close()

        // Generate JWT (replace with your JWT generation logic)
        jwtToken := generateJWT()

        // Create metadata with JWT
        ctx := metadata.AppendToOutgoingContext(context.Background(), "authorization", "Bearer "+jwtToken)

        // Make gRPC call
        client := YourGrpcServiceClient{conn}
        response, err := client.YourMethod(ctx, request)
        if err != nil {
                log.Fatalf("could not greet: %v", err)
        }
        // ... handle response
}

Key points to remember:

  • Statelessness: Serverless functions are stateless, but the authentication process itself doesn't inherently require state.
  • External management: Credentials, certificates, or token generation logic are typically handled outside the serverless function.
  • Platform support: Leverage the platform's built-in features for authentication and security whenever possible.

Hi, are you using vertex AI text embedding model? Because I'm trying to call it from golang as well but it gives me Invalid Argument error, so I was wondering if it's supported through the aiplatform package.