Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

Problem integrating Python asyncio, sound device, Dialogflow streaming_detect_intent

Hi Folks:

I am trying to write a custom Dialogflow integration. I am writing a small program that streams audio to Dialogflow's streaming_detect_intent(). I have modified existing examples. For the audio, I am using sounddevice. 

I have two Python tasks. One runs the audio, the other Dialogflow. The tasks communicate through a shared queue. I can successfully stream audio into a file. I can successfully stream a file into Dialogflow. My code fails when I stream audio into Dialogflow. The immediate culprit is an asyncio.CancelledError(). The trace is

 

 

File "/home/andrew/experiments/messaging/a_recording.py", line 95, in sample_streaming_detect_intent
async for response in stream:
File "/home/andrew/venv/lib/python3.11/site-packages/google/api_core/grpc_helpers_async.py", line 102, in _wrapped_aiter
async for response in self._call: # pragma: no branch
File "/home/andrew/venv/lib/python3.11/site-packages/grpc/aio/_call.py", line 327, in _fetch_stream_responses
await self._raise_for_status()
File "/home/andrew/venv/lib/python3.11/site-packages/grpc/aio/_call.py", line 233, in _raise_for_status
raise asyncio.CancelledError()
asyncio.exceptions.CancelledError

 

 

The code fragment is

 

 

async def sample_streaming_detect_intent(
    loop, audio_queue, project_id, session_id, sample_rate
):
    # Create a client

    client = dialogflow.SessionsAsyncClient()

    audio_config = dialogflow.InputAudioConfig(
        audio_encoding=dialogflow.AudioEncoding.AUDIO_ENCODING_LINEAR_16,
        language_code="en",
        sample_rate_hertz=sample_rate,
    )

    async def request_generator(loop, project_id, session_id, audio_config, audio_queue):

        query_input = dialogflow.QueryInput(audio_config=audio_config)

        # Initialize request argument(s)
        yield dialogflow.StreamingDetectIntentRequest(
            session=client.session_path(project_id, session_id), query_input=query_input
        )

        while True:
            chunk = await audio_queue.get()
            if not chunk:
                break
            # The later requests contains audio data.
            yield dialogflow.StreamingDetectIntentRequest(input_audio=chunk)

    # Make the request
    client_task = asyncio.create_task(
        client.streaming_detect_intent(
            requests=request_generator(
                loop, project_id, session_id, audio_config, audio_queue
            )
        )
    )

    try:
        stream = await client_task
    except Exception as e:
        print(f"failed with {e.__cause__}")

    try:
        async for response in stream:
            print(response)
    except Exception as e:
        print(f"failed with {e.__cause__}")

    query_result = response.query_result

    print("=" * 20)
    print("Query text: {}".format(query_result.query_text))
    print(
        "Detected intent: {} (confidence: {})\n".format(
            query_result.intent.display_name, query_result.intent_detection_confidence
        )
    )
    print("Fulfillment text: {}\n".format(query_result.fulfillment_text))

 

 

 

 

 

    audio_queue = asyncio.Queue()

    # to assert that we are using the same event loop
    loop = asyncio.get_event_loop()

    await asyncio.gather(
        record_audio(fp, loop, audio_queue, sample_rate, device, channels),
        sample_streaming_detect_intent(
            loop, audio_queue, project_id, session_id, sample_rate
        ),
    )

 

 

 Any insights would be appreciated!

0 3 1,913
3 REPLIES 3

Per the official docs, exception asyncio.CancelledError happens when the operation has been cancelled. You may want to check the audio stream and make sure it's not closing prematurely before the Dialogflow request can complete. Can you share the original sample code that you modified, and/or any guide/documentation you might have followed?

Hi @Joevanie 

Thanks for the suggestions! The audio stream ought to close with the Ctrl C. However I get the CancelledError() almost immediately. I guess I could check this. I'll post the original code. If you need the project_id, I could send that to you separately. You ought to be able to play with the different examples. Again, thanks for the help!

P.S - my original code did not have the single_utterance = True

Andrew

 

 

 

#!/usr/bin/env python3
"""Create a recording with arbitrary duration.
The soundfile module (https://python-soundfile.readthedocs.io/)
has to be installed!
"""

import pdb
import uuid
import asyncio
import argparse
import tempfile
import queue
import sys
import contextlib
import traceback
import aiofiles

import sounddevice as sd
import soundfile as sf
import numpy  # Make sure NumPy is loaded before it is used in the callback

assert numpy  # avoid "imported but unused" message (W0611)

from google.cloud import dialogflow_v2beta1 as dialogflow


async def write_file(fp, audio_queue):
    while True:
        chunk = await audio_queue.get()
        fp.write(chunk)


async def read_file(file_name, input_queue):
    with open(file_name, "rb") as fp:
        while True:
            chunk = fp.read(4096)
            print(f"writing {len(chunk)}")
            input_queue.put_nowait(chunk)
            if not chunk:
                break

async def aio_read_file(file_name, input_queue):
    async with aiofiles.open(file_name, "rb") as fp:
        while True:
            chunk = await fp.read(4096)
            print(f"writing {len(chunk)}")
            input_queue.put_nowait(chunk)
            if not chunk:
                break

