Get hands-on experience with 20+ free Google Cloud products and $300 in free credit for new customers.

stream.Recv() gets stuck in Media Translation API even after sending audio buffer

jsaini
New Member

Below is a code snippet, I am using to send audio buffer to media translation api. The audio codec is ogg_opus with sample rate 48000 hz. But the server is not responding with anything. The python example in official documentation of Media Translation API with linear16 codec works fine. I replaced Media Translation API with Cloud Speech To Text API, that is working fine too. Read the documentation too, not sure what I am doing wrong.

 
import (
"context"
"errors"
"io"
"strings"
"sync"

mediatranslation "cloud.google.com/go/mediatranslation/apiv1beta1"
"cloud.google.com/go/mediatranslation/apiv1beta1/mediatranslationpb"
"github.com/livekit/protocol/logger"
"github.com/pion/rtp"
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media/oggwriter"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type MediaTranslator struct {
ctx context.Context
cancel context.CancelFunc

speechClient *mediatranslation.SpeechTranslationClient
language *Language

rtpCodec webrtc.RTPCodecParameters
 
lock sync.Mutex
oggWriter *io.PipeWriter
oggReader *io.PipeReader
oggSerializer *oggwriter.OggWriter

results chan RecognizeResult
closeCh chan struct{}
}

type RecognizeResult struct {
Error error
Text string
IsFinal bool
}

func NewMediaTranslator(rtpCodec webrtc.RTPCodecParameters, speechClient *mediatranslation.SpeechTranslationClient, language *Language) (*MediaTranslator, error) {
if !strings.EqualFold(rtpCodec.MimeType, "audio/opus") {
return nil, errors.New("only opus is supported")
}

oggReader, oggWriter := io.Pipe()
ctx, cancel := context.WithCancel(context.Background())
t := &MediaTranslator{
ctx: ctx,
cancel: cancel,
rtpCodec: rtpCodec,
//sb: samplebuilder.New(200, &codecs.OpusPacket{}, rtpCodec.ClockRate),
oggReader: oggReader,
oggWriter: oggWriter,
language: language,
speechClient: speechClient,
results: make(chan RecognizeResult),
closeCh: make(chan struct{}),
}
go t.start()
return t, nil
}

func (t *MediaTranslator) Language() *Language {
return t.language
}

func (t *MediaTranslator) WriteRTP(pkt *rtp.Packet) error {
t.lock.Lock()
defer t.lock.Unlock()

if t.oggSerializer == nil {
oggSerializer, err := oggwriter.NewWith(t.oggWriter, t.rtpCodec.ClockRate, t.rtpCodec.Channels)
if err != nil {
logger.Errorw("failed to create ogg serializer", err)
return err
}
t.oggSerializer = oggSerializer
}

 
if err := t.oggSerializer.WriteRTP(pkt); err != nil {
return err
}
 

return nil
}

func (t *MediaTranslator) start() error {
defer func() {
close(t.closeCh)
}()

for {
stream, err := t.newStream()
if err != nil {
if status, ok := status.FromError(err); ok && status.Code() == codes.Canceled {
return nil
}

logger.Errorw("failed to create a new speech stream", err)
t.results <- RecognizeResult{
Error: err,
}
return err
}

endStreamCh := make(chan struct{})
nextCh := make(chan struct{})

// Forward oggreader to the speech stream
go func() {
defer close(nextCh)
buf := make([]byte, 1024)
for {
select {
case <-endStreamCh:
return
default:
n, err := t.oggReader.Read(buf)
if err != nil {
if err != io.EOF {
logger.Errorw("failed to read from ogg reader", err)
}
return
}

if n <= 0 {
continue // No data
}

if err := stream.Send(&mediatranslationpb.StreamingTranslateSpeechRequest{
StreamingRequest: &mediatranslationpb.StreamingTranslateSpeechRequest_AudioContent{
AudioContent: buf[:n],
},
}); err != nil {
if err != io.EOF {
logger.Errorw("failed to forward audio data to speech stream", err)
t.results <- RecognizeResult{
Error: err,
}
}
logger.Infow("Going to return from stream send..")
return
}
}
}

}()

// Read translation results
for {
resp, err := stream.Recv()
logger.Infow("Stream recv", "resp", resp, "error", err)
if err != nil {
if status, ok := status.FromError(err); ok {
if status.Code() == codes.OutOfRange {
logger.Infow("Media translation failed due to", "StatusCode", status.Code())
break // Create a new speech stream (maximum speech length exceeded)
} else if status.Code() == codes.Canceled {
logger.Infow("Media translation failed due to", "StatusCode", status.Code())
return nil // Context canceled (Stop)
}
}

logger.Errorw("failed to receive response from speech stream", err)
t.results <- RecognizeResult{
Error: err,
}

return err
}

if resp.Error != nil {
break
}

result := resp.GetResult().GetTextTranslationResult()
logger.Infow("Getting result from media translation api", "result", result)
if result.GetIsFinal() {
t.results <- RecognizeResult{
Text: result.GetTranslation(),
IsFinal: result.GetIsFinal(),
}
logger.Infow("Translated text: ", result.GetTranslation())
}
}

close(endStreamCh)

<-nextCh

t.lock.Lock()
t.oggSerializer = nil
t.lock.Unlock()
}
}

func (t *MediaTranslator) Close() {
t.cancel()
t.oggReader.Close()
t.oggWriter.Close()
<-t.closeCh
close(t.results)
}

func (t *MediaTranslator) Results() <-chan RecognizeResult {
return t.results
}

func (t *MediaTranslator) newStream() (mediatranslationpb.SpeechTranslationService_StreamingTranslateSpeechClient, error) {
stream, err := t.speechClient.StreamingTranslateSpeech(t.ctx)
if err != nil {
logger.Infow("Error creating streaming translate speech")
return nil, err
}

audioConfig := &mediatranslationpb.TranslateSpeechConfig{
SourceLanguageCode: t.language.Code,
TargetLanguageCode: t.language.TranslateToCode,
SampleRateHertz: int32(t.rtpCodec.ClockRate),
AudioEncoding: "ogg_opus",
Model: "google-provided-model/video",
}

if err := stream.Send(&mediatranslationpb.StreamingTranslateSpeechRequest{
StreamingRequest: &mediatranslationpb.StreamingTranslateSpeechRequest_StreamingConfig{
StreamingConfig: &mediatranslationpb.StreamingTranslateSpeechConfig{
SingleUtterance: true,
AudioConfig: audioConfig,
},
},
}); err != nil {
logger.Infow("Error sending mt config request")
return nil, err
}
return stream, nil
}
0 0 274
0 REPLIES 0