使用AssemblyAI、Qdrant和DeepSeek-R1构建音频RAG

厌倦了手动筛选数小时的音频来寻找关键见解?本指南教你创建一个 AI 驱动的聊天机器人,将会议、播客、访谈等录音转化为互动对话。使用 AssemblyAI 进行带有发言人标签的精确转录,使用 Qdrant 进行快速数据存储,通过 SambaNova Cloud 使用 DeepSeek-R1 进行智能回复,您将创建一个 RAG 工具,回答“[发言人]说了什么?”或 “总结这段话”等问题。使用 AssemblyAI、Qdrant 和 DeepSeek-R1 构建 RAG 系统,将音频转化为可搜索的人工智能驱动对话。

学习目标

  • 利用 AssemblyAI API 转录带有说话人日记的音频文件,将对话转换为结构化文本数据以供分析。
  • 部署 Qdrant 向量数据库,使用 HuggingFace 模型存储和高效检索转录音频内容的嵌入。
  • 通过 SambaNova Cloud 使用 DeepSeek R1 模型实施 RAG,生成上下文感知聊天机器人回复。
  • 为用户上传音频文件、可视化转录内容并与聊天机器人实时互动建立一个 Streamlit Web 界面。
  • 整合端到端工作流程,将音频处理、向量存储和人工智能驱动的回复生成结合起来,创建一个可扩展的基于音频的聊天应用程序。

什么是AssemblyAI?

AssemblyAI 是您将音频转化为可操作见解的首选工具。无论您是在转录播客、分析客户来电还是为视频添加字幕,其人工智能驱动的语音到文本引擎都能提供精确的准确性,即使在有口音或背景噪音的情况下也是如此。

什么是SambaNova云?

试想一下,运行 DeepSeek-R1 (671B) 等大型开源模型的速度可提高 10 倍,而且无需通常的基础设施。

SambaNova 不依赖 GPU,而是使用可重构数据流单元(RDUs),通过以下方式实现更快的性能:

  • 海量内存存储–无需不断重新加载模型
  • 高效的数据流设计–针对高吞吐量任务进行了优化
  • 即时模型切换–在微秒级时间内完成模型切换
  • 立即运行 DeepSeek-R1–无需复杂设置
  • 在同一平台上进行训练和微调–一切尽在其中

什么是Qdrant?

Qdrant 是一个快如闪电的矢量数据库,旨在为人工智能应用增添动力。无论您是在构建推荐系统、图像搜索工具还是聊天机器人,Qdrant 都能进行相似性搜索,快速为文本嵌入或视觉特征等复杂数据找出最接近的匹配项。

什么是DeepSeek-R1?

Deepseek-R1是一种改变游戏规则的语言模型,它将人类的适应性与尖端的人工智能相结合,使其成为自然语言处理领域的佼佼者。无论您是制作内容、翻译语言、调试代码,还是总结复杂的报告,R1 都能出色地理解上下文、语气和意图,提供直观而非机械的响应。Deepseek-R1 将同理心和精确性放在首位,它不仅仅是一款工具,更是人工智能与我们一样自然交流的未来。

使用AssemblyAI和DeepSeek-R1构建RAG模型

现在您已经了解了所有组件,让我们开始构建 RAG。但在此之前,让我们先快速了解一下入门所需的内容。

1. 必要的先决条件

以下是所需的先决条件:

克隆版本库:

git clone https://github.com/karthikponna/chat_with_audios.git 
cd chat_with_audios

创建并激活虚拟环境:

# For macOS and Linux:
python3 -m venv venv
source venv/bin/activate
# For Windows:
python -m venv venv
.\venv\Scripts\activate

安装所需依赖项:

pip install -r requirements.txt

设置环境变量:

创建一个 `.env` 文件,并添加 AssemblyAI 和 SambaNova API 密钥。

ASSEMBLYAI_API_KEY="your_assemblyai_api_key_string"
SAMBANOVA_API_KEY="your_sambanova_api_key_string"

现在让我们开始编码部分。

2. 检索增强生成

RAG 将大型语言模型与外部数据合并,以生成更准确、语境更丰富的答案。它能在查询时获取相关信息,确保回答依赖于真实数据,而不仅仅是模型训练。

2.1 导入必要的库

让我们创建一个名为 rag_code.py 的文件。我们将从导入必要的模块和使用 Llama Index 协调代码架构开始,逐步完成代码。

from qdrant_client import models
from qdrant_client import QdrantClient
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.sambanovasystems import SambaNovaCloud
from llama_index.llms.ollama import Ollama
import assemblyai as aai
from typing import List, Dict
from llama_index.core.base.llms.types import (
ChatMessage,
MessageRole,
)

2.2 使用Hugging Face进行批量处理和文本嵌入

在这里,batch_iterate 函数将文本列表分割成小块,从而更容易处理大型数据集。然后,EmbedData 类加载 Hugging Face 嵌入模型,为每批文本生成嵌入,并收集这些嵌入供以后使用。

def batch_iterate(lst, batch_size):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), batch_size):
yield lst[i : i + batch_size]
class EmbedData:
def __init__(self, embed_model_name="BAAI/bge-large-en-v1.5", batch_size = 32):
self.embed_model_name = embed_model_name
self.embed_model = self._load_embed_model()
self.batch_size = batch_size
self.embeddings = []
def _load_embed_model(self):
embed_model = HuggingFaceEmbedding(model_name=self.embed_model_name, trust_remote_code=True, cache_folder='./hf_cache')
return embed_model
def generate_embedding(self, context):
return self.embed_model.get_text_embedding_batch(context)
def embed(self, contexts):
self.contexts = contexts
for batch_context in batch_iterate(contexts, self.batch_size):
batch_embeddings = self.generate_embedding(batch_context)
self.embeddings.extend(batch_embeddings)

2.3 Qdrant矢量数据库的设置与输入

  • QdrantVDB_QB 类通过设置关键参数(如集合名称、矢量维度和批量大小)来初始化 Qdrant 矢量数据库。
  • 它通过批处理文本上下文及其相应的嵌入,然后相应地更新集合的配置,从而高效地上传数据。
class QdrantVDB_QB:
def __init__(self, collection_name, vector_dim = 768, batch_size=512):
self.collection_name = collection_name
self.batch_size = batch_size
self.vector_dim = vector_dim
def define_client(self):
self.client = QdrantClient(url="http://localhost:6333", prefer_grpc=True)
def create_collection(self):
if not self.client.collection_exists(collection_name=self.collection_name):
self.client.create_collection(collection_name=f"{self.collection_name}",
vectors_config=models.VectorParams(size=self.vector_dim,
distance=models.Distance.DOT,
on_disk=True),
optimizers_config=models.OptimizersConfigDiff(default_segment_number=5,
indexing_threshold=0),
quantization_config=models.BinaryQuantization(
binary=models.BinaryQuantizationConfig(always_ram=True)),
)
def ingest_data(self, embeddata):
for batch_context, batch_embeddings in zip(batch_iterate(embeddata.contexts, self.batch_size), 
batch_iterate(embeddata.embeddings, self.batch_size)):
self.client.upload_collection(collection_name=self.collection_name,
vectors=batch_embeddings,
payload=[{"context": context} for context in batch_context])
self.client.update_collection(collection_name=self.collection_name,
optimizer_config=models.OptimizersConfigDiff(indexing_threshold=20000)
)

2.4 查询嵌入Retriever

  • Retriever 类旨在通过初始化矢量数据库客户端和嵌入模型,在用户查询和矢量数据库之间架起一座桥梁。
  • 它的搜索方法使用模型将查询转换为嵌入,然后使用微调的量化参数在数据库中执行矢量搜索,以快速检索相关结果。
class Retriever:
def __init__(self, vector_db, embeddata):
self.vector_db = vector_db
self.embeddata = embeddata
def search(self, query):
query_embedding = self.embeddata.embed_model.get_query_embedding(query)
result = self.vector_db.client.search(
collection_name=self.vector_db.collection_name,
query_vector=query_embedding,
search_params=models.SearchParams(
quantization=models.QuantizationSearchParams(
ignore=False,
rescore=True,
oversampling=2.0,
)
),
timeout=1000,
)
return result

2.5 RAG智能查询助手

RAG 类集成了一个检索器和一个 LLM,用于生成上下文感知响应。它从矢量数据库中检索相关信息,将其格式化为结构化提示,并将其发送给 LLM 以获得响应。我正在使用 SambaNovaCloud 通过其 API 访问 LLM 模型,以高效生成文本。

class RAG:
def __init__(self,
retriever,
llm_name = "Meta-Llama-3.1-405B-Instruct"
):
system_msg = ChatMessage(
role=MessageRole.SYSTEM,
content="You are a helpful assistant that answers questions about the user's document.",
)
self.messages = [system_msg, ]
self.llm_name = llm_name
self.llm = self._setup_llm()
self.retriever = retriever
self.qa_prompt_tmpl_str = ("Context information is below.\n"
"---------------------\n"
"{context}\n"
"---------------------\n"
"Given the context information above I want you to think step by step to answer the query in a crisp manner, incase case you don't know the answer say 'I don't know!'.\n"
"Query: {query}\n"
"Answer: "
)
def _setup_llm(self):
return SambaNovaCloud(
model=self.llm_name,
temperature=0.7,
context_window=100000,
)
# return Ollama(model=self.llm_name,
#               temperature=0.7,
#               context_window=100000,
#             )
def generate_context(self, query):
result = self.retriever.search(query)
context = [dict(data) for data in result]
combined_prompt = []
for entry in context[:2]:
context = entry["payload"]["context"]
combined_prompt.append(context)
return "\n\n---\n\n".join(combined_prompt)
def query(self, query):
context = self.generate_context(query=query)
prompt = self.qa_prompt_tmpl_str.format(context=context, query=query)
user_msg = ChatMessage(role=MessageRole.USER, content=prompt)
# self.messages.append(ChatMessage(role=MessageRole.USER, content=prompt))
streaming_response = self.llm.stream_complete(user_msg.content)
return streaming_response

2.6 音频转录

Transcribe 类通过设置 AssemblyAI API 密钥和创建转录器进行初始化。然后,它使用启用说话人标签的配置来处理音频文件,最终返回一个字典列表,其中每个条目都将一个说话人映射到其转录文本。

class Transcribe:
def __init__(self, api_key: str):
"""Initialize the Transcribe class with AssemblyAI API key."""
aai.settings.api_key = api_key
self.transcriber = aai.Transcriber()
def transcribe_audio(self, audio_path: str) -> List[Dict[str, str]]:
"""
Transcribe an audio file and return speaker-labeled transcripts.
Args:
audio_path: Path to the audio file
Returns:
List of dictionaries containing speaker and text information
"""
# Configure transcription with speaker labels
config = aai.TranscriptionConfig(
speaker_labels=True,
speakers_expected=2  # Adjust this based on your needs
)
# Transcribe the audio
transcript = self.transcriber.transcribe(audio_path, config=config)
# Extract speaker utterances
speaker_transcripts = []
for utterance in transcript.utterances:
speaker_transcripts.append({
"speaker": f"Speaker {utterance.speaker}",
"text": utterance.text
})
return speaker_transcripts

3. Streamlit应用程序

Streamlit 是一个 Python 库,可将数据脚本转换为交互式网络应用程序,因此非常适合基于 LLM 的解决方案。

  • 下面的代码构建了一个用户友好型应用程序,让用户可以上传音频文件、查看其转录内容并进行相应的聊天。
  • AssemblyAI 会将上传的音频转录为标有说话人的文本。
  • 转录内容被嵌入并存储在 Qdrant 向量数据库中,以便高效检索。
  • 与 RAG 引擎配对的检索器会利用这些嵌入生成上下文感知的聊天回复。
  • 会话状态管理聊天历史和文件缓存,以确保流畅的体验。
import os
import gc
import uuid
import tempfile
import base64
from dotenv import load_dotenv
from rag_code import Transcribe, EmbedData, QdrantVDB_QB, Retriever, RAG
import streamlit as st
if "id" not in st.session_state:
st.session_state.id = uuid.uuid4()
st.session_state.file_cache = {}
session_id = st.session_state.id
collection_name = "chat with audios"
batch_size = 32
load_dotenv()
def reset_chat():
st.session_state.messages = []
st.session_state.context = None
gc.collect()
with st.sidebar:
st.header("Add your audio file!")
uploaded_file = st.file_uploader("Choose your audio file", type=["mp3", "wav", "m4a"])
if uploaded_file:
try:
with tempfile.TemporaryDirectory() as temp_dir:
file_path = os.path.join(temp_dir, uploaded_file.name)
with open(file_path, "wb") as f:
f.write(uploaded_file.getvalue())
file_key = f"{session_id}-{uploaded_file.name}"
st.write("Transcribing with AssemblyAI and storing in vector database...")
if file_key not in st.session_state.get('file_cache', {}):
# Initialize transcriber
transcriber = Transcribe(api_key=os.getenv("ASSEMBLYAI_API_KEY"))
# Get speaker-labeled transcripts
transcripts = transcriber.transcribe_audio(file_path)
st.session_state.transcripts = transcripts
# Each speaker segment becomes a separate document for embedding
documents = [f"Speaker {t['speaker']}: {t['text']}" for t in transcripts]
# embed data    
embeddata = EmbedData(embed_model_name="BAAI/bge-large-en-v1.5", batch_size=batch_size)
embeddata.embed(documents)
# set up vector database
qdrant_vdb = QdrantVDB_QB(collection_name=collection_name,
batch_size=batch_size,
vector_dim=1024)
qdrant_vdb.define_client()
qdrant_vdb.create_collection()
qdrant_vdb.ingest_data(embeddata=embeddata)
# set up retriever
retriever = Retriever(vector_db=qdrant_vdb, embeddata=embeddata)
# set up rag
query_engine = RAG(retriever=retriever, llm_name="DeepSeek-R1-Distill-Llama-70B")
st.session_state.file_cache[file_key] = query_engine
else:
query_engine = st.session_state.file_cache[file_key]
# Inform the user that the file is processed
st.success("Ready to Chat!")
# Display audio player
st.audio(uploaded_file)
# Display speaker-labeled transcript
st.subheader("Transcript")
with st.expander("Show full transcript", expanded=True):
for t in st.session_state.transcripts:
st.text(f"**{t['speaker']}**: {t['text']}")
except Exception as e:
st.error(f"An error occurred: {e}")
st.stop()     
col1, col2 = st.columns([6, 1])
with col1:
st.markdown("""
# RAG over Audio powered by   and 
""".format(base64.b64encode(open("assets/AssemblyAI.png", "rb").read()).decode(),
base64.b64encode(open("assets/deep-seek.png", "rb").read()).decode()), unsafe_allow_html=True)
with col2:
st.button("Clear ↺", on_click=reset_chat)
# Initialize chat history
if "messages" not in st.session_state:
reset_chat()
# Display chat messages from history on app rerun
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Accept user input
if prompt := st.chat_input("Ask about the audio conversation..."):
# Add user message to chat history
st.session_state.messages.append({"role": "user", "content": prompt})
# Display user message in chat message container
with st.chat_message("user"):
st.markdown(prompt)
# Display assistant response in chat message container
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
# Get streaming response
streaming_response = query_engine.query(prompt)
for chunk in streaming_response:
try:
new_text = chunk.raw["choices"][0]["delta"]["content"]
full_response += new_text
message_placeholder.markdown(full_response + "▌")
except:
pass
message_placeholder.markdown(full_response)
# Add assistant response to chat history
st.session_state.messages.append({"role": "assistant", "content": full_response})

在终端运行 app.py 文件,输入以下代码,上传音频文件并与聊天机器人互动。

streamlit run app.py

您可以在这里观看使用该应用程序的演示。您还可以从这里下载音频样本文件。

小结

我们成功地将 AssemblyAI、SambaNova Cloud、Qdrant 和 DeepSeek 结合起来,构建了一个通过音频使用检索增强生成技术的聊天机器人。rag_code.py 文件管理 RAG 工作流程,而 app.py 文件则提供了一个简单的 Streamlit 界面。我希望你能使用不同的音频文件与这个聊天机器人互动,调整代码,添加新功能,探索基于音频的聊天解决方案的无限可能性。

GitHub Repo:https://github.com/karthikponna/chat_with_audios/tree/main

  • 利用 AssemblyAI 进行音频转录可实现准确的说话者标签文本,为高级对话体验奠定坚实基础。
  • 集成 Qdrant 可确保快速进行基于向量的检索,从而快速访问相关上下文,以便做出更明智的回答。
  • 采用 RAG 方法,将检索和生成结合起来,确保答案以实际数据为基础。
  • 在 LLM 中采用 SambaNova Cloud,可提供强大的语言理解能力,从而支持引人入胜的上下文感知交互。
  • 用户界面使用 Streamlit,提供了一个直接的交互环境,简化了基于音频的聊天机器人部署。
© 版权声明
THE END
喜欢就支持一下吧
点赞20 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容