实战whisper第二天:直播语音转字幕(全部代码和详细部署步骤)

实战whisper第二天:直播语音转字幕(全部代码和详细部署步骤)

    正在检查是否收录...

直播语音实时转字幕:

基于Whisper的实时直播语音转录或翻译是一项使用OpenAI的Whisper模型实现的技术,它能够实时将直播中的语音内容转录成文本,甚至翻译成另一种语言。这一过程大致分为三个步骤:捕获直播音频流、语音识别(转录)以及翻译(如果需要)。下面详细解释其原理和意义。

原理

捕获直播音频流: 首先,需要从直播源捕获音频流。这通常通过软件工具实现,如ffmpegstreamlink,它们可以接入直播平台(如Twitch、YouTube等)的直播流,并提取音频数据。

语音识别(转录): 捕获到的音频流被送入Whisper模型进行语音识别。Whisper是OpenAI开发的一款强大的语音识别模型,它能够准确地将语音转换成文本。该模型训练于多种语言的大量数据集上,因此具有高度的准确性和多语言识别能力。

翻译(可选): 如果需要将转录的文本翻译成另一种语言,可以进一步使用机器翻译模型(如OpenAI的GPT、Google Translate等)对转录文本进行翻译。

意义

提高可及性: 通过实时转录直播语音,听障人士和不懂直播原语言的观众也能够理解内容,大大提高了直播内容的可及性。

内容归档与搜索: 转录生成的文本可以作为直播内容的归档,便于未来搜索和回顾。相比视频数据,文本更容易被搜索引擎索引,从而提高内容的发现性。

多语言翻译: 实时翻译可以让不同语言的观众理解和享受直播内容,促进跨语言、跨文化的交流。

学习和教育: 对于教育直播,实时转录和翻译能够帮助学生更好地理解教学内容,尤其是对于非母语学习者。

内容审核: 转录文本还可以用于自动内容审核,帮助直播平台监控和管理不适宜的内容。 

一、部署 

下载stream-translator

GitHub - fortypercnt/stream-translator

实战whisper语音识别第一天,部署服务器,可远程访问,实时语音转文字(全部代码和详细部署步骤)-CSDN博客

如果在之前的文章,实战whisper语音识别第一天,部署服务器,配置过环境,可跳过下面安装。

git clone https://github.com/fortypercnt/stream-translator.git pip install -r requirements.txt 

模型下载: 

large-v3模型:https://huggingface.co/Systran/faster-whisper-large-v3/tree/main large-v2模型:https://huggingface.co/guillaumekln/faster-whisper-large-v2/tree/main large-v2模型:https://huggingface.co/guillaumekln/faster-whisper-large-v1/tree/main medium模型:https://huggingface.co/guillaumekln/faster-whisper-medium/tree/main small模型:https://huggingface.co/guillaumekln/faster-whisper-small/tree/main base模型:https://huggingface.co/guillaumekln/faster-whisper-base/tree/main tiny模型:https://huggingface.co/guillaumekln/faster-whisper-tiny/tree/main

经测试large-v3模型需要10G显存以上。显存不够的可以用小模型。

使用方法:

python translator.py 直播链接

这个translator.py是进行实时翻译,不想翻译可运行下面代码

二、代码

translator1.py:

