I want to query from a deployed matching engine index via Go. I am running this on go 1.22.5, and using cloud.google.com/go/aiplatform version 1.68.0.
Here is my full implementation:
package main import ( "fmt" "context" "regexp" aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" aiplatform "cloud.google.com/go/aiplatform/apiv1" "google.golang.org/api/option" "google.golang.org/protobuf/types/known/structpb" ) func embedTexts(project, location string, texts []string)([][]float32, error) { ... } func main() { ctx := context.Background() query_string_arr := []string{"Some text to be used as a query for nearest neighbors"} var project_id = "<project id>" var location = "us-central1" // Get text embeddings for query embeddings, err := embedTexts(project_id, location, query_string_arr) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("The length of the embedding vector is:") fmt.Println(len(embeddings[0])) } // ensure that we are hitting the correct API endpoint for vector search client apiEndpoint := fmt.Sprintf("%s-aiplatform.googleapis.com:443", location) // initialize matching engine client c, err := aiplatform.NewMatchClient(ctx, option.WithEndpoint(apiEndpoint)) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("Vector Search Client successfully initialized!") } // // initialize structs to create request body //// 1. initialize the index datapoint restriction struct restricts_params := &aiplatformpb.IndexDatapoint_Restriction{ Namespace: "<some namespace>", AllowList: []string{"<some value to allow for namespace>"}, } //// 2. initialize the index datapoint struct query_datapoint := &aiplatformpb.IndexDatapoint{ DatapointId: "<Placeholder>", FeatureVector: embeddings[0], Restricts: []*aiplatformpb.IndexDatapoint_Restriction{restricts_params}, } //// 3. initialize the query struct find_nbrs_query := &aiplatformpb.FindNeighborsRequest_Query{ Datapoint: query_datapoint, NeighborCount: 50, } //// 4. initialize the nearest neighbors request struct req := &aiplatformpb.FindNeighborsRequest{ IndexEndpoint: "projects/<project number>/locations/us-central1/indexEndpoints/<vector search index endpoint id>", DeployedIndexId: "<vector search index id>", Queries: []*aiplatformpb.FindNeighborsRequest_Query{find_nbrs_query}, ReturnFullDatapoint: true, } // send request to vector search resp, err := c.FindNeighbors(ctx, req) if err != nil { fmt.Println("Failing at vector search request time.") fmt.Println(err.Error()) } else { fmt.Println("Vector Search Request successfully sent!") } defer c.Close() for _, neighbor := range resp.GetNearestNeighbors() { fmt.Printf("Neighbor ID:%s\n", neighbor.Id) } }
Everything here compiles properly. The text embedding prediction client runs properly, and the matching engine client initializes successfully. The issue arises when I submit the API request to query from the vector search index. The above code gives me the following error:
rpc error: code = Unimplemented desc = Operation is not implemented, or supported, or enabled.
This seems to indicate that the FindNeighbors method is not actually implemented in the Go Vertex AI SDK, because I am able to query from my vector search indexes properly via Python, so I know that it's not an endpoint issue.
My question here is: Am I doing something incorrectly? Or is Go's SDK not up to speed on querying from a Vector Search index endpoint?
Update:
I have figured out that us-central1-aiplatform.googleapis.com:443 is not the correct endpoint. My vector search index is within a VPC network, therefore, the private service access requires a different endpoint.
Here is the new implementation.
package main import ( "fmt" "context" "regexp" aiplatformpb "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" aiplatform "cloud.google.com/go/aiplatform/apiv1" "google.golang.org/api/option" grpc "google.golang.org/grpc" insecure "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/structpb" ) // method to invoke text embeddings func embedTexts( project, location string, texts []string) ([][]float32, error) { ... } // NOTE: // In order to perform candidate generation, we must do the following: // 0. Build struct to form request body of text embedding model // 1. Call text embedding model to convert a string to a dense feature vector // 2. Build the structs to form the request for the nearest neighbors search // 3. Call vector search index //////// 1. Make API request to text embedding model // in main method func main() { ctx := context.Background() query_string_arr := []string{"<query contents>"} var project_id = "<project id>" var location = "us-central1" //////// 2. Call Text Embeddings // Get text embeddings for query embeddings, err := embedTexts(project_id, location, query_string_arr) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("The length of the embedding vector is:") fmt.Println(len(embeddings[0])) } // ensure that we are hitting the correct API endpoint for vector search index // NOTE: Since we are hitting a PSA index (Private Service Access), we must route our request to the internal // gRPC endpoint grpc_conn, err := grpc.NewClient("10.25.0.5:10000", grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("Successfully connected to gRPC endpoint.") } // initialize matching engine client c, err := aiplatform.NewMatchClient(ctx, option.WithGRPCConn(grpc_conn)) if err != nil { fmt.Println(err.Error()) } else { fmt.Println("Vector Search Client successfully initialized!") } defer c.Close() //////// 3. Create Request Body for Vector Search // // initialize structs to create request body //// 1. initialize the index datapoint restriction struct restricts_params := &aiplatformpb.IndexDatapoint_Restriction{ Namespace: "<namespace>", AllowList: []string{<allowed namespace values>}, DenyList: []string{<denied namespace values>}, } //// 2. initialize the index datapoint struct query_datapoint := &aiplatformpb.IndexDatapoint{ DatapointId: "Placeholder", FeatureVector: embeddings[0], Restricts: []*aiplatformpb.IndexDatapoint_Restriction{restricts_params}, } //// 3. initialize the query struct find_nbrs_query := &aiplatformpb.FindNeighborsRequest_Query{ Datapoint: query_datapoint, NeighborCount: 10, } //// 4. initialize the nearest neighbors request struct req := &aiplatformpb.FindNeighborsRequest{ IndexEndpoint: "projects/<project number>/locations/us-central1/indexEndpoints/<index endpoint id>", DeployedIndexId: "<deployed_index_endpoint_name>_<index id>", Queries: []*aiplatformpb.FindNeighborsRequest_Query{find_nbrs_query}, ReturnFullDatapoint: false, } // send request to vector search resp, err := c.FindNeighbors(ctx, req) if err != nil { fmt.Println("Failing at vector search request time.") fmt.Println(err.Error()) fmt.Printf("Response object: %s", resp) } else { fmt.Println("Vector Search Request successfully sent!") } for _, neighbor := range resp.GetNearestNeighbors() { fmt.Printf("Neighbor ID:%s\n", neighbor.Id) } }
I realize that I have to somehow route the request to the gRPC host when initializing the matching engine client.
With that said when I route to the gRPC matching engine IP, I am now getting a (different) error:
rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing: dial tcp 10.25.0.5:10000: connect: connection refused"
I am not sure why the connection is being refused here?