pdf_code/zzb_data_word/parse_word.py

269 lines
13 KiB
Python

from docx import Document
import json
from docx.oxml.table import CT_Tbl
from docx.oxml.text.paragraph import CT_P
from lxml import etree
import os
import zipfile
RESULT_TYPE_TEXT = 'text'
RESULT_TYPE_TABLE = 'table'
def build_result(result_type, index, data):
return {
'type': result_type,
'index': index,
'data': data
}
def build_catalog_result(index, depth, data):
return {
'index': index,
'depth': depth,
'data': data
}
# 解析docx文件中的XML内容
def get_xml_content(docx_filename, xml_filename):
with zipfile.ZipFile(docx_filename) as z:
return z.read(xml_filename)
def parse_paragraph(paragraph, index, namespaces):
paragraph_text = paragraph.text.strip() if paragraph else ''
if paragraph_text:
return build_result(RESULT_TYPE_TEXT, index, paragraph_text)
return None
def parse_table(table, index):
table_data = []
for row in table.rows:
row_data = [cell.text for cell in row.cells]
table_data.append(row_data)
return build_result(RESULT_TYPE_TABLE, index, table_data)
def parse_paragraph_element(paragraph_element, index, namespaces):
paragraph_xml = etree.fromstring(paragraph_element.xml)
paragraph_text = ''.join(paragraph_xml.xpath('//w:t/text()', namespaces=namespaces)).strip()
if paragraph_text:
return build_result(RESULT_TYPE_TEXT, index, paragraph_text)
return None
def parse_table_element(table_element, index, namespaces):
table_xml = etree.fromstring(table_element.xml)
table_data = []
for row in table_xml.xpath('//w:tr', namespaces=namespaces):
row_data = []
for cell in row.xpath('./w:tc | ./w:sdt', namespaces=namespaces):
cell_text = ''.join(cell.xpath('.//w:t/text()', namespaces=namespaces)).strip()
grid_span_xpath = etree.XPath('.//w:tcPr/w:gridSpan/@w:val', namespaces=namespaces)
grid_span = int(grid_span_xpath(cell)[0]) if grid_span_xpath(cell) else 1
if grid_span > 1:
row_data.extend([cell_text] * grid_span)
else:
row_data.append(cell_text)
table_data.append(row_data)
return build_result(RESULT_TYPE_TABLE, index, table_data)
def add_to_catalog(element_xml, index, catalog_content, namespaces, paragraph_text, heading_styles):
p_element = etree.fromstring(element_xml)
# outlineLvl = p_element.xpath('.//w:outlineLvl', namespaces=namespaces)
# if outlineLvl:
# level = int(outlineLvl[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val'))
# catalog_content.append(build_catalog_result(index, level, paragraph_text))
level = is_heading_paragraph(p_element, heading_styles, namespaces)
if level != -1:
catalog_content.append(build_catalog_result(index, level, paragraph_text))
# 检查段落是否为标题样式
def is_heading_paragraph(paragraph, heading_styles, namespaces):
pPr = paragraph.find('.//w:pPr', namespaces=namespaces)
if pPr is not None:
pStyle = pPr.find('.//w:pStyle', namespaces=namespaces)
pOutLineLvl = pPr.find('.//w:outlineLvl', namespaces=namespaces)
if pStyle is not None:
style_val = pStyle.get(f"{{{namespaces['w']}}}val")
if style_val.isdigit():
return int(style_val)
if pOutLineLvl is not None:
outLineLvl_val = pOutLineLvl.get(f"{{{namespaces['w']}}}val")
if outLineLvl_val.isdigit():
return int(outLineLvl_val) + 1
# if pStyle is not None and pStyle.get(ns['w'] + 'val') in heading_styles:
# if style_val > 0:
# return True
return -1
def get_paragraph_text(paragraph_element, namespaces):
paragraph_text = ''
for run in paragraph_element.findall('.//w:r', namespaces=namespaces):
for text in run.findall('.//w:t', namespaces=namespaces):
paragraph_text += text.text if text.text is not None else ''
return paragraph_text
def add_to_catalog_paragraph(text, index, catalog_content, namespaces):
# 添加段落到目录
catalog_content.append(build_catalog_result(index, 1, text)) # 假设默认级别为1
def parse_sdt_catalog(sdt_element, catalog_content, index, namespaces):
sdt_content = sdt_element.find('.//w:sdtContent', namespaces=namespaces)
if sdt_content is not None:
for child in sdt_content:
if child.tag.endswith('p'): # 内容控件中的段落
paragraph_text = get_paragraph_text(child, namespaces)
if paragraph_text.strip(): # 检查文本是否为空
add_to_catalog_paragraph(paragraph_text, index, catalog_content, namespaces)
index += 1 # 更新索引
elif child.tag.endswith('tbl'): # 内容控件中的表格
# 处理表格内容(如果需要)
pass
elif child.tag.endswith('sdt'): # 嵌套的内容控件
index = parse_sdt_catalog(child, catalog_content, index, namespaces) # 递归解析嵌套的内容控件
return index
def parse_docx(docx_path):
try:
document = Document(docx_path)
styles_xml = get_xml_content(docx_path, 'word/styles.xml')
except Exception as e:
print(f"Error loading document: {e}")
return None, None
doc_content = [] # 内容(文本+表格)
catalog_content = [] # 目录
current_index = 1 # 维护全局的 index 变量
paragraph_index = 0
table_index = 0
# 获取整个文档的XML内容
xml_root = document.part.element
namespaces = xml_root.nsmap
# 获取所有标题样式
styles_root = etree.fromstring(styles_xml)
heading_styles = set()
for style in styles_root.xpath('//w:style', namespaces=namespaces):
style_type = style.get(namespaces['w'] + 'type')
if style_type == 'paragraph' and style.get(namespaces['w'] + 'styleId').startswith('Heading'):
heading_styles.add(style.get(namespaces['w'] + 'styleId'))
# 遍历文档中的所有元素
for i, element in enumerate(document.element.body):
if isinstance(element, CT_P): # 段落
paragraph_result = parse_paragraph_element(element, current_index, namespaces)
if paragraph_result:
doc_content.append(paragraph_result)
# 判断是否为目录,是就插入目录内容
paragraph = document.paragraphs[paragraph_index]
add_to_catalog(paragraph._element.xml, current_index, catalog_content, namespaces, paragraph.text, heading_styles)
current_index += 1 # 更新 index
paragraph_index += 1
elif isinstance(element, CT_Tbl): # 表格
table_result = parse_table_element(element, current_index, namespaces)
if table_result:
doc_content.append(table_result)
current_index += 1 # 更新 index
table_index += 1
elif element.tag.endswith('sdt'): # 内容控件
current_index = parse_sdt(element, doc_content, current_index, namespaces, catalog_content, heading_styles) # 更新索引
return json.dumps(doc_content, indent=4, ensure_ascii=False), json.dumps(catalog_content, indent=4, ensure_ascii=False)
def parse_sdt(sdt_element, doc_content, current_index, namespaces, catalog_content, heading_styles):
sdtContent = sdt_element.find('.//w:sdtContent', namespaces=namespaces)
if sdtContent is not None:
for child in sdtContent:
if child.tag.endswith('p'): # 内容控件中的段落
paragraph_text = ''
for run in child.findall('.//w:r', namespaces=namespaces):
for text in run.findall('.//w:t', namespaces=namespaces):
paragraph_text += text.text if text.text is not None else ''
if paragraph_text.strip(): # 检查文本是否为空
doc_content.append(build_result(RESULT_TYPE_TEXT, current_index, paragraph_text.strip()))
# 判断是否为目录,是就插入目录内容
add_to_catalog(child.xml, current_index, catalog_content, namespaces, paragraph_text, heading_styles)
current_index += 1 # 更新索引
elif child.tag.endswith('tbl'): # 内容控件中的表格
table_data = []
merged_cells = {} # 用于记录跨行单元格的信息
for row_idx, row in enumerate(child.findall('.//w:tr', namespaces=namespaces)):
row_data = []
for col_idx, cell in enumerate(row.findall('.//w:tc', namespaces=namespaces)):
cell_text = ''
for run in cell.findall('.//w:r', namespaces=namespaces):
for text in run.findall('.//w:t', namespaces=namespaces):
cell_text += text.text if text.text is not None else ''
# 检查单元格是否跨列
grid_span_xpath = etree.XPath('.//w:tcPr/w:gridSpan/@w:val', namespaces=namespaces)
grid_span = int(grid_span_xpath(cell)[0]) if grid_span_xpath(cell) else 1
if grid_span > 1:
row_data.extend([cell_text.strip()] * grid_span)
else:
row_data.append(cell_text.strip())
# 检查单元格是否跨行
v_merge_xpath = etree.XPath('.//w:tcPr/w:vMerge/@w:val', namespaces=namespaces)
v_merge = v_merge_xpath(cell)
if v_merge and v_merge[0] == 'restart':
merged_cells[(row_idx, col_idx)] = (int(grid_span), 1)
elif v_merge and v_merge[0] == 'continue':
if (row_idx - 1, col_idx) in merged_cells:
merged_cells[(row_idx - 1, col_idx)] = (merged_cells[(row_idx - 1, col_idx)][0], merged_cells[(row_idx - 1, col_idx)][1] + 1)
# 跨行单元格不需要再次添加到 row_data 中
else:
# 只有非跨行单元格才需要添加到 row_data 中
pass
# 处理跨行单元格
for (r, c), (col_span, row_span) in list(merged_cells.items()):
if r < row_idx:
for i in range(row_span):
if r + i == row_idx:
row_data[c:c] = [row_data[c]] * (col_span - 1)
break
if r + row_span - 1 == row_idx:
del merged_cells[(r, c)]
table_data.append(row_data)
if table_data: # 检查表格数据是否为空
doc_content.append(build_result(RESULT_TYPE_TABLE, current_index, table_data))
current_index += 1 # 更新索引
elif child.tag.endswith('sdt'): # 嵌套的内容控件
current_index = parse_sdt(child, doc_content, current_index, namespaces, catalog_content, heading_styles) # 递归解析嵌套的内容控件
return current_index # 返回更新后的索引
def split_text_table(json_data):
# 分组
text_elements = [element for element in json_data if element['type'] == 'text']
table_elements = [element for element in json_data if element['type'] == 'table']
# 转换为JSON字符串
text_elements_json = json.dumps(text_elements, ensure_ascii=False, indent=4)
table_elements_json = json.dumps(table_elements, ensure_ascii=False, indent=4)
return text_elements_json, table_elements_json
def append_to_file(file_path, text):
try:
with open(file_path, 'a', encoding='utf-8') as file:
file.write(text + '\n')
except Exception as e:
print(f"Error writing to file: {e}")
if __name__ == "__main__":
current_directory = os.getcwd()
docx_relative_path = '101.docx'
file_relative_path = 'file\\docx\\test1.txt'
docx_path = os.path.join(current_directory, docx_relative_path)
file_path = os.path.join(current_directory, file_relative_path)
try:
parsed_content, catalog_content = parse_docx(docx_path)
if parsed_content and catalog_content:
json_parsed_content = json.loads(parsed_content)
text_elements_json, table_elements_json = split_text_table(json_parsed_content)
append_to_file(file_path, text_elements_json)
append_to_file(file_path, table_elements_json)
append_to_file(file_path, catalog_content)
except Exception as e:
print(f"Error parse_docx: {e}")