MajorRAG 文件内容提取实现分析
1. 整体架构设计
1.1 分层结构
项目采用标准的分层处理架构,每种文件类型遵循相同的5步骤流程:
API层 (FileRoutes.py)
↓
FileService.py (文件管理与索引协调)
↓
[PDF|Word|Excel|PPT|Text]Service.py (协调器)
↓
5个处理层并联执行:
├── ExtractService (内容提取)
├── FormatJsonService (格式化)
├── VectorService (向量化)
├── OpenSearchService (倒排索引)
└── MysqlService (数据库保存)
1.2 核心文件位置
app/service/
├── FileService.py # 主入口,文件上传/管理
├── pdf/ # PDF处理
│ ├── PdfService.py # 协调器
│ ├── PdfExtractService.py # 提取器
│ ├── PdfFormatJsonService.py # 格式化
│ ├── PdfVectorService.py # 向量化
│ ├── PdfOpenSearchService.py # 倒排索引
│ └── PdfMysqlService.py # 数据保存
├── word/ # Word处理(同样结构)
├── excel/ # Excel处理(同样结构)
├── ppt/ # PPT处理(同样结构)
└── text/ # 文本/Markdown处理(同样结构)
2. 整体流程和入口
2.1 API入口
文件路径: app/routes/FileRoutes.py
路由: /api/files/upload (POST)
支持的文件类型:
ALLOWED_EXTENSIONS = {
'pdf', 'doc', 'docx', 'xls', 'xlsx',
'ppt', 'pptx', 'txt', 'md', 'jpg', 'jpeg', 'png'
}
文件大小限制: 100MB
流程:
- 接收文件和文件路径
- 验证文件类型和大小
- 调用
FileService.upload_files()
2.2 文件上传与索引流程
文件路径: app/service/FileService.py
第1步:文件上传与存储
def upload_files(file_pairs, user_id):
"""
上传文件并存储
参数:
- file_pairs: [(file, path), ...] 文件和路径的配对
- user_id: 用户ID
流程:
1. 创建文件夹结构(支持嵌套)
2. 生成唯一文件名和MD5哈希
3. 保存文件到磁盘
4. 检查重复(同路径+同MD5)
5. 保存文件元数据到MySQL
返回:
[
{
'filename': str,
'file_id': int,
'status': 'uploaded'|'duplicate'|'error',
'message': str
},
...
]
"""
第2步:异步索引启动
def start_async_index(file_id, user_id):
"""
启动异步索引任务
流程:
1. 验证文件存在性
2. 生成任务ID
3. 启动后台线程执行 _async_index_worker()
返回立即响应:
{
'success': True,
'task_id': str,
'message': '索引任务已启动'
}
"""
第3步:异步索引工作线程
def _async_index_worker(file_id, task_id):
"""
后台索引处理线程
根据文件类型路由到不同的Service:
- PDF文件 → PdfService().index_file()
- Excel文件 (.xlsx, .xls, .xlsm) → ExcelService().index_file()
- Word文件 (.docx, .doc) → WordService().index_file()
- PPT文件 (.pptx, .ppt) → PptService().index_file()
- 文本文件 (.txt, .md) → TextService().index_file()
异常处理:
- 捕获所有异常
- 更新索引状态为失败
- 通过WebSocket推送错误信息
"""
3. 各文件类型的提取实现
3.1 PDF提取
文件路径: app/service/pdf/PdfExtractService.py
使用库: Unstructured库
核心方法
def extract_content(file_path: str) -> Dict:
"""
提取PDF的所有内容(文本、图片、表格、图表)
参数:
- file_path: PDF文件路径
返回:
{
'extraction_status': 'success'|'failed',
'file_path': str,
'file_name': str,
'elements': [Element, ...], # 包含所有提取的元素对象
'error_message': str (仅失败时)
}
"""
配置参数
配置文件: config/Unstructured.yaml
extract_images_in_pdf: true # 提取图片
extract_image_block_types: ["Table"] # 提取表格
strategy: "hi_res" # 高分辨率布局分析
languages: ['zh'] # 中文语言支持
提取特点
- 高精度布局分析:使用
hi_res策略,能够准确识别PDF布局 - 图片提取:自动提取图片到临时目录
- 多元素识别:识别多种元素类型
- NarrativeText(叙述文本)
- Title(标题)
- Table(表格)
- Image(图片)
- ListItem(列表项)
- Header(页眉)
- Footer(页脚)
实现流程
from unstructured.partition.pdf import partition_pdf
def extract_content(file_path):
# 1. 调用 Unstructured 进行分区提取
elements = partition_pdf(
filename=file_path,
extract_images_in_pdf=True,
infer_table_structure=True,
strategy="hi_res",
languages=['zh']
)
# 2. 返回提取结果
return {
'extraction_status': 'success',
'file_path': file_path,
'file_name': os.path.basename(file_path),
'elements': elements # Element对象列表
}
3.2 Word提取
文件路径: app/service/word/WordExtractService.py
使用库: python-docx
核心方法
def extract_content(file_path: str) -> Dict:
"""
提取Word文件的所有内容(段落、标题、表格)
返回:
{
'extraction_status': 'success'|'failed',
'file_path': str,
'file_name': str,
'elements_count': int,
'elements': [
{
'type': 'heading'|'paragraph',
'text': str,
'style': str # 样式名称(如"Heading 1", "Normal")
},
{
'type': 'table',
'rows': [[cell1, cell2, ...], [...], ...]
},
...
]
}
"""
提取逻辑
from docx import Document
from docx.oxml.text.paragraph import CT_P
from docx.oxml.table import CT_Tbl
def extract_content(file_path):
doc = Document(file_path)
elements = []
# 遍历文档所有元素,保持原始顺序
for element in doc.element.body:
if isinstance(element, CT_P): # 段落
para_element = _extract_paragraph(element, doc)
if para_element:
elements.append(para_element)
elif isinstance(element, CT_Tbl): # 表格
table_element = _extract_table(element, doc)
if table_element:
elements.append(table_element)
return {
'extraction_status': 'success',
'file_path': file_path,
'file_name': os.path.basename(file_path),
'elements_count': len(elements),
'elements': elements
}
段落提取
def _extract_paragraph(paragraph_element, doc):
paragraph = None
for p in doc.paragraphs:
if p._element == paragraph_element:
paragraph = p
break
if not paragraph or not paragraph.text.strip():
return None
# 获取段落样式
style = paragraph.style.name if paragraph.style else 'Normal'
# 判断是否为标题:style以"Heading"或"标题"开头
is_heading = style.startswith('Heading') or style.startswith('标题')
return {
'type': 'heading' if is_heading else 'paragraph',
'text': paragraph.text.strip(),
'style': style
}
表格提取
def _extract_table(table_element, doc):
table = None
for t in doc.tables:
if t._element == table_element:
table = t
break
if not table:
return None
# 逐行逐列提取单元格文本
rows_data = []
for row in table.rows:
row_data = [cell.text.strip() for cell in row.cells]
rows_data.append(row_data)
return {
'type': 'table',
'rows': rows_data
}
支持格式
- .doc
- .docx
提取特点
- 保持元素的原始顺序(段落和表格混合)
- 自动识别标题级别(基于样式)
- 提取完整的表格结构
3.3 Excel提取
文件路径: app/service/excel/ExcelExtractService.py
使用库: pandas, openpyxl
核心方法
def extract_content(file_path: str) -> Dict:
"""
提取Excel文件的所有工作表内容
返回:
{
'extraction_status': 'success'|'failed',
'file_path': str,
'file_name': str,
'sheet_count': int,
'elements': [
{
'sheet_name': str,
'content': {
'headers': [col1, col2, ...],
'rows': [
{
'row_index': int, # Excel行号(从2开始)
'data': [cell_value, ...],
'text': str # 行文本连接(空格分隔)
},
...
],
'row_count': int,
'column_count': int,
'full_text': str # 整表文本(用于全文搜索)
}
},
...
]
}
"""
提取流程
import pandas as pd
from openpyxl import load_workbook
def extract_content(file_path):
# 1. 获取所有工作表名称
workbook = load_workbook(file_path, read_only=True)
sheet_names = workbook.sheetnames
workbook.close()
# 2. 逐个工作表提取
elements = []
for sheet_name in sheet_names:
sheet_content = _extract_sheet_content(file_path, sheet_name)
elements.append({
'sheet_name': sheet_name,
'content': sheet_content
})
return {
'extraction_status': 'success',
'file_path': file_path,
'file_name': os.path.basename(file_path),
'sheet_count': len(elements),
'elements': elements
}
工作表提取
def _extract_sheet_content(file_path, sheet_name):
# 使用pandas读取sheet
df = pd.read_excel(file_path, sheet_name=sheet_name)
df = df.fillna('') # 处理空值
# 提取表头
headers = df.columns.tolist()
headers_text = ' '.join([str(h) for h in headers])
# 逐行提取数据
rows = []
for idx, row in df.iterrows():
row_data = row.tolist()
row_text = ' '.join([str(cell) for cell in row_data if cell])
rows.append({
'row_index': int(idx) + 2, # Excel行号(1是表头,2开始是数据)
'data': row_data,
'text': row_text
})
# 生成整表文本(用于全文搜索)
all_row_texts = [row['text'] for row in rows]
full_text = headers_text + '\n' + '\n'.join(all_row_texts)
return {
'headers': headers,
'rows': rows,
'row_count': len(rows),
'column_count': len(headers),
'full_text': full_text
}
支持格式
- .xlsx
- .xls
- .xlsm
提取特点
- 支持多个工作表(Sheet)
- 保留行号和列结构
- 生成整表文本用于全文搜索
- 自动处理空值
- 数据类型自动转换
3.4 PPT提取
文件路径: app/service/ppt/PptExtractService.py
使用库: python-pptx
核心方法
def extract_content(file_path: str) -> Dict:
"""
提取PPT文件内容
返回:
{
'extraction_status': 'success'|'failed',
'file_path': str,
'file_name': str,
'total_slides': int,
'slides': [
{
'slide_number': 1,
'content': [
{'type': 'text', 'text': '...'},
{'type': 'table', 'rows': [[...], [...]]},
...
]
},
{
'slide_number': 2,
'content': [...]
},
...
]
}
"""
提取流程
from pptx import Presentation
from pptx.enum.shapes import MSO_SHAPE_TYPE
def extract_content(file_path):
presentation = Presentation(file_path)
slides = []
for slide_number, slide in enumerate(presentation.slides, start=1):
slide_content = _extract_slide_content(slide, slide_number)
slides.append(slide_content)
return {
'extraction_status': 'success',
'file_path': file_path,
'file_name': os.path.basename(file_path),
'total_slides': len(slides),
'slides': slides
}
幻灯片提取
def _extract_slide_content(slide, slide_number):
content = []
# 遍历幻灯片中的所有形状
for shape in slide.shapes:
# 提取文本框内容
if hasattr(shape, "text") and shape.text.strip():
content.append({
'type': 'text',
'text': shape.text.strip()
})
# 提取表格内容
if shape.shape_type == MSO_SHAPE_TYPE.TABLE:
table_data = _extract_table(shape.table)
content.append({
'type': 'table',
'rows': table_data
})
return {
'slide_number': slide_number,
'content': content
}
表格提取
def _extract_table(table):
rows_data = []
for row in table.rows:
row_data = [cell.text.strip() for cell in row.cells]
rows_data.append(row_data)
return rows_data
支持格式
- .pptx
- .ppt
提取特点
- 按幻灯片顺序提取
- 提取文本框内容
- 提取表格结构
- 保持幻灯片编号
3.5 文本/Markdown提取
文件路径: app/service/text/TextExtractService.py
使用库: 原生文件读取
核心方法
def extract_content(file_path: str, file_type: str = 'txt') -> Dict:
"""
提取文本或Markdown文件内容
参数:
- file_path: 文件路径
- file_type: 'txt' 或 'md'
返回:
{
'extraction_status': 'success'|'failed',
'file_path': str,
'file_name': str,
'file_type': 'txt'|'md',
'raw_content': str, # 原始内容
'elements': [...], # 解析后的元素
'elements_count': int
}
"""
编码识别与读取
def _read_file_with_encoding(file_path):
"""
智能编码识别
尝试顺序:utf-8 → gbk → gb2312 → utf-16 → latin-1
如果所有编码都失败,使用二进制读取并忽略错误
"""
encodings = ['utf-8', 'gbk', 'gb2312', 'utf-16', 'latin-1']
for encoding in encodings:
try:
with open(file_path, 'r', encoding=encoding) as f:
return f.read()
except UnicodeDecodeError:
continue
except Exception as e:
raise Exception(f"读取文件失败: {str(e)}")
# 所有编码失败,用二进制读取并忽略错误
try:
with open(file_path, 'rb') as f:
content = f.read()
return content.decode('utf-8', errors='ignore')
except Exception as e:
raise Exception(f"读取文件失败: {str(e)}")
TXT提取逻辑
def _parse_text(content: str):
"""
解析TXT文件
按空行分割段落
"""
# 按空行分割段落
paragraphs = content.split('\n\n')
elements = []
for i, para in enumerate(paragraphs):
para = para.strip()
if para: # 跳过空段落
elements.append({
'type': 'paragraph',
'text': para,
'order': i + 1
})
return elements
Markdown提取逻辑
def _parse_markdown(content: str):
"""
解析Markdown文件
识别:
- 标题(# 开头)
- 代码块(``` 包裹)
- 内容段落
"""
lines = content.split('\n')
elements = []
current_title = None
content_buffer = []
in_code_block = False
code_buffer = []
for line in lines:
# 代码块处理
if line.strip().startswith('```'):
if in_code_block:
# 代码块结束
elements.append({
'type': 'code',
'text': '\n'.join(code_buffer),
'belongs_to_title': current_title
})
code_buffer = []
in_code_block = False
else:
# 代码块开始
in_code_block = True
continue
if in_code_block:
code_buffer.append(line)
continue
# 标题处理
if line.strip().startswith('#'):
# 先保存之前的内容
if content_buffer:
elements.append({
'type': 'content',
'text': '\n'.join(content_buffer).strip(),
'belongs_to_title': current_title
})
content_buffer = []
# 提取标题级别和文本
title_level = len(line) - len(line.lstrip('#'))
title_text = line.strip().lstrip('#').strip()
elements.append({
'type': 'title',
'text': title_text,
'level': title_level
})
current_title = title_text
else:
# 累积内容
content_buffer.append(line)
# 保存最后的内容
if content_buffer:
elements.append({
'type': 'content',
'text': '\n'.join(content_buffer).strip(),
'belongs_to_title': current_title
})
return elements
支持格式
- .txt
- .md
提取特点
- 智能编码识别:支持UTF-8、GBK、GB2312等多种编码
- TXT处理:按空行自动分段
- Markdown处理:
- 识别标题级别(#, ##, ###...)
- 识别代码块(```)
- 内容与标题关联
- 保持文档结构
4. 索引处理的5步骤流程
每种文件类型的 *Service.py(如 PdfService.py、WordService.py等)都遵循相同的处理流程:
4.1 步骤1:内容提取 (0-20%)
extract_result = self.extract_service.extract_content(file_path)
输出: 原始提取的元素和内容
各文件类型的提取服务:
- PDF:
PdfExtractService - Word:
WordExtractService - Excel:
ExcelExtractService - PPT:
PptExtractService - 文本:
TextExtractService
4.2 步骤2:内容格式化 (20-40%)
formatted_result = self.format_service.format_content(file_id, extract_result)
输出: 统一的JSON结构化格式
作用:
- 将各种不同格式的提取结果转换为统一的JSON结构
- 添加元数据(文件ID、页码、位置等)
- 分块处理长文本
4.3 步骤3:向量化 (40-60%)
vector_result = self.vector_service.vectorize_content(file_info, formatted_result)
输出: 内容向量和embedding
作用:
- 使用BGE模型生成文本向量
- 支持语义搜索
- 存储向量到向量数据库
4.4 步骤4:倒排索引 (60-80%)
index_result = self.opensearch_service.create_inverted_index(file_info, formatted_result)
输出: 创建OpenSearch索引
作用:
- 创建全文搜索索引
- 支持关键词搜索
- 支持中文分词
4.5 步骤5:MySQL保存 (80-100%)
save_result = self.mysql_service.save_to_mysql(file_info, formatted_result)
输出: 保存到MySQL数据库
作用:
- 保存格式化后的内容
- 保存元数据
- 支持结构化查询
4.6 进度回调机制
所有步骤都支持实时进度回调:
def progress_callback(progress: int):
"""
进度回调函数
参数:
- progress: 0-100的进度值
通过WebSocket实时推送给前端
"""
send_index_progress(file_id, progress, f"索引进度: {progress}%")
5. 关键代码文件路径总览
5.1 入口与管理
| 功能 | 文件路径 |
|---|---|
| API入口 | app/routes/FileRoutes.py |
| 文件管理 | app/service/FileService.py |
5.2 PDF处理
| 功能 | 文件路径 |
|---|---|
| PDF提取 | app/service/pdf/PdfExtractService.py |
| PDF服务协调 | app/service/pdf/PdfService.py |
| PDF格式化 | app/service/pdf/PdfFormatJsonService.py |
| PDF向量化 | app/service/pdf/PdfVectorService.py |
| PDF倒排索引 | app/service/pdf/PdfOpenSearchService.py |
| PDF数据保存 | app/service/pdf/PdfMysqlService.py |
5.3 Word处理
| 功能 | 文件路径 |
|---|---|
| Word提取 | app/service/word/WordExtractService.py |
| Word服务协调 | app/service/word/WordService.py |
| Word格式化 | app/service/word/WordFormatJsonService.py |
| Word向量化 | app/service/word/WordVectorService.py |
| Word倒排索引 | app/service/word/WordOpenSearchService.py |
| Word数据保存 | app/service/word/WordMysqlService.py |
5.4 Excel处理
| 功能 | 文件路径 |
|---|---|
| Excel提取 | app/service/excel/ExcelExtractService.py |
| Excel服务协调 | app/service/excel/ExcelService.py |
| Excel格式化 | app/service/excel/ExcelFormatJsonService.py |
| Excel向量化 | app/service/excel/ExcelVectorService.py |
| Excel倒排索引 | app/service/excel/ExcelOpenSearchService.py |
| Excel数据保存 | app/service/excel/ExcelMysqlService.py |
5.5 PPT处理
| 功能 | 文件路径 |
|---|---|
| PPT提取 | app/service/ppt/PptExtractService.py |
| PPT服务协调 | app/service/ppt/PptService.py |
| PPT格式化 | app/service/ppt/PptFormatJsonService.py |
| PPT向量化 | app/service/ppt/PptVectorService.py |
| PPT倒排索引 | app/service/ppt/PptOpenSearchService.py |
| PPT数据保存 | app/service/ppt/PptMysqlService.py |
5.6 文本/Markdown处理
| 功能 | 文件路径 |
|---|---|
| 文本提取 | app/service/text/TextExtractService.py |
| 文本服务协调 | app/service/text/TextService.py |
| 文本格式化 | app/service/text/TextFormatJsonService.py |
| 文本向量化 | app/service/text/TextVectorService.py |
| 文本倒排索引 | app/service/text/TextOpenSearchService.py |
| 文本数据保存 | app/service/text/TextMysqlService.py |
6. 核心特性总结
| 特性 | 实现方式 |
|---|---|
| 多文件类型支持 | PDF、Word、Excel、PPT、TXT、MD |
| 统一处理流程 | 提取 → 格式化 → 向量化 → 倒排索引 → 数据保存 |
| 异步处理 | 后台线程处理,避免阻塞 |
| 进度追踪 | WebSocket实时推送进度(0-100%) |
| 文件夹管理 | 支持嵌套文件夹,完整树形结构 |
| 重复检测 | 基于MD5哈希和路径检查 |
| 编码识别 | 自动识别文本文件编码(UTF-8、GBK等) |
| 全文搜索 | OpenSearch倒排索引 + 中文分词 |
| 语义搜索 | BGE模型向量化 + 向量数据库 |
| 数据持久化 | MySQL存储元数据和内容 |
| 错误处理 | 完整的异常捕获和状态更新 |
7. 依赖的核心库
| 文件类型 | 核心库 | 用途 |
|---|---|---|
| Unstructured | 高精度PDF解析,支持文本、图片、表格提取 | |
| Word | python-docx | Word文档解析,支持段落、标题、表格 |
| Excel | pandas, openpyxl | Excel工作表读取和数据处理 |
| PPT | python-pptx | PowerPoint解析,支持幻灯片和形状 |
| 文本 | 原生Python | 智能编码识别和文本解析 |
| 向量化 | BGE模型 | 文本向量化和语义搜索 |
| 搜索 | OpenSearch | 全文搜索和倒排索引 |
| 数据库 | MySQL | 数据持久化和结构化查询 |
8. 设计优势
- 统一架构:所有文件类型遵循相同的处理流程,易于维护和扩展
- 模块化设计:每个功能独立成服务,职责清晰
- 异步处理:大文件处理不阻塞用户操作
- 实时反馈:WebSocket推送处理进度
- 多索引支持:同时支持全文搜索和语义搜索
- 错误处理完善:每个步骤都有异常捕获和状态更新
- 可扩展性强:新增文件类型只需实现相同的5个服务接口
分析完成日期: 2025-11-12