Soft Handover in Infinite streaming

This feature suggestion is inspired from "Soft Handover" that happens in mobile towers. For details: https://en.wikipedia.org/wiki/Handover#Types

Basically, with a 5 min streaming limit for Google Speech-To-Text (STT), there is an example provided in the `java-docs-samples` repo (and python and JS too) which continues to stream audio - infinitely -by opening a new stream and resending the past N audio packets. This is calculated from last final transcript's end time.

This uses what is referred to as "break-and-make" approach. This has some problems:

  1. When we can't establish a new stream (for whatever reason), there is unavailability.
  2. It's highly coupled with word timings which may not be supported by all ASRs. I understand that's not very relevant to Google but, to an application that uses Google along with some other ASRs (For ex, specifically trained on a target corpus), this is becomes a bottleneck for extensibility.
  3. Calculation of audio packets to be sent is dependent on encoding and sampling. While this is acceptable as a specification, in some cases it might be a severe limitation.

Alternative:

I tried to work with a "Soft handover" method ("make-and-break") where we open another stream early (Say, 1 min before the limit) and send audio to both streams until the transcripts "align". When they do, we switch to new stream.

While this has a few unique aspects to take care of, I believe this gives better control to the application and is not tied to word timings.

I wanted to get feedback on the same. Attaching the code file (in java) here. For testing purposes, I have kept the STREAMING_LIMIT as 30 seconds.

I am aware of some areas where I can sharpen this more (Ex: alignment algorithm), but I am looking for concrete and major concerns from experts or authors.

High level overview of code

  • This class demonstrates how to perform infinite streaming speech recognition using theStreamingRecognize functionality of the Speech API. This class is almost identical to the InfiniteStreamRecognize.java class, except that it demonstrates how to perform "soft handover" between two streams.
  • A "soft handover" is making a new stream before breaking the old stream. As against a "hard handover" where you break the old stream before making the new stream. This is useful in situations where you want to perform speech recognition on a continuous audio input, but need to periodically restart the stream to avoid exceeding the maximum allowed continuous streaming duration. For demonstration purposes only, this sample uses a reset duration of 30 seconds whereas the actual allowed duration is 5 minutes per Google documentation for Streaming API.
  • This class uses two streams, STREAM1 and STREAM2, and alternates between them. When one stream is active, the other stream is used to buffer audio input. When the active stream is stopped, the buffered audio input is used to create a new stream.
  • This class also demonstrates how to align the transcript of the previous stream with the transcript of the current stream using a very simplistic algorithm.

 

/*
 * Copyright 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.example.speech;

// [START speech_transcribe_infinite_streaming_soft_handover]

import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.speech.v1p1beta1.RecognitionConfig;
import com.google.cloud.speech.v1p1beta1.SpeechClient;
import com.google.cloud.speech.v1p1beta1.SpeechRecognitionAlternative;
import com.google.cloud.speech.v1p1beta1.StreamingRecognitionConfig;
import com.google.cloud.speech.v1p1beta1.StreamingRecognitionResult;
import com.google.cloud.speech.v1p1beta1.StreamingRecognizeRequest;
import com.google.cloud.speech.v1p1beta1.StreamingRecognizeResponse;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.AudioSystem;
import javax.sound.sampled.DataLine;
import javax.sound.sampled.DataLine.Info;
import javax.sound.sampled.TargetDataLine;

/*
 * This class demonstrates how to perform infinite streaming speech recognition using the
 * StreamingRecognize functionality of the Speech API.
 * This class is almost identical to the InfiniteStreamRecognize.java class, except that it
 * demonstrates how to perform "soft handover" between two streams.
 * A "soft handover" is making a new stream before breaking the old stream. As against a "hard
 * handover" where you break the old stream before making the new stream.
 * This is useful in situations where you want to perform speech recognition on a continuous audio
 * input, but need to periodically restart the stream to avoid exceeding the maximum allowed
 * continuous streaming duration of 30 seconds.
 * This class uses two streams, STREAM1 and STREAM2, and alternates between them.
 * When one stream is active, the other stream is used to buffer audio input.
 * When the active stream is stopped, the buffered audio input is used to create a new stream.
 * This class also demonstrates how to align the transcript of the previous stream with the
 * transcript of the current stream. This is useful in situations where you want to perform
 * continuous speech recognition on a continuous audio input, but need to restart the stream
 */

public class InfiniteStreamRecognizeSoftHandOver {

  private static final int STREAMING_LIMIT = 30000; // resetting 30 seconds to test

  enum Stream {
    STREAM1,
    STREAM2;
  }

  public static final String RED = "\033[0;31m";
  public static final String GREEN = "\033[0;32m";
  public static final String YELLOW = "\033[0;33m";

  // Creating shared object
  private static volatile BlockingQueue<byte[]> sharedQueue = new LinkedBlockingQueue<byte[]>();
  private static TargetDataLine targetDataLine;
  private static int BYTES_PER_BUFFER = 6400; // buffer size in bytes

  private static int restartCounter = 0;
  private static ArrayList<ByteString> audioInput = new ArrayList<ByteString>();
  private static int resultEndTimeInMS = 0;
  private static boolean newStream = false;
  private static double bridgingOffset = 0;
  private static volatile Stream activeStream = Stream.STREAM1;
  private static ByteString tempByteString;
  private static Map<String, StreamingRecognitionResult> streamWiseLatestResults =
      new HashMap<String, StreamingRecognitionResult>();

  public static void main(String... args) {
    InfiniteStreamRecognizeOptions options = InfiniteStreamRecognizeOptions.fromFlags(args);
    if (options == null) {
      // Could not parse.
      System.out.println("Failed to parse options.");
      System.exit(1);
    }

    try {
      infiniteStreamingRecognize(options.langCode);
    } catch (Exception e) {
      System.out.println("Exception caught: " + e);
    }
  }

  public static String convertMillisToDate(double milliSeconds) {
    long millis = (long) milliSeconds;
    DecimalFormat format = new DecimalFormat();
    format.setMinimumIntegerDigits(2);
    return String.format(
        "%s:%s /",
        format.format(TimeUnit.MILLISECONDS.toMinutes(millis)),
        format.format(
            TimeUnit.MILLISECONDS.toSeconds(millis)
                - TimeUnit.MINUTES.toSeconds(TimeUnit.MILLISECONDS.toMinutes(millis))));
  }

  private static boolean computeIntersectionRatio(
      String previousTranscript, String currentTranscript) { // Compare the
    // transcripts
    // for
    // alignment
    int commonWordCount = 0;
    String[] previousWords = previousTranscript.split("\\s+");
    String[] currentWords = currentTranscript.split("\\s+");

    // Compute word intersection ratio between previous and current by reverse traversing the arrays
    for (int i = previousWords.length - 1; i >= 0; i--) {
      for (int j = currentWords.length - 1; j >= 0; j--) {
        if (previousWords[i].equalsIgnoreCase(currentWords[j])) {
          commonWordCount++;
          // This is a simplistic alignment algorithm which works ok.
          // However, real world use cases may require more sophisticated algorithms
          if (commonWordCount >= 3) {
            return true;
          }
        }
      }
    }

    return false;
  }

