Commit 3ac7da45 by ccran

feat: add skills;

parent 3f7f704b
......@@ -18,10 +18,15 @@ class LLMConfig:
api_key: str = "none"
model: str = "Qwen2-72B-Instruct"
# 最大分片数量
min_single_chunk_size = 2000
max_single_chunk_size = 100000
max_chunk_page = 10
MAX_SINGLE_CHUNK_SIZE=100000
MERGE_RULE_PROMPT = False
MAX_SINGLE_CHUNK_SIZE = 100000
# MAX_SINGLE_CHUNK_SIZE = 5000
# MAX_SINGLE_CHUNK_SIZE = 2000
MERGE_RULE_PROMPT = False
META_KEY = "META"
DEFAULT_RULESET_ID = "通用"
## 规则集ID列表,需与rules.xlsx中的sheet名称保持一致!!!
......@@ -110,7 +115,3 @@ LLM = {
}
doc_support_formats = [".docx", ".doc", ".wps"]
pdf_support_formats = [".txt", ".md", ".pdf"]
# 最大分片数量
min_single_chunk_size = 2000
max_single_chunk_size = 20000
max_chunk_page = 10
......@@ -19,7 +19,7 @@ batch_size = 5
if not use_lufa:
SUFFIX = "_麓发迁移"
batch_input_dir_path = "jp-input"
batch_output_dir_path = f"/data/home/htsc/jp-contract/data/benchmark/results/jp-output-lufa-{time.strftime('%Y%m%d-%H%M%S', time.localtime())}"
batch_output_dir_path = f"/data/home/htsc/jp-contract/data/benchmark/results/jp-output-lufa-chunk100000"
# 金盘fastgpt接口
url = "http://172.21.107.45:3002/api/v1/chat/completions"
# 金盘迁移麓发合同审查测试token
......
#!/usr/bin/env python3
"""Single-file CLI version of utils/common_util.py and utils/http_util.py.
This script mirrors the project utility functions while staying standalone:
it does not import local project modules such as utils.* or core.*. Runtime
defaults that originally came from core.config are CLI arguments here.
"""
from __future__ import annotations
import argparse
import json
import mimetypes
import random
import re
import string
import sys
import time
import urllib.parse
import urllib.request
from datetime import datetime
from pathlib import Path
from typing import Any
DEFAULT_BASE_FASTGPT_URL = "http://172.21.107.45:3030"
DEFAULT_BASE_BACKEND_URL = "http://172.21.107.45:1122"
DEFAULT_OUTER_BACKEND_URL = "https://172.21.107.45:48080"
DEFAULT_MIN_SINGLE_CHUNK_SIZE = 2000
DEFAULT_MAX_SINGLE_CHUNK_SIZE = 100000
DEFAULT_MAX_CHUNK_PAGE = 10
def random_str(l: int = 5) -> str:
"""Mirror utils.common_util.random_str."""
if l > len(string.ascii_lowercase):
return "".join(random.choice(string.ascii_lowercase) for _ in range(l))
return "".join(random.sample(string.ascii_lowercase, l))
def format_now() -> str:
"""Mirror utils.common_util.format_now."""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def extract_url_file(url: str, support_formats: list[str]) -> str:
"""Mirror utils.common_util.extract_url_file."""
pattern = "|".join(
r"[\u4e00-\u9fa5()()0-9\w-]+" + re.escape(fmt)
for fmt in support_formats
)
search_result = re.search(pattern, url)
if search_result:
return search_result.group()
raise Exception(f"{support_formats} not found in url:{url}")
def adjust_single_chunk_size(
all_text_len: int,
max_chunk_page: int = DEFAULT_MAX_CHUNK_PAGE,
min_single_chunk_size: int = DEFAULT_MIN_SINGLE_CHUNK_SIZE,
max_single_chunk_size: int = DEFAULT_MAX_SINGLE_CHUNK_SIZE,
) -> int:
"""Mirror utils.common_util.adjust_single_chunk_size with explicit config."""
desired_chunk_size = all_text_len // max_chunk_page
return max(min_single_chunk_size, min(desired_chunk_size, max_single_chunk_size))
def _try_json_loads(text: str) -> Any:
try:
import json_repair # type: ignore
except ImportError:
return json.loads(text)
return json_repair.loads(text, strict=False)
def extract_json(json_str: str) -> list[Any]:
"""Mirror utils.common_util.extract_json.
Uses json_repair when available; otherwise falls back to strict stdlib JSON.
"""
def _try_parse_to_list(candidate: str, out_list: list[Any]) -> bool:
s = (candidate or "").strip()
if not s:
return False
s = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", s)
try:
obj = _try_json_loads(s)
except Exception:
return False
if isinstance(obj, list):
out_list.extend(obj)
else:
out_list.append(obj)
return True
results: list[Any] = []
for match in re.findall(r"```json([\s\S]*?)```", json_str or "", re.DOTALL):
_try_parse_to_list(match, results)
if results:
return results
if _try_parse_to_list(json_str or "", results):
return results
for match in re.findall(r"```([\s\S]*?)```", json_str or "", re.DOTALL):
if _try_parse_to_list(match, results):
return results
for match in re.findall(r"(\{[\s\S]*?\}|\[[\s\S]*?\])", json_str or "", re.DOTALL):
_try_parse_to_list(match, results)
return results
def remove_duplicates_by_key(data_list: list[dict[str, Any]], key: str) -> list[dict[str, Any]]:
"""Mirror utils.common_util.remove_duplicates_by_key."""
sorted_list = sorted(data_list, key=lambda x: len(str(x.get(key, ""))), reverse=True)
result = []
seen_strings = []
for item in sorted_list:
value = str(item.get(key, ""))
if not any(value in s for s in seen_strings):
seen_strings.append(value)
result.append(item)
return result
def extract_drop_json_part(json_str: str) -> str:
"""Mirror utils.common_util.extract_drop_json_part."""
return re.sub(r"```json([\s\S]*?)```", "", json_str, flags=re.DOTALL).strip()
def group_chunk_by_len(chunk_list: list[dict[str, Any]], key: str, chunk_len: int) -> list[list[dict[str, Any]]]:
"""Mirror utils.common_util.group_chunk_by_len."""
ret_chunk_list = []
sub_chunk_list = []
current_acc_len = 0
for chunk in chunk_list:
content_len = len(str(chunk.get(key, "")))
if current_acc_len + content_len > chunk_len and sub_chunk_list:
ret_chunk_list.append(sub_chunk_list)
sub_chunk_list = []
current_acc_len = 0
sub_chunk_list.append(chunk)
current_acc_len += content_len
if sub_chunk_list:
ret_chunk_list.append(sub_chunk_list)
return ret_chunk_list
def _download_basename(filename: str) -> str:
filename = urllib.parse.unquote(filename.strip().strip('"'))
filename = filename.replace("\\", "/")
return Path(filename).name or "downloaded_file"
def _resolve_download_filename(url: str, headers: dict[str, str]) -> str:
content_disposition = headers.get("content-disposition", "") or headers.get("Content-Disposition", "")
if content_disposition:
match = re.search(r"filename\*=(?:UTF-8''|utf-8'')?([^;]+)", content_disposition)
if match:
return _download_basename(match.group(1))
match = re.search(r'filename="?([^";]+)"?', content_disposition)
if match:
return _download_basename(match.group(1))
return _download_basename(urllib.parse.urlparse(url).path)
def url_replace_fastgpt(origin: str, base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL) -> str:
"""Mirror utils.http_util.url_replace_fastgpt."""
if not origin.startswith("http:") and not origin.startswith("https:"):
origin = base_fastgpt_url + origin
return origin
def download_file(
url: str,
path: str,
input_url_to_inner: bool = True,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
) -> str | None:
"""Mirror utils.http_util.download_file."""
if input_url_to_inner and not url.startswith("http:") and not url.startswith("https:"):
url = base_fastgpt_url + url
if input_url_to_inner:
url = url.replace(outer_backend_url, base_backend_url)
request = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(request, timeout=120) as response:
target_path = Path(path)
if target_path.exists() and target_path.is_dir():
target_path = target_path / _resolve_download_filename(url, dict(response.headers))
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_bytes(response.read())
return str(target_path)
except urllib.error.HTTPError as exc:
print(f"{url}文件下载失败. HTTP Status Code: {exc.code}", file=sys.stderr)
return None
def _multipart_body(path: str, field_name: str = "file") -> tuple[bytes, str]:
file_path = Path(path)
boundary = f"----common-tool-{int(time.time() * 1000)}-{random_str(8)}"
content_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
body = bytearray()
body.extend(f"--{boundary}\r\n".encode())
body.extend(
(
f'Content-Disposition: form-data; name="{field_name}"; filename="{file_path.name}"\r\n'
f"Content-Type: {content_type}\r\n\r\n"
).encode()
)
body.extend(file_path.read_bytes())
body.extend(f"\r\n--{boundary}--\r\n".encode())
return bytes(body), boundary
def _post_json(url: str, data: dict[str, Any], headers: dict[str, str] | None = None, timeout: int = 120) -> str:
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
request = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/json", **(headers or {})},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout) as response:
return response.read().decode("utf-8", errors="replace")
def upload_file(
path: str,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
username: str = "admin",
password: str = "admin@jpai.com",
) -> str:
"""Mirror utils.http_util.upload_file with explicit config."""
login_url = f"{base_backend_url}/admin-api/system/auth/login"
login_text = _post_json(login_url, {"username": username, "password": password})
token = (json.loads(login_text).get("data") or {}).get("accessToken")
if not token:
raise RuntimeError(f"后端登录异常: {login_text}")
body, boundary = _multipart_body(path)
upload_url = f"{base_backend_url}/admin-api/infra/file/upload"
request = urllib.request.Request(
upload_url,
data=body,
headers={
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Authorization": token,
},
method="POST",
)
with urllib.request.urlopen(request, timeout=120) as response:
response_text = response.read().decode("utf-8", errors="replace")
res = json.loads(response_text).get("data")
if res:
return res
raise Exception(f"上传{path}失败 Response text: {response_text}")
def fastgpt_openai_chat(url: str, token: str, model: str, chat_id: str, file_url: str, text: str, stream: bool = True) -> str:
"""Mirror utils.http_util.fastgpt_openai_chat."""
data = {
"chatId": chat_id,
"messages": [
{
"role": "user",
"content": [
{"type": "file_url", "name": "文件", "url": file_url},
{"type": "text", "text": text},
],
}
],
"model": model,
"stream": stream,
}
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
request = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"},
method="POST",
)
with urllib.request.urlopen(request, timeout=60000) as response:
if not stream:
rsp = json.loads(response.read().decode("utf-8", errors="replace"))
return rsp.get("choices", [{}])[0].get("message", {}).get("content", "")
rsp_text = ""
for raw_line in response:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "):
line = line[6:]
try:
stream_rsp = json.loads(line)
rsp_text += stream_rsp.get("choices", [{}])[0].get("delta", {}).get("content", "")
except Exception:
continue
return rsp_text
def _read_json_arg(value: str) -> Any:
path = Path(value)
if path.exists():
return json.loads(path.read_text(encoding="utf-8"))
return json.loads(value)
def main() -> int:
parser = argparse.ArgumentParser(description="Single-file common/http utilities based on utils/")
sub = parser.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("random-str")
p.add_argument("-l", "--length", type=int, default=5)
sub.add_parser("format-now")
p = sub.add_parser("extract-url-file")
p.add_argument("url")
p.add_argument("formats", nargs="+")
p = sub.add_parser("adjust-single-chunk-size")
p.add_argument("all_text_len", type=int)
p.add_argument("--max-chunk-page", type=int, default=DEFAULT_MAX_CHUNK_PAGE)
p.add_argument("--min-single-chunk-size", type=int, default=DEFAULT_MIN_SINGLE_CHUNK_SIZE)
p.add_argument("--max-single-chunk-size", type=int, default=DEFAULT_MAX_SINGLE_CHUNK_SIZE)
p = sub.add_parser("extract-json")
p.add_argument("text", nargs="?")
p = sub.add_parser("remove-duplicates-by-key")
p.add_argument("json_list")
p.add_argument("key")
p = sub.add_parser("extract-drop-json-part")
p.add_argument("text", nargs="?")
p = sub.add_parser("group-chunk-by-len")
p.add_argument("json_list")
p.add_argument("key")
p.add_argument("chunk_len", type=int)
p = sub.add_parser("url-replace-fastgpt")
p.add_argument("origin")
p.add_argument("--base-fastgpt-url", default=DEFAULT_BASE_FASTGPT_URL)
p = sub.add_parser("download")
p.add_argument("url")
p.add_argument("path")
p.add_argument("--base-fastgpt-url", default=DEFAULT_BASE_FASTGPT_URL)
p.add_argument("--base-backend-url", default=DEFAULT_BASE_BACKEND_URL)
p.add_argument("--outer-backend-url", default=DEFAULT_OUTER_BACKEND_URL)
p.add_argument("--no-input-url-to-inner", action="store_true")
p = sub.add_parser("upload")
p.add_argument("path")
p.add_argument("--base-backend-url", default=DEFAULT_BASE_BACKEND_URL)
p.add_argument("--username", default="admin")
p.add_argument("--password", default="admin@jpai.com")
p = sub.add_parser("fastgpt-chat")
p.add_argument("--url", required=True)
p.add_argument("--token", required=True)
p.add_argument("--model", required=True)
p.add_argument("--chat-id", required=True)
p.add_argument("--file-url", required=True)
p.add_argument("--text", required=True)
p.add_argument("--no-stream", action="store_true")
args = parser.parse_args()
if args.cmd == "random-str":
print(random_str(args.length))
elif args.cmd == "format-now":
print(format_now())
elif args.cmd == "extract-url-file":
print(extract_url_file(args.url, args.formats))
elif args.cmd == "adjust-single-chunk-size":
print(adjust_single_chunk_size(args.all_text_len, args.max_chunk_page, args.min_single_chunk_size, args.max_single_chunk_size))
elif args.cmd == "extract-json":
text = args.text if args.text is not None else sys.stdin.read()
print(json.dumps(extract_json(text), ensure_ascii=False, indent=2))
elif args.cmd == "remove-duplicates-by-key":
print(json.dumps(remove_duplicates_by_key(_read_json_arg(args.json_list), args.key), ensure_ascii=False, indent=2))
elif args.cmd == "extract-drop-json-part":
text = args.text if args.text is not None else sys.stdin.read()
print(extract_drop_json_part(text))
elif args.cmd == "group-chunk-by-len":
print(json.dumps(group_chunk_by_len(_read_json_arg(args.json_list), args.key, args.chunk_len), ensure_ascii=False, indent=2))
elif args.cmd == "url-replace-fastgpt":
print(url_replace_fastgpt(args.origin, args.base_fastgpt_url))
elif args.cmd == "download":
print(download_file(args.url, args.path, not args.no_input_url_to_inner, args.base_fastgpt_url, args.base_backend_url, args.outer_backend_url))
elif args.cmd == "upload":
print(upload_file(args.path, args.base_backend_url, args.username, args.password))
elif args.cmd == "fastgpt-chat":
print(fastgpt_openai_chat(args.url, args.token, args.model, args.chat_id, args.file_url, args.text, not args.no_stream))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Single-file CLI version inspired by utils/doc_util.py and utils/excel_util.py.
This script does not import local project files. It keeps the public shape of
ExcelUtil and a lightweight document chunk reader so the skill can be used from
CLI while staying portable.
"""
from __future__ import annotations
import argparse
import csv
import html
import json
import re
import string
import zipfile
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from xml.etree import ElementTree as ET
DEFAULT_MAX_SINGLE_CHUNK_SIZE = 100000
DEFAULT_MIN_SINGLE_CHUNK_SIZE = 2000
DEFAULT_MAX_CHUNK_PAGE = 10
WORD_NS = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}
XLSX_NS = {
"a": "http://schemas.openxmlformats.org/spreadsheetml/2006/main",
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships",
"rel": "http://schemas.openxmlformats.org/package/2006/relationships",
}
class ExcelLoadError(Exception):
"""Raised when Excel loading fails. Mirrors utils.excel_util.ExcelLoadError."""
def _column_index(cell_ref: str) -> int:
letters = "".join(ch for ch in cell_ref if ch in string.ascii_letters)
index = 0
for ch in letters.upper():
index = index * 26 + (ord(ch) - ord("A") + 1)
return max(index - 1, 0)
class ExcelUtil:
"""Standalone Excel helper aligned with utils.excel_util.ExcelUtil."""
def __init__(self, file_path: Union[str, Path]):
self.file_path = Path(file_path)
@staticmethod
def _import_openpyxl():
try:
import openpyxl # type: ignore
except ImportError as exc:
raise ExcelLoadError("openpyxl is unavailable") from exc
return openpyxl
def _ensure_exists(self) -> None:
if not self.file_path.exists():
raise ExcelLoadError(f"File not found: {self.file_path}")
def _load_with_openpyxl(self, sheet_name: Optional[str], has_header: bool) -> List[Union[Dict[str, object], List[object]]]:
openpyxl = self._import_openpyxl()
try:
wb = openpyxl.load_workbook(self.file_path, data_only=True, read_only=True)
except Exception as exc:
raise ExcelLoadError(f"Failed to open Excel file: {exc}") from exc
ws = wb[sheet_name] if sheet_name else wb.active
rows = list(ws.iter_rows(values_only=True))
return self._rows_to_result(rows, has_header)
@staticmethod
def _rows_to_result(rows: list[tuple[Any, ...] | list[Any]], has_header: bool) -> List[Union[Dict[str, object], List[object]]]:
if not rows:
return []
if not has_header:
return [list(row) for row in rows]
headers = [str(h).strip() if h is not None else "" for h in rows[0]]
result: List[Dict[str, object]] = []
for row in rows[1:]:
row_dict = {headers[i] if i < len(headers) else f"col{i}": row[i] for i in range(len(row))}
result.append(row_dict)
return result
def load(self, sheet_name: Optional[str] = None, has_header: bool = True) -> List[Union[Dict[str, object], List[object]]]:
"""Mirror ExcelUtil.load. Uses openpyxl if available, otherwise stdlib xlsx parsing."""
self._ensure_exists()
if self.file_path.suffix.lower() in {".csv", ".tsv"}:
delimiter = "\t" if self.file_path.suffix.lower() == ".tsv" else ","
return self._rows_to_result(_read_csv_rows(self.file_path, delimiter), has_header)
try:
return self._load_with_openpyxl(sheet_name, has_header)
except ExcelLoadError:
if self.file_path.suffix.lower() != ".xlsx":
raise
return _load_xlsx_stdlib(self.file_path, sheet_name, has_header)
def list_sheets(self) -> List[str]:
"""Mirror ExcelUtil.list_sheets."""
self._ensure_exists()
try:
openpyxl = self._import_openpyxl()
wb = openpyxl.load_workbook(self.file_path, read_only=True)
return wb.sheetnames
except ExcelLoadError:
if self.file_path.suffix.lower() != ".xlsx":
raise
return _list_xlsx_sheets_stdlib(self.file_path)
except Exception as exc:
raise ExcelLoadError(f"Failed to read sheet names: {exc}") from exc
def find_value_by_column(
self,
key_column: str,
key_value: object,
value_column: str,
sheet_name: Optional[str] = None,
) -> Optional[object]:
"""Mirror ExcelUtil.find_value_by_column."""
rows = self.load(sheet_name=sheet_name, has_header=True)
for row in rows:
if isinstance(row, dict) and row.get(key_column) == key_value:
return row.get(value_column)
return None
def map_rows(self, sheet_name: Optional[str], column_map: Dict[str, str]) -> List[Dict[str, object]]:
"""Mirror ExcelUtil.map_rows."""
rows = self.load(sheet_name=sheet_name, has_header=True)
mapped: List[Dict[str, object]] = []
for row in rows:
if not isinstance(row, dict):
continue
mapped.append({new_key: row.get(header) for new_key, header in column_map.items()})
return mapped
@classmethod
def load_excel(cls, file_path: Union[str, Path], sheet_name: Optional[str] = None, has_header: bool = True):
return cls(file_path).load(sheet_name=sheet_name, has_header=has_header)
@classmethod
def list_excel_sheets(cls, file_path: Union[str, Path]) -> List[str]:
return cls(file_path).list_sheets()
@classmethod
def find_value_by_column_excel(
cls,
file_path: Union[str, Path],
key_column: str,
key_value: object,
value_column: str,
sheet_name: Optional[str] = None,
) -> Optional[object]:
return cls(file_path).find_value_by_column(key_column, key_value, value_column, sheet_name)
@classmethod
def load_mapped_excel(cls, file_path: Union[str, Path], sheet_name: Optional[str], column_map: Dict[str, str]):
return cls(file_path).map_rows(sheet_name=sheet_name, column_map=column_map)
def _read_csv_rows(path: Path, delimiter: str) -> list[list[str]]:
with path.open(newline="", encoding="utf-8-sig", errors="replace") as file:
return list(csv.reader(file, delimiter=delimiter))
def _xlsx_shared_strings(zf: zipfile.ZipFile) -> list[str]:
try:
root = ET.fromstring(zf.read("xl/sharedStrings.xml"))
except KeyError:
return []
return ["".join(t.text or "" for t in item.findall(".//a:t", XLSX_NS)) for item in root.findall(".//a:si", XLSX_NS)]
def _workbook_sheet_map(zf: zipfile.ZipFile) -> list[tuple[str, str]]:
workbook = ET.fromstring(zf.read("xl/workbook.xml"))
rels = ET.fromstring(zf.read("xl/_rels/workbook.xml.rels"))
rel_map = {rel.attrib["Id"]: rel.attrib["Target"] for rel in rels.findall("rel:Relationship", XLSX_NS)}
sheets = []
for sheet in workbook.findall(".//a:sheets/a:sheet", XLSX_NS):
name = sheet.attrib.get("name", "")
rel_id = sheet.attrib.get(f"{{{XLSX_NS['r']}}}id", "")
target = rel_map.get(rel_id, "")
path = "xl/" + target.lstrip("/") if not target.startswith("xl/") else target
sheets.append((name, path))
return sheets
def _list_xlsx_sheets_stdlib(path: Path) -> list[str]:
with zipfile.ZipFile(path) as zf:
return [name for name, _ in _workbook_sheet_map(zf)]
def _load_xlsx_stdlib(path: Path, sheet_name: Optional[str], has_header: bool):
with zipfile.ZipFile(path) as zf:
shared = _xlsx_shared_strings(zf)
sheets = _workbook_sheet_map(zf)
if not sheets:
return []
sheet_path = sheets[0][1]
if sheet_name:
for name, candidate_path in sheets:
if name == sheet_name:
sheet_path = candidate_path
break
else:
raise ExcelLoadError(f"Sheet not found: {sheet_name}")
root = ET.fromstring(zf.read(sheet_path))
rows: list[list[Any]] = []
for row in root.findall(".//a:sheetData/a:row", XLSX_NS):
values: list[Any] = []
for cell in row.findall("a:c", XLSX_NS):
cell_idx = _column_index(cell.attrib.get("r", ""))
while len(values) < cell_idx:
values.append(None)
cell_type = cell.attrib.get("t")
value_node = cell.find("a:v", XLSX_NS)
inline_node = cell.find("a:is/a:t", XLSX_NS)
raw = value_node.text if value_node is not None else inline_node.text if inline_node is not None else None
if cell_type == "s" and raw is not None:
value = shared[int(raw)] if int(raw) < len(shared) else raw
else:
value = raw
values.append(value)
rows.append(values)
return ExcelUtil._rows_to_result(rows, has_header)
class DocBase(ABC):
"""Standalone shape of utils.doc_util.DocBase."""
def __init__(self, **kwargs):
self._doc_path = None
self._doc_name = None
self._kwargs = kwargs
self._max_single_chunk_size = kwargs.get("max_single_chunk_size", DEFAULT_MAX_SINGLE_CHUNK_SIZE)
@abstractmethod
def load(self, doc_path):
pass
@abstractmethod
def adjust_chunk_size(self):
pass
@abstractmethod
async def get_from_ocr(self):
pass
@abstractmethod
def get_chunk_item(self, chunk_id):
pass
@abstractmethod
def get_chunk_info(self, chunk_id):
pass
@abstractmethod
def get_chunk_location(self, chunk_id):
pass
@abstractmethod
def add_chunk_comment(self, chunk_id, comments):
pass
@abstractmethod
def edit_chunk_comment(self, comments):
pass
@abstractmethod
def delete_chunk_comment(self, comments):
pass
@abstractmethod
def get_chunk_id_list(self, step=1):
pass
@abstractmethod
def get_chunk_num(self):
pass
@abstractmethod
def get_all_text(self):
pass
def to_file(self, path, **kwargs):
Path(path).write_text(self.get_all_text(), encoding="utf-8")
def release(self):
pass
def adjust_single_chunk_size(
all_text_len: int,
max_chunk_page: int = DEFAULT_MAX_CHUNK_PAGE,
min_single_chunk_size: int = DEFAULT_MIN_SINGLE_CHUNK_SIZE,
max_single_chunk_size: int = DEFAULT_MAX_SINGLE_CHUNK_SIZE,
) -> int:
desired_chunk_size = all_text_len // max_chunk_page
return max(min_single_chunk_size, min(desired_chunk_size, max_single_chunk_size))
class StandaloneDoc(DocBase):
"""Lightweight document reader with DocBase-style chunk methods."""
def load(self, doc_path):
self._doc_path = str(doc_path)
self._doc_name = Path(doc_path).name
self._all_text = read_document_text(doc_path)
self._chunk_list = self._resolve_doc_chunk()
def adjust_chunk_size(self):
self._max_single_chunk_size = adjust_single_chunk_size(len(self.get_all_text()))
self._chunk_list = self._resolve_doc_chunk()
return self._max_single_chunk_size
async def get_from_ocr(self):
return ""
def _resolve_doc_chunk(self):
text = self._all_text
chunks = []
for start in range(0, len(text), self._max_single_chunk_size):
chunks.append({"text": text[start : start + self._max_single_chunk_size], "start": start, "end": min(start + self._max_single_chunk_size, len(text))})
return chunks or [{"text": "", "start": 0, "end": 0}]
def get_chunk_item(self, chunk_id):
return self._chunk_list[chunk_id]["text"]
def get_chunk_info(self, chunk_id):
chunk = self._chunk_list[chunk_id]
text = chunk["text"]
tips = f"[{text[:20]}]...到... [{text[-20:]}]" if text else "[]"
return f"文件块id: {chunk_id + 1}\n文件块位置: 字符{chunk['start']}到{chunk['end']}\n文件块简述: {tips}\n"
def get_chunk_location(self, chunk_id):
chunk = self._chunk_list[chunk_id]
return f"字符{chunk['start']}到{chunk['end']}"
def add_chunk_comment(self, chunk_id, comments):
raise NotImplementedError("StandaloneDoc does not modify Word/PDF comments.")
def edit_chunk_comment(self, comments):
raise NotImplementedError("StandaloneDoc does not modify Word/PDF comments.")
def delete_chunk_comment(self, comments):
raise NotImplementedError("StandaloneDoc does not modify Word/PDF comments.")
def get_chunk_id_list(self, step=1):
return [idx for idx in range(0, self.get_chunk_num(), step)]
def get_chunk_num(self):
return len(self._chunk_list)
def get_all_text(self):
return self._all_text
def parse_docx(path: Union[str, Path]) -> str:
with zipfile.ZipFile(path) as zf:
xml_data = zf.read("word/document.xml")
root = ET.fromstring(xml_data)
paragraphs = []
for paragraph in root.findall(".//w:p", WORD_NS):
parts = []
for node in paragraph.iter():
if node.tag == f"{{{WORD_NS['w']}}}t":
parts.append(node.text or "")
elif node.tag == f"{{{WORD_NS['w']}}}tab":
parts.append("\t")
text = "".join(parts)
if text:
paragraphs.append(text)
return "\n".join(paragraphs)
def read_document_text(path: Union[str, Path]) -> str:
suffix = Path(path).suffix.lower()
if suffix == ".docx":
return parse_docx(path)
if suffix in {".xlsx", ".xlsm", ".csv", ".tsv"}:
return json.dumps(ExcelUtil(path).load(has_header=False), ensure_ascii=False)
if suffix == ".pdf":
return read_pdf_text_optional(path)
return Path(path).read_text(encoding="utf-8", errors="replace")
def read_pdf_text_optional(path: Union[str, Path]) -> str:
try:
import fitz # type: ignore
except ImportError as exc:
raise RuntimeError("PDF text extraction needs PyMuPDF installed; this script does not import project utils.") from exc
pdf = fitz.open(path)
try:
return "\n".join(page.get_text() for page in pdf)
finally:
pdf.close()
def process_string(s: str) -> str:
"""Mirror utils.spire_word_util.process_string."""
newline_count = s.count("\n")
if newline_count == 0:
return s
if newline_count == 1:
parts = s.split("\n", 1)
return parts[0] if len(parts[0]) >= len(parts[1]) else parts[1]
parts = s.split("\n")
middle_parts = parts[1:-1] if len(parts) > 2 else []
if not middle_parts:
non_empty_parts = [p for p in parts if p]
return max(non_empty_parts, key=len) if non_empty_parts else ""
return max(middle_parts, key=len, default="")
def is_messy_text(
text: str,
min_chars=40,
chinese_ratio_thresh=0.20,
printable_ratio_thresh=0.70,
symbol_ratio_thresh=0.30,
longest_non_word_run_thresh=10,
english_word_density_thresh=0.03,
) -> bool:
"""Mirror utils.spire_pdf_util.is_messy_text."""
if not text:
return True
text_len = len(text)
if text_len < min_chars:
return True
chinese_count = sum(1 for c in text if "\u4e00" <= c <= "\u9fff")
printable_count = sum(1 for c in text if c.isprintable())
symbol_count = sum(1 for c in text if not (("\u4e00" <= c <= "\u9fff") or c.isalnum() or c.isspace()))
chinese_ratio = chinese_count / text_len
printable_ratio = printable_count / text_len
symbol_ratio = symbol_count / text_len
non_word_runs = re.findall(r"[^0-9A-Za-z\u4e00-\u9fff\s]+", text)
longest_non_word_run = max((len(s) for s in non_word_runs), default=0)
english_words = re.findall(r"\b[a-zA-Z]{2,}\b", text)
english_word_density = len(english_words) / max(1, text_len)
if printable_ratio < printable_ratio_thresh * 0.6:
return True
if symbol_ratio > max(0.5, symbol_ratio_thresh):
return True
if longest_non_word_run >= longest_non_word_run_thresh * 1.5:
return True
if chinese_ratio < chinese_ratio_thresh and english_word_density < english_word_density_thresh and symbol_ratio > symbol_ratio_thresh:
return True
if chinese_ratio < (chinese_ratio_thresh * 0.5) and printable_ratio < printable_ratio_thresh:
return True
return False
def _json_print(value: Any) -> None:
print(json.dumps(value, ensure_ascii=False, indent=2))
def main() -> int:
parser = argparse.ArgumentParser(description="Single-file doc/excel utilities based on utils/")
sub = parser.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("load-excel")
p.add_argument("file")
p.add_argument("--sheet-name")
p.add_argument("--no-header", action="store_true")
p = sub.add_parser("list-sheets")
p.add_argument("file")
p = sub.add_parser("find-value")
p.add_argument("file")
p.add_argument("key_column")
p.add_argument("key_value")
p.add_argument("value_column")
p.add_argument("--sheet-name")
p = sub.add_parser("map-rows")
p.add_argument("file")
p.add_argument("column_map", help="JSON object: {new_key: header_name}")
p.add_argument("--sheet-name")
p = sub.add_parser("doc-load")
p.add_argument("file")
p.add_argument("--max-single-chunk-size", type=int, default=DEFAULT_MAX_SINGLE_CHUNK_SIZE)
p = sub.add_parser("doc-chunk")
p.add_argument("file")
p.add_argument("chunk_id", type=int)
p.add_argument("--max-single-chunk-size", type=int, default=DEFAULT_MAX_SINGLE_CHUNK_SIZE)
p = sub.add_parser("doc-info")
p.add_argument("file")
p.add_argument("chunk_id", type=int)
p.add_argument("--max-single-chunk-size", type=int, default=DEFAULT_MAX_SINGLE_CHUNK_SIZE)
p = sub.add_parser("doc-adjust-chunk-size")
p.add_argument("file")
p = sub.add_parser("process-string")
p.add_argument("text")
p = sub.add_parser("is-messy-text")
p.add_argument("text")
args = parser.parse_args()
if args.cmd == "load-excel":
_json_print(ExcelUtil.load_excel(args.file, sheet_name=args.sheet_name, has_header=not args.no_header))
elif args.cmd == "list-sheets":
_json_print(ExcelUtil.list_excel_sheets(args.file))
elif args.cmd == "find-value":
value: object = args.key_value
_json_print(ExcelUtil.find_value_by_column_excel(args.file, args.key_column, value, args.value_column, args.sheet_name))
elif args.cmd == "map-rows":
_json_print(ExcelUtil.load_mapped_excel(args.file, args.sheet_name, json.loads(args.column_map)))
elif args.cmd in {"doc-load", "doc-chunk", "doc-info", "doc-adjust-chunk-size"}:
doc = StandaloneDoc(max_single_chunk_size=getattr(args, "max_single_chunk_size", DEFAULT_MAX_SINGLE_CHUNK_SIZE))
doc.load(args.file)
if args.cmd == "doc-load":
_json_print({"chunk_num": doc.get_chunk_num(), "chunk_ids": doc.get_chunk_id_list(), "text_len": len(doc.get_all_text())})
elif args.cmd == "doc-chunk":
print(doc.get_chunk_item(args.chunk_id))
elif args.cmd == "doc-info":
print(doc.get_chunk_info(args.chunk_id))
elif args.cmd == "doc-adjust-chunk-size":
print(doc.adjust_chunk_size())
elif args.cmd == "process-string":
print(process_string(args.text))
elif args.cmd == "is-messy-text":
print(json.dumps(is_messy_text(args.text), ensure_ascii=False))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Single-file CLI version of utils/ocr_util.py, paddle_ocr_util.py and tesseract_ocr_util.py.
This script keeps the same class names and method names where practical, but
does not import project-local modules. External runtime tools/libraries such
as PyMuPDF or tesseract are optional and loaded only when their commands need
them.
"""
from __future__ import annotations
import argparse
import asyncio
import codecs
import json
import mimetypes
import os
import random
import re
import string
import subprocess
import time
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
DEFAULT_BASE_FASTGPT_URL = "http://172.21.107.45:3030"
DEFAULT_BASE_BACKEND_URL = "http://172.21.107.45:1122"
DEFAULT_OUTER_BACKEND_URL = "https://172.21.107.45:48080"
DEFAULT_PADDLE_OCR_URL = "http://192.168.252.71:56100/ocr/pdf-robust"
def random_str(l: int = 5) -> str:
if l > len(string.ascii_lowercase):
return "".join(random.choice(string.ascii_lowercase) for _ in range(l))
return "".join(random.sample(string.ascii_lowercase, l))
def url_replace_fastgpt(origin: str, base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL) -> str:
if not origin.startswith("http:"):
origin = base_fastgpt_url + origin
return origin
def download_file(
url: str,
path: str,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
) -> str | None:
if not url.startswith("http:") and not url.startswith("https:"):
url = base_fastgpt_url + url
url = url.replace(outer_backend_url, base_backend_url)
with urllib.request.urlopen(url, timeout=120) as response:
target_path = Path(path)
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_bytes(response.read())
return str(target_path)
def _multipart_post(url: str, file_path: str, field_name: str = "file", timeout: int = 1200, content_type: str | None = None) -> str:
path = Path(file_path)
boundary = f"----ocr-tool-{int(time.time() * 1000)}-{random_str(8)}"
file_content_type = content_type or mimetypes.guess_type(path.name)[0] or "application/octet-stream"
body = bytearray()
body.extend(f"--{boundary}\r\n".encode())
body.extend(
(
f'Content-Disposition: form-data; name="{field_name}"; filename="{path.name}"\r\n'
f"Content-Type: {file_content_type}\r\n\r\n"
).encode()
)
body.extend(path.read_bytes())
body.extend(f"\r\n--{boundary}--\r\n".encode())
request = urllib.request.Request(
url,
data=bytes(body),
headers={
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout) as response:
return response.read().decode("utf-8", errors="replace")
class OCRUtil:
"""Standalone variant of utils.ocr_util.OCRUtil."""
def __init__(self, ocr_url: str):
self.ocr_url = ocr_url
async def ocr_requests_async(self, session, file_path):
del session
return await asyncio.to_thread(_multipart_post, self.ocr_url, file_path, "file", 600), file_path
async def ocr_image_async(self, path_list):
responses = await asyncio.gather(*[self.ocr_requests_async(None, file_path) for file_path in path_list])
res_dict = {}
for response_text, file_path in responses:
rsp_json = json.loads(response_text)
if "data" not in rsp_json:
continue
page_num = int(self.get_pdf_2_img_page_num(file_path))
res_dict[page_num] = rsp_json["data"]["strRes"]
return [res_dict[key] for key in sorted(res_dict)]
def set_pdf_2_img_page(self, path, page_idx):
return f"{path}_{page_idx + 1}.png"
def get_pdf_2_img_page_num(self, path):
split_path = path.split("_")
return split_path[-1][:-4]
def pdf_2_img(self, path, zoom_x=1, zoom_y=1):
try:
import fitz # type: ignore
except ImportError as exc:
raise RuntimeError("pdf_2_img needs PyMuPDF installed; no project-local imports are used.") from exc
pdf = fitz.open(path)
pdf_list = []
try:
for pg in range(0, pdf.page_count):
page = pdf[pg]
trans = fitz.Matrix(zoom_x, zoom_y)
pm = page.get_pixmap(matrix=trans, alpha=False)
dest_png = self.set_pdf_2_img_page(path, pg)
pm.save(dest_png)
pdf_list.append(dest_png)
finally:
pdf.close()
return pdf_list
def ocr_download_path(
self,
url,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
):
url = url_replace_fastgpt(url, base_fastgpt_url)
url_parsed = urllib.parse.urlparse(url)
query_dict = urllib.parse.parse_qs(url_parsed.query)
filename = query_dict.get("filename", [f"{random_str()}.pdf"])[0]
dest_path = f"ocr/{filename}"
download_file(url, dest_path, base_fastgpt_url, base_backend_url, outer_backend_url)
return dest_path
async def ocr_result_pdf(self, dest_path):
pdf_list = self.pdf_2_img(dest_path)
try:
return await self.ocr_image_async(pdf_list)
finally:
for pdf in pdf_list:
if os.path.exists(pdf):
os.remove(pdf)
class PaddleOCRUtil:
"""Standalone variant of utils.paddle_ocr_util.PaddleOCRUtil."""
def __init__(self, ocr_url: str = DEFAULT_PADDLE_OCR_URL):
self.ocr_url = ocr_url
@staticmethod
def _decode_text(text):
if text is None:
return ""
if not isinstance(text, str):
text = str(text)
text = text.strip()
if not text:
return ""
if re.search(r"\\u[0-9a-fA-F]{4}", text):
try:
text = codecs.decode(text, "unicode_escape")
except UnicodeDecodeError:
pass
return text
def _parse_response_text(self, response_text):
try:
rsp_json = json.loads(response_text)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid paddle ocr response json: {response_text[:500]}") from exc
if not rsp_json.get("ok") or rsp_json.get("code") != 0:
raise ValueError(f"Paddle ocr failed: {rsp_json}")
data = rsp_json.get("data") or {}
return self._decode_text(data.get("text", ""))
async def ocr_requests_async(self, session, file_path):
del session
return await asyncio.to_thread(_multipart_post, self.ocr_url, file_path, "file", 1200, "application/pdf")
async def ocr_result_pdf(self, dest_path):
response_text = await self.ocr_requests_async(None, dest_path)
return [self._parse_response_text(response_text)]
def ocr_download_path(
self,
url,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
):
url = url_replace_fastgpt(url, base_fastgpt_url)
url_parsed = urllib.parse.urlparse(url)
query_dict = urllib.parse.parse_qs(url_parsed.query)
filename = query_dict.get("filename", [f"{random_str()}.pdf"])[0]
dest_path = f"ocr/{filename}"
download_file(url, dest_path, base_fastgpt_url, base_backend_url, outer_backend_url)
return dest_path
class TesseractOCRUtil:
"""Standalone variant of utils.tesseract_ocr_util.TesseractOCRUtil."""
def __init__(self, lang: str = "chi_sim+eng", executable: str = "tesseract"):
self.lang = lang
self.executable = executable
def ocr_image(self, file_path):
result = subprocess.run(
[self.executable, file_path, "stdout", "-l", self.lang],
check=True,
capture_output=True,
text=True,
)
return result.stdout
async def ocr_image_async(self, path_list):
tasks = [asyncio.to_thread(self.ocr_image, file_path) for file_path in path_list]
responses = await asyncio.gather(*tasks)
res_dict = {}
for file_path, content in zip(path_list, responses):
page_num = int(self.get_pdf_2_img_page_num(file_path))
res_dict[page_num] = content
return [res_dict[key] for key in sorted(res_dict)]
def set_pdf_2_img_page(self, path, page_idx):
return f"{path}_{page_idx + 1}.png"
def get_pdf_2_img_page_num(self, path):
match = re.search(r"_(\d+)\.png$", path)
if not match:
raise ValueError(f"Invalid pdf page image path: {path}")
return match.group(1)
def pdf_2_img(self, path, zoom_x=2, zoom_y=2):
try:
import fitz # type: ignore
except ImportError as exc:
raise RuntimeError("pdf_2_img needs PyMuPDF installed; no project-local imports are used.") from exc
pdf = fitz.open(path)
pdf_list = []
try:
for pg in range(0, pdf.page_count):
page = pdf[pg]
trans = fitz.Matrix(zoom_x, zoom_y)
pm = page.get_pixmap(matrix=trans, alpha=False)
dest_png = self.set_pdf_2_img_page(path, pg)
pm.save(dest_png)
pdf_list.append(dest_png)
finally:
pdf.close()
return pdf_list
def ocr_download_path(
self,
url,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
):
url = url_replace_fastgpt(url, base_fastgpt_url)
url_parsed = urllib.parse.urlparse(url)
query_dict = urllib.parse.parse_qs(url_parsed.query)
filename = query_dict.get("filename", [f"{random_str()}.pdf"])[0]
dest_path = f"ocr/{filename}"
download_file(url, dest_path, base_fastgpt_url, base_backend_url, outer_backend_url)
return dest_path
async def ocr_result_pdf(self, dest_path):
pdf_list = self.pdf_2_img(dest_path)
try:
return await self.ocr_image_async(pdf_list)
finally:
for pdf in pdf_list:
if os.path.exists(pdf):
os.remove(pdf)
def _json_print(value: Any) -> None:
print(json.dumps(value, ensure_ascii=False, indent=2))
def main() -> int:
parser = argparse.ArgumentParser(description="Single-file OCR utilities based on utils/")
sub = parser.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("remote-image")
p.add_argument("ocr_url")
p.add_argument("images", nargs="+")
p = sub.add_parser("remote-pdf")
p.add_argument("ocr_url")
p.add_argument("pdf")
p = sub.add_parser("paddle-pdf")
p.add_argument("pdf")
p.add_argument("--ocr-url", default=DEFAULT_PADDLE_OCR_URL)
p = sub.add_parser("tesseract-image")
p.add_argument("image")
p.add_argument("--lang", default="chi_sim+eng")
p.add_argument("--executable", default="tesseract")
p = sub.add_parser("tesseract-pdf")
p.add_argument("pdf")
p.add_argument("--lang", default="chi_sim+eng")
p.add_argument("--executable", default="tesseract")
p = sub.add_parser("pdf-to-img")
p.add_argument("pdf")
p.add_argument("--zoom-x", type=float, default=2)
p.add_argument("--zoom-y", type=float, default=2)
p.add_argument("--mode", choices=["ocr", "tesseract"], default="tesseract")
p = sub.add_parser("download-path")
p.add_argument("url")
p.add_argument("--base-fastgpt-url", default=DEFAULT_BASE_FASTGPT_URL)
p.add_argument("--base-backend-url", default=DEFAULT_BASE_BACKEND_URL)
p.add_argument("--outer-backend-url", default=DEFAULT_OUTER_BACKEND_URL)
p.add_argument("--mode", choices=["ocr", "paddle", "tesseract"], default="ocr")
p.add_argument("--ocr-url", default=DEFAULT_PADDLE_OCR_URL)
args = parser.parse_args()
if args.cmd == "remote-image":
util = OCRUtil(args.ocr_url)
_json_print(asyncio.run(util.ocr_image_async(args.images)))
elif args.cmd == "remote-pdf":
util = OCRUtil(args.ocr_url)
_json_print(asyncio.run(util.ocr_result_pdf(args.pdf)))
elif args.cmd == "paddle-pdf":
_json_print(asyncio.run(PaddleOCRUtil(args.ocr_url).ocr_result_pdf(args.pdf)))
elif args.cmd == "tesseract-image":
print(TesseractOCRUtil(args.lang, args.executable).ocr_image(args.image), end="")
elif args.cmd == "tesseract-pdf":
_json_print(asyncio.run(TesseractOCRUtil(args.lang, args.executable).ocr_result_pdf(args.pdf)))
elif args.cmd == "pdf-to-img":
util = OCRUtil("unused") if args.mode == "ocr" else TesseractOCRUtil()
_json_print(util.pdf_2_img(args.pdf, args.zoom_x, args.zoom_y))
elif args.cmd == "download-path":
if args.mode == "paddle":
util = PaddleOCRUtil(args.ocr_url)
elif args.mode == "tesseract":
util = TesseractOCRUtil()
else:
util = OCRUtil(args.ocr_url)
print(util.ocr_download_path(args.url, args.base_fastgpt_url, args.base_backend_url, args.outer_backend_url))
return 0
if __name__ == "__main__":
raise SystemExit(main())
import argparse
import json
import re
import sys
from pathlib import Path
from urllib.parse import unquote, urlparse
import requests
from loguru import logger
DEFAULT_OUTER_BACKEND_URL = "https://172.21.107.45:48080"
DEFAULT_BASE_FASTGPT_URL = "http://172.21.107.45:3030"
DEFAULT_BASE_BACKEND_URL = "http://172.21.107.45:1122"
DEFAULT_BACKEND_ADMIN_USERNAME = "admin"
DEFAULT_BACKEND_ADMIN_PASSWORD = "admin@jpai.com"
base_fastgpt_url = DEFAULT_BASE_FASTGPT_URL
base_backend_url = DEFAULT_BASE_BACKEND_URL
outer_backend_url = DEFAULT_OUTER_BACKEND_URL
backend_admin_username = DEFAULT_BACKEND_ADMIN_USERNAME
backend_admin_password = DEFAULT_BACKEND_ADMIN_PASSWORD
def configure_urls(
fastgpt_url: str | None = None,
backend_url: str | None = None,
outer_url: str | None = None,
):
global base_fastgpt_url, base_backend_url, outer_backend_url
if fastgpt_url is not None:
base_fastgpt_url = fastgpt_url
if backend_url is not None:
base_backend_url = backend_url
if outer_url is not None:
outer_backend_url = outer_url
def configure_login(username: str | None = None, password: str | None = None):
global backend_admin_username, backend_admin_password
if username is not None:
backend_admin_username = username
if password is not None:
backend_admin_password = password
def _strip_trailing_slash(url: str | None) -> str | None:
if url is None:
return None
return url.rstrip("/")
def upload_file(path, input_url_to_inner=True, output_url_to_inner=False) -> str:
from requests_toolbelt import MultipartEncoder
login_data = {
"username": backend_admin_username,
"password": backend_admin_password,
}
login_url = f"{base_backend_url}/admin-api/system/auth/login"
response = requests.post(
url=login_url,
headers={"Content-Type": "application/json"},
data=json.dumps(login_data),
)
response.raise_for_status()
try:
token = json.loads(response.text).get("data").get("accessToken")
except Exception as e:
logger.error(f"后端登录异常:{e}")
raise
upload_url = f"{base_backend_url}/admin-api/infra/file/upload"
with open(path, "rb") as file_obj:
encoder = MultipartEncoder(fields={"file": (Path(path).name, file_obj)})
response = requests.post(
url=upload_url,
headers={"Content-Type": encoder.content_type, "Authorization": token},
data=encoder,
)
response.raise_for_status()
res = json.loads(response.text).get("data")
if res:
return res
raise Exception(f"上传{path}失败 Response text: {response.text}")
def _download_basename(filename: str) -> str:
filename = unquote(filename.strip().strip('"'))
filename = filename.replace("\\", "/")
return Path(filename).name or "downloaded_file"
def _resolve_download_filename(url: str, response: requests.Response) -> str:
content_disposition = response.headers.get("content-disposition", "")
if content_disposition:
match = re.search(
r"filename\*=(?:UTF-8''|utf-8'')?([^;]+)", content_disposition
)
if match:
return _download_basename(match.group(1))
match = re.search(r'filename="?([^";]+)"?', content_disposition)
if match:
return _download_basename(match.group(1))
url_filename = _download_basename(urlparse(url).path)
if url_filename:
return url_filename
return "downloaded_file"
def download_file(url, path, input_url_to_inner=True):
if not url.startswith("http:") and not url.startswith("https:"):
url = base_fastgpt_url + url
url = url.replace(outer_backend_url, base_backend_url)
logger.info(f"url准备下载:{url}")
response = requests.get(url)
if response.status_code == 200:
target_path = Path(path)
if target_path.exists() and target_path.is_dir():
target_path = target_path / _resolve_download_filename(url, response)
target_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_path, "wb") as f:
f.write(response.content)
logger.info(f"{url}文件下载成功,保存到{target_path}")
return str(target_path)
logger.error(f"{url}文件下载失败. HTTP Status Code: {response.status_code}")
return None
def url_replace_fastgpt(origin: str):
if not origin.startswith("http:"):
origin = base_fastgpt_url + origin
return origin
def _add_common_url_args(parser: argparse.ArgumentParser):
parser.add_argument(
"--base-fastgpt-url",
default=DEFAULT_BASE_FASTGPT_URL,
help=f"FastGPT 内网基础地址,默认:{DEFAULT_BASE_FASTGPT_URL}",
)
parser.add_argument(
"--base-backend-url",
default=DEFAULT_BASE_BACKEND_URL,
help=f"后端内网基础地址,默认:{DEFAULT_BASE_BACKEND_URL}",
)
parser.add_argument(
"--outer-backend-url",
default=DEFAULT_OUTER_BACKEND_URL,
help=f"后端外网地址,下载时会替换为内网地址,默认:{DEFAULT_OUTER_BACKEND_URL}",
)
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="单文件上传/下载工具:通过后端接口上传文件,或下载 FastGPT/后端文件 URL。"
)
parser.set_defaults(command=None)
subparsers = parser.add_subparsers(dest="command", required=True)
upload_parser = subparsers.add_parser("upload", help="上传本地文件。")
_add_common_url_args(upload_parser)
upload_parser.add_argument(
"--username",
default=DEFAULT_BACKEND_ADMIN_USERNAME,
help=f"后端管理员用户名,默认:{DEFAULT_BACKEND_ADMIN_USERNAME}",
)
upload_parser.add_argument(
"--password",
default=DEFAULT_BACKEND_ADMIN_PASSWORD,
help=f"后端管理员密码,默认:{DEFAULT_BACKEND_ADMIN_PASSWORD}",
)
upload_parser.add_argument("path", help="要上传的本地文件路径。")
download_parser = subparsers.add_parser("download", help="下载 URL 到本地路径。")
_add_common_url_args(download_parser)
download_parser.add_argument("url", help="HTTP URL 或 FastGPT/后端相对路径。")
download_parser.add_argument(
"path", help="输出文件路径;如果是已存在目录,则自动解析文件名。"
)
normalize_parser = subparsers.add_parser(
"normalize-url", help="把 FastGPT 相对路径补全为绝对 URL。"
)
_add_common_url_args(normalize_parser)
normalize_parser.add_argument("url", help="HTTP URL 或 FastGPT 相对路径。")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_arg_parser()
args = parser.parse_args(argv)
configure_urls(
fastgpt_url=_strip_trailing_slash(args.base_fastgpt_url),
backend_url=_strip_trailing_slash(args.base_backend_url),
outer_url=_strip_trailing_slash(args.outer_backend_url),
)
if args.command == "upload":
configure_login(username=args.username, password=args.password)
if args.command == "upload":
print(upload_file(args.path))
return 0
if args.command == "download":
saved_path = download_file(args.url, args.path)
if saved_path is None:
return 1
print(saved_path)
return 0
if args.command == "normalize-url":
print(url_replace_fastgpt(args.url))
return 0
parser.error(f"unsupported command: {args.command}")
return 2
if __name__ == "__main__":
sys.exit(main())
from spire.doc import Document, Paragraph, Table, Comment, CommentMark, CommentMarkType
from loguru import logger
import re
from thefuzz import fuzz
from utils.doc_util import DocBase
from utils.common_util import adjust_single_chunk_size
import os
def extract_table_cells_text(table, joiner="\n"):
"""
从 Spire.Doc 的 Table 对象中提取每个单元格文本,并按行主序返回扁平列表:
["r0c0_text", "r0c1_text", "r1c0_text", ...]
joiner: 用于连接单元格内多段落或嵌套表行的分隔符(默认换行)
注意:不对文本做任何清洗或 strip,保持原有格式
"""
def _para_text(para):
# 优先使用 para.Text(保留原样),否则尝试从 para.ChildObjects 收集 Text-like 字段
try:
if hasattr(para, "Text"):
return para.Text if para.Text is not None else ""
except Exception:
pass
parts = []
try:
for idx in range(para.ChildObjects.Count):
obj = para.ChildObjects[idx]
if hasattr(obj, "Text"):
parts.append(obj.Text if obj.Text is not None else "")
except Exception:
pass
return "".join(parts)
def _extract_cell_text(cell):
parts = []
# 收集单元格内所有段落文本(保持原样,不做 strip)
try:
for p_idx in range(cell.Paragraphs.Count):
para = cell.Paragraphs[p_idx]
parts.append(_para_text(para))
except Exception:
pass
# 处理嵌套表格(若存在),把嵌套表每一行合并为一条字符串,并按行加入 parts
try:
if hasattr(cell, "Tables") and cell.Tables.Count > 0:
for t_idx in range(cell.Tables.Count):
nested = cell.Tables[t_idx]
nested_rows = []
for nr in range(nested.Rows.Count):
nested_row_cells = []
for nc in range(nested.Rows[nr].Cells.Count):
try:
# 取嵌套单元格的所有段落并用 joiner 连接(保留原样)
nc_parts = []
for np_idx in range(
nested.Rows[nr].Cells[nc].Paragraphs.Count
):
nc_parts.append(
_para_text(
nested.Rows[nr].Cells[nc].Paragraphs[np_idx]
)
)
nested_row_cells.append(joiner.join(nc_parts))
except Exception:
nested_row_cells.append("")
nested_rows.append(joiner.join(nested_row_cells))
parts.append(joiner.join(nested_rows))
else:
# 有时嵌套表格会放在 cell.ChildObjects 中,兼容处理
try:
for idx in range(cell.ChildObjects.Count):
ch = cell.ChildObjects[idx]
if hasattr(ch, "Rows") and getattr(ch, "Rows") is not None:
nested = ch
nested_rows = []
for nr in range(nested.Rows.Count):
nested_row_cells = []
for nc in range(nested.Rows[nr].Cells.Count):
try:
nc_parts = []
for np_idx in range(
nested.Rows[nr].Cells[nc].Paragraphs.Count
):
nc_parts.append(
_para_text(
nested.Rows[nr]
.Cells[nc]
.Paragraphs[np_idx]
)
)
nested_row_cells.append(joiner.join(nc_parts))
except Exception:
nested_row_cells.append("")
nested_rows.append(joiner.join(nested_row_cells))
parts.append(joiner.join(nested_rows))
except Exception:
pass
except Exception:
pass
# 把单元格内收集到的片段用 joiner 连接成最终字符串(不做任何 trim/clean)
return joiner.join(parts)
flat = []
for r in range(table.Rows.Count):
row = table.Rows[r]
for c in range(row.Cells.Count):
cell = row.Cells[c]
cell_text = _extract_cell_text(cell)
# 保持原样,空单元格返回空字符串
flat.append(cell_text)
return flat
def process_string(s):
# 统计换行符数量
newline_count = s.count("\n")
# 情况1:没有换行符
if newline_count == 0:
return s
# 情况2:只有一个换行符
elif newline_count == 1:
# 分割成两部分
parts = s.split("\n", 1)
# 比较前后部分长度
return parts[0] if len(parts[0]) >= len(parts[1]) else parts[1]
# 情况3:多个换行符
else:
# 分割所有部分
parts = s.split("\n")
# 找出中间部分(排除首尾)
middle_parts = parts[1:-1] if len(parts) > 2 else []
# 如果没有有效中间部分
if not middle_parts:
# 返回最长的一段(排除空字符串)
non_empty_parts = [p for p in parts if p]
return max(non_empty_parts, key=len) if non_empty_parts else ""
# 返回最长的中间部分
return max(middle_parts, key=len, default="")
def _score_target_against_query(target_text: str, query_text: str):
"""对单个候选文本与查询文本打分,并返回最适合落批注的匹配片段。"""
if not target_text or not query_text:
return None, 0
if query_text in target_text:
return query_text, 100
# partial_ratio 负责召回,ratio 负责精度;组合分用于排序
def _combined_score(text_a: str, text_b: str):
ratio_score = fuzz.ratio(text_a, text_b)
partial_score = fuzz.partial_ratio(text_a, text_b)
combined = int(round(0.4 * ratio_score + 0.6 * partial_score))
return combined
best_text = target_text
best_score = _combined_score(target_text, query_text)
# 对长句按常见中文分隔符做子句拆分,避免整句比较被噪声稀释。
for clause in target_text.replace("。", ";").replace(",", ";").split(";"):
clause = clause.strip()
if not clause:
continue
clause_score = _combined_score(clause, query_text)
if clause_score > best_score:
best_score = clause_score
best_text = clause
return best_text, best_score
def _build_narrowed_queries(text: str, min_len=12):
"""对文本做一步缩窄,生成下一轮候选。"""
if not text:
return []
text = text.strip()
if len(text) <= min_len:
return []
next_queries = []
cut = max(1, len(text) // 8)
left_cut = text[cut:]
right_cut = text[:-cut]
center_cut = text[cut:-cut] if len(text) > 2 * cut else ""
for item in (left_cut, right_cut, center_cut):
item = item.strip()
if len(item) >= min_len:
next_queries.append(item)
simplified = process_string(text)
if simplified and len(simplified) >= min_len:
next_queries.append(simplified.strip())
parts = [p.strip() for p in re.split(r"[。;;,,\n]", text) if p.strip()]
if len(parts) > 1:
longest_part = max(parts, key=len)
if len(longest_part) >= min_len:
next_queries.append(longest_part)
if len(parts) > 2:
mid_join = "".join(parts[1:-1]).strip()
if len(mid_join) >= min_len:
next_queries.append(mid_join)
deduped = []
seen = set()
for item in next_queries:
if item not in seen:
seen.add(item)
deduped.append(item)
return deduped
def _find_best_match_in_texts(target_texts, original_text):
"""在候选文本列表中查找与 original_text 最相近的一条(支持递进缩窄查询)。"""
if not target_texts or not original_text:
return None, -1
best_match = None
best_score = -1
# beam_size: 每轮仅保留得分最高的前 N 个查询继续扩展,控制搜索分支爆炸。
beam_size = 5
# max_rounds: 递进缩窄的最大轮数,避免异常文本导致无限尝试。
max_rounds = 8
min_query_len = 12
active_queries = [original_text.strip()]
seen_queries = set(active_queries)
for _ in range(max_rounds):
if not active_queries:
break
query_best_scores = []
for query in active_queries:
local_best = -1
for target_text in target_texts:
match_text, score = _score_target_against_query(target_text, query)
if score > best_score:
best_match = match_text
best_score = score
if score > local_best:
local_best = score
query_best_scores.append((query, local_best))
if best_score >= 100:
break
# 先保留当前轮最有希望的查询,再基于它们生成下一轮缩窄查询。
query_best_scores.sort(key=lambda x: x[1], reverse=True)
top_queries = [q for q, _ in query_best_scores[:beam_size]]
next_queries = []
for query in top_queries:
for narrowed in _build_narrowed_queries(query, min_len=min_query_len):
if narrowed not in seen_queries:
seen_queries.add(narrowed)
next_queries.append(narrowed)
active_queries = next_queries
return best_match, best_score
# spire doc解析
class SpireWordDoc(DocBase):
def load(self, doc_path, **kwargs):
# License.SetLicenseFileFullPath(f"{root_path}/license.elic.python.xml")
self._doc_path = doc_path
self._doc_name = os.path.basename(doc_path)
self._doc = Document()
self._doc.LoadFromFile(doc_path)
self._chunk_list = self._resolve_doc_chunk()
return self
def _ensure_loaded(self):
if not self._doc:
raise RuntimeError("Document not loaded. Call load() first.")
def adjust_chunk_size(self):
self._ensure_loaded()
all_text_len = len(self.get_all_text())
self._max_single_chunk_size = adjust_single_chunk_size(all_text_len)
logger.info(
f"SpireWordDoc adjust _max_single_chunk_size to {self._max_single_chunk_size}"
)
self._chunk_list = self._resolve_doc_chunk()
return self._max_single_chunk_size
async def get_from_ocr(self):
pass
# 把文档分割成chunk
def _resolve_doc_chunk(self):
self._ensure_loaded()
chunk_list = []
# 单个chunk
single_chunk = ""
# 单个chunk的位置信息
single_chunk_location = []
# 遍历每个节
for section_idx in range(self._doc.Sections.Count):
current_section = self._doc.Sections.get_Item(section_idx)
# 遍历节里面每个子对象
for section_child_idx in range(current_section.Body.ChildObjects.Count):
# 获取子对象
child_obj = current_section.Body.ChildObjects.get_Item(
section_child_idx
)
# 段落处理
current_child_text = ""
if isinstance(child_obj, Paragraph):
paragraph = child_obj
current_child_text = paragraph.Text
# 表格处理
elif isinstance(child_obj, Table):
table = child_obj
current_child_text = self._resolve_table(table)
# 跳过其他非文本子对象
else:
continue
# 添加新对象
if (
len(single_chunk) + len(current_child_text)
> self._max_single_chunk_size
):
chunk_list.append(
{
"chunk_content": single_chunk,
"chunk_location": single_chunk_location,
}
)
single_chunk = ""
single_chunk_location = []
single_chunk += current_child_text + "\n"
single_chunk_location.append(
{"section_idx": section_idx, "section_child_idx": section_child_idx}
)
if len(single_chunk):
chunk_list.append(
{"chunk_content": single_chunk, "chunk_location": single_chunk_location}
)
return chunk_list
# 表格解析为markdown
def _resolve_table(self, table):
table_data = ""
for i in range(0, table.Rows.Count):
# 遍历行的单元格(cells)
cell_list = []
for j in range(0, table.Rows.get_Item(i).Cells.Count):
# 获取每一个单元格(cell)
cell = table.Rows.get_Item(i).Cells.get_Item(j)
cell_content = ""
for para_idx in range(cell.Paragraphs.Count):
paragraph_text = cell.Paragraphs.get_Item(para_idx).Text
cell_content += paragraph_text
cell_list.append(cell_content)
# table_data += "|" + "|".join(cell_list) + "|"
# table_data += "\n"
table_data += ' '.join(cell_list) + '\n'
if i == 0:
# table_data += "|" + "|".join(["--- " for _ in cell_list]) + "|\n"
table_data= ' '.join(cell_list) + '\n'
return table_data
def get_chunk_info(self, chunk_id):
chunk = self._chunk_list[chunk_id]
chunk_content = chunk["chunk_content"]
chunk_location = chunk["chunk_location"]
from_location = f"[第{chunk_location[0]['section_idx'] + 1}节的第{chunk_location[0]['section_child_idx'] + 1}段落]"
to_location = f"[第{chunk_location[-1]['section_idx'] + 1}节的第{chunk_location[-1]['section_child_idx'] + 1}段落]"
chunk_content_tips = (
"[" + chunk_content[:20] + "]...到...[" + chunk_content[-20:] + "]"
)
return f"文件块id: {chunk_id + 1}\n文件块位置: 从{from_location}到{to_location}\n文件块简述: {chunk_content_tips}\n"
def get_chunk_location(self, chunk_id):
return self.get_chunk_info(chunk_id)
def get_chunk_num(self):
self._ensure_loaded()
return len(self._chunk_list)
def get_chunk_item(self, chunk_id):
self._ensure_loaded()
return self._chunk_list[chunk_id]["chunk_content"]
# 根据locations获取数据
def get_sub_chunks(self, chunk_id):
if chunk_id >= len(self._chunk_list):
logger.error(f"get_sub_chunks_error:{chunk_id}")
return []
chunk = self._chunk_list[chunk_id]
chunk_locations = chunk["chunk_location"]
return [
self._doc.Sections.get_Item(loc["section_idx"]).Body.ChildObjects.get_Item(
loc["section_child_idx"]
)
for loc in chunk_locations
]
def format_comment_author(self, comment):
return "{}|{}".format(str(comment["id"]), comment["key_points"])
def _decorate_author_with_match_type(self, author, match_type):
if match_type == "exact":
return f"(精确){author}"
if match_type == "fuzzy":
return f"(模糊){author}"
return author
def _normalize_author_prefix(self, author):
# 去掉匹配来源前缀后再比对,确保“精确/模糊”两种作者标签都能命中同一条批注。
if not author:
return author
for prefix in ("(精确)", "(模糊)"):
if author.startswith(prefix):
return author[len(prefix) :]
return author
def remove_comment_prefix(
self,
):
for i in range(self._doc.Comments.Count):
current_comment = self._doc.Comments.get_Item(i)
comment_author = current_comment.Format.Author
split_author = comment_author.split("|")
if len(split_author) == 2:
current_comment.Format.Author = comment_author.split("|")[1]
def _insert_comment_by_text_range(self, text_range, author, comment_content):
if text_range is None:
return False
paragraph = text_range.OwnerParagraph
if paragraph is None:
return False
comment = Comment(self._doc)
comment.Body.AddParagraph().Text = comment_content
comment.Format.Author = author
paragraph.ChildObjects.Insert(
paragraph.ChildObjects.IndexOf(text_range) + 1, comment
)
# Word 批注需要成对的起止标记;两者共享同一个 CommentId。
comment_start = CommentMark(self._doc, CommentMarkType.CommentStart)
comment_end = CommentMark(self._doc, CommentMarkType.CommentEnd)
comment_start.CommentId = comment.Format.CommentId
comment_end.CommentId = comment.Format.CommentId
paragraph.ChildObjects.Insert(
paragraph.ChildObjects.IndexOf(text_range), comment_start
)
paragraph.ChildObjects.Insert(
paragraph.ChildObjects.IndexOf(text_range) + 1, comment_end
)
return True
def _update_comment_content(self, comment_idx, suggest):
self._doc.Comments.get_Item(comment_idx).Body.Paragraphs.get_Item(0).Text = suggest
def _try_add_comment_in_paragraphs(self, paragraphs, target_text, author, suggest):
if not target_text:
return False
for paragraph in paragraphs:
text_sel = paragraph.Find(target_text, False, True)
if text_sel and self.set_comment_by_text_selection(text_sel, author, suggest):
return True
return False
def _try_add_comment_by_exact(self, sub_chunks, find_key, author, suggest):
for obj in sub_chunks:
if isinstance(obj, Paragraph):
try:
text_sel = obj.Find(find_key, False, True)
if text_sel and self.set_comment_by_text_selection(
text_sel, author, suggest
):
return True
except Exception as e:
print(f"段落批注添加失败: {str(e)}")
elif isinstance(obj, Table):
try:
if self.add_table_comment(obj, find_key, suggest, author):
return True
except Exception as e:
print(f"表格批注添加失败: {str(e)}")
return False
def _try_add_comment_by_fuzzy(self, sub_chunks, comment, author, suggest):
original_text = comment.get("original_text", "")
candidates = []
# 段落与表格同权:统一加入候选池,按最高分排序后尝试落批注
for order, obj in enumerate(sub_chunks):
if isinstance(obj, Paragraph):
match_text, score = _find_best_match_in_texts([obj.Text], original_text)
candidates.append(
{
"kind": "paragraph",
"obj": obj,
"match_text": match_text,
"score": score,
"order": order,
}
)
elif isinstance(obj, Table):
table_data = extract_table_cells_text(obj)
match_text, score = _find_best_match_in_texts(table_data, original_text)
candidates.append(
{
"kind": "table",
"obj": obj,
"match_text": match_text,
"score": score,
"order": order,
}
)
# 过滤无效候选后按分数降序、原文档顺序升序尝试,优先高分且靠前的位置。
candidates = [
item
for item in candidates
if item.get("match_text") and item.get("score", -1) >= 0
]
candidates.sort(key=lambda x: (-x["score"], x["order"]))
for item in candidates:
match_text = item["match_text"]
processed_text = process_string(match_text) if match_text else ""
if item["kind"] == "paragraph":
paragraph = item["obj"]
# 先尝试原匹配片段,再尝试 process_string 压缩后的片段,提高落点成功率。
if self._try_add_comment_in_paragraphs(
[paragraph], match_text, author, suggest
):
return True
if self._try_add_comment_in_paragraphs(
[paragraph], processed_text, author, suggest
):
return True
else:
table = item["obj"]
# 表格同样使用“原片段 -> 压缩片段”的两阶段策略。
if self.add_table_comment(table, match_text, suggest, author):
return True
if processed_text and self.add_table_comment(
table, processed_text, suggest, author
):
return True
return False
# 根据text_selection批注
def set_comment_by_text_selection(self, text_sel, author, comment_content):
if text_sel is None:
return False
text_range = text_sel.GetAsOneRange()
return self._insert_comment_by_text_range(text_range, author, comment_content)
# 设置chunk批注
def add_table_comment(
self, table, target_text, comment_text, author="审阅助手", initials="AI"
):
"""
在表格中添加批注
返回是否成功添加
"""
added = False
# 遍历表格所有单元格
for i in range(table.Rows.Count):
row = table.Rows[i]
for j in range(row.Cells.Count):
cell = row.Cells[j]
# 遍历单元格中的段落
for k in range(cell.Paragraphs.Count):
para = cell.Paragraphs[k]
# 在段落中查找目标文本
selection = para.Find(target_text, False, True)
if selection:
text_range = selection.GetAsOneRange()
if self._insert_comment_by_text_range(
text_range, author, comment_text
):
added = True
# print(f"表格批注添加成功: '{target_text[:20]}...'")
# 添加成功后跳出内层循环
break
# 如果已经添加,跳出单元格循环
if added:
break
# 如果已经添加,跳出行循环
if added:
break
return added
def add_chunk_comment(self, chunk_id, comments):
"""
为 chunk 添加批注(保证每条评论只批注一次)。
执行顺序:
1) 过滤非“不合格”项;
2) 先按作者标识查重,命中则更新内容;
3) 未命中时先精确匹配,再模糊匹配;
4) 仍失败则记录日志。
"""
for comment in comments:
if comment.get("result") != "不合格":
continue
# update chunk_id
comment_chunk_id = comment.get("chunk_id", -1)
# 优先使用comments里提供的chunk_id,如果没有或无效则使用外部传入的chunk_id,如果都没有则异常处理
sub_chunks = self.get_sub_chunks(comment_chunk_id) if comment_chunk_id != -1 \
and comment_chunk_id < self.get_chunk_num() else self.get_sub_chunks(chunk_id)
author = self.format_comment_author(comment)
suggest = comment.get("suggest", "")
find_key = comment["original_text"].strip() or comment["key_points"]
# 先检查是否已有同一“规则ID|要点”的批注,避免重复插入。
existing_comment_idx = self.find_comment(author)
if existing_comment_idx is not None:
# 已存在批注,则更新内容
self._update_comment_content(existing_comment_idx, suggest)
# print(f"批注已存在,更新内容: '{find_key[:20]}...'")
continue
exact_author = self._decorate_author_with_match_type(author, "exact")
fuzzy_author = self._decorate_author_with_match_type(author, "fuzzy")
# 优先精确匹配,成功则不再进入模糊匹配。
matched = self._try_add_comment_by_exact(
sub_chunks, find_key, exact_author, suggest
)
if not matched:
try:
# 精确失败后走模糊匹配(段落/表格统一候选池评分)。
matched = self._try_add_comment_by_fuzzy(
sub_chunks, comment, fuzzy_author, suggest
)
except Exception as e:
print(f"模糊匹配失败: {str(e)}")
# ---------- 3. 匹配最终失败 ----------
if not matched:
logger.error(f"未找到可批注位置: '{find_key[:20]}...'")
# 根据作者名称查找批注
def find_comment(self, author):
# 比较前去掉“(精确)/(模糊)”前缀,只按真实作者键(id|key_points)识别唯一批注。
normalized_author = self._normalize_author_prefix(author)
for i in range(self._doc.Comments.Count):
current_comment = self._doc.Comments.get_Item(i)
comment_author = self._normalize_author_prefix(current_comment.Format.Author)
if comment_author == normalized_author:
return i
return None
def delete_chunk_comment(self, comments):
"""
删除指定作者批注
"""
for comment in comments:
author = self.format_comment_author(comment)
author_comment_idx = self.find_comment(author)
if author_comment_idx is not None:
self._doc.Comments.RemoveAt(author_comment_idx)
print(f"删除批注: '{author}'")
def edit_chunk_comment(self, comments):
"""
编辑chunk批注:删除已合格的批注,修改存在的批注,不存在则新增
"""
for comment in comments:
author = self.format_comment_author(comment)
review_answer = comment["result"]
existing_comment_idx = self.find_comment(author)
if review_answer == "合格":
# 删除批注
if existing_comment_idx is not None:
self._doc.Comments.RemoveAt(existing_comment_idx)
# print(f"已删除合格批注: '{author}'")
else:
# 不合格,更新或新增
suggest = comment.get("suggest", "")
if existing_comment_idx is not None:
self._update_comment_content(existing_comment_idx, suggest)
# print(f"更新已有批注: '{author}'")
else:
# chunk_id要从comment中获取
self.add_chunk_comment(comment["chunk_id"] - 1, [comment])
def get_chunk_id_list(self, step=1):
self._ensure_loaded()
return [idx for idx in range(0, self.get_chunk_num(), step)]
def get_all_text(self):
self._ensure_loaded()
return self._doc.GetText()
def to_file(self, path, remove_prefix=False):
self._ensure_loaded()
if remove_prefix:
self.remove_comment_prefix()
self._doc.SaveToFile(path)
def release(self):
# 关闭文件
if self._doc:
self._doc.Close()
super().release()
def __del__(self):
pass
# self.release()
if __name__ == "__main__":
doc = SpireWordDoc()
doc.load(
r"/home/ccran/lufa-contract/demo/今麦郎合同审核.docx"
)
print(doc._doc_name)
print("附件2《技术协议》" in doc.get_all_text())
# doc.add_chunk_comment(
# 0,
# [
# {
# "id": "1",
# "key_points": "日期审查",
# "original_text": "承诺",
# "details": "1111",
# "chunk_id": 0,
# "result": "不合格",
# "suggest": "这是测试建议",
# }
# ],
# )
# doc.to_file("/home/ccran/lufa-contract/demo/今麦郎合同审核_test.docx", True)
\ No newline at end of file
from spire.doc import Document, Paragraph, Table, Comment, CommentMark, CommentMarkType
import json
from loguru import logger
import re
from thefuzz import fuzz
from utils.doc_util import DocBase
from utils.common_util import adjust_single_chunk_size
import os
def extract_table_cells_text(table, joiner="\n"):
"""
从 Spire.Doc 的 Table 对象中提取每个单元格文本,并按行主序返回扁平列表:
["r0c0_text", "r0c1_text", "r1c0_text", ...]
joiner: 用于连接单元格内多段落或嵌套表行的分隔符(默认换行)
注意:不对文本做任何清洗或 strip,保持原有格式
"""
def _para_text(para):
# 优先使用 para.Text(保留原样),否则尝试从 para.ChildObjects 收集 Text-like 字段
try:
if hasattr(para, "Text"):
return para.Text if para.Text is not None else ""
except Exception:
pass
parts = []
try:
for idx in range(para.ChildObjects.Count):
obj = para.ChildObjects[idx]
if hasattr(obj, "Text"):
parts.append(obj.Text if obj.Text is not None else "")
except Exception:
pass
return "".join(parts)
def _extract_cell_text(cell):
parts = []
# 收集单元格内所有段落文本(保持原样,不做 strip)
try:
for p_idx in range(cell.Paragraphs.Count):
para = cell.Paragraphs[p_idx]
parts.append(_para_text(para))
except Exception:
pass
# 处理嵌套表格(若存在),把嵌套表每一行合并为一条字符串,并按行加入 parts
try:
if hasattr(cell, "Tables") and cell.Tables.Count > 0:
for t_idx in range(cell.Tables.Count):
nested = cell.Tables[t_idx]
nested_rows = []
for nr in range(nested.Rows.Count):
nested_row_cells = []
for nc in range(nested.Rows[nr].Cells.Count):
try:
# 取嵌套单元格的所有段落并用 joiner 连接(保留原样)
nc_parts = []
for np_idx in range(
nested.Rows[nr].Cells[nc].Paragraphs.Count
):
nc_parts.append(
_para_text(
nested.Rows[nr].Cells[nc].Paragraphs[np_idx]
)
)
nested_row_cells.append(joiner.join(nc_parts))
except Exception:
nested_row_cells.append("")
nested_rows.append(joiner.join(nested_row_cells))
parts.append(joiner.join(nested_rows))
else:
# 有时嵌套表格会放在 cell.ChildObjects 中,兼容处理
try:
for idx in range(cell.ChildObjects.Count):
ch = cell.ChildObjects[idx]
if hasattr(ch, "Rows") and getattr(ch, "Rows") is not None:
nested = ch
nested_rows = []
for nr in range(nested.Rows.Count):
nested_row_cells = []
for nc in range(nested.Rows[nr].Cells.Count):
try:
nc_parts = []
for np_idx in range(
nested.Rows[nr].Cells[nc].Paragraphs.Count
):
nc_parts.append(
_para_text(
nested.Rows[nr]
.Cells[nc]
.Paragraphs[np_idx]
)
)
nested_row_cells.append(joiner.join(nc_parts))
except Exception:
nested_row_cells.append("")
nested_rows.append(joiner.join(nested_row_cells))
parts.append(joiner.join(nested_rows))
except Exception:
pass
except Exception:
pass
# 把单元格内收集到的片段用 joiner 连接成最终字符串(不做任何 trim/clean)
return joiner.join(parts)
flat = []
for r in range(table.Rows.Count):
row = table.Rows[r]
for c in range(row.Cells.Count):
cell = row.Cells[c]
cell_text = _extract_cell_text(cell)
# 保持原样,空单元格返回空字符串
flat.append(cell_text)
return flat
def process_string(s):
# 统计换行符数量
newline_count = s.count("\n")
# 情况1:没有换行符
if newline_count == 0:
return s
# 情况2:只有一个换行符
elif newline_count == 1:
# 分割成两部分
parts = s.split("\n", 1)
# 比较前后部分长度
return parts[0] if len(parts[0]) >= len(parts[1]) else parts[1]
# 情况3:多个换行符
else:
# 分割所有部分
parts = s.split("\n")
# 找出中间部分(排除首尾)
middle_parts = parts[1:-1] if len(parts) > 2 else []
# 如果没有有效中间部分
if not middle_parts:
# 返回最长的一段(排除空字符串)
non_empty_parts = [p for p in parts if p]
return max(non_empty_parts, key=len) if non_empty_parts else ""
# 返回最长的中间部分
return max(middle_parts, key=len, default="")
def build_mapping(original: str):
"""构造规范化文本和原文索引映射"""
normalized = []
mapping = []
for m in re.finditer(r"\S+", original):
word = m.group()
if normalized:
normalized.append(" ")
mapping.append(m.start()) # 空格映射
for j, ch in enumerate(word):
normalized.append(ch)
mapping.append(m.start() + j)
return "".join(normalized), mapping
def extract_match(big_text: str, small_text: str, threshold=20):
"""
简化版文本匹配函数
核心逻辑:优先整个文本块匹配,次优子句匹配
"""
# 1. 精确匹配整个文本块
if small_text in big_text:
return small_text, 100
# 2. 整个文本块模糊匹配
full_score = fuzz.ratio(big_text, small_text)
if full_score >= threshold:
return big_text, full_score
# 3. 子句匹配(简单分割)
best_score = 0
best_clause = None
# 简单分割:按句号、分号、逗号分割
for clause in big_text.replace("。", ";").replace(",", ";").split(";"):
if not clause.strip():
continue
clause_score = fuzz.ratio(clause, small_text)
if clause_score > best_score:
best_score = clause_score
best_clause = clause
# 4. 返回最佳匹配
if best_score >= threshold:
return best_clause, best_score
# 5. 无有效匹配
return None, max(full_score, best_score)
def find_best_match(sub_chunks, comment):
"""
在给定的文本块中查找与原始评论最匹配的文本
参数:
sub_chunks -- 包含Text属性的对象列表
comment -- 包含"original_text"的字典
返回:
best_match -- 匹配度最高的文本
best_score -- 最高匹配度
all_results -- 所有匹配结果列表(匹配文本, 相似度)
"""
all_results = [] # 存储所有(匹配文本, 相似度)的元组
best_match = None # 存储最佳匹配的结果
best_score = -1 # 存储最高相似度(初始化为-1)
# print(f"开始处理评论: {comment['original_text'][:30]}...") # 显示简化的原始评论
for obj in sub_chunks:
if isinstance(obj, Paragraph):
target_text = obj.Text
original_text = comment["original_text"]
match_text, score = extract_match(target_text, original_text)
# 打印当前结果(保持原格式)
# print("匹配到:\n", match_text)
# print("相似度:", score)
# 存储所有结果
all_results.append((match_text, score))
# 更新最佳匹配 - 只更新分数更高的结果
if score > best_score:
best_match = match_text
best_score = score
# 打印最终的最佳匹配结果
# print("\n" + "=" * 40)
# print("\n处理完成 - 最佳匹配结果:")
# print("匹配到:\n", best_match)
# print("相似度:", best_score)
# print("=" * 40 + "\n")
return best_match, best_score
def table_contract(target_texts, comment):
"""
在给定的文本块中查找与原始评论最匹配的文本
参数:
sub_chunks -- 待对比文本
comment -- 包含"original_text"的字典
返回:
best_match -- 匹配度最高的文本
best_score -- 最高匹配度
all_results -- 所有匹配结果列表(匹配文本, 相似度)
"""
all_results = [] # 存储所有(匹配文本, 相似度)的元组
best_match = None # 存储最佳匹配的结果
best_score = -1 # 存储最高相似度(初始化为-1)
# print(f"开始处理评论: {comment['original_text'][:30]}...") # 显示简化的原始评论
original_text = comment["original_text"]
for target_text in target_texts:
match_text, score = extract_match(target_text, original_text)
# 打印当前结果(保持原格式)
# print("匹配到:\n", match_text)
# print("相似度:", score)
# 存储所有结果
all_results.append((match_text, score))
# 更新最佳匹配 - 只更新分数更高的结果
if score > best_score:
best_match = match_text
best_score = score
# 打印最终的最佳匹配结果
# print("\n" + "=" * 40)
# print("\n处理完成 - 最佳匹配结果:")
# print("匹配到:\n", best_match)
# print("相似度:", best_score)
# print("=" * 40 + "\n")
return best_match, best_score
# spire doc解析
class SpireWordDoc(DocBase):
def load(self, doc_path, **kwargs):
# License.SetLicenseFileFullPath(f"{root_path}/license.elic.python.xml")
self._doc_path = doc_path
self._doc_name = os.path.basename(doc_path)
self._doc = Document()
self._doc.LoadFromFile(doc_path)
self._chunk_list = self._resolve_doc_chunk()
return self
def _ensure_loaded(self):
if not self._doc:
raise RuntimeError("Document not loaded. Call load() first.")
def adjust_chunk_size(self):
self._ensure_loaded()
all_text_len = len(self.get_all_text())
self._max_single_chunk_size = adjust_single_chunk_size(all_text_len)
logger.info(
f"SpireWordDoc adjust _max_single_chunk_size to {self._max_single_chunk_size}"
)
self._chunk_list = self._resolve_doc_chunk()
return self._max_single_chunk_size
async def get_from_ocr(self):
pass
# 把文档分割成chunk
def _resolve_doc_chunk(self):
self._ensure_loaded()
chunk_list = []
# 单个chunk
single_chunk = ""
# 单个chunk的位置信息
single_chunk_location = []
# 遍历每个节
for section_idx in range(self._doc.Sections.Count):
current_section = self._doc.Sections.get_Item(section_idx)
# 遍历节里面每个子对象
for section_child_idx in range(current_section.Body.ChildObjects.Count):
# 获取子对象
child_obj = current_section.Body.ChildObjects.get_Item(
section_child_idx
)
# 段落处理
current_child_text = ""
if isinstance(child_obj, Paragraph):
paragraph = child_obj
current_child_text = paragraph.Text
# 表格处理
elif isinstance(child_obj, Table):
table = child_obj
current_child_text = self._resolve_table(table)
# 跳过其他非文本子对象
else:
continue
# 添加新对象
if (
len(single_chunk) + len(current_child_text)
> self._max_single_chunk_size
):
chunk_list.append(
{
"chunk_content": single_chunk,
"chunk_location": single_chunk_location,
}
)
single_chunk = ""
single_chunk_location = []
single_chunk += current_child_text + "\n"
single_chunk_location.append(
{"section_idx": section_idx, "section_child_idx": section_child_idx}
)
if len(single_chunk):
chunk_list.append(
{"chunk_content": single_chunk, "chunk_location": single_chunk_location}
)
return chunk_list
# 表格解析为markdown
def _resolve_table(self, table):
table_data = ""
for i in range(0, table.Rows.Count):
# 遍历行的单元格(cells)
cell_list = []
for j in range(0, table.Rows.get_Item(i).Cells.Count):
# 获取每一个单元格(cell)
cell = table.Rows.get_Item(i).Cells.get_Item(j)
cell_content = ""
for para_idx in range(cell.Paragraphs.Count):
paragraph_text = cell.Paragraphs.get_Item(para_idx).Text
cell_content += paragraph_text
cell_list.append(cell_content)
# table_data += "|" + "|".join(cell_list) + "|"
# table_data += "\n"
table_data += ' '.join(cell_list) + '\n'
if i == 0:
# table_data += "|" + "|".join(["--- " for _ in cell_list]) + "|\n"
table_data= ' '.join(cell_list) + '\n'
return table_data
def get_chunk_info(self, chunk_id):
chunk = self._chunk_list[chunk_id]
chunk_content = chunk["chunk_content"]
chunk_location = chunk["chunk_location"]
from_location = f"[第{chunk_location[0]['section_idx'] + 1}节的第{chunk_location[0]['section_child_idx'] + 1}段落]"
to_location = f"[第{chunk_location[-1]['section_idx'] + 1}节的第{chunk_location[-1]['section_child_idx'] + 1}段落]"
chunk_content_tips = (
"[" + chunk_content[:20] + "]...到...[" + chunk_content[-20:] + "]"
)
return f"文件块id: {chunk_id + 1}\n文件块位置: 从{from_location}到{to_location}\n文件块简述: {chunk_content_tips}\n"
def get_chunk_location(self, chunk_id):
return self.get_chunk_info(chunk_id)
def get_chunk_num(self):
self._ensure_loaded()
return len(self._chunk_list)
def get_chunk_item(self, chunk_id):
self._ensure_loaded()
return self._chunk_list[chunk_id]["chunk_content"]
# 根据locations获取数据
def get_sub_chunks(self, chunk_id):
if chunk_id >= len(self._chunk_list):
logger.error(f"get_sub_chunks_error:{chunk_id}")
return []
chunk = self._chunk_list[chunk_id]
chunk_locations = chunk["chunk_location"]
return [
self._doc.Sections.get_Item(loc["section_idx"]).Body.ChildObjects.get_Item(
loc["section_child_idx"]
)
for loc in chunk_locations
]
def format_comment_author(self, comment):
return "{}|{}".format(str(comment["id"]), comment["key_points"])
def remove_comment_prefix(
self,
):
for i in range(self._doc.Comments.Count):
current_comment = self._doc.Comments.get_Item(i)
comment_author = current_comment.Format.Author
split_author = comment_author.split("|")
if len(split_author) == 2:
current_comment.Format.Author = comment_author.split("|")[1]
# 根据text_selection批注
def set_comment_by_text_selection(self, text_sel, author, comment_content):
if text_sel is None:
return False
# 将找到的文本作为文本范围,并获取其所属的段落
range = text_sel.GetAsOneRange()
paragraph = range.OwnerParagraph
if paragraph is None:
return False
# 创建一个评论对象并设置评论的内容和作者
comment = Comment(self._doc)
comment.Body.AddParagraph().Text = comment_content
comment.Format.Author = author
# logger.info(author)
# 将评论添加到段落中
paragraph.ChildObjects.Insert(
paragraph.ChildObjects.IndexOf(range) + 1, comment
)
# 创建评论起始标记和结束标记,并将它们设置为创建的评论的起始标记和结束标记
commentStart = CommentMark(self._doc, CommentMarkType.CommentStart)
commentEnd = CommentMark(self._doc, CommentMarkType.CommentEnd)
commentStart.CommentId = comment.Format.CommentId
commentEnd.CommentId = comment.Format.CommentId
# 在找到的文本之前和之后插入创建的评论起始和结束标记
paragraph.ChildObjects.Insert(
paragraph.ChildObjects.IndexOf(range), commentStart
)
paragraph.ChildObjects.Insert(
paragraph.ChildObjects.IndexOf(range) + 1, commentEnd
)
return True
# 根据段落批注
def set_comment_by_paragraph(self, paragraph, author, comment_content):
comment = Comment(self._doc)
comment.Body.AddParagraph().Text = comment_content
# 设置注释的作者
comment.Format.Author = author
paragraph.ChildObjects.Add(comment)
# 创建注释开始标记和结束标记,并将它们设置为创建的注释的开始和结束标记
commentStart = CommentMark(self._doc, CommentMarkType.CommentStart)
commentEnd = CommentMark(self._doc, CommentMarkType.CommentEnd)
commentStart.CommentId = comment.Format.CommentId
commentEnd.CommentId = comment.Format.CommentId
# 在段落结尾插入注释开始标记和结束标记
# paragraph.ChildObjects.Add(commentStart)
paragraph.ChildObjects.Add(commentEnd)
# 也可以考虑在段落开始处插入标记
paragraph.ChildObjects.Insert(0, commentStart)
# 设置chunk批注
def add_table_comment(
self, table, target_text, comment_text, author="审阅助手", initials="AI"
):
"""
在表格中添加批注
返回是否成功添加
"""
added = False
# 遍历表格所有单元格
for i in range(table.Rows.Count):
row = table.Rows[i]
for j in range(row.Cells.Count):
cell = row.Cells[j]
# 遍历单元格中的段落
for k in range(cell.Paragraphs.Count):
para = cell.Paragraphs[k]
# 在段落中查找目标文本
selection = para.Find(target_text, False, True)
if selection:
# 获取文本范围
text_range = selection.GetAsOneRange()
if text_range is None:
continue
# 获取所属段落
paragraph = text_range.OwnerParagraph
if paragraph is None:
continue
# 创建一个评论对象并设置评论的内容和作者
comment = Comment(self._doc)
comment.Body.AddParagraph().Text = comment_text
comment.Format.Author = author
# 将评论添加到段落中
paragraph.ChildObjects.Insert(
paragraph.ChildObjects.IndexOf(text_range) + 1, comment
)
# 创建评论起始标记和结束标记
commentStart = CommentMark(
self._doc, CommentMarkType.CommentStart
)
commentEnd = CommentMark(self._doc, CommentMarkType.CommentEnd)
commentStart.CommentId = comment.Format.CommentId
commentEnd.CommentId = comment.Format.CommentId
# 在找到的文本之前和之后插入创建的评论起始和结束标记
paragraph.ChildObjects.Insert(
paragraph.ChildObjects.IndexOf(text_range), commentStart
)
paragraph.ChildObjects.Insert(
paragraph.ChildObjects.IndexOf(text_range) + 1, commentEnd
)
added = True
# print(f"表格批注添加成功: '{target_text[:20]}...'")
# 添加成功后跳出内层循环
break
# 如果已经添加,跳出单元格循环
if added:
break
# 如果已经添加,跳出行循环
if added:
break
return added
def add_chunk_comment(self, chunk_id, comments):
"""
为chunk添加批注(保证每条评论只批注一次)
"""
if chunk_id is not None:
sub_chunks = self.get_sub_chunks(chunk_id)
for comment in comments:
if comment.get("result") != "不合格":
continue
# update chunk_id
chunk_id = comment.get("chunk_id", -1)
if chunk_id is not None and chunk_id != -1:
sub_chunks = self.get_sub_chunks(chunk_id)
author = self.format_comment_author(comment)
suggest = comment.get("suggest", "")
find_key = comment["original_text"].strip() or comment["key_points"]
# 先检查是否已经有批注
existing_comment_idx = self.find_comment(author)
if existing_comment_idx is not None:
# 已存在批注,则更新内容
self._doc.Comments.get_Item(
existing_comment_idx
).Body.Paragraphs.get_Item(0).Text = suggest
# print(f"批注已存在,更新内容: '{find_key[:20]}...'")
continue
matched = False
# ---------- 1. 精确匹配(段落 + 表格) ----------
for obj in sub_chunks:
if isinstance(obj, Paragraph):
try:
text_sel = obj.Find(find_key, False, True)
if text_sel and self.set_comment_by_text_selection(
text_sel, author, suggest
):
# print(f"段落批注添加成功: '{find_key[:20]}...'")
matched = True
# 第一个找到的作为标注对象
break
except Exception as e:
print(f"段落批注添加失败: {str(e)}")
elif isinstance(obj, Table):
try:
if self.add_table_comment(obj, find_key, suggest, author):
# 第一个找到的表格对象作为批注对象
matched = True
break
except Exception as e:
print(f"表格批注添加失败: {str(e)}")
# ---------- 2. 模糊匹配 ----------
if not matched:
try:
paragraphs_only = [
obj for obj in sub_chunks if isinstance(obj, Paragraph)
]
match_text, _ = find_best_match(paragraphs_only, comment)
if match_text:
for obj in paragraphs_only:
text_sel = obj.Find(match_text, False, True)
if text_sel and self.set_comment_by_text_selection(
text_sel, author, suggest
):
# print(f"模糊批注添加成功: '{match_text[:20]}...'")
matched = True
break
if not matched:
processed_text = process_string(match_text)
for obj in paragraphs_only:
text_sel = obj.Find(processed_text, False, True)
if text_sel and self.set_comment_by_text_selection(
text_sel, author, suggest
):
# print(f"处理后批注添加成功: '{processed_text[:20]}...'")
matched = True
break
# 表格模糊匹配(仅段落模糊匹配失败才跑)
if not matched:
for obj in sub_chunks:
if isinstance(obj, Table):
table_data = extract_table_cells_text(obj)
best_table_match, _ = table_contract(
table_data, comment
)
if best_table_match and self.add_table_comment(
obj, best_table_match, suggest, author
):
# print(f"表格批注添加成功: '{best_table_match[:20]}...'")
matched = True
break
except Exception as e:
print(f"模糊匹配失败: {str(e)}")
# ---------- 3. 匹配最终失败 ----------
if not matched:
logger.error(f"未找到可批注位置: '{find_key[:20]}...'")
# 根据作者名称查找批注
def find_comment(self, author):
for i in range(self._doc.Comments.Count):
current_comment = self._doc.Comments.get_Item(i)
comment_author = current_comment.Format.Author
if comment_author == author:
return i
return None
def delete_chunk_comment(self, comments):
"""
删除指定作者批注
"""
for comment in comments:
author = self.format_comment_author(comment)
author_comment_idx = self.find_comment(author)
if author_comment_idx is not None:
self._doc.Comments.RemoveAt(author_comment_idx)
print(f"删除批注: '{author}'")
def edit_chunk_comment(self, comments):
"""
编辑chunk批注:删除已合格的批注,修改存在的批注,不存在则新增
"""
for comment in comments:
author = self.format_comment_author(comment)
review_answer = comment["result"]
existing_comment_idx = self.find_comment(author)
if review_answer == "合格":
# 删除批注
if existing_comment_idx is not None:
self._doc.Comments.RemoveAt(existing_comment_idx)
# print(f"已删除合格批注: '{author}'")
else:
# 不合格,更新或新增
suggest = comment.get("suggest", "")
if existing_comment_idx is not None:
self._doc.Comments.get_Item(
existing_comment_idx
).Body.Paragraphs.get_Item(0).Text = suggest
# print(f"更新已有批注: '{author}'")
else:
# chunk_id要从comment中获取
self.add_chunk_comment(comment["chunk_id"] - 1, [comment])
def get_chunk_id_list(self, step=1):
self._ensure_loaded()
return [idx for idx in range(0, self.get_chunk_num(), step)]
def get_all_text(self):
self._ensure_loaded()
return self._doc.GetText()
def to_file(self, path, remove_prefix=False):
self._ensure_loaded()
if remove_prefix:
self.remove_comment_prefix()
self._doc.SaveToFile(path)
def release(self):
# 关闭文件
if self._doc:
self._doc.Close()
super().release()
def __del__(self):
pass
# self.release()
if __name__ == "__main__":
doc = SpireWordDoc()
doc.load(
r"/home/ccran/lufa-contract/demo/今麦郎合同审核.docx"
)
print(doc._doc_name)
doc.add_chunk_comment(
0,
[
{
"id": "1",
"key_points": "日期审查",
"original_text": "承诺",
"details": "1111",
"chunk_id": 0,
"result": "不合格",
"suggest": "这是测试建议",
}
],
)
doc.to_file("/home/ccran/lufa-contract/demo/今麦郎合同审核_test.docx", True)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment