I want to query from a deployed matching engine index via Go. I am running this on go 1.22.5, and using cloud.google.com/go/aiplatform version 1.68.0.
Here is my full implementation:
package main import ( "fmt" "context" "regexp" aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" aiplatform "cloud.google.com/go/aiplatform/apiv1" "google.golang.org/api/option" "google.golang.org/protobuf/types/known/structpb" ) func embedTexts(project, location string, texts []string)([][]float32, error) { ... } func main() { ctx := context.Background() query_string_arr := []string{"Some text to be used as a query for nearest neighbors"} var project_id = "<project id>" var location = "us-central1" // Get text embeddings for query embeddings, err := embedTexts(project_id, location, query_string_arr) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("The length of the embedding vector is:") fmt.Println(len(embeddings[0])) } // ensure that we are hitting the correct API endpoint for vector search client apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location) // initialize matching engine client c, err := aiplatform.NewMatchClient(ctx, option.WithEndpoint(apiEndpoint)) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("Vector Search Client successfully initialized!") } // // initialize structs to create request body //// 1. initialize the index datapoint restriction struct restricts_params := &aiplatformpb.IndexDatapoint_Restriction{ Namespace: "<some namespace>", AllowList: []string{"<some value to allow for namespace>"}, } //// 2. initialize the index datapoint struct query_datapoint := &aiplatformpb.IndexDatapoint{ DatapointId: "<Placeholder>", FeatureVector: embeddings[0], Restricts: []*aiplatformpb.IndexDatapoint_Restriction{restricts_params}, } //// 3. initialize the query struct find_nbrs_query := &aiplatformpb.FindNeighborsRequest_Query{ Datapoint: query_datapoint, NeighborCount: 50, } //// 4. initialize the nearest neighbors request struct req := &aiplatformpb.FindNeighborsRequest{ IndexEndpoint: "projects/<project number>/locations/us-central1/indexEndpoints/<vector search index endpoint id>", DeployedIndexId: "<vector search index id>", Queries: []*aiplatformpb.FindNeighborsRequest_Query{find_nbrs_query}, ReturnFullDatapoint: true, } // send request to vector search resp, err := c.FindNeighbors(ctx, req) if err != nil { fmt.Println("Failing at vector search request time.") fmt.Println(err.Error()) } else { fmt.Println("Vector Search Request successfully sent!") } defer c.Close() for _, neighbor := range resp.GetNearestNeighbors() { fmt.Printf("Neighbor ID:%s\n", neighbor.Id) } }
Everything here compiles properly. The text embedding prediction client runs properly, and the matching engine client initializes successfully. The issue arises when I submit the API request to query from the vector search index. The above code gives me the following error:
rpc error: code = Unimplemented desc = Operation is not implemented, or supported, or enabled.
This seems to indicate that the FindNeighbors method is not actually implemented in the Go Vertex AI SDK, because I am able to query from my vector search indexes properly via Python, so I know that it's not an endpoint issue.
My question here is: Am I doing something incorrectly? Or is Go's SDK not up to speed on querying from a Vector Search index endpoint?
Update:
I have figured out that us-central1-aiplatform.googleapis.com:443 is not the correct endpoint. My vector search index is within a VPC network, therefore, the private service access requires a different endpoint.
Here is the new implementation.
package main import ( "fmt" "context" "regexp" aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" aiplatform "cloud.google.com/go/aiplatform/apiv1" "google.golang.org/api/option" grpc "google.golang.org/grpc" insecure "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/structpb" ) // method to invoke text embeddings func embedTexts( project, location string, texts []string) ([][]float32, error) { ... } // NOTE: // In order to perform candidate generation, we must do the following: // 0. Build struct to form request body of text embedding model // 1. Call text embedding model to convert a string to a dense feature vector // 2. Build the structs to form the request for the nearest neighbors search // 3. Call vector search index //////// 1. Make API request to text embedding model // in main method func main() { ctx := context.Background() query_string_arr := []string{"<query contents>"} var project_id = "<project id>" var location = "us-central1" //////// 2. Call Text Embeddings // Get text embeddings for query embeddings, err := embedTexts(project_id, location, query_string_arr) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("The length of the embedding vector is:") fmt.Println(len(embeddings[0])) } // ensure that we are hitting the correct API endpoint for vector search index // NOTE: Since we are hitting a PSA index (Private Service Access), we must route our request to the internal // gRPC endpoint grpc_conn, err := grpc.NewClient("10.25.0.5:10000", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("Successfully connected to gRPC endpoint.") } // initialize matching engine client c, err := aiplatform.NewMatchClient(ctx, option.WithGRPCConn(grpc_conn)) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("Vector Search Client successfully initialized!") } defer c.Close() //////// 3. Create Request Body for Vector Search // // initialize structs to create request body //// 1. initialize the index datapoint restriction struct restricts_params := &aiplatformpb.IndexDatapoint_Restriction{ Namespace: "<namespace>", AllowList: []string{<allowed namespace values>}, DenyList: []string{<denied namespace values>}, } //// 2. initialize the index datapoint struct query_datapoint := &aiplatformpb.IndexDatapoint{ DatapointId: "Placeholder", FeatureVector: embeddings[0], Restricts: []*aiplatformpb.IndexDatapoint_Restriction{restricts_params}, } //// 3. initialize the query struct find_nbrs_query := &aiplatformpb.FindNeighborsRequest_Query{ Datapoint: query_datapoint, NeighborCount: 10, } //// 4. initialize the nearest neighbors request struct req := &aiplatformpb.FindNeighborsRequest{ IndexEndpoint: "projects/<project number>/locations/us-central1/indexEndpoints/<index endpoint id>", DeployedIndexId: "<deployed_index_endpoint_name>_<index id>", Queries: []*aiplatformpb.FindNeighborsRequest_Query{find_nbrs_query}, ReturnFullDatapoint: false, } // send request to vector search resp, err := c.FindNeighbors(ctx, req) if err != nil { fmt.Println("Failing at vector search request time.") fmt.Println(err.Error()) fmt.Printf("Response object: %s", resp) } else { fmt.Println("Vector Search Request successfully sent!") } for _, neighbor := range resp.GetNearestNeighbors() { fmt.Printf("Neighbor ID:%s\n", neighbor.Id) } }
I realize that I have to somehow route the request to the gRPC host when initializing the matching engine client.
With that said when I route to the gRPC matching engine IP, I am now getting a (different) error:
rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.25.0.5:10000: connect: connection refused"
I am not sure why the connection is being refused here?
Hello andrew_chang_cl,
Welcome to Google Cloud Community!
Using insecure.NewCredentials() is strongly discouraged for production environments. This code will establish an unencrypted connection to the gRPC server at 10.25.0.5:10000
If security is a concern, I highly recommend replacing insecure.NewCredentials() with a secure credential option like TLS/SSL certificates or GCP-specific methods.
Here in gRPC, stated in the column of Cons that we must implement authentication. The following authentication mechanisms are built-in to gRPC:
As additional reference for example code using Go.
I hope the above information is helpful.
Hi!
Thank you for responding. This gives me a lot of ideas to try out. I am just wondering if these authentication methods are compatible with serverless platforms (i.e. cloud run service, cloud functions) meant to be executed in a stateless manner?
I would like to run the above code snippet in a RESTful API hosted on a cloud run service or cloud function, and from experience, utilizing cloud run/functions in a stateless manner is cause for concern with inherently stateful authentications like TLS, which require some notion of a session to persist the agreement between client and server. I am assuming that if we can use the cloud load balancer to front the cloud run/function, this would actually not be an issue as we would be persisting an IP for a server (the LB) that can always handle ingress/egress from the API. But for non-technical reasons, the cloud load balancer is not an option immediately. Is there a way to leverage a stateless authentication, like JWT's, to encrypt the connection to the gRPC server? Please feel free to correct me if any of my understanding of TLS and the serverless platforms is incorrect as well.
Yes, these authentication methods are compatible with serverless platforms. You may refer to this opening a gRPC connection to a service such as Cloud Run. You're right with TLS it requires some notion of a session to persist the agreement between client and server.
JWTs are not used to encrypt the connection itself. Instead, they are used for authentication and authorization. The encryption of the connection is handled by TLS.
How JWTs and TLS Work Together
Implementing JWT Authentication with gRPC
Code Example (Go)
package main
import (
"context"
"crypto/tls"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
func main() {
// Load TLS credentials (replace with your certificates)
cert, err := tls.LoadX509KeyPair("server.crt", "server.key")
if err != nil {
log.Fatalf("failed to load keys: %v", err)
}
creds := credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
ServerName: "your_server_name",
})
// Create gRPC connection
conn, err := grpc.Dial("your_server_address:port", grpc.WithTransportCredentials(creds))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
// Generate JWT (replace with your JWT generation logic)
jwtToken := generateJWT()
// Create metadata with JWT
ctx := metadata.AppendToOutgoingContext(context.Background(), "authorization", "Bearer "+jwtToken)
// Make gRPC call
client := YourGrpcServiceClient{conn}
response, err := client.YourMethod(ctx, request)
if err != nil {
log.Fatalf("could not greet: %v", err)
}
// ... handle response
}
Key points to remember:
Hi, are you using vertex AI text embedding model? Because I'm trying to call it from golang as well but it gives me Invalid Argument error, so I was wondering if it's supported through the aiplatform package.