import argparse import sys import signal from datetime import datetime import ffmpeg import numpy as np import whisper from whisper.audio import SAMPLE_RATE class RingBuffer: def __init__(self, size): self.size = size self.data = [] self.full = False self.cur = 0 def append(self, x): if self.size <= 0: return if self.full: self.data[self.cur] = x self.cur = (self.cur + 1) % self.size else: self.data.append(x) if len(self.data) == self.size: self.full = True def get_all(self): all_data = [] for i in range(len(self.data)): idx = (i + self.cur) % self.size all_data.append(self.data[idx]) return all_data def clear(self): self.data = [] self.full = False self.cur = 0 def open_stream(stream, direct_url, preferred_quality): if direct_url: try: process = ( ffmpeg.input(stream, loglevel="panic") .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE) .run_async(pipe_stdout=True) ) except ffmpeg.Error as e: raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e return process, None import streamlink import subprocess import threading stream_options = streamlink.streams(stream) if not stream_options: print("No playable streams found on this URL:", stream) sys.exit(0) option = None for quality in [preferred_quality, 'audio_only', 'audio_mp4a', 'audio_opus', 'best']: if quality in stream_options: option = quality break if option is None: # Fallback option = next(iter(stream_options.values())) def writer(streamlink_proc, ffmpeg_proc): while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()): try: chunk = streamlink_proc.stdout.read(1024) ffmpeg_proc.stdin.write(chunk) except (BrokenPipeError, OSError): pass cmd = ['streamlink', stream, option, "-O"] streamlink_process = subprocess.Popen(cmd, stdout=subprocess.PIPE) try: ffmpeg_process = ( ffmpeg.input("pipe:", loglevel="panic") .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE) .run_async(pipe_stdin=True, pipe_stdout=True) ) except ffmpeg.Error as e: raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e thread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process)) thread.start() return ffmpeg_process, streamlink_process def main(url, model="large-v3", interval=5, preferred_quality="audio_only", direct_url=False, **decode_options): print("Loading model...") model = whisper.load_model(model) print("Opening stream...") ffmpeg_process, _ = open_stream(url, direct_url, preferred_quality) def handler(signum, frame): ffmpeg_process.kill() sys.exit(0) signal.signal(signal.SIGINT, handler) n_bytes = interval * SAMPLE_RATE * 2 # Factor 2 comes from reading the int16 stream as bytes audio_buffer = RingBuffer(1) # No need for a history buffer since we're just doing real-time transcription try: while True: in_bytes = ffmpeg_process.stdout.read(n_bytes) if not in_bytes: break audio = np.frombuffer(in_bytes, np.int16).flatten().astype(np.float32) / 32768.0 audio_buffer.append(audio) result = model.transcribe(np.concatenate(audio_buffer.get_all()), **decode_options) print(f'{datetime.now().strftime("%H:%M:%S")} {result["text"]}') audio_buffer.clear() # Clear the buffer after each transcription finally: ffmpeg_process.kill() def cli(): parser = argparse.ArgumentParser(description="Real-time audio transcription from streams.") parser.add_argument('URL', type=str, help='Stream website and channel name, e.g. twitch.tv/forsen') parser.add_argument('--model', type=str, default='large-v3', help='Whisper model for transcription.') parser.add_argument('--interval', type=int, default=5, help='Interval between transcription in seconds.') parser.add_argument('--preferred_quality', type=str, default='audio_only', help='Preferred stream quality.') parser.add_argument('--direct_url', action='store_true', help='Pass the URL directly to ffmpeg.') args = parser.parse_args().__dict__ url = args.pop("URL") main(url, **args) if __name__ == '__main__': cli() 
python translator1.py https://www.huya.com/kpl

虎牙kpl的直播,文字转录:

还有繁体字,修改代码,繁体转简体:

pip install opencc-python-reimplemented 

 translator2.py:

import argparse import sys import signal from datetime import datetime import ffmpeg import numpy as np import whisper from whisper.audio import SAMPLE_RATE import opencc class RingBuffer: def __init__(self, size): self.size = size self.data = [] self.full = False self.cur = 0 def append(self, x): if self.size <= 0: return if self.full: self.data[self.cur] = x self.cur = (self.cur + 1) % self.size else: self.data.append(x) if len(self.data) == self.size: self.full = True def get_all(self): all_data = [] for i in range(len(self.data)): idx = (i + self.cur) % self.size all_data.append(self.data[idx]) return all_data def clear(self): self.data = [] self.full = False self.cur = 0 def open_stream(stream, direct_url, preferred_quality): if direct_url: try: process = ( ffmpeg.input(stream, loglevel="panic") .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE) .run_async(pipe_stdout=True) ) except ffmpeg.Error as e: raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e return process, None import streamlink import subprocess import threading stream_options = streamlink.streams(stream) if not stream_options: print("No playable streams found on this URL:", stream) sys.exit(0) option = None for quality in [preferred_quality, 'audio_only', 'audio_mp4a', 'audio_opus', 'best']: if quality in stream_options: option = quality break if option is None: # Fallback option = next(iter(stream_options.values())) def writer(streamlink_proc, ffmpeg_proc): while (not streamlink_proc.poll()) and (not ffmpeg_proc.poll()): try: chunk = streamlink_proc.stdout.read(1024) ffmpeg_proc.stdin.write(chunk) except (BrokenPipeError, OSError): pass cmd = ['streamlink', stream, option, "-O"] streamlink_process = subprocess.Popen(cmd, stdout=subprocess.PIPE) try: ffmpeg_process = ( ffmpeg.input("pipe:", loglevel="panic") .output("pipe:", format="s16le", acodec="pcm_s16le", ac=1, ar=SAMPLE_RATE) .run_async(pipe_stdin=True, pipe_stdout=True) ) except ffmpeg.Error as e: raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e thread = threading.Thread(target=writer, args=(streamlink_process, ffmpeg_process)) thread.start() return ffmpeg_process, streamlink_process def main(url, model="large-v3", interval=5, preferred_quality="audio_only", direct_url=False, **decode_options): print("Loading model...") model = whisper.load_model(model) print("Opening stream...") ffmpeg_process, _ = open_stream(url, direct_url, preferred_quality) converter = opencc.OpenCC('t2s') # 创建繁体转简体的转换器 def handler(signum, frame): ffmpeg_process.kill() sys.exit(0) signal.signal(signal.SIGINT, handler) n_bytes = interval * SAMPLE_RATE * 2 # Factor 2 comes from reading the int16 stream as bytes audio_buffer = RingBuffer(1) try: while True: in_bytes = ffmpeg_process.stdout.read(n_bytes) if not in_bytes: break audio = np.frombuffer(in_bytes, np.int16).flatten().astype(np.float32) / 32768.0 audio_buffer.append(audio) result = model.transcribe(np.concatenate(audio_buffer.get_all()), **decode_options) result_text = converter.convert(result["text"]) # 将繁体转换为简体 print(f'{datetime.now().strftime("%H:%M:%S")} {result_text}') audio_buffer.clear() finally: ffmpeg_process.kill() def cli(): parser = argparse.ArgumentParser(description="Real-time audio transcription from streams.") parser.add_argument('URL', type=str, help='Stream website and channel name, e.g. twitch.tv/forsen') parser.add_argument('--model', type=str, default='large-v3', help='Whisper model for transcription.') parser.add_argument('--interval', type=int, default=5, help='Interval between transcription in seconds.') parser.add_argument('--preferred_quality', type=str, default='audio_only', help='Preferred stream quality.') parser.add_argument('--direct_url', action='store_true', help='Pass the URL directly to ffmpeg.') args = parser.parse_args().__dict__ url = args.pop("URL") main(url, **args) if __name__ == '__main__': cli() 
python translator2.py https://www.huya.com/kpl

urlparsewhisperamlcodescriptappsignalhuggingfacegui语音识别gitpythontpucliwritertwitchopenainumpyaction
  • 本文作者:李琛
  • 本文链接: https://wapzz.net/post-14788.html
  • 版权声明:本博客所有文章除特别声明外,均默认采用 CC BY-NC-SA 4.0 许可协议。
本站部分内容来源于网络转载,仅供学习交流使用。如涉及版权问题,请及时联系我们,我们将第一时间处理。
文章很赞!支持一下吧 还没有人为TA充电
为TA充电
还没有人为TA充电
0
  • 支付宝打赏
    支付宝扫一扫
  • 微信打赏
    微信扫一扫
感谢支持
文章很赞!支持一下吧
关于作者
2.3W+
5
0
1
WAP站长官方

生成式 AI 在电商领域究竟有多牛,这款产品给出了回答

上一篇

深入解析“mogublog爬虫”:原理、应用与未来趋势

下一篇
  • 复制图片
按住ctrl可打开默认菜单