目录  1. pom依赖 2. websocket接口 3. 自定义输入流 4. 自定义控制 5. 自定义语音流 6. 说明   
 
<dependency><groupId>com.microsoft.cognitiveservices.speech</groupId><artifactId>client-sdk</artifactId><version>1.37.0</version>
</dependency>
package com.learning;import com.microsoft.cognitiveservices.speech.CancellationReason;
import com.microsoft.cognitiveservices.speech.ResultReason;
import com.microsoft.cognitiveservices.speech.SpeechConfig;
import com.microsoft.cognitiveservices.speech.SpeechRecognizer;
import com.microsoft.cognitiveservices.speech.audio.AudioConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;@Slf4j
@Controller
@ServerEndpoint("/microsoft")
public class MicrosoftAsrSocketServer {private VoiceAudioStream voiceAudioStream;Semaphore stopTranslationWithFileSemaphore = new Semaphore(0);@OnOpenpublic void onOpen(Session session) {voiceAudioStream = new VoiceAudioStream();String queryString = session.getQueryString();SpeechConfig speechConfig = SpeechConfig.fromSubscription("你的token", "eastus");speechConfig.setSpeechRecognitionLanguage("zh-CN");AudioConfig audioConfig = AudioConfig.fromStreamInput(this.voiceAudioStream);SpeechRecognizer speechRecognizer = new SpeechRecognizer(speechConfig, audioConfig);speechRecognizer.recognizing.addEventListener((s, e) -> {System.out.println("RECOGNIZING: Text=" + e.getResult().getText());this.returnMessage(session, e.getResult().getText());});speechRecognizer.recognized.addEventListener((s, e) -> {if (e.getResult().getReason() == ResultReason.RecognizedSpeech) {System.out.println("RECOGNIZED: Text=" + e.getResult().getText());}else if (e.getResult().getReason() == ResultReason.NoMatch) {System.out.println("NOMATCH: Speech could not be recognized.");}});speechRecognizer.canceled.addEventListener((s, e) -> {System.out.println("CANCELED: Reason=" + e.getReason());if (e.getReason() == CancellationReason.Error) {System.out.println("CANCELED: ErrorCode=" + e.getErrorCode());System.out.println("CANCELED: ErrorDetails=" + e.getErrorDetails());System.out.println("CANCELED: Did you set the speech resource key and region values?");}stopTranslationWithFileSemaphore.release();});speechRecognizer.sessionStopped.addEventListener((s, e) -> {System.out.println("Session stopped event.");stopTranslationWithFileSemaphore.release();});try {speechRecognizer.startContinuousRecognitionAsync().get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}@OnMessagepublic void onMessage(byte[] bytes) {voiceAudioStream.write(bytes, 0, bytes.length);}@OnClosepublic void onClose(Session session) throws IOException {session.close();voiceAudioStream.close();stopTranslationWithFileSemaphore.release();}@OnErrorpublic void onError(Session session, Throwable error) throws IOException {error.printStackTrace();voiceAudioStream.close();stopTranslationWithFileSemaphore.release();}/*** 回写消息* @param session* @param message*/private void returnMessage(Session session, String message){if (session.isOpen()) {log.info("<=======写出数据:{}",message);try {if(!"".equals(message) && message != null){session.getBasicRemote().sendText(message);}} catch (IOException e) {e.printStackTrace();}}}
}
package com.learning;import lombok.extern.slf4j.Slf4j;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ConcurrentLinkedDeque;@Slf4j
public class EchoStream extends InputStream {private ManualResetEvent dataReady = new ManualResetEvent();private ConcurrentLinkedDeque<byte[]> buffers = new ConcurrentLinkedDeque<>();public Boolean dataAvailable(){return !buffers.isEmpty();}public  void write(byte[] buffer, int offset, int count){// log.info("开始write,EchoStream");buffers.addLast(buffer);if(buffers.size()>1){dataReady.set();}}@Overridepublic int read() throws IOException {return 0;}public byte[] getLBuffer(){if(buffers.size() != 0){return buffers.pollFirst();}return new byte[0];}public  InputStream Read(byte[] buffer, int offset, int count){//log.info("开始read,EchoStream");try {if(buffers.size() == 0){dataReady.waitForever();}} catch (InterruptedException e) {e.printStackTrace();}byte[] lBuffer = buffers.pollFirst();if (lBuffer == null || lBuffer.length == 0){dataReady.reset();}if (!dataAvailable()) {dataReady.reset();}buffer = lBuffer.clone();return new ByteArrayInputStream(buffer);}@Overridepublic void close() throws IOException {super.close();}
}
package com.learning;import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  public class ManualResetEvent {  private final Lock lock = new ReentrantLock();  private final Condition condition = lock.newCondition();  private AtomicBoolean isSet = new AtomicBoolean(false);public void set() {  lock.lock();  try {  isSet.set(true);  // 唤醒所有等待的线程  condition.signalAll();  } finally {  lock.unlock();  }  }  public void reset() {  lock.lock();  try {  isSet.set(false);  // 注意:我们在这里不唤醒任何线程,因为它们是等待信号被设置  } finally {  lock.unlock();  }  }  public void waitForever() throws InterruptedException {  lock.lock();  try {  while (!isSet.get()) {  // 等待信号被设置  condition.await();  }  } finally {  lock.unlock();  }  }  // 也可以提供一个带有超时的等待方法  public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {lock.lock();  try {  if (!isSet.get()) {  return condition.await(timeout, unit);  }  return true;  } finally {  lock.unlock();  }  }  // 注意:这里没有包含TimeUnit的导入,你需要添加它  // import java.util.concurrent.TimeUnit;  // 示例用法  public static void main(String[] args) throws InterruptedException {  ManualResetEvent event = new ManualResetEvent();  // 启动一个线程来模拟生产者设置事件  new Thread(() -> {  try {  Thread.sleep(1000); // 模拟耗时操作  event.set(); // 设置事件,允许等待的线程继续执行  Thread.sleep(1000); // 再次模拟耗时操作  event.reset(); // 重置事件,使等待的线程再次等待  } catch (InterruptedException e) {  e.printStackTrace();  }  }).start();  // 主线程等待事件被设置// 等待事件被设置event.waitForever();// 在这里可以添加更多的逻辑来处理事件被重置的情况  // 例如,通过循环等待事件再次被设置  // 注意:在实际应用中,你可能需要处理更多的线程和更复杂的逻辑}public void close() {try {isSet.set(false);// 唤醒所有等待的线程condition.signalAll();} finally {lock.unlock();}}
}
package com.xiaoi.xics.avatar.api.interfaces;import com.microsoft.cognitiveservices.speech.audio.PullAudioInputStreamCallback;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;/*** 自定义语音流*/
@Slf4j
public class VoiceAudioStream extends PullAudioInputStreamCallback {private EchoStream dataStream = new EchoStream();private ManualResetEvent waitForEmptyDataStream = null;private InputStream stream;/*** 服务从PullAudioInputStream中读取数据, 读到0个字节并不会关闭流*/@Overridepublic int read(byte[] dataBuffer){long  ret = 0;// 用户主动close时可以关闭流if (waitForEmptyDataStream != null && !dataStream.dataAvailable()){waitForEmptyDataStream.set();return 0;}try {if(this.stream != null){ret = this.stream.read(dataBuffer,0, dataBuffer.length);if((int)ret < 1){this.stream = dataStream.Read(dataBuffer, 0, dataBuffer.length);ret = this.stream.read(dataBuffer,0, dataBuffer.length);}}else{this.stream = dataStream.Read(dataBuffer, 0, dataBuffer.length);ret = this.stream.read(dataBuffer,0, dataBuffer.length);}} catch (IOException e) {e.printStackTrace();}return (int)Math.max(0, ret);}/*** Client向PullAudioInputStream写入数据*/public void write(byte[] buffer, int offset, int count){dataStream.write(buffer, offset, count);}@Overridepublic void close(){if (dataStream.dataAvailable()){// 通过ManualResetEvent强制流的使用者必须调用close来手动关闭流waitForEmptyDataStream = new ManualResetEvent();try {waitForEmptyDataStream.waitForever();} catch (InterruptedException e) {e.printStackTrace();}}if(waitForEmptyDataStream != null){this.waitForEmptyDataStream.close();}try {this.dataStream.close();this.stream.close();} catch (IOException e) {e.printStackTrace();}}
}
1.学习用途,切勿放入工作当中,警惕线程控制类。 2.springboot集成websocket流程省略,前端连上websocket后打开麦克风传入语音输入流。 3.亲测有效 。自测参考地址:ws://127.0.0.1:8080/microsoft 4.参考C#的实现过程的,点我跳转。