init: 添加初始python版本
This commit is contained in:
commit
106ebc4c4a
244
index.py
Normal file
244
index.py
Normal file
@ -0,0 +1,244 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import fitz # PyMuPDF
|
||||||
|
|
||||||
|
|
||||||
|
BASE_DIR = Path(__file__).resolve().parent
|
||||||
|
DEFAULT_SOURCE_DIR = BASE_DIR / "source"
|
||||||
|
DEFAULT_OUTPUT_DIR = BASE_DIR / "output"
|
||||||
|
|
||||||
|
|
||||||
|
def safe_dir_name(name: str) -> str:
|
||||||
|
"""Sanitize directory name for Windows filesystem."""
|
||||||
|
cleaned = re.sub(r"[\\/:*?\"<>|]", "_", name.strip())
|
||||||
|
cleaned = re.sub(r"\s+", " ", cleaned)
|
||||||
|
return cleaned[:120] if cleaned else "未识别收款人"
|
||||||
|
|
||||||
|
|
||||||
|
def extract_payee_name(text: str) -> str | None:
|
||||||
|
"""Extract 收款人全称 from text using common label patterns."""
|
||||||
|
if not text:
|
||||||
|
return None
|
||||||
|
|
||||||
|
normalized = text.replace("\u3000", " ")
|
||||||
|
compact = re.sub(r"\s+", "", normalized)
|
||||||
|
|
||||||
|
compact_match = re.search(
|
||||||
|
r"收款人全称[::]?(.+?)(别名|账号|开户行|大写金额|小写金额|用途|钞汇标志|摘要|重要提示|付款人全称|$)",
|
||||||
|
compact,
|
||||||
|
flags=re.IGNORECASE,
|
||||||
|
)
|
||||||
|
if compact_match:
|
||||||
|
value = compact_match.group(1).strip(" ::")
|
||||||
|
if value:
|
||||||
|
return value
|
||||||
|
|
||||||
|
patterns = [
|
||||||
|
r"收款人全称\s*[::]\s*([^\n\r]+)",
|
||||||
|
r"收款人\s*全称\s*[::]\s*([^\n\r]+)",
|
||||||
|
r"收\s*款\s*人\s*全\s*称\s*[::]?\s*([^\n\r]+)",
|
||||||
|
r"收款人全称\s+([^\n\r]+)",
|
||||||
|
]
|
||||||
|
for pattern in patterns:
|
||||||
|
match = re.search(pattern, normalized, flags=re.IGNORECASE)
|
||||||
|
if match:
|
||||||
|
value = match.group(1).strip()
|
||||||
|
value = re.split(r"\s{2,}|金额|开户行|账号|日期", value)[0].strip(" ::")
|
||||||
|
if value:
|
||||||
|
return value
|
||||||
|
|
||||||
|
lines = [line.strip() for line in normalized.splitlines() if line.strip()]
|
||||||
|
for idx, line in enumerate(lines):
|
||||||
|
if "收款人全称" in line:
|
||||||
|
after = line.split("收款人全称", 1)[1].strip(" ::")
|
||||||
|
if after:
|
||||||
|
return after
|
||||||
|
if idx + 1 < len(lines):
|
||||||
|
candidate = lines[idx + 1].strip(" ::")
|
||||||
|
if candidate:
|
||||||
|
return candidate
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def extract_text_via_ocr(image_path: Path) -> str:
|
||||||
|
"""OCR fallback for scanned PDFs without embedded text."""
|
||||||
|
try:
|
||||||
|
import pytesseract
|
||||||
|
from PIL import Image
|
||||||
|
except ImportError:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
tesseract_cmd = os.environ.get("TESSERACT_CMD", "")
|
||||||
|
if tesseract_cmd:
|
||||||
|
pytesseract.pytesseract.tesseract_cmd = tesseract_cmd
|
||||||
|
|
||||||
|
try:
|
||||||
|
with Image.open(image_path) as img:
|
||||||
|
return pytesseract.image_to_string(img, lang="chi_sim+eng")
|
||||||
|
except Exception:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_io_dirs() -> tuple[Path, Path]:
|
||||||
|
"""Resolve source/output directories from CLI, with defaults as fallback."""
|
||||||
|
parser = argparse.ArgumentParser(description="PDF 回执拆分与按收款人分类")
|
||||||
|
parser.add_argument("input_dir", nargs="?", help="输入目录(可选,默认 ./source)")
|
||||||
|
parser.add_argument("output_dir", nargs="?", help="输出目录(可选,默认 ./output)")
|
||||||
|
parser.add_argument("-i", "--input", dest="input_opt", help="输入目录")
|
||||||
|
parser.add_argument("-o", "--output", dest="output_opt", help="输出目录")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
input_raw = args.input_opt or args.input_dir
|
||||||
|
output_raw = args.output_opt or args.output_dir
|
||||||
|
|
||||||
|
source_dir = Path(input_raw).expanduser().resolve() if input_raw else DEFAULT_SOURCE_DIR
|
||||||
|
output_dir = Path(output_raw).expanduser().resolve() if output_raw else DEFAULT_OUTPUT_DIR
|
||||||
|
return source_dir, output_dir
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_dirs(source_dir: Path, output_dir: Path) -> None:
|
||||||
|
source_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
output_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
|
||||||
|
def get_receipt_clips(page: fitz.Page) -> list[fitz.Rect]:
|
||||||
|
"""Infer receipt regions on one page by locating repeated receipt headers."""
|
||||||
|
header_text = "中国建设银行网上银行电子回执"
|
||||||
|
header_rects = page.search_for(header_text)
|
||||||
|
|
||||||
|
if not header_rects:
|
||||||
|
return [page.rect]
|
||||||
|
|
||||||
|
sorted_rects = sorted(header_rects, key=lambda r: (round(r.y0, 1), r.x0))
|
||||||
|
unique_y: list[float] = []
|
||||||
|
for rect in sorted_rects:
|
||||||
|
y = rect.y0
|
||||||
|
if not unique_y or abs(y - unique_y[-1]) > 8:
|
||||||
|
unique_y.append(y)
|
||||||
|
|
||||||
|
bounds = page.rect
|
||||||
|
clips: list[fitz.Rect] = []
|
||||||
|
for idx, y in enumerate(unique_y):
|
||||||
|
y0 = max(bounds.y0, y - 6)
|
||||||
|
y1 = unique_y[idx + 1] - 6 if idx + 1 < len(unique_y) else bounds.y1
|
||||||
|
if y1 - y0 > 40:
|
||||||
|
clips.append(fitz.Rect(bounds.x0, y0, bounds.x1, y1))
|
||||||
|
|
||||||
|
return clips or [page.rect]
|
||||||
|
|
||||||
|
|
||||||
|
def process_pdf(pdf_path: Path, output_dir: Path) -> tuple[int, int, dict[str, int]]:
|
||||||
|
"""Split PDF pages to images and classify by payee name.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(success_count, failed_count, per_dir_counts)
|
||||||
|
"""
|
||||||
|
success_count = 0
|
||||||
|
failed_count = 0
|
||||||
|
per_dir_counts: dict[str, int] = {}
|
||||||
|
|
||||||
|
with fitz.open(pdf_path) as doc:
|
||||||
|
for page_index, page in enumerate(doc, start=1):
|
||||||
|
receipt_clips = get_receipt_clips(page)
|
||||||
|
for receipt_index, clip in enumerate(receipt_clips, start=1):
|
||||||
|
image_name = f"{pdf_path.stem}_p{page_index:03d}_r{receipt_index:03d}.png"
|
||||||
|
temp_image = output_dir / image_name
|
||||||
|
|
||||||
|
# Render one inferred receipt region as one output image.
|
||||||
|
pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0), alpha=False, clip=clip)
|
||||||
|
pix.save(temp_image)
|
||||||
|
|
||||||
|
receipt_text = page.get_text("text", clip=clip) or ""
|
||||||
|
payee_name = extract_payee_name(receipt_text)
|
||||||
|
|
||||||
|
if not payee_name:
|
||||||
|
ocr_text = extract_text_via_ocr(temp_image)
|
||||||
|
payee_name = extract_payee_name(ocr_text)
|
||||||
|
|
||||||
|
if not payee_name:
|
||||||
|
payee_name = "未识别收款人"
|
||||||
|
failed_count += 1
|
||||||
|
else:
|
||||||
|
success_count += 1
|
||||||
|
|
||||||
|
target_dir = output_dir / safe_dir_name(payee_name)
|
||||||
|
target_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
per_dir_counts[target_dir.name] = per_dir_counts.get(target_dir.name, 0) + 1
|
||||||
|
|
||||||
|
final_image = target_dir / image_name
|
||||||
|
if final_image.exists():
|
||||||
|
final_image = target_dir / f"{pdf_path.stem}_p{page_index:03d}_r{receipt_index:03d}_{os.getpid()}.png"
|
||||||
|
|
||||||
|
temp_image.replace(final_image)
|
||||||
|
|
||||||
|
return success_count, failed_count, per_dir_counts
|
||||||
|
|
||||||
|
|
||||||
|
def write_execution_report(report_lines: list[str]) -> Path:
|
||||||
|
"""Write execution details into a timestamped report file under BASE_DIR."""
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
report_path = BASE_DIR / f"执行结果记录_{timestamp}.txt"
|
||||||
|
report_path.write_text("\n".join(report_lines) + "\n", encoding="utf-8")
|
||||||
|
return report_path
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
source_dir, output_dir = resolve_io_dirs()
|
||||||
|
ensure_dirs(source_dir, output_dir)
|
||||||
|
|
||||||
|
pdf_files = sorted(source_dir.glob("*.pdf"))
|
||||||
|
if not pdf_files:
|
||||||
|
print(f"未在目录中发现 PDF: {source_dir}")
|
||||||
|
print("请将 PDF 文件放到 source 目录后重试。")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
total_ok = 0
|
||||||
|
total_unknown = 0
|
||||||
|
report_lines = [
|
||||||
|
f"执行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
||||||
|
f"输入目录: {source_dir}",
|
||||||
|
f"输出目录: {output_dir}",
|
||||||
|
"",
|
||||||
|
]
|
||||||
|
|
||||||
|
for pdf_file in pdf_files:
|
||||||
|
print(f"处理文件: {pdf_file.name}")
|
||||||
|
ok, unknown, dir_counts = process_pdf(pdf_file, output_dir)
|
||||||
|
print(f" 识别到收款人全称的图片数: {ok}")
|
||||||
|
print(f" 未识别收款人全称的图片数: {unknown}")
|
||||||
|
total_ok += ok
|
||||||
|
total_unknown += unknown
|
||||||
|
|
||||||
|
report_lines.append(f"PDF: {pdf_file.name}")
|
||||||
|
report_lines.append(f" 拆分目录数: {len(dir_counts)}")
|
||||||
|
report_lines.append(f" 识别到收款人全称的图片数: {ok}")
|
||||||
|
report_lines.append(f" 未识别收款人全称的图片数: {unknown}")
|
||||||
|
report_lines.append(" 目录明细:")
|
||||||
|
for dir_name in sorted(dir_counts):
|
||||||
|
report_lines.append(f" - {dir_name}: {dir_counts[dir_name]} 张")
|
||||||
|
report_lines.append("")
|
||||||
|
|
||||||
|
print("\n处理完成")
|
||||||
|
print(f"识别到收款人全称的图片数: {total_ok}")
|
||||||
|
print(f"未识别收款人全称的图片数: {total_unknown}")
|
||||||
|
print(f"输出目录: {output_dir}")
|
||||||
|
|
||||||
|
report_lines.append("汇总:")
|
||||||
|
report_lines.append(f" 识别到收款人全称的图片总数: {total_ok}")
|
||||||
|
report_lines.append(f" 未识别收款人全称的图片总数: {total_unknown}")
|
||||||
|
|
||||||
|
report_path = write_execution_report(report_lines)
|
||||||
|
print(f"执行结果记录文件: {report_path}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
||||||
Loading…
x
Reference in New Issue
Block a user