  /** Performs infinite streaming speech recognition */
  public static void infiniteStreamingRecognize(String languageCode) throws Exception {

    // Microphone Input buffering
    class MicBuffer implements Runnable {

      @Override
      public void run() {
        System.out.println(YELLOW);
        System.out.println("Start speaking...Press Ctrl-C to stop");
        targetDataLine.start();
        byte[] data = new byte[BYTES_PER_BUFFER];
        while (targetDataLine.isOpen()) {
          try {
            int numBytesRead = targetDataLine.read(data, 0, data.length);
            if ((numBytesRead <= 0) && (targetDataLine.isOpen())) {
              continue;
            }
            sharedQueue.put(data.clone());
          } catch (InterruptedException e) {
            System.out.println("Microphone input buffering interrupted : " + e.getMessage());
          }
        }
      }
    }

    // Creating microphone input buffer thread
    MicBuffer micrunnable = new MicBuffer();
    Thread micThread = new Thread(micrunnable);
    // one for each stream, just for clarity. Can be optimized.
    ResponseObserver<StreamingRecognizeResponse> responseObserver1 = null;
    ResponseObserver<StreamingRecognizeResponse> responseObserver2 = null;
    try (SpeechClient client = SpeechClient.create()) {
      // Active stream is used to send data.
      ClientStream<StreamingRecognizeRequest> stream1 = null;
      ClientStream<StreamingRecognizeRequest> stream2 = null;

      responseObserver1 =
          new ResponseObserver<StreamingRecognizeResponse>() {
            ArrayList<StreamingRecognizeResponse> responses = new ArrayList<>();

            public void onStart(StreamController controller) {}

            public void onResponse(StreamingRecognizeResponse response) {
              responses.add(response);
              if (response.getResults(0).getIsFinal()) { // we only compare final and finals has
                // only one results
                streamWiseLatestResults.put("stream1", response.getResults(0));
              }
              StreamingRecognitionResult result = response.getResultsList().get(0);
              Duration resultEndTime = result.getResultEndTime();
              resultEndTimeInMS =
                  (int)
                      ((resultEndTime.getSeconds() * 1000) + (resultEndTime.getNanos() / 1000000));
              double correctedTime =
                  resultEndTimeInMS - bridgingOffset + (STREAMING_LIMIT * restartCounter);

              if (activeStream == Stream.STREAM1) {
                SpeechRecognitionAlternative alternative = result.getAlternativesList().get(0);
                if (result.getIsFinal()) {
                  System.out.print(GREEN);
                  System.out.print("\033[2K\r");
                  System.out.printf(
                      "%s: %s [confidence: %.2f] [stream: %s]\n",
                      convertMillisToDate(correctedTime),
                      alternative.getTranscript(),
                      alternative.getConfidence(),
                      "stream1");
                } else {
                  System.out.print(RED);
                  System.out.print("\033[2K\r");
                  System.out.printf(
                      "%s: %s", convertMillisToDate(correctedTime), alternative.getTranscript());
                }
              }
            }

            public void onComplete() {}

            @Override
            public void onError(Throwable t) {}
          };
      stream1 = client.streamingRecognizeCallable().splitCall(responseObserver1);

      responseObserver2 =
          new ResponseObserver<StreamingRecognizeResponse>() {
            ArrayList<StreamingRecognizeResponse> responses = new ArrayList<>();

            public void onStart(StreamController controller) {}

            public void onResponse(StreamingRecognizeResponse response) {
              responses.add(response);
              if (response.getResults(0).getIsFinal()) { // we only compare final and finals has
                // only one results
                streamWiseLatestResults.put("stream2", response.getResults(0));
              }
              StreamingRecognitionResult result = response.getResultsList().get(0);
              Duration resultEndTime = result.getResultEndTime();
              resultEndTimeInMS =
                  (int)
                      ((resultEndTime.getSeconds() * 1000) + (resultEndTime.getNanos() / 1000000));
              double correctedTime =
                  resultEndTimeInMS - bridgingOffset + (STREAMING_LIMIT * restartCounter);

              if (activeStream == Stream.STREAM2) {
                SpeechRecognitionAlternative alternative = result.getAlternativesList().get(0);
                if (result.getIsFinal()) {
                  System.out.print(GREEN);
                  System.out.print("\033[2K\r");
                  System.out.printf(
                      "%s: %s [confidence: %.2f] [stream: %s]\n",
                      convertMillisToDate(correctedTime),
                      alternative.getTranscript(),
                      alternative.getConfidence(),
                      "stream2");

                } else {
                  System.out.print(RED);
                  System.out.print("\033[2K\r");
                  System.out.printf(
                      "%s: %s", convertMillisToDate(correctedTime), alternative.getTranscript());
                }
              }
            }

            public void onComplete() {}

            public void onError(Throwable t) {}
          };

      RecognitionConfig recognitionConfig =
          RecognitionConfig.newBuilder()
              .setEncoding(RecognitionConfig.AudioEncoding.LINEAR16)
              .setLanguageCode(languageCode)
              .setSampleRateHertz(16000)
              .build();

      StreamingRecognitionConfig streamingRecognitionConfig =
          StreamingRecognitionConfig.newBuilder()
              .setConfig(recognitionConfig)
              .setInterimResults(true)
              .build();

      StreamingRecognizeRequest request =
          StreamingRecognizeRequest.newBuilder()
              .setStreamingConfig(streamingRecognitionConfig)
              .build(); // The first request in a streaming call has to be a config

      stream1.send(request);

      try {
        // SampleRate:16000Hz, SampleSizeInBits: 16, Number of channels: 1, Signed:
        // true,
        // bigEndian: false
        AudioFormat audioFormat = new AudioFormat(16000, 16, 1, true, false);
        DataLine.Info targetInfo =
            new Info(
                TargetDataLine.class,
                audioFormat); // Set the system information to read from the microphone audio
        // stream

        if (!AudioSystem.isLineSupported(targetInfo)) {
          System.out.println("Microphone not supported");
          System.exit(0);
        }
        // Target data line captures the audio stream the microphone produces.
        targetDataLine = (TargetDataLine) AudioSystem.getLine(targetInfo);
        targetDataLine.open(audioFormat);
        micThread.start();

        long startTime = System.currentTimeMillis();
        AtomicBoolean isAligned = new AtomicBoolean(false);

        while (true) {

          long estimatedTime = System.currentTimeMillis() - startTime;

          if (estimatedTime >= STREAMING_LIMIT) {
            // reset started, bring passive stream to life!
            restartCounter++;
            newStream = true;

            request =
                StreamingRecognizeRequest.newBuilder()
                    .setStreamingConfig(streamingRecognitionConfig)
                    .build();

            if (activeStream
                == Stream.STREAM2) { // meaning stream2 is active, so prepare stream 1 for reset
              stream1 = client.streamingRecognizeCallable().splitCall(responseObserver1);
              while (!stream1.isSendReady()) {
                Thread.sleep(5);
              }
              stream1.send(request);
            } else { // meaning stream1 is active, so prepare stream 2 for reset
              stream2 = client.streamingRecognizeCallable().splitCall(responseObserver2);
              while (!stream2.isSendReady()) {
                Thread.sleep(5);
              }
              stream2.send(request);
            }

            System.out.println(YELLOW);
            System.out.printf("%d: SOFT RESTARTING REQUEST\n", restartCounter * STREAMING_LIMIT);

            // reset startTime
            startTime = System.currentTimeMillis();

          } else {
            if (newStream) {
              // we're here, that means the second stream has been opened now

              if (!isAligned.get()) {
                // now we're comparing for aligment
                // Send audio to two streams to check for alignment:
                request =
                    StreamingRecognizeRequest.newBuilder().setAudioContent(tempByteString).build();

                stream1.send(request);
                stream2.send(request);

                if (streamWiseLatestResults.size() > 0) {
                  // we have results to compare!
                  StreamingRecognitionResult result1 = streamWiseLatestResults.get("stream1");
                  StreamingRecognitionResult result2 = streamWiseLatestResults.get("stream2");

                  if (result1 != null && result2 != null) {
                    isAligned.set(
                        computeIntersectionRatio(
                            result1.getAlternatives(0).getTranscript(),
                            result2.getAlternatives(0).getTranscript()));
                  }
                }
              } else {
                newStream = false;
                isAligned.set(false);
                // toggle streams
                if (activeStream == Stream.STREAM1) {
                  activeStream = Stream.STREAM2;
                  stream1.closeSend();
                  stream1 = null;
                } else {
                  activeStream = Stream.STREAM1;
                  stream2.closeSend();
                  stream2 = null;
                }
              }
            }
            tempByteString = ByteString.copyFrom(sharedQueue.take());

            request =
                StreamingRecognizeRequest.newBuilder().setAudioContent(tempByteString).build();

            audioInput.add(tempByteString);

            if (activeStream == Stream.STREAM1) {
              stream1.send(request);
            } else {
              stream2.send(request);
            }
          }
        }
      } catch (Exception e) {
        System.out.println(e);
      }
    }
  }
}
// [END speech_transcribe_infinite_streaming_soft_handover]

 

Thanks in advance!

2 3 837
3 REPLIES 3

Thank you for making this post @bburli . 

Hi @bburli , this is exactly what I need for my project, but it's in python. Do you have a python version of this code?

Unfortunately, no. I am not a python expert either but I imagine it should not be hard to convert this especially when you have access to co-pilots.

@saltlas Would you kindly accept the solution if it fits your need? I would also appreciate a like 🙂