async def sample_streaming_detect_intent(
    loop, audio_queue, project_id, session_id, sample_rate
):
    # Create a client

    client = dialogflow.SessionsAsyncClient()

    audio_config = dialogflow.InputAudioConfig(
        audio_encoding=dialogflow.AudioEncoding.AUDIO_ENCODING_LINEAR_16,
        language_code="en",
        sample_rate_hertz=sample_rate,
        single_utterance=True
    )

    async def request_generator(loop, project_id, session_id, audio_config, audio_queue):

        query_input = dialogflow.QueryInput(audio_config=audio_config)

        # Initialize request argument(s)
        yield dialogflow.StreamingDetectIntentRequest(
            session=client.session_path(project_id, session_id), query_input=query_input
        )

        while True:
            chunk = await audio_queue.get()
            if not chunk:
                break
            # The later requests contains audio data.
            yield dialogflow.StreamingDetectIntentRequest(input_audio=chunk)

    # Make the request
    client_task = asyncio.create_task(
        client.streaming_detect_intent(
            requests=request_generator(
                loop, project_id, session_id, audio_config, audio_queue
            )
        )
    )

    try:
        stream = await client_task
    except Exception as e:
        print(f"failed with {e.__cause__}")

    try:
        async for response in stream:
            print(response)
    except Exception as e:
        print(f"failed with {e.__cause__}")

    query_result = response.query_result

    print("=" * 20)
    print("Query text: {}".format(query_result.query_text))
    print(
        "Detected intent: {} (confidence: {})\n".format(
            query_result.intent.display_name, query_result.intent_detection_confidence
        )
    )
    print("Fulfillment text: {}\n".format(query_result.fulfillment_text))


async def record_audio(fp, loop, audio_queue, sample_rate, device, channels):

    q = asyncio.Queue()

    def callback(indata, frames, time, status):
        loop.call_soon_threadsafe(q.put_nowait, indata.copy())
        
    try:
        with sd.InputStream(
            samplerate=sample_rate, device=device, channels=channels, callback=callback
        ):
            print("#" * 80)
            print("press Ctrl+C to stop the recording")
            print("#" * 80)
            while True:
                chunk = await q.get()
                audio_queue.put_nowait(chunk)

    except KeyboardInterrupt:
        print("\nRecording finished")
        fp.close()
    except asyncio.CancelledError as e:
        print("caught a cancelled Error ****")
        traceback.print_exc(limit=None, file=None, chain=True)
    except Exception as e:
        print(e.__cause__)
        print(type(e).__name__ + ": " + str(e))


async def test_one():

    device = None
    channels = 1
    project_id = ""
    session_id = str(uuid.uuid4())
    mode = "x"
    subtype = None

    audio_queue = asyncio.Queue()

    device_info = sd.query_devices(device, "input")
    sample_rate = int(device_info["default_samplerate"])

    filename = tempfile.mktemp(prefix="delme_rec_unlimited_", suffix=".wav", dir="")

    fp = sf.SoundFile(filename, mode, sample_rate, channels, subtype)

    loop = asyncio.get_event_loop()
    await asyncio.gather(
        record_audio(fp, loop, audio_queue, sample_rate, device, channels),
        write_file(fp, audio_queue),
    )


async def test_two():

    sample_rate = 24000
    device = None
    channels = 1
    project_id = ""
    session_id = str(uuid.uuid4())
    mode = "x"
    subtype = None

    audio_queue = asyncio.Queue()

    device_info = sd.query_devices(device, "input")
    samplerate = int(device_info["default_samplerate"])

    loop = asyncio.get_event_loop()
    await asyncio.gather(
        read_file("request.wav", audio_queue),
        sample_streaming_detect_intent(
            loop, audio_queue, project_id, session_id, sample_rate
        ),
    )


async def test_three():
    """
    stream from mic into Dialogflow
    """

    device = None
    channels = 1
    project_id = ""
    session_id = str(uuid.uuid4())
    mode = "x"
    subtype = None

    audio_queue = asyncio.Queue()

    device_info = sd.query_devices(device, "input")
    sample_rate = int(device_info["default_samplerate"])

    # let's do this just for consistency
    filename = tempfile.mktemp(prefix="delme_rec_unlimited_", suffix=".wav", dir="")

    fp = sf.SoundFile(filename, mode, sample_rate, channels, subtype)

    loop = asyncio.get_event_loop()
    await asyncio.gather(
        record_audio(fp, loop, audio_queue, sample_rate, device, channels),
        sample_streaming_detect_intent(
            loop, audio_queue, project_id, session_id, sample_rate
        ),
    )



async def test_five():

    sample_rate = 24000
    device = None
    channels = 1
    project_id = ""
    session_id = str(uuid.uuid4())
    mode = "x"
    subtype = None

    audio_queue = asyncio.Queue()

    device_info = sd.query_devices(device, "input")
    samplerate = int(device_info["default_samplerate"])

    loop = asyncio.get_event_loop()

    await asyncio.gather(
        aio_read_file("request.wav", audio_queue),
        sample_streaming_detect_intent(
            loop, audio_queue, project_id, session_id, sample_rate
        ),
    )

if __name__ == "__main__":
    asyncio.run(test_three(), debug=True)

 

 

 

@JoevanieI have made more headway. Replacing tasks with threads, I see the problem. InputStream works with ndarrays. Dialogflow works with byte streams. The CancelError exception thrown from the generator (stream), does not seem to include this important piece of information. I may be mistaken but this suggestions, the exception handling in the asyncio version could be improved.

That said, I am writing information to Dialogflow (they tend to be small amounts) but I am getting nothing. What should I be looking for?