From 106ebc4c4ae2cec3d64ef1cc4526c718e9ddff67 Mon Sep 17 00:00:00 2001 From: liyanyan <215952619@qq.com> Date: Tue, 26 May 2026 21:36:18 +0800 Subject: [PATCH] =?UTF-8?q?init:=20=E6=B7=BB=E5=8A=A0=E5=88=9D=E5=A7=8Bpyt?= =?UTF-8?q?hon=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- index.py | 244 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 index.py diff --git a/index.py b/index.py new file mode 100644 index 0000000..e93b426 --- /dev/null +++ b/index.py @@ -0,0 +1,244 @@ +from __future__ import annotations + +import argparse +import os +import re +import sys +from datetime import datetime +from pathlib import Path + +import fitz # PyMuPDF + + +BASE_DIR = Path(__file__).resolve().parent +DEFAULT_SOURCE_DIR = BASE_DIR / "source" +DEFAULT_OUTPUT_DIR = BASE_DIR / "output" + + +def safe_dir_name(name: str) -> str: + """Sanitize directory name for Windows filesystem.""" + cleaned = re.sub(r"[\\/:*?\"<>|]", "_", name.strip()) + cleaned = re.sub(r"\s+", " ", cleaned) + return cleaned[:120] if cleaned else "未识别收款人" + + +def extract_payee_name(text: str) -> str | None: + """Extract 收款人全称 from text using common label patterns.""" + if not text: + return None + + normalized = text.replace("\u3000", " ") + compact = re.sub(r"\s+", "", normalized) + + compact_match = re.search( + r"收款人全称[::]?(.+?)(别名|账号|开户行|大写金额|小写金额|用途|钞汇标志|摘要|重要提示|付款人全称|$)", + compact, + flags=re.IGNORECASE, + ) + if compact_match: + value = compact_match.group(1).strip(" ::") + if value: + return value + + patterns = [ + r"收款人全称\s*[::]\s*([^\n\r]+)", + r"收款人\s*全称\s*[::]\s*([^\n\r]+)", + r"收\s*款\s*人\s*全\s*称\s*[::]?\s*([^\n\r]+)", + r"收款人全称\s+([^\n\r]+)", + ] + for pattern in patterns: + match = re.search(pattern, normalized, flags=re.IGNORECASE) + if match: + value = match.group(1).strip() + value = re.split(r"\s{2,}|金额|开户行|账号|日期", value)[0].strip(" ::") + if value: + return value + + lines = [line.strip() for line in normalized.splitlines() if line.strip()] + for idx, line in enumerate(lines): + if "收款人全称" in line: + after = line.split("收款人全称", 1)[1].strip(" ::") + if after: + return after + if idx + 1 < len(lines): + candidate = lines[idx + 1].strip(" ::") + if candidate: + return candidate + + return None + + +def extract_text_via_ocr(image_path: Path) -> str: + """OCR fallback for scanned PDFs without embedded text.""" + try: + import pytesseract + from PIL import Image + except ImportError: + return "" + + tesseract_cmd = os.environ.get("TESSERACT_CMD", "") + if tesseract_cmd: + pytesseract.pytesseract.tesseract_cmd = tesseract_cmd + + try: + with Image.open(image_path) as img: + return pytesseract.image_to_string(img, lang="chi_sim+eng") + except Exception: + return "" + + +def resolve_io_dirs() -> tuple[Path, Path]: + """Resolve source/output directories from CLI, with defaults as fallback.""" + parser = argparse.ArgumentParser(description="PDF 回执拆分与按收款人分类") + parser.add_argument("input_dir", nargs="?", help="输入目录(可选,默认 ./source)") + parser.add_argument("output_dir", nargs="?", help="输出目录(可选,默认 ./output)") + parser.add_argument("-i", "--input", dest="input_opt", help="输入目录") + parser.add_argument("-o", "--output", dest="output_opt", help="输出目录") + args = parser.parse_args() + + input_raw = args.input_opt or args.input_dir + output_raw = args.output_opt or args.output_dir + + source_dir = Path(input_raw).expanduser().resolve() if input_raw else DEFAULT_SOURCE_DIR + output_dir = Path(output_raw).expanduser().resolve() if output_raw else DEFAULT_OUTPUT_DIR + return source_dir, output_dir + + +def ensure_dirs(source_dir: Path, output_dir: Path) -> None: + source_dir.mkdir(parents=True, exist_ok=True) + output_dir.mkdir(parents=True, exist_ok=True) + + +def get_receipt_clips(page: fitz.Page) -> list[fitz.Rect]: + """Infer receipt regions on one page by locating repeated receipt headers.""" + header_text = "中国建设银行网上银行电子回执" + header_rects = page.search_for(header_text) + + if not header_rects: + return [page.rect] + + sorted_rects = sorted(header_rects, key=lambda r: (round(r.y0, 1), r.x0)) + unique_y: list[float] = [] + for rect in sorted_rects: + y = rect.y0 + if not unique_y or abs(y - unique_y[-1]) > 8: + unique_y.append(y) + + bounds = page.rect + clips: list[fitz.Rect] = [] + for idx, y in enumerate(unique_y): + y0 = max(bounds.y0, y - 6) + y1 = unique_y[idx + 1] - 6 if idx + 1 < len(unique_y) else bounds.y1 + if y1 - y0 > 40: + clips.append(fitz.Rect(bounds.x0, y0, bounds.x1, y1)) + + return clips or [page.rect] + + +def process_pdf(pdf_path: Path, output_dir: Path) -> tuple[int, int, dict[str, int]]: + """Split PDF pages to images and classify by payee name. + + Returns: + (success_count, failed_count, per_dir_counts) + """ + success_count = 0 + failed_count = 0 + per_dir_counts: dict[str, int] = {} + + with fitz.open(pdf_path) as doc: + for page_index, page in enumerate(doc, start=1): + receipt_clips = get_receipt_clips(page) + for receipt_index, clip in enumerate(receipt_clips, start=1): + image_name = f"{pdf_path.stem}_p{page_index:03d}_r{receipt_index:03d}.png" + temp_image = output_dir / image_name + + # Render one inferred receipt region as one output image. + pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0), alpha=False, clip=clip) + pix.save(temp_image) + + receipt_text = page.get_text("text", clip=clip) or "" + payee_name = extract_payee_name(receipt_text) + + if not payee_name: + ocr_text = extract_text_via_ocr(temp_image) + payee_name = extract_payee_name(ocr_text) + + if not payee_name: + payee_name = "未识别收款人" + failed_count += 1 + else: + success_count += 1 + + target_dir = output_dir / safe_dir_name(payee_name) + target_dir.mkdir(parents=True, exist_ok=True) + per_dir_counts[target_dir.name] = per_dir_counts.get(target_dir.name, 0) + 1 + + final_image = target_dir / image_name + if final_image.exists(): + final_image = target_dir / f"{pdf_path.stem}_p{page_index:03d}_r{receipt_index:03d}_{os.getpid()}.png" + + temp_image.replace(final_image) + + return success_count, failed_count, per_dir_counts + + +def write_execution_report(report_lines: list[str]) -> Path: + """Write execution details into a timestamped report file under BASE_DIR.""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + report_path = BASE_DIR / f"执行结果记录_{timestamp}.txt" + report_path.write_text("\n".join(report_lines) + "\n", encoding="utf-8") + return report_path + + +def main() -> int: + source_dir, output_dir = resolve_io_dirs() + ensure_dirs(source_dir, output_dir) + + pdf_files = sorted(source_dir.glob("*.pdf")) + if not pdf_files: + print(f"未在目录中发现 PDF: {source_dir}") + print("请将 PDF 文件放到 source 目录后重试。") + return 0 + + total_ok = 0 + total_unknown = 0 + report_lines = [ + f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", + f"输入目录: {source_dir}", + f"输出目录: {output_dir}", + "", + ] + + for pdf_file in pdf_files: + print(f"处理文件: {pdf_file.name}") + ok, unknown, dir_counts = process_pdf(pdf_file, output_dir) + print(f" 识别到收款人全称的图片数: {ok}") + print(f" 未识别收款人全称的图片数: {unknown}") + total_ok += ok + total_unknown += unknown + + report_lines.append(f"PDF: {pdf_file.name}") + report_lines.append(f" 拆分目录数: {len(dir_counts)}") + report_lines.append(f" 识别到收款人全称的图片数: {ok}") + report_lines.append(f" 未识别收款人全称的图片数: {unknown}") + report_lines.append(" 目录明细:") + for dir_name in sorted(dir_counts): + report_lines.append(f" - {dir_name}: {dir_counts[dir_name]} 张") + report_lines.append("") + + print("\n处理完成") + print(f"识别到收款人全称的图片数: {total_ok}") + print(f"未识别收款人全称的图片数: {total_unknown}") + print(f"输出目录: {output_dir}") + + report_lines.append("汇总:") + report_lines.append(f" 识别到收款人全称的图片总数: {total_ok}") + report_lines.append(f" 未识别收款人全称的图片总数: {total_unknown}") + + report_path = write_execution_report(report_lines) + print(f"执行结果记录文件: {report_path}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())