Hi Folks:
I am trying to write a custom Dialogflow integration. I am writing a small program that streams audio to Dialogflow's streaming_detect_intent(). I have modified existing examples. For the audio, I am using sounddevice.
I have two Python tasks. One runs the audio, the other Dialogflow. The tasks communicate through a shared queue. I can successfully stream audio into a file. I can successfully stream a file into Dialogflow. My code fails when I stream audio into Dialogflow. The immediate culprit is an asyncio.CancelledError(). The trace is
File "/home/andrew/experiments/messaging/a_recording.py", line 95, in sample_streaming_detect_intent
async for response in stream:
File "/home/andrew/venv/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 102, in _wrapped_aiter
async for response in self._call: # pragma: no branch
File "/home/andrew/venv/lib/python3.11/site-packages/grpc/aio/_call.py", line 327, in _fetch_stream_responses
await self._raise_for_status()
File "/home/andrew/venv/lib/python3.11/site-packages/grpc/aio/_call.py", line 233, in _raise_for_status
raise asyncio.CancelledError()
asyncio.exceptions.CancelledError
The code fragment is
async def sample_streaming_detect_intent(
loop, audio_queue, project_id, session_id, sample_rate
):
# Create a client
client = dialogflow.SessionsAsyncClient()
audio_config = dialogflow.InputAudioConfig(
audio_encoding=dialogflow.AudioEncoding.AUDIO_ENCODING_LINEAR_16,
language_code="en",
sample_rate_hertz=sample_rate,
)
async def request_generator(loop, project_id, session_id, audio_config, audio_queue):
query_input = dialogflow.QueryInput(audio_config=audio_config)
# Initialize request argument(s)
yield dialogflow.StreamingDetectIntentRequest(
session=client.session_path(project_id, session_id), query_input=query_input
)
while True:
chunk = await audio_queue.get()
if not chunk:
break
# The later requests contains audio data.
yield dialogflow.StreamingDetectIntentRequest(input_audio=chunk)
# Make the request
client_task = asyncio.create_task(
client.streaming_detect_intent(
requests=request_generator(
loop, project_id, session_id, audio_config, audio_queue
)
)
)
try:
stream = await client_task
except Exception as e:
print(f"failed with {e.__cause__}")
try:
async for response in stream:
print(response)
except Exception as e:
print(f"failed with {e.__cause__}")
query_result = response.query_result
print("=" * 20)
print("Query text: {}".format(query_result.query_text))
print(
"Detected intent: {} (confidence: {})\n".format(
query_result.intent.display_name, query_result.intent_detection_confidence
)
)
print("Fulfillment text: {}\n".format(query_result.fulfillment_text))
audio_queue = asyncio.Queue()
# to assert that we are using the same event loop
loop = asyncio.get_event_loop()
await asyncio.gather(
record_audio(fp, loop, audio_queue, sample_rate, device, channels),
sample_streaming_detect_intent(
loop, audio_queue, project_id, session_id, sample_rate
),
)
Any insights would be appreciated!