I have created this schema in Google Cloud Pub/Sub
{
"type": "record",
"name": "TestDanbotMMSTaskHist",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "statusId",
"type": "int",
"doc": "list of statuses"
},
{
"name": "statusCode",
"type": "string",
"doc": "PENDING, RESOLVED"
},
{
"name": "startTime",
"type": "string",
"doc": "JavaScript built-in JSON object like 2012-04-23T18:25:43.511Z"
},
{
"name": "endTime",
"type": "string",
"doc": "JavaScript built-in JSON object like 2012-04-23T18:25:43.511Z"
},
{
"name": "activeInd",
"type": "boolean",
"doc": "false / true"
},
{
"name": "createUserId",
"type": "string"
}
]
}
I have created this example, which tests as valid against this schema.
{
"id": 1,
"statusId": 3,
"statusCode": "RESOLVED",
"startTime": "2012-04-23T18:25:43.511Z",
"endTime": "2012-04-24T17:25:43.511Z",
"activeInd": true,
"createUserId": "bpmsupport@gfs.com"
}
I have created a topic that uses this schema. When I send this example message to the topic,
{"messages":[{"attributes":{"id":1,"statusId":3,"statusCode":"RESOLVED","startTime":"2012-04-23T18:25:43.511Z","endTime":"2012-04-24T17:25:43.511Z","activeInd":true,"createUserId":"bpmsupport@gfs.com"},"orderingKey":"1"}]}
using the https://pubsub.googleapis.com/v1/ publish API, I get the following error:
{
"code": 400,
"message": "Invalid value at 'messages[0].attributes[0].value' (TYPE_STRING), 1\nInvalid value at 'messages[0].attributes[1].value' (TYPE_STRING), 3\nInvalid value at 'messages[0].attributes[5].value' (TYPE_STRING), true",
"status": "INVALID_ARGUMENT",
"details": [
{
"@type": "type.googleapis.com/google.rpc.BadRequest",
"fieldViolations": [
{
"field": "messages[0].attributes[0].value",
"description": "Invalid value at 'messages[0].attributes[0].value' (TYPE_STRING), 1"
},
{
"field": "messages[0].attributes[1].value",
"description": "Invalid value at 'messages[0].attributes[1].value' (TYPE_STRING), 3"
},
{
"field": "messages[0].attributes[5].value",
"description": "Invalid value at 'messages[0].attributes[5].value' (TYPE_STRING), true"
}
]
}
]
}
So, the two values that are declared as ints and the one that is boolean are complaining that their type should be string. If I try to validate the ints and boolean as a quoted string in the schema editor the message does not validate.
I also tried converting these to quoted string values when sending the message. The invalid value errors go away and then I get an AVRO error:
{
"code": 400,
"message": "Invalid data in message: Message failed schema validation.",
"status": "INVALID_ARGUMENT",
"details": [
{
"@type": "type.googleapis.com/google.rpc.ErrorInfo",
"reason": "INVALID_JSON_AVRO_MESSAGE",
"domain": "pubsub.googleapis.com",
"metadata": {
"message": "Message failed schema validation"
}
}
]
}
I have a feeling that I am missing something simple.
Solved! Go to Solution.
Hi,
I was able to make it work if I publish using the client library. Upon checking the documentation, publishing with a schema is only available using the client library.
from avro.io import BinaryEncoder, DatumWriter
import avro
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding
# TODO(developer): Replace these variables before running the sample.
project_id = "your-project-id"
topic_id = "your-topic-id-here"
avsc_file = "./schema.json" # your avro schema
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
# Prepare to write Avro records to the binary output stream.
avro_schema = avro.schema.parse(open(avsc_file, "rb").read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()
# Prepare some data using a Python dictionary that matches the Avro schema
record = {
"id": 1,
"statusId": 3,
"statusCode": "RESOLVED",
"startTime": "2012-04-23T18:25:43.511Z",
"endTime": "2012-04-24T17:25:43.511Z",
"activeInd": True,
"createUserId": "bpmsupport@gfs.com"
}
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
encoder = BinaryEncoder(bout)
writer.write(record, encoder)
data = bout.getvalue()
print(f"Preparing a binary-encoded message:\n{data.decode()}")
elif encoding == Encoding.JSON:
data_str = json.dumps(record)
print(f"Preparing a JSON-encoded message:\n{data_str}")
data = data_str.encode("utf-8")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
Pulled in the subscription:
Hi,
I was able to make it work if I publish using the client library. Upon checking the documentation, publishing with a schema is only available using the client library.
from avro.io import BinaryEncoder, DatumWriter
import avro
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding
# TODO(developer): Replace these variables before running the sample.
project_id = "your-project-id"
topic_id = "your-topic-id-here"
avsc_file = "./schema.json" # your avro schema
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
# Prepare to write Avro records to the binary output stream.
avro_schema = avro.schema.parse(open(avsc_file, "rb").read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()
# Prepare some data using a Python dictionary that matches the Avro schema
record = {
"id": 1,
"statusId": 3,
"statusCode": "RESOLVED",
"startTime": "2012-04-23T18:25:43.511Z",
"endTime": "2012-04-24T17:25:43.511Z",
"activeInd": True,
"createUserId": "bpmsupport@gfs.com"
}
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
encoder = BinaryEncoder(bout)
writer.write(record, encoder)
data = bout.getvalue()
print(f"Preparing a binary-encoded message:\n{data.decode()}")
elif encoding == Encoding.JSON:
data_str = json.dumps(record)
print(f"Preparing a JSON-encoded message:\n{data_str}")
data = data_str.encode("utf-8")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
Pulled in the subscription:
Thanks. I am integrating from a proprietary platform, so adding a client library is not a convenient solution. I have discussed with the consumer and they are reluctantly willing to accept a message without schema validation.