pdf-picker/index.py
2026-05-26 21:36:18 +08:00

245 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import argparse
import os
import re
import sys
from datetime import datetime
from pathlib import Path
import fitz # PyMuPDF
BASE_DIR = Path(__file__).resolve().parent
DEFAULT_SOURCE_DIR = BASE_DIR / "source"
DEFAULT_OUTPUT_DIR = BASE_DIR / "output"
def safe_dir_name(name: str) -> str:
"""Sanitize directory name for Windows filesystem."""
cleaned = re.sub(r"[\\/:*?\"<>|]", "_", name.strip())
cleaned = re.sub(r"\s+", " ", cleaned)
return cleaned[:120] if cleaned else "未识别收款人"
def extract_payee_name(text: str) -> str | None:
"""Extract 收款人全称 from text using common label patterns."""
if not text:
return None
normalized = text.replace("\u3000", " ")
compact = re.sub(r"\s+", "", normalized)
compact_match = re.search(
r"收款人全称[:]?(.+?)(别名|账号|开户行|大写金额|小写金额|用途|钞汇标志|摘要|重要提示|付款人全称|$)",
compact,
flags=re.IGNORECASE,
)
if compact_match:
value = compact_match.group(1).strip(" :")
if value:
return value
patterns = [
r"收款人全称\s*[:]\s*([^\n\r]+)",
r"收款人\s*全称\s*[:]\s*([^\n\r]+)",
r"\s*款\s*人\s*全\s*称\s*[:]?\s*([^\n\r]+)",
r"收款人全称\s+([^\n\r]+)",
]
for pattern in patterns:
match = re.search(pattern, normalized, flags=re.IGNORECASE)
if match:
value = match.group(1).strip()
value = re.split(r"\s{2,}|金额|开户行|账号|日期", value)[0].strip(" :")
if value:
return value
lines = [line.strip() for line in normalized.splitlines() if line.strip()]
for idx, line in enumerate(lines):
if "收款人全称" in line:
after = line.split("收款人全称", 1)[1].strip(" :")
if after:
return after
if idx + 1 < len(lines):
candidate = lines[idx + 1].strip(" :")
if candidate:
return candidate
return None
def extract_text_via_ocr(image_path: Path) -> str:
"""OCR fallback for scanned PDFs without embedded text."""
try:
import pytesseract
from PIL import Image
except ImportError:
return ""
tesseract_cmd = os.environ.get("TESSERACT_CMD", "")
if tesseract_cmd:
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
try:
with Image.open(image_path) as img:
return pytesseract.image_to_string(img, lang="chi_sim+eng")
except Exception:
return ""
def resolve_io_dirs() -> tuple[Path, Path]:
"""Resolve source/output directories from CLI, with defaults as fallback."""
parser = argparse.ArgumentParser(description="PDF 回执拆分与按收款人分类")
parser.add_argument("input_dir", nargs="?", help="输入目录(可选,默认 ./source")
parser.add_argument("output_dir", nargs="?", help="输出目录(可选,默认 ./output")
parser.add_argument("-i", "--input", dest="input_opt", help="输入目录")
parser.add_argument("-o", "--output", dest="output_opt", help="输出目录")
args = parser.parse_args()
input_raw = args.input_opt or args.input_dir
output_raw = args.output_opt or args.output_dir
source_dir = Path(input_raw).expanduser().resolve() if input_raw else DEFAULT_SOURCE_DIR
output_dir = Path(output_raw).expanduser().resolve() if output_raw else DEFAULT_OUTPUT_DIR
return source_dir, output_dir
def ensure_dirs(source_dir: Path, output_dir: Path) -> None:
source_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
def get_receipt_clips(page: fitz.Page) -> list[fitz.Rect]:
"""Infer receipt regions on one page by locating repeated receipt headers."""
header_text = "中国建设银行网上银行电子回执"
header_rects = page.search_for(header_text)
if not header_rects:
return [page.rect]
sorted_rects = sorted(header_rects, key=lambda r: (round(r.y0, 1), r.x0))
unique_y: list[float] = []
for rect in sorted_rects:
y = rect.y0
if not unique_y or abs(y - unique_y[-1]) > 8:
unique_y.append(y)
bounds = page.rect
clips: list[fitz.Rect] = []
for idx, y in enumerate(unique_y):
y0 = max(bounds.y0, y - 6)
y1 = unique_y[idx + 1] - 6 if idx + 1 < len(unique_y) else bounds.y1
if y1 - y0 > 40:
clips.append(fitz.Rect(bounds.x0, y0, bounds.x1, y1))
return clips or [page.rect]
def process_pdf(pdf_path: Path, output_dir: Path) -> tuple[int, int, dict[str, int]]:
"""Split PDF pages to images and classify by payee name.
Returns:
(success_count, failed_count, per_dir_counts)
"""
success_count = 0
failed_count = 0
per_dir_counts: dict[str, int] = {}
with fitz.open(pdf_path) as doc:
for page_index, page in enumerate(doc, start=1):
receipt_clips = get_receipt_clips(page)
for receipt_index, clip in enumerate(receipt_clips, start=1):
image_name = f"{pdf_path.stem}_p{page_index:03d}_r{receipt_index:03d}.png"
temp_image = output_dir / image_name
# Render one inferred receipt region as one output image.
pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0), alpha=False, clip=clip)
pix.save(temp_image)
receipt_text = page.get_text("text", clip=clip) or ""
payee_name = extract_payee_name(receipt_text)
if not payee_name:
ocr_text = extract_text_via_ocr(temp_image)
payee_name = extract_payee_name(ocr_text)
if not payee_name:
payee_name = "未识别收款人"
failed_count += 1
else:
success_count += 1
target_dir = output_dir / safe_dir_name(payee_name)
target_dir.mkdir(parents=True, exist_ok=True)
per_dir_counts[target_dir.name] = per_dir_counts.get(target_dir.name, 0) + 1
final_image = target_dir / image_name
if final_image.exists():
final_image = target_dir / f"{pdf_path.stem}_p{page_index:03d}_r{receipt_index:03d}_{os.getpid()}.png"
temp_image.replace(final_image)
return success_count, failed_count, per_dir_counts
def write_execution_report(report_lines: list[str]) -> Path:
"""Write execution details into a timestamped report file under BASE_DIR."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = BASE_DIR / f"执行结果记录_{timestamp}.txt"
report_path.write_text("\n".join(report_lines) + "\n", encoding="utf-8")
return report_path
def main() -> int:
source_dir, output_dir = resolve_io_dirs()
ensure_dirs(source_dir, output_dir)
pdf_files = sorted(source_dir.glob("*.pdf"))
if not pdf_files:
print(f"未在目录中发现 PDF: {source_dir}")
print("请将 PDF 文件放到 source 目录后重试。")
return 0
total_ok = 0
total_unknown = 0
report_lines = [
f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
f"输入目录: {source_dir}",
f"输出目录: {output_dir}",
"",
]
for pdf_file in pdf_files:
print(f"处理文件: {pdf_file.name}")
ok, unknown, dir_counts = process_pdf(pdf_file, output_dir)
print(f" 识别到收款人全称的图片数: {ok}")
print(f" 未识别收款人全称的图片数: {unknown}")
total_ok += ok
total_unknown += unknown
report_lines.append(f"PDF: {pdf_file.name}")
report_lines.append(f" 拆分目录数: {len(dir_counts)}")
report_lines.append(f" 识别到收款人全称的图片数: {ok}")
report_lines.append(f" 未识别收款人全称的图片数: {unknown}")
report_lines.append(" 目录明细:")
for dir_name in sorted(dir_counts):
report_lines.append(f" - {dir_name}: {dir_counts[dir_name]}")
report_lines.append("")
print("\n处理完成")
print(f"识别到收款人全称的图片数: {total_ok}")
print(f"未识别收款人全称的图片数: {total_unknown}")
print(f"输出目录: {output_dir}")
report_lines.append("汇总:")
report_lines.append(f" 识别到收款人全称的图片总数: {total_ok}")
report_lines.append(f" 未识别收款人全称的图片总数: {total_unknown}")
report_path = write_execution_report(report_lines)
print(f"执行结果记录文件: {report_path}")
return 0
if __name__ == "__main__":
sys.exit(main())