Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Schema errors on message posted to Topic

I have created this schema in Google Cloud Pub/Sub

 

 

 

{
  "type": "record",
  "name": "TestDanbotMMSTaskHist",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "statusId",
      "type": "int",
      "doc": "list of statuses"
    },
    {
      "name": "statusCode",
      "type": "string",
      "doc": "PENDING, RESOLVED"
    },
    {
      "name": "startTime",
      "type": "string",
      "doc": "JavaScript built-in JSON object like 2012-04-23T18:25:43.511Z"
    },
    {
      "name": "endTime",
      "type": "string",
      "doc": "JavaScript built-in JSON object like 2012-04-23T18:25:43.511Z"
    },
    {
      "name": "activeInd",
      "type": "boolean",
      "doc": "false / true"
    },
    {
      "name": "createUserId",
      "type": "string"
    }
  ]
}

 

 

 

I have created this example, which tests as valid against this schema.

 

 

 

{
    "id": 1,
    "statusId": 3,
    "statusCode": "RESOLVED",
    "startTime": "2012-04-23T18:25:43.511Z",
    "endTime": "2012-04-24T17:25:43.511Z",
    "activeInd": true,
    "createUserId": "bpmsupport@gfs.com"
}

 

 

 

I have created a topic that uses this schema. When I send this example message to the topic,

 

 

 

{"messages":[{"attributes":{"id":1,"statusId":3,"statusCode":"RESOLVED","startTime":"2012-04-23T18:25:43.511Z","endTime":"2012-04-24T17:25:43.511Z","activeInd":true,"createUserId":"bpmsupport@gfs.com"},"orderingKey":"1"}]}

 

 

 

using the https://pubsub.googleapis.com/v1/ publish API, I get the following error:

 

 

 

{
    "code": 400,
    "message": "Invalid value at 'messages[0].attributes[0].value' (TYPE_STRING), 1\nInvalid value at 'messages[0].attributes[1].value' (TYPE_STRING), 3\nInvalid value at 'messages[0].attributes[5].value' (TYPE_STRING), true",
    "status": "INVALID_ARGUMENT",
    "details": [
      {
        "@type": "type.googleapis.com/google.rpc.BadRequest",
        "fieldViolations": [
          {
            "field": "messages[0].attributes[0].value",
            "description": "Invalid value at 'messages[0].attributes[0].value' (TYPE_STRING), 1"
          },
          {
            "field": "messages[0].attributes[1].value",
            "description": "Invalid value at 'messages[0].attributes[1].value' (TYPE_STRING), 3"
          },
          {
            "field": "messages[0].attributes[5].value",
            "description": "Invalid value at 'messages[0].attributes[5].value' (TYPE_STRING), true"
          }
        ]
      }
    ]
}

 

 

 

So, the two values that are declared as ints and the one that is boolean are complaining that their type should be string. If I try to validate the ints and boolean as a quoted string in the schema editor the message does not validate.

I also tried converting these to quoted string values when sending the message. The invalid value errors go away and then I get an AVRO error:

 

 

 

{
    "code": 400,
    "message": "Invalid data in message: Message failed schema validation.",
    "status": "INVALID_ARGUMENT",
    "details": [
      {
        "@type": "type.googleapis.com/google.rpc.ErrorInfo",
        "reason": "INVALID_JSON_AVRO_MESSAGE",
        "domain": "pubsub.googleapis.com",
        "metadata": {
          "message": "Message failed schema validation"
        }
      }
    ]
}

 

 

 

I have a feeling that I am missing something simple.

Solved Solved
2 2 8,829
1 ACCEPTED SOLUTION

Hi,

I was able to make it work if I publish using the client library. Upon checking the documentation, publishing with a schema is only available using the client library. 

from avro.io import BinaryEncoder, DatumWriter
import avro
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
project_id = "your-project-id"
topic_id = "your-topic-id-here"
avsc_file = "./schema.json" # your avro schema 

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

# Prepare to write Avro records to the binary output stream.
avro_schema = avro.schema.parse(open(avsc_file, "rb").read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()

# Prepare some data using a Python dictionary that matches the Avro schema
record = {
        "id": 1,
        "statusId": 3,
        "statusCode": "RESOLVED",
        "startTime": "2012-04-23T18:25:43.511Z",
        "endTime": "2012-04-24T17:25:43.511Z",
        "activeInd": True,
        "createUserId": "bpmsupport@gfs.com"
      }

try:
    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        encoder = BinaryEncoder(bout)
        writer.write(record, encoder)
        data = bout.getvalue()
        print(f"Preparing a binary-encoded message:\n{data.decode()}")
    elif encoding == Encoding.JSON:
        data_str = json.dumps(record)
        print(f"Preparing a JSON-encoded message:\n{data_str}")
        data = data_str.encode("utf-8")
    else:
        print(f"No encoding specified in {topic_path}. Abort.")
        exit(0)

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")

Pulled in the subscription:

Screenshot 2022-12-27 1.24.43 AM.png

View solution in original post

2 REPLIES 2

Hi,

I was able to make it work if I publish using the client library. Upon checking the documentation, publishing with a schema is only available using the client library. 

from avro.io import BinaryEncoder, DatumWriter
import avro
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding

# TODO(developer): Replace these variables before running the sample.
project_id = "your-project-id"
topic_id = "your-topic-id-here"
avsc_file = "./schema.json" # your avro schema 

publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)

# Prepare to write Avro records to the binary output stream.
avro_schema = avro.schema.parse(open(avsc_file, "rb").read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()

# Prepare some data using a Python dictionary that matches the Avro schema
record = {
        "id": 1,
        "statusId": 3,
        "statusCode": "RESOLVED",
        "startTime": "2012-04-23T18:25:43.511Z",
        "endTime": "2012-04-24T17:25:43.511Z",
        "activeInd": True,
        "createUserId": "bpmsupport@gfs.com"
      }

try:
    # Get the topic encoding type.
    topic = publisher_client.get_topic(request={"topic": topic_path})
    encoding = topic.schema_settings.encoding

    # Encode the data according to the message serialization type.
    if encoding == Encoding.BINARY:
        encoder = BinaryEncoder(bout)
        writer.write(record, encoder)
        data = bout.getvalue()
        print(f"Preparing a binary-encoded message:\n{data.decode()}")
    elif encoding == Encoding.JSON:
        data_str = json.dumps(record)
        print(f"Preparing a JSON-encoded message:\n{data_str}")
        data = data_str.encode("utf-8")
    else:
        print(f"No encoding specified in {topic_path}. Abort.")
        exit(0)

    future = publisher_client.publish(topic_path, data)
    print(f"Published message ID: {future.result()}")

except NotFound:
    print(f"{topic_id} not found.")

Pulled in the subscription:

Screenshot 2022-12-27 1.24.43 AM.png

Thanks. I am integrating from a proprietary platform, so adding a client library is not a convenient solution. I have discussed with the consumer and they are reluctantly willing to accept a message without schema validation.