Announcements
The Google Cloud Community will be in read-only from July 16 - July 22 as we migrate to a new platform; refer to this community post for more details.
Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Problem integrating Python asyncio, sound device, Dialogflow streaming_detect_intent

Hi Folks:

I am trying to write a custom Dialogflow integration. I am writing a small program that streams audio to Dialogflow's streaming_detect_intent(). I have modified existing examples. For the audio, I am using sounddevice. 

I have two Python tasks. One runs the audio, the other Dialogflow. The tasks communicate through a shared queue. I can successfully stream audio into a file. I can successfully stream a file into Dialogflow. My code fails when I stream audio into Dialogflow. The immediate culprit is an asyncio.CancelledError(). The trace is

 

 

File "/home/andrew/experiments/messaging/a_recording.py", line 95, in sample_streaming_detect_intent
async for response in stream:
File "/home/andrew/venv/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 102, in _wrapped_aiter
async for response in self._call: # pragma: no branch
File "/home/andrew/venv/lib/python3.11/site-packages/grpc/aio/_call.py", line 327, in _fetch_stream_responses
await self._raise_for_status()
File "/home/andrew/venv/lib/python3.11/site-packages/grpc/aio/_call.py", line 233, in _raise_for_status
raise asyncio.CancelledError()
asyncio.exceptions.CancelledError

 

 

The code fragment is

 

 

async def sample_streaming_detect_intent(
    loop, audio_queue, project_id, session_id, sample_rate
):
    # Create a client

    client = dialogflow.SessionsAsyncClient()

    audio_config = dialogflow.InputAudioConfig(
        audio_encoding=dialogflow.AudioEncoding.AUDIO_ENCODING_LINEAR_16,
        language_code="en",
        sample_rate_hertz=sample_rate,
    )

    async def request_generator(loop, project_id, session_id, audio_config, audio_queue):

        query_input = dialogflow.QueryInput(audio_config=audio_config)

        # Initialize request argument(s)
        yield dialogflow.StreamingDetectIntentRequest(
            session=client.session_path(project_id, session_id), query_input=query_input
        )

        while True:
            chunk = await audio_queue.get()
            if not chunk:
                break
            # The later requests contains audio data.
            yield dialogflow.StreamingDetectIntentRequest(input_audio=chunk)

    # Make the request
    client_task = asyncio.create_task(
        client.streaming_detect_intent(
            requests=request_generator(
                loop, project_id, session_id, audio_config, audio_queue
            )
        )
    )

    try:
        stream = await client_task
    except Exception as e:
        print(f"failed with {e.__cause__}")

    try:
        async for response in stream:
            print(response)
    except Exception as e:
        print(f"failed with {e.__cause__}")

    query_result = response.query_result

    print("=" * 20)
    print("Query text: {}".format(query_result.query_text))
    print(
        "Detected intent: {} (confidence: {})\n".format(
            query_result.intent.display_name, query_result.intent_detection_confidence
        )
    )
    print("Fulfillment text: {}\n".format(query_result.fulfillment_text))

 

 

 

 

 

    audio_queue = asyncio.Queue()

    # to assert that we are using the same event loop
    loop = asyncio.get_event_loop()

    await asyncio.gather(
        record_audio(fp, loop, audio_queue, sample_rate, device, channels),
        sample_streaming_detect_intent(
            loop, audio_queue, project_id, session_id, sample_rate
        ),
    )

 

 

 Any insights would be appreciated!

0 3 1,963
3 REPLIES 3