from docx import Document import json from docx.oxml.table import CT_Tbl from docx.oxml.text.paragraph import CT_P from lxml import etree import os import zipfile RESULT_TYPE_TEXT = 'text' RESULT_TYPE_TABLE = 'table' def build_result(result_type, index, data): return { 'type': result_type, 'index': index, 'data': data } def build_catalog_result(index, depth, data): return { 'index': index, 'depth': depth, 'data': data } # 解析docx文件中的XML内容 def get_xml_content(docx_filename, xml_filename): with zipfile.ZipFile(docx_filename) as z: return z.read(xml_filename) def parse_paragraph(paragraph, index, namespaces): paragraph_text = paragraph.text.strip() if paragraph else '' if paragraph_text: return build_result(RESULT_TYPE_TEXT, index, paragraph_text) return None def parse_table(table, index): table_data = [] for row in table.rows: row_data = [cell.text for cell in row.cells] table_data.append(row_data) return build_result(RESULT_TYPE_TABLE, index, table_data) def parse_paragraph_element(paragraph_element, index, namespaces): paragraph_xml = etree.fromstring(paragraph_element.xml) paragraph_text = ''.join(paragraph_xml.xpath('//w:t/text()', namespaces=namespaces)).strip() if paragraph_text: return build_result(RESULT_TYPE_TEXT, index, paragraph_text) return None def parse_table_element(table_element, index, namespaces): table_xml = etree.fromstring(table_element.xml) table_data = [] for row in table_xml.xpath('//w:tr', namespaces=namespaces): row_data = [] for cell in row.xpath('./w:tc | ./w:sdt', namespaces=namespaces): cell_text = ''.join(cell.xpath('.//w:t/text()', namespaces=namespaces)).strip() grid_span_xpath = etree.XPath('.//w:tcPr/w:gridSpan/@w:val', namespaces=namespaces) grid_span = int(grid_span_xpath(cell)[0]) if grid_span_xpath(cell) else 1 if grid_span > 1: row_data.extend([cell_text] * grid_span) else: row_data.append(cell_text) table_data.append(row_data) return build_result(RESULT_TYPE_TABLE, index, table_data) def add_to_catalog(element_xml, index, catalog_content, namespaces, paragraph_text, heading_styles): p_element = etree.fromstring(element_xml) # outlineLvl = p_element.xpath('.//w:outlineLvl', namespaces=namespaces) # if outlineLvl: # level = int(outlineLvl[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val')) # catalog_content.append(build_catalog_result(index, level, paragraph_text)) level = is_heading_paragraph(p_element, heading_styles, namespaces) if level != -1: catalog_content.append(build_catalog_result(index, level, paragraph_text)) # 检查段落是否为标题样式 def is_heading_paragraph(paragraph, heading_styles, namespaces): pPr = paragraph.find('.//w:pPr', namespaces=namespaces) if pPr is not None: pStyle = pPr.find('.//w:pStyle', namespaces=namespaces) pOutLineLvl = pPr.find('.//w:outlineLvl', namespaces=namespaces) if pStyle is not None: style_val = pStyle.get(f"{{{namespaces['w']}}}val") if style_val.isdigit(): return int(style_val) if pOutLineLvl is not None: outLineLvl_val = pOutLineLvl.get(f"{{{namespaces['w']}}}val") if outLineLvl_val.isdigit(): return int(outLineLvl_val) + 1 # if pStyle is not None and pStyle.get(ns['w'] + 'val') in heading_styles: # if style_val > 0: # return True return -1 def get_paragraph_text(paragraph_element, namespaces): paragraph_text = '' for run in paragraph_element.findall('.//w:r', namespaces=namespaces): for text in run.findall('.//w:t', namespaces=namespaces): paragraph_text += text.text if text.text is not None else '' return paragraph_text def add_to_catalog_paragraph(text, index, catalog_content, namespaces): # 添加段落到目录 catalog_content.append(build_catalog_result(index, 1, text)) # 假设默认级别为1 def parse_sdt_catalog(sdt_element, catalog_content, index, namespaces): sdt_content = sdt_element.find('.//w:sdtContent', namespaces=namespaces) if sdt_content is not None: for child in sdt_content: if child.tag.endswith('p'): # 内容控件中的段落 paragraph_text = get_paragraph_text(child, namespaces) if paragraph_text.strip(): # 检查文本是否为空 add_to_catalog_paragraph(paragraph_text, index, catalog_content, namespaces) index += 1 # 更新索引 elif child.tag.endswith('tbl'): # 内容控件中的表格 # 处理表格内容(如果需要) pass elif child.tag.endswith('sdt'): # 嵌套的内容控件 index = parse_sdt_catalog(child, catalog_content, index, namespaces) # 递归解析嵌套的内容控件 return index def parse_docx(docx_path): try: document = Document(docx_path) styles_xml = get_xml_content(docx_path, 'word/styles.xml') except Exception as e: print(f"Error loading document: {e}") return None, None doc_content = [] # 内容(文本+表格) catalog_content = [] # 目录 current_index = 1 # 维护全局的 index 变量 paragraph_index = 0 table_index = 0 # 获取整个文档的XML内容 xml_root = document.part.element namespaces = xml_root.nsmap # 获取所有标题样式 styles_root = etree.fromstring(styles_xml) heading_styles = set() for style in styles_root.xpath('//w:style', namespaces=namespaces): style_type = style.get(namespaces['w'] + 'type') if style_type == 'paragraph' and style.get(namespaces['w'] + 'styleId').startswith('Heading'): heading_styles.add(style.get(namespaces['w'] + 'styleId')) # 遍历文档中的所有元素 for i, element in enumerate(document.element.body): if isinstance(element, CT_P): # 段落 paragraph_result = parse_paragraph_element(element, current_index, namespaces) if paragraph_result: doc_content.append(paragraph_result) # 判断是否为目录,是就插入目录内容 paragraph = document.paragraphs[paragraph_index] add_to_catalog(paragraph._element.xml, current_index, catalog_content, namespaces, paragraph.text, heading_styles) current_index += 1 # 更新 index paragraph_index += 1 elif isinstance(element, CT_Tbl): # 表格 table_result = parse_table_element(element, current_index, namespaces) if table_result: doc_content.append(table_result) current_index += 1 # 更新 index table_index += 1 elif element.tag.endswith('sdt'): # 内容控件 current_index = parse_sdt(element, doc_content, current_index, namespaces, catalog_content, heading_styles) # 更新索引 return json.dumps(doc_content, indent=4, ensure_ascii=False), json.dumps(catalog_content, indent=4, ensure_ascii=False) def parse_sdt(sdt_element, doc_content, current_index, namespaces, catalog_content, heading_styles): sdtContent = sdt_element.find('.//w:sdtContent', namespaces=namespaces) if sdtContent is not None: for child in sdtContent: if child.tag.endswith('p'): # 内容控件中的段落 paragraph_text = '' for run in child.findall('.//w:r', namespaces=namespaces): for text in run.findall('.//w:t', namespaces=namespaces): paragraph_text += text.text if text.text is not None else '' if paragraph_text.strip(): # 检查文本是否为空 doc_content.append(build_result(RESULT_TYPE_TEXT, current_index, paragraph_text.strip())) # 判断是否为目录,是就插入目录内容 add_to_catalog(child.xml, current_index, catalog_content, namespaces, paragraph_text, heading_styles) current_index += 1 # 更新索引 elif child.tag.endswith('tbl'): # 内容控件中的表格 table_data = [] merged_cells = {} # 用于记录跨行单元格的信息 for row_idx, row in enumerate(child.findall('.//w:tr', namespaces=namespaces)): row_data = [] for col_idx, cell in enumerate(row.findall('.//w:tc', namespaces=namespaces)): cell_text = '' for run in cell.findall('.//w:r', namespaces=namespaces): for text in run.findall('.//w:t', namespaces=namespaces): cell_text += text.text if text.text is not None else '' # 检查单元格是否跨列 grid_span_xpath = etree.XPath('.//w:tcPr/w:gridSpan/@w:val', namespaces=namespaces) grid_span = int(grid_span_xpath(cell)[0]) if grid_span_xpath(cell) else 1 if grid_span > 1: row_data.extend([cell_text.strip()] * grid_span) else: row_data.append(cell_text.strip()) # 检查单元格是否跨行 v_merge_xpath = etree.XPath('.//w:tcPr/w:vMerge/@w:val', namespaces=namespaces) v_merge = v_merge_xpath(cell) if v_merge and v_merge[0] == 'restart': merged_cells[(row_idx, col_idx)] = (int(grid_span), 1) elif v_merge and v_merge[0] == 'continue': if (row_idx - 1, col_idx) in merged_cells: merged_cells[(row_idx - 1, col_idx)] = (merged_cells[(row_idx - 1, col_idx)][0], merged_cells[(row_idx - 1, col_idx)][1] + 1) # 跨行单元格不需要再次添加到 row_data 中 else: # 只有非跨行单元格才需要添加到 row_data 中 pass # 处理跨行单元格 for (r, c), (col_span, row_span) in list(merged_cells.items()): if r < row_idx: for i in range(row_span): if r + i == row_idx: row_data[c:c] = [row_data[c]] * (col_span - 1) break if r + row_span - 1 == row_idx: del merged_cells[(r, c)] table_data.append(row_data) if table_data: # 检查表格数据是否为空 doc_content.append(build_result(RESULT_TYPE_TABLE, current_index, table_data)) current_index += 1 # 更新索引 elif child.tag.endswith('sdt'): # 嵌套的内容控件 current_index = parse_sdt(child, doc_content, current_index, namespaces, catalog_content, heading_styles) # 递归解析嵌套的内容控件 return current_index # 返回更新后的索引 def split_text_table(json_data): # 分组 text_elements = [element for element in json_data if element['type'] == 'text'] table_elements = [element for element in json_data if element['type'] == 'table'] # 转换为JSON字符串 text_elements_json = json.dumps(text_elements, ensure_ascii=False, indent=4) table_elements_json = json.dumps(table_elements, ensure_ascii=False, indent=4) return text_elements_json, table_elements_json def append_to_file(file_path, text): try: with open(file_path, 'a', encoding='utf-8') as file: file.write(text + '\n') except Exception as e: print(f"Error writing to file: {e}") if __name__ == "__main__": current_directory = os.getcwd() docx_relative_path = '101.docx' file_relative_path = 'file\\docx\\test1.txt' docx_path = os.path.join(current_directory, docx_relative_path) file_path = os.path.join(current_directory, file_relative_path) try: parsed_content, catalog_content = parse_docx(docx_path) if parsed_content and catalog_content: json_parsed_content = json.loads(parsed_content) text_elements_json, table_elements_json = split_text_table(json_parsed_content) append_to_file(file_path, text_elements_json) append_to_file(file_path, table_elements_json) append_to_file(file_path, catalog_content) except Exception as e: print(f"Error parse_docx: {e}")