269 lines
13 KiB
Python
269 lines
13 KiB
Python
from docx import Document
|
|
import json
|
|
from docx.oxml.table import CT_Tbl
|
|
from docx.oxml.text.paragraph import CT_P
|
|
from lxml import etree
|
|
import os
|
|
import zipfile
|
|
|
|
RESULT_TYPE_TEXT = 'text'
|
|
RESULT_TYPE_TABLE = 'table'
|
|
|
|
def build_result(result_type, index, data):
|
|
return {
|
|
'type': result_type,
|
|
'index': index,
|
|
'data': data
|
|
}
|
|
|
|
def build_catalog_result(index, depth, data):
|
|
return {
|
|
'index': index,
|
|
'depth': depth,
|
|
'data': data
|
|
}
|
|
|
|
# 解析docx文件中的XML内容
|
|
def get_xml_content(docx_filename, xml_filename):
|
|
with zipfile.ZipFile(docx_filename) as z:
|
|
return z.read(xml_filename)
|
|
|
|
def parse_paragraph(paragraph, index, namespaces):
|
|
paragraph_text = paragraph.text.strip() if paragraph else ''
|
|
if paragraph_text:
|
|
return build_result(RESULT_TYPE_TEXT, index, paragraph_text)
|
|
return None
|
|
|
|
def parse_table(table, index):
|
|
table_data = []
|
|
for row in table.rows:
|
|
row_data = [cell.text for cell in row.cells]
|
|
table_data.append(row_data)
|
|
return build_result(RESULT_TYPE_TABLE, index, table_data)
|
|
|
|
def parse_paragraph_element(paragraph_element, index, namespaces):
|
|
paragraph_xml = etree.fromstring(paragraph_element.xml)
|
|
paragraph_text = ''.join(paragraph_xml.xpath('//w:t/text()', namespaces=namespaces)).strip()
|
|
if paragraph_text:
|
|
return build_result(RESULT_TYPE_TEXT, index, paragraph_text)
|
|
return None
|
|
|
|
def parse_table_element(table_element, index, namespaces):
|
|
table_xml = etree.fromstring(table_element.xml)
|
|
table_data = []
|
|
for row in table_xml.xpath('//w:tr', namespaces=namespaces):
|
|
row_data = []
|
|
for cell in row.xpath('./w:tc | ./w:sdt', namespaces=namespaces):
|
|
cell_text = ''.join(cell.xpath('.//w:t/text()', namespaces=namespaces)).strip()
|
|
grid_span_xpath = etree.XPath('.//w:tcPr/w:gridSpan/@w:val', namespaces=namespaces)
|
|
grid_span = int(grid_span_xpath(cell)[0]) if grid_span_xpath(cell) else 1
|
|
if grid_span > 1:
|
|
row_data.extend([cell_text] * grid_span)
|
|
else:
|
|
row_data.append(cell_text)
|
|
table_data.append(row_data)
|
|
return build_result(RESULT_TYPE_TABLE, index, table_data)
|
|
|
|
def add_to_catalog(element_xml, index, catalog_content, namespaces, paragraph_text, heading_styles):
|
|
p_element = etree.fromstring(element_xml)
|
|
# outlineLvl = p_element.xpath('.//w:outlineLvl', namespaces=namespaces)
|
|
# if outlineLvl:
|
|
# level = int(outlineLvl[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val'))
|
|
# catalog_content.append(build_catalog_result(index, level, paragraph_text))
|
|
level = is_heading_paragraph(p_element, heading_styles, namespaces)
|
|
if level != -1:
|
|
catalog_content.append(build_catalog_result(index, level, paragraph_text))
|
|
# 检查段落是否为标题样式
|
|
def is_heading_paragraph(paragraph, heading_styles, namespaces):
|
|
pPr = paragraph.find('.//w:pPr', namespaces=namespaces)
|
|
if pPr is not None:
|
|
pStyle = pPr.find('.//w:pStyle', namespaces=namespaces)
|
|
pOutLineLvl = pPr.find('.//w:outlineLvl', namespaces=namespaces)
|
|
if pStyle is not None:
|
|
style_val = pStyle.get(f"{{{namespaces['w']}}}val")
|
|
if style_val.isdigit():
|
|
return int(style_val)
|
|
if pOutLineLvl is not None:
|
|
outLineLvl_val = pOutLineLvl.get(f"{{{namespaces['w']}}}val")
|
|
if outLineLvl_val.isdigit():
|
|
return int(outLineLvl_val) + 1
|
|
# if pStyle is not None and pStyle.get(ns['w'] + 'val') in heading_styles:
|
|
# if style_val > 0:
|
|
# return True
|
|
return -1
|
|
|
|
def get_paragraph_text(paragraph_element, namespaces):
|
|
paragraph_text = ''
|
|
for run in paragraph_element.findall('.//w:r', namespaces=namespaces):
|
|
for text in run.findall('.//w:t', namespaces=namespaces):
|
|
paragraph_text += text.text if text.text is not None else ''
|
|
return paragraph_text
|
|
|
|
def add_to_catalog_paragraph(text, index, catalog_content, namespaces):
|
|
# 添加段落到目录
|
|
catalog_content.append(build_catalog_result(index, 1, text)) # 假设默认级别为1
|
|
|
|
def parse_sdt_catalog(sdt_element, catalog_content, index, namespaces):
|
|
sdt_content = sdt_element.find('.//w:sdtContent', namespaces=namespaces)
|
|
if sdt_content is not None:
|
|
for child in sdt_content:
|
|
if child.tag.endswith('p'): # 内容控件中的段落
|
|
paragraph_text = get_paragraph_text(child, namespaces)
|
|
if paragraph_text.strip(): # 检查文本是否为空
|
|
add_to_catalog_paragraph(paragraph_text, index, catalog_content, namespaces)
|
|
index += 1 # 更新索引
|
|
elif child.tag.endswith('tbl'): # 内容控件中的表格
|
|
# 处理表格内容(如果需要)
|
|
pass
|
|
elif child.tag.endswith('sdt'): # 嵌套的内容控件
|
|
index = parse_sdt_catalog(child, catalog_content, index, namespaces) # 递归解析嵌套的内容控件
|
|
return index
|
|
|
|
def parse_docx(docx_path):
|
|
try:
|
|
document = Document(docx_path)
|
|
styles_xml = get_xml_content(docx_path, 'word/styles.xml')
|
|
except Exception as e:
|
|
print(f"Error loading document: {e}")
|
|
return None, None
|
|
|
|
doc_content = [] # 内容(文本+表格)
|
|
catalog_content = [] # 目录
|
|
current_index = 1 # 维护全局的 index 变量
|
|
paragraph_index = 0
|
|
table_index = 0
|
|
# 获取整个文档的XML内容
|
|
xml_root = document.part.element
|
|
namespaces = xml_root.nsmap
|
|
|
|
# 获取所有标题样式
|
|
styles_root = etree.fromstring(styles_xml)
|
|
heading_styles = set()
|
|
for style in styles_root.xpath('//w:style', namespaces=namespaces):
|
|
style_type = style.get(namespaces['w'] + 'type')
|
|
if style_type == 'paragraph' and style.get(namespaces['w'] + 'styleId').startswith('Heading'):
|
|
heading_styles.add(style.get(namespaces['w'] + 'styleId'))
|
|
|
|
# 遍历文档中的所有元素
|
|
for i, element in enumerate(document.element.body):
|
|
if isinstance(element, CT_P): # 段落
|
|
paragraph_result = parse_paragraph_element(element, current_index, namespaces)
|
|
if paragraph_result:
|
|
doc_content.append(paragraph_result)
|
|
# 判断是否为目录,是就插入目录内容
|
|
paragraph = document.paragraphs[paragraph_index]
|
|
add_to_catalog(paragraph._element.xml, current_index, catalog_content, namespaces, paragraph.text, heading_styles)
|
|
current_index += 1 # 更新 index
|
|
paragraph_index += 1
|
|
elif isinstance(element, CT_Tbl): # 表格
|
|
table_result = parse_table_element(element, current_index, namespaces)
|
|
if table_result:
|
|
doc_content.append(table_result)
|
|
current_index += 1 # 更新 index
|
|
table_index += 1
|
|
elif element.tag.endswith('sdt'): # 内容控件
|
|
current_index = parse_sdt(element, doc_content, current_index, namespaces, catalog_content, heading_styles) # 更新索引
|
|
|
|
return json.dumps(doc_content, indent=4, ensure_ascii=False), json.dumps(catalog_content, indent=4, ensure_ascii=False)
|
|
|
|
|
|
|
|
def parse_sdt(sdt_element, doc_content, current_index, namespaces, catalog_content, heading_styles):
|
|
sdtContent = sdt_element.find('.//w:sdtContent', namespaces=namespaces)
|
|
if sdtContent is not None:
|
|
for child in sdtContent:
|
|
if child.tag.endswith('p'): # 内容控件中的段落
|
|
paragraph_text = ''
|
|
for run in child.findall('.//w:r', namespaces=namespaces):
|
|
for text in run.findall('.//w:t', namespaces=namespaces):
|
|
paragraph_text += text.text if text.text is not None else ''
|
|
if paragraph_text.strip(): # 检查文本是否为空
|
|
doc_content.append(build_result(RESULT_TYPE_TEXT, current_index, paragraph_text.strip()))
|
|
# 判断是否为目录,是就插入目录内容
|
|
add_to_catalog(child.xml, current_index, catalog_content, namespaces, paragraph_text, heading_styles)
|
|
current_index += 1 # 更新索引
|
|
elif child.tag.endswith('tbl'): # 内容控件中的表格
|
|
table_data = []
|
|
merged_cells = {} # 用于记录跨行单元格的信息
|
|
for row_idx, row in enumerate(child.findall('.//w:tr', namespaces=namespaces)):
|
|
row_data = []
|
|
for col_idx, cell in enumerate(row.findall('.//w:tc', namespaces=namespaces)):
|
|
cell_text = ''
|
|
for run in cell.findall('.//w:r', namespaces=namespaces):
|
|
for text in run.findall('.//w:t', namespaces=namespaces):
|
|
cell_text += text.text if text.text is not None else ''
|
|
|
|
# 检查单元格是否跨列
|
|
grid_span_xpath = etree.XPath('.//w:tcPr/w:gridSpan/@w:val', namespaces=namespaces)
|
|
grid_span = int(grid_span_xpath(cell)[0]) if grid_span_xpath(cell) else 1
|
|
if grid_span > 1:
|
|
row_data.extend([cell_text.strip()] * grid_span)
|
|
else:
|
|
row_data.append(cell_text.strip())
|
|
|
|
# 检查单元格是否跨行
|
|
v_merge_xpath = etree.XPath('.//w:tcPr/w:vMerge/@w:val', namespaces=namespaces)
|
|
v_merge = v_merge_xpath(cell)
|
|
if v_merge and v_merge[0] == 'restart':
|
|
merged_cells[(row_idx, col_idx)] = (int(grid_span), 1)
|
|
elif v_merge and v_merge[0] == 'continue':
|
|
if (row_idx - 1, col_idx) in merged_cells:
|
|
merged_cells[(row_idx - 1, col_idx)] = (merged_cells[(row_idx - 1, col_idx)][0], merged_cells[(row_idx - 1, col_idx)][1] + 1)
|
|
# 跨行单元格不需要再次添加到 row_data 中
|
|
else:
|
|
# 只有非跨行单元格才需要添加到 row_data 中
|
|
pass
|
|
|
|
# 处理跨行单元格
|
|
for (r, c), (col_span, row_span) in list(merged_cells.items()):
|
|
if r < row_idx:
|
|
for i in range(row_span):
|
|
if r + i == row_idx:
|
|
row_data[c:c] = [row_data[c]] * (col_span - 1)
|
|
break
|
|
if r + row_span - 1 == row_idx:
|
|
del merged_cells[(r, c)]
|
|
|
|
table_data.append(row_data)
|
|
if table_data: # 检查表格数据是否为空
|
|
doc_content.append(build_result(RESULT_TYPE_TABLE, current_index, table_data))
|
|
current_index += 1 # 更新索引
|
|
elif child.tag.endswith('sdt'): # 嵌套的内容控件
|
|
current_index = parse_sdt(child, doc_content, current_index, namespaces, catalog_content, heading_styles) # 递归解析嵌套的内容控件
|
|
return current_index # 返回更新后的索引
|
|
|
|
def split_text_table(json_data):
|
|
# 分组
|
|
text_elements = [element for element in json_data if element['type'] == 'text']
|
|
table_elements = [element for element in json_data if element['type'] == 'table']
|
|
|
|
# 转换为JSON字符串
|
|
text_elements_json = json.dumps(text_elements, ensure_ascii=False, indent=4)
|
|
table_elements_json = json.dumps(table_elements, ensure_ascii=False, indent=4)
|
|
|
|
return text_elements_json, table_elements_json
|
|
|
|
def append_to_file(file_path, text):
|
|
try:
|
|
with open(file_path, 'a', encoding='utf-8') as file:
|
|
file.write(text + '\n')
|
|
except Exception as e:
|
|
print(f"Error writing to file: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
current_directory = os.getcwd()
|
|
docx_relative_path = '101.docx'
|
|
file_relative_path = 'file\\docx\\test1.txt'
|
|
docx_path = os.path.join(current_directory, docx_relative_path)
|
|
file_path = os.path.join(current_directory, file_relative_path)
|
|
try:
|
|
parsed_content, catalog_content = parse_docx(docx_path)
|
|
if parsed_content and catalog_content:
|
|
json_parsed_content = json.loads(parsed_content)
|
|
text_elements_json, table_elements_json = split_text_table(json_parsed_content)
|
|
|
|
append_to_file(file_path, text_elements_json)
|
|
append_to_file(file_path, table_elements_json)
|
|
append_to_file(file_path, catalog_content)
|
|
except Exception as e:
|
|
print(f"Error parse_docx: {e}") |