Commit d26c53e1 by ccran

feat: add readme.md

parent 5f18aa67
...@@ -7,5 +7,7 @@ ...@@ -7,5 +7,7 @@
# Keep Python source files # Keep Python source files
!**/*.py !**/*.py
!README.md
# Keep this file tracked # Keep this file tracked
!.gitignore !.gitignore
\ No newline at end of file
# 合同审查智能体 (Contract Review Agent)
一个基于 FastAPI 和大型语言模型 (LLM) 的智能合同审查系统,能够自动分析合同条款、识别风险并提供审查建议。
## 📋 项目概述
本项目是一个智能合同审查代理,通过以下流程实现合同自动化审查:
1. **文档解析** - 支持多种格式的合同文档解析
2. **分段处理** - 将合同按规则智能分段
3. **事实提取** - 从每个分段中提取与审查规则相关的客观事实
4. **规则审查** - 基于预设规则对提取的事实进行审查
5. **风险复核** - 对审查结果进行反思和复核
6. **结果合并** - 合并所有分段审查结果生成最终报告
## 🏗️ 项目结构
```
lufa-contract/
├── main.py # FastAPI 主应用入口
├── test.py # 测试脚本
├── core/ # 核心业务逻辑
│ ├── cache.py # 缓存管理
│ ├── config.py # 配置管理
│ ├── memory.py # 记忆/状态管理
│ ├── tool.py # 工具基类
│ └── tools/ # 具体工具实现
│ ├── segment_summary.py # 分段事实提取
│ ├── segment_review.py # 分段规则审查
│ ├── segment_rule_router.py # 规则路由
│ ├── retrieve_reference.py # 参考检索
│ ├── reflect_retry.py # 反思重试
│ └── segment_merger.py # 结果合并
├── data/ # 数据文件
│ ├── rules.xlsx # 审查规则表
│ ├── batch/ # 批量处理数据
│ └── benchmark/ # 基准测试数据
├── utils/ # 工具函数
│ ├── common_util.py # 通用工具
│ ├── http_util.py # HTTP 工具
│ └── doc_util.py # 文档工具
├── demo/ # 演示文件
├── tmp/ # 临时文件
└── .vscode/ # VSCode 配置
```
## 🔧 技术栈
- **后端框架**: FastAPI
- **LLM 服务**: Qwen2-72B-Instruct (可配置)
- **文档处理**: 支持 PDF、Word 等多种格式
- **日志**: Loguru
- **数据验证**: Pydantic
## 📦 核心功能
### 1. 分段事实提取 (SegmentSummary)
基于审查规则从合同分段中提取客观事实,确保:
- 事实可在原文中直接找到
- 不做抽象、概括或推断
- 不补充未出现的主体、条件或数值
### 2. 分段规则审查 (SegmentReview)
对提取的事实进行规则匹配和风险分析,输出:
- 风险等级 (H/M/L)
- 审查结论
- 修改建议
### 3. 反思重试 (ReflectRetry)
对审查结果进行自我反思,识别潜在问题并重试
### 4. 结果合并 (SegmentMerger)
合并所有分段的审查结果,生成完整的审查报告
## ⚙️ 配置说明
`core/config.py` 中可配置:
```python
# LLM 配置
LLMConfig:
base_url: "http://192.168.252.71:9002/v1"
model: "Qwen2-72B-Instruct"
# 审查规则集
ALL_RULESET_IDS = ["通用", "借款", "担保", "财务口", "金盘", "金盘简化"]
# 分段大小控制
MAX_SINGLE_CHUNK_SIZE = 5000
```
## 🚀 快速开始
### 1. 安装依赖
```bash
pip install fastapi uvicorn pydantic loguru
```
### 2. 启动服务
```bash
python main.py
```
服务将在 `http://localhost:8000` 启动
### 3. API 端点
- `POST /sleep` - 测试端点
- `POST /document/parse` - 解析合同文档
- `POST /contract/review` - 执行合同审查
- `GET /contract/{conversation_id}/result` - 获取审查结果
## 📝 使用示例
### 提交合同审查请求
```python
import requests
# 上传合同文档
response = requests.post(
"http://localhost:8000/document/parse",
json={
"conversation_id": "unique-conversation-id",
"file_url": "http://example.com/contract.pdf",
"ruleset_id": "通用"
}
)
# 获取审查结果
result = requests.get(
f"http://localhost:8000/contract/{response.json()['conversation_id']}/result"
)
```
## 🔐 安全说明
- API Key 配置在 `core/config.py`
- 支持内外网环境切换 (`use_lufa` 参数)
- 临时文件自动清理
## 📊 数据格式
### 审查结果结构
```json
{
"conversation_id": "xxx",
"findings": [
{
"segment_id": "seg_001",
"rule_id": "rule_001",
"risk_level": "H",
"fact": "提取的事实",
"conclusion": "审查结论",
"suggestion": "修改建议"
}
]
}
```
## 🛠️ 开发指南
### 添加新的审查规则
1.`data/rules.xlsx` 中添加新规则
2. 更新 `core/config.py` 中的规则集配置
3. 重启服务
### 自定义 LLM 模型
修改 `core/config.py` 中的 `LLMConfig`:
```python
LLMConfig:
base_url: "你的 LLM 服务地址"
model: "你的模型名称"
```
## 📄 许可证
内部使用,保留所有权利。
## 👥 维护者
- 开发团队
## 📞 联系方式
如有问题,请联系项目维护团队。
...@@ -17,464 +17,509 @@ from core.config import META_KEY ...@@ -17,464 +17,509 @@ from core.config import META_KEY
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_ALLOWED_RISK_LEVELS = {"H", "M", "L",""} _ALLOWED_RISK_LEVELS = {"H", "M", "L", ""}
FINDING_KEY_REVIEW = "review" FINDING_KEY_REVIEW = "review"
FINDING_KEY_REFLECT = "reflect" FINDING_KEY_REFLECT = "reflect"
FINDING_KEY_MERGE = "merge" FINDING_KEY_MERGE = "merge"
_DEFAULT_REVIEW_KEY = FINDING_KEY_REVIEW _DEFAULT_REVIEW_KEY = FINDING_KEY_REVIEW
_FINDING_KEY_SHEET_NAMES = { _FINDING_KEY_SHEET_NAMES = {
FINDING_KEY_REVIEW: "审查结果", FINDING_KEY_REVIEW: "审查结果",
FINDING_KEY_REFLECT: "复核结果", FINDING_KEY_REFLECT: "复核结果",
FINDING_KEY_MERGE: "合并结果", FINDING_KEY_MERGE: "合并结果",
} }
@dataclass @dataclass
class Finding: class Finding:
rule_title: str rule_title: str
segment_id: int segment_id: int
original_text: str original_text: str
issue: str issue: str
risk_level: str risk_level: str
suggestion: str suggestion: str
id: str = "" id: str = ""
result: str = "" result: str = ""
def __post_init__(self) -> None: def __post_init__(self) -> None:
level = (self.risk_level or "").upper() level = (self.risk_level or "").upper()
if level not in _ALLOWED_RISK_LEVELS: if level not in _ALLOWED_RISK_LEVELS:
raise ValueError(f"risk_level must be one of {_ALLOWED_RISK_LEVELS}, got {self.risk_level}") raise ValueError(
self.risk_level = level f"risk_level must be one of {_ALLOWED_RISK_LEVELS}, got {self.risk_level}"
)
@classmethod self.risk_level = level
def from_dict(cls, data: Dict) -> "Finding":
data = data or {} @classmethod
return cls( def from_dict(cls, data: Dict) -> "Finding":
id=str(data.get("id", "")), data = data or {}
rule_title=str(data.get("rule_title", "")), return cls(
segment_id=int(data.get("segment_id", 0) or 0), id=str(data.get("id", "")),
original_text=str(data.get("original_text", "")), rule_title=str(data.get("rule_title", "")),
issue=str(data.get("issue", "")), segment_id=int(data.get("segment_id", 0) or 0),
risk_level=str(data.get("risk_level", "")), original_text=str(data.get("original_text", "")),
suggestion=str(data.get("suggestion", "")), issue=str(data.get("issue", "")),
result=str(data.get("result", "")), risk_level=str(data.get("risk_level", "")),
) suggestion=str(data.get("suggestion", "")),
result=str(data.get("result", "")),
def __repr__(self): )
return (
f"Finding(id={self.id!r}, rule_title={self.rule_title!r}, segment_id={self.segment_id}, " def __repr__(self):
f"issue={self.issue!r}, risk_level={self.risk_level!r}, result={self.result!r})" return (
) f"Finding(id={self.id!r}, rule_title={self.rule_title!r}, segment_id={self.segment_id}, "
f"issue={self.issue!r}, risk_level={self.risk_level!r}, result={self.result!r})"
)
@dataclass @dataclass
class MemoryStore: class MemoryStore:
"""简化的记忆存储:合同事实 facts 与问题 findings。线程安全并支持 JSON 持久化。""" """简化的记忆存储:合同事实 facts 与问题 findings。线程安全并支持 JSON 持久化。"""
storage_name: Optional[Path] = 'default.json' storage_name: Optional[Path] = "default.json"
def __init__(self, storage_name: str = "default.json") -> None:
def __init__(self,storage_name:str = 'default.json') -> None: self._storage_path = Path(__file__).resolve().parent.parent / "tmp" / storage_name # type: ignore[arg-type]
self._storage_path = Path(__file__).resolve().parent.parent / "tmp" / storage_name # type: ignore[arg-type] self._storage_path.parent.mkdir(parents=True, exist_ok=True)
self._storage_path.parent.mkdir(parents=True, exist_ok=True) self._lock = RLock()
self._lock = RLock() self.facts: List[Dict[str, Any]] = []
self.facts: List[Dict[str, Any]] = [] self.findings: Dict[str, List[Finding]] = {}
self.findings: Dict[str, List[Finding]] = {} self._load()
self._load()
# ---------------------- facts ----------------------
# ---------------------- facts ---------------------- def set_facts(self, facts: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
def set_facts(self, facts: List[Dict[str, Any]]) -> List[Dict[str, Any]]: with self._lock:
with self._lock: self.facts = facts or []
self.facts = facts or [] self._persist()
self._persist() return self.facts
return self.facts
def add_facts(self, partial: Dict[str, Any]) -> List[Dict[str, Any]]:
def add_facts(self, partial: Dict[str, Any]) -> List[Dict[str, Any]]: with self._lock:
with self._lock: self.facts.append(partial)
self.facts.append(partial) self._persist()
self._persist() return self.facts
return self.facts
def get_facts(self) -> List[Dict[str, Any]]:
def get_facts(self) -> List[Dict[str, Any]]: with self._lock:
with self._lock: return self.facts # deep copy
return self.facts # deep copy
def search_facts(self, keywords: List[str]) -> List[Any]:
def search_facts(self, keywords: List[str]) -> List[Any]: keys = [str(k).strip().lower() for k in (keywords or []) if str(k).strip()]
keys = [str(k).strip().lower() for k in (keywords or []) if str(k).strip()] if not keys:
if not keys: return []
return []
def _key_match(name: Any) -> bool:
def _key_match(name: Any) -> bool: key_name = str(name).strip().lower()
key_name = str(name).strip().lower() return bool(key_name) and any(k in key_name or key_name in k for k in keys)
return bool(key_name) and any(k in key_name or key_name in k for k in keys)
matched_values: List[Any] = []
matched_values: List[Any] = [] with self._lock:
with self._lock: all_facts = list(self.facts)
all_facts = list(self.facts)
for item in all_facts:
for item in all_facts: if not isinstance(item, dict):
if not isinstance(item, dict): continue
continue
for top_key, top_value in item.items():
for top_key, top_value in item.items(): if _key_match(top_key):
if _key_match(top_key): matched_values.append(
matched_values.append({ {
top_key: top_value, top_key: top_value,
META_KEY: item.get(META_KEY, {}) # include metadata if exists META_KEY: item.get(
}) META_KEY, {}
), # include metadata if exists
}
)
return matched_values
return matched_values
# -------------------- findings ---------------------
def add_finding(self, key: str, finding: Finding) -> Finding: # -------------------- findings ---------------------
return self._add_finding(key, finding) def add_finding(self, key: str, finding: Finding) -> Finding:
return self._add_finding(key, finding)
def list_findings(self, key: str) -> List[Finding]:
return self._list_findings(self._get_findings_bucket(key)) def list_findings(self, key: str) -> List[Finding]:
return self._list_findings(self._get_findings_bucket(key))
def get_findings_by_segment(self, key: str, segment_id: int) -> List[Finding]:
return self._get_findings_by_segment(self._get_findings_bucket(key), segment_id) def get_findings_by_segment(self, key: str, segment_id: int) -> List[Finding]:
return self._get_findings_by_segment(self._get_findings_bucket(key), segment_id)
def delete_findings_by_segment(self, key: str, segment_id: int) -> int:
return self._delete_findings_by_segment(key, segment_id) def delete_findings_by_segment(self, key: str, segment_id: int) -> int:
return self._delete_findings_by_segment(key, segment_id)
def search_findings(self, key: str, keyword: str, rule_title: Optional[str] = None, risk_level: Optional[str] = None) -> List[Finding]:
return self._search_findings(self._get_findings_bucket(key), keyword, rule_title, risk_level) def search_findings(
self,
def list_findings_grouped(self) -> Dict[str, List[Finding]]: key: str,
with self._lock: keyword: str,
return {k: list(v) for k, v in self.findings.items()} rule_title: Optional[str] = None,
risk_level: Optional[str] = None,
def _add_finding(self, key: str, finding: Finding) -> Finding: ) -> List[Finding]:
with self._lock: return self._search_findings(
finding_key = self._normalize_finding_key(key) self._get_findings_bucket(key), keyword, rule_title, risk_level
if not finding.id:
finding.id = uuid4().hex
bucket = self.findings.setdefault(finding_key, [])
bucket.append(finding)
self._persist()
return finding
def _get_findings_bucket(self, key: str) -> List[Finding]:
finding_key = self._normalize_finding_key(key)
return self.findings.setdefault(finding_key, [])
def _list_findings(self, target: List[Finding]) -> List[Finding]:
with self._lock:
return list(target)
def _get_findings_by_segment(self, target: List[Finding], segment_id: int) -> List[Finding]:
with self._lock:
return [f for f in target if f.segment_id == segment_id]
def _delete_findings_by_segment(self, key: str, segment_id: int) -> int:
with self._lock:
current = self._get_findings_bucket(key)
before = len(current)
updated = [f for f in current if f.segment_id != segment_id]
self.findings[key] = updated
removed = before - len(updated)
if removed:
self._persist()
return removed
def _search_findings(
self,
target: List[Finding],
keyword: str,
rule_title: Optional[str] = None,
risk_level: Optional[str] = None,
) -> List[Finding]:
key = (keyword or "").strip().lower()
with self._lock:
candidates = list(target)
if rule_title:
candidates = [f for f in candidates if (f.rule_title or "").lower() == rule_title.strip().lower()]
if risk_level:
lvl = risk_level.strip().upper()
candidates = [f for f in candidates if f.risk_level == lvl]
if not key:
return candidates
def _matches(f: Finding) -> bool:
hay = " ".join([
f.rule_title,
f.original_text,
f.issue,
f.suggestion,
f.result,
]).lower()
return key in hay
return [f for f in candidates if _matches(f)]
# ------------------- housekeeping ------------------
def clear(self) -> None:
with self._lock:
self.facts.clear()
self.findings.clear()
self._persist()
def _persist(self) -> None:
payload = {
"facts": self.facts,
"findings": {
key: [asdict(f) for f in values]
for key, values in self.findings.items()
},
}
try:
self._storage_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception as exc:
logger.error("Failed to persist memory store: %s", exc)
def _load(self) -> None:
try:
if not self._storage_path.exists():
return
raw = self._storage_path.read_text(encoding="utf-8")
data = json.loads(raw or "{}")
if isinstance(data, dict):
self.facts = data.get("facts") or []
loaded_findings = data.get("findings", {})
findings_map: Dict[str, List[Finding]] = {}
if isinstance(loaded_findings, dict):
for key, items in loaded_findings.items():
normalized_key = self._normalize_finding_key(str(key))
findings_map[normalized_key] = [Finding.from_dict(item) for item in (items or [])]
self.findings = findings_map
needs_persist = False
for bucket in self.findings.values():
for finding in bucket:
if not finding.id:
finding.id = uuid4().hex
needs_persist = True
if needs_persist:
self._persist()
except Exception as exc:
logger.error("Failed to load memory store: %s", exc)
def export_to_excel(self, file_name: Optional[str] = None) -> Dict[str, Any]:
"""Export findings and facts to Excel, upload, then delete the local file."""
try:
from openpyxl import Workbook # type: ignore
except ImportError as exc:
raise ImportError("openpyxl is required for export_to_excel; install via 'pip install openpyxl'") from exc
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
name = file_name or f"memory_export_{ts}.xlsx"
output_path = Path(__file__).resolve().parent.parent / "tmp" / name
with self._lock:
wb = Workbook()
finding_headers = [
("id", "ID"),
("rule_title", "规则标题"),
("segment_id", "分段ID"),
("original_text", "原文"),
("issue", "问题描述"),
("risk_level", "风险等级"),
("result", "合格性"),
("suggestion", "建议"),
]
grouped_items = list(self.findings.items())
if grouped_items:
first_key, first_values = grouped_items[0]
ws_first = wb.active
first_sheet_name = _FINDING_KEY_SHEET_NAMES.get(self._normalize_finding_key(first_key), first_key)
ws_first.title = self._safe_sheet_name(first_sheet_name)
ws_first.append([label for _, label in finding_headers])
for f in first_values:
ws_first.append([getattr(f, key, "") for key, _ in finding_headers])
for key, values in grouped_items[1:]:
sheet_name = _FINDING_KEY_SHEET_NAMES.get(self._normalize_finding_key(key), key)
ws = wb.create_sheet(self._safe_sheet_name(sheet_name))
ws.append([label for _, label in finding_headers])
for f in values:
ws.append([getattr(f, item_key, "") for item_key, _ in finding_headers])
else:
ws_empty = wb.active
ws_empty.title = self._safe_sheet_name(_FINDING_KEY_SHEET_NAMES.get(_DEFAULT_REVIEW_KEY, _DEFAULT_REVIEW_KEY))
ws_empty.append([label for _, label in finding_headers])
ws_facts = wb.create_sheet("合同事实")
if self.facts:
ws_facts.append(["元信息", "事实内容"])
for item in self.facts:
if not isinstance(item, dict):
ws_facts.append(["事实", json.dumps(item, ensure_ascii=False)])
continue
meta_info = item.get(META_KEY, None)
ws_facts.append([json.dumps(meta_info, ensure_ascii=False), json.dumps(item, ensure_ascii=False)])
else:
ws_facts.append(["元信息", "事实内容"])
wb.save(output_path)
try:
res = upload_file(str(output_path))
finally:
try:
output_path.unlink()
except Exception:
logger.warning("Failed to delete temp excel: %s", output_path)
return res
def export_findings_to_doc_comments(
self,
doc_obj: DocBase,
file_name: Optional[str] = None,
remove_prefix: bool = False,
finding_key: str = _DEFAULT_REVIEW_KEY,
) -> Dict[str, Any]:
"""Add all findings as comments to a document, upload, then delete the local file."""
if doc_obj is None:
raise ValueError("doc_obj is required")
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
doc_name = getattr(doc_obj, "_doc_name", "") or ""
suffix = Path(doc_name).suffix or ".docx"
name = file_name or f"findings_{ts}{suffix}"
if not Path(name).suffix:
name = f"{name}{suffix}"
output_path = Path(__file__).resolve().parent.parent / "tmp" / name
target_key = self._normalize_finding_key(finding_key)
with self._lock:
target_findings = list(self._get_findings_bucket(target_key))
comments: List[Dict[str, Any]] = []
for idx, f in enumerate(target_findings, start=1):
segment_id = int(f.segment_id or 0)
chunk_id = max(segment_id, 0)
suggest_parts = []
if f.risk_level:
suggest_parts.append(f"风险等级:{f.risk_level}")
if f.issue:
suggest_parts.append(f"问题:{f.issue}")
if f.suggestion:
suggest_parts.append(f"建议:{f.suggestion}")
suggest_text = "\n".join(suggest_parts).strip()
comments.append(
{
"id": str(idx),
"key_points": f.rule_title or "风险提示",
"original_text": f.original_text or "",
"details": f.issue or "",
"chunk_id": chunk_id,
"result": f.result or "不合格",
"suggest": suggest_text,
}
) )
if comments: def list_findings_grouped(self) -> Dict[str, List[Finding]]:
doc_obj.add_chunk_comment(0, comments) with self._lock:
return {k: list(v) for k, v in self.findings.items()}
doc_obj.to_file(str(output_path), remove_prefix=remove_prefix)
try: def _add_finding(self, key: str, finding: Finding) -> Finding:
res = upload_file(str(output_path)) with self._lock:
finally: finding_key = self._normalize_finding_key(key)
try: if not finding.id:
output_path.unlink() finding.id = uuid4().hex
except Exception: bucket = self.findings.setdefault(finding_key, [])
logger.warning("Failed to delete temp doc: %s", output_path) bucket.append(finding)
self._persist()
return res return finding
@staticmethod def _get_findings_bucket(self, key: str) -> List[Finding]:
def _safe_sheet_name(name: str) -> str: finding_key = self._normalize_finding_key(key)
# Excel sheet names cannot exceed 31 chars or include certain symbols. return self.findings.setdefault(finding_key, [])
safe = (name or _DEFAULT_REVIEW_KEY).strip() or _DEFAULT_REVIEW_KEY
for ch in [":", "\\", "/", "?", "*", "[", "]"]: def _list_findings(self, target: List[Finding]) -> List[Finding]:
safe = safe.replace(ch, "_") with self._lock:
return safe[:31] return list(target)
@staticmethod def _get_findings_by_segment(
def _normalize_finding_key(key: str) -> str: self, target: List[Finding], segment_id: int
normalized = (key or "").strip().lower() ) -> List[Finding]:
if not normalized: with self._lock:
return _DEFAULT_REVIEW_KEY return [f for f in target if f.segment_id == segment_id]
return normalized
def _delete_findings_by_segment(self, key: str, segment_id: int) -> int:
with self._lock:
current = self._get_findings_bucket(key)
before = len(current)
updated = [f for f in current if f.segment_id != segment_id]
self.findings[key] = updated
removed = before - len(updated)
if removed:
self._persist()
return removed
def _search_findings(
self,
target: List[Finding],
keyword: str,
rule_title: Optional[str] = None,
risk_level: Optional[str] = None,
) -> List[Finding]:
key = (keyword or "").strip().lower()
with self._lock:
candidates = list(target)
if rule_title:
candidates = [
f
for f in candidates
if (f.rule_title or "").lower() == rule_title.strip().lower()
]
if risk_level:
lvl = risk_level.strip().upper()
candidates = [f for f in candidates if f.risk_level == lvl]
if not key:
return candidates
def _matches(f: Finding) -> bool:
hay = " ".join(
[
f.rule_title,
f.original_text,
f.issue,
f.suggestion,
f.result,
]
).lower()
return key in hay
return [f for f in candidates if _matches(f)]
# ------------------- housekeeping ------------------
def clear(self) -> None:
with self._lock:
self.facts.clear()
self.findings.clear()
self._persist()
def _persist(self) -> None:
payload = {
"facts": self.facts,
"findings": {
key: [asdict(f) for f in values]
for key, values in self.findings.items()
},
}
try:
self._storage_path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8"
)
except Exception as exc:
logger.error("Failed to persist memory store: %s", exc)
def _load(self) -> None:
try:
if not self._storage_path.exists():
return
raw = self._storage_path.read_text(encoding="utf-8")
data = json.loads(raw or "{}")
if isinstance(data, dict):
self.facts = data.get("facts") or []
loaded_findings = data.get("findings", {})
findings_map: Dict[str, List[Finding]] = {}
if isinstance(loaded_findings, dict):
for key, items in loaded_findings.items():
normalized_key = self._normalize_finding_key(str(key))
findings_map[normalized_key] = [
Finding.from_dict(item) for item in (items or [])
]
self.findings = findings_map
needs_persist = False
for bucket in self.findings.values():
for finding in bucket:
if not finding.id:
finding.id = uuid4().hex
needs_persist = True
if needs_persist:
self._persist()
except Exception as exc:
logger.error("Failed to load memory store: %s", exc)
def export_to_excel(self, file_name: Optional[str] = None) -> Dict[str, Any]:
"""Export findings and facts to Excel, upload, then delete the local file."""
try:
from openpyxl import Workbook # type: ignore
except ImportError as exc:
raise ImportError(
"openpyxl is required for export_to_excel; install via 'pip install openpyxl'"
) from exc
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
name = file_name or f"memory_export_{ts}.xlsx"
output_path = Path(__file__).resolve().parent.parent / "tmp" / name
with self._lock:
wb = Workbook()
finding_headers = [
("id", "ID"),
("rule_title", "规则标题"),
("segment_id", "分段ID"),
("original_text", "原文"),
("issue", "问题描述"),
("risk_level", "风险等级"),
("result", "合格性"),
("suggestion", "建议"),
]
grouped_items = list(self.findings.items())
if grouped_items:
first_key, first_values = grouped_items[0]
ws_first = wb.active
first_sheet_name = _FINDING_KEY_SHEET_NAMES.get(
self._normalize_finding_key(first_key), first_key
)
ws_first.title = self._safe_sheet_name(first_sheet_name)
ws_first.append([label for _, label in finding_headers])
for f in first_values:
ws_first.append([getattr(f, key, "") for key, _ in finding_headers])
for key, values in grouped_items[1:]:
sheet_name = _FINDING_KEY_SHEET_NAMES.get(
self._normalize_finding_key(key), key
)
ws = wb.create_sheet(self._safe_sheet_name(sheet_name))
ws.append([label for _, label in finding_headers])
for f in values:
ws.append(
[
getattr(f, item_key, "")
for item_key, _ in finding_headers
]
)
else:
ws_empty = wb.active
ws_empty.title = self._safe_sheet_name(
_FINDING_KEY_SHEET_NAMES.get(
_DEFAULT_REVIEW_KEY, _DEFAULT_REVIEW_KEY
)
)
ws_empty.append([label for _, label in finding_headers])
ws_facts = wb.create_sheet("合同事实")
if self.facts:
ws_facts.append(["元信息", "事实内容"])
for item in self.facts:
if not isinstance(item, dict):
ws_facts.append(["事实", json.dumps(item, ensure_ascii=False)])
continue
meta_info = item.get(META_KEY, None)
ws_facts.append(
[
json.dumps(meta_info, ensure_ascii=False),
json.dumps(item, ensure_ascii=False),
]
)
else:
ws_facts.append(["元信息", "事实内容"])
wb.save(output_path)
try:
res = upload_file(str(output_path))
finally:
try:
output_path.unlink()
except Exception:
logger.warning("Failed to delete temp excel: %s", output_path)
return res
def export_findings_to_doc_comments(
self,
doc_obj: DocBase,
file_name: Optional[str] = None,
remove_prefix: bool = False,
finding_key: str = _DEFAULT_REVIEW_KEY,
) -> Dict[str, Any]:
"""Add all findings as comments to a document, upload, then delete the local file."""
if doc_obj is None:
raise ValueError("doc_obj is required")
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
doc_name = getattr(doc_obj, "_doc_name", "") or ""
suffix = Path(doc_name).suffix or ".docx"
name = file_name or f"findings_{ts}{suffix}"
if not Path(name).suffix:
name = f"{name}{suffix}"
output_path = Path(__file__).resolve().parent.parent / "tmp" / name
target_key = self._normalize_finding_key(finding_key)
with self._lock:
target_findings = list(self._get_findings_bucket(target_key))
comments: List[Dict[str, Any]] = []
for idx, f in enumerate(target_findings, start=1):
segment_id = int(f.segment_id or 0)
chunk_id = max(segment_id, 0)
suggest_parts = []
if f.risk_level:
suggest_parts.append(f"风险等级:{f.risk_level}")
if f.issue:
suggest_parts.append(f"问题:{f.issue}")
if f.suggestion:
suggest_parts.append(f"建议:{f.suggestion}")
suggest_text = "\n".join(suggest_parts).strip()
comments.append(
{
"id": str(idx),
"key_points": f.rule_title or "风险提示",
"original_text": f.original_text or "",
"details": f.issue or "",
"chunk_id": chunk_id,
"result": f.result or "不合格",
"suggest": suggest_text,
}
)
if comments:
doc_obj.add_chunk_comment(0, comments)
doc_obj.to_file(str(output_path), remove_prefix=remove_prefix)
try:
res = upload_file(str(output_path))
finally:
try:
output_path.unlink()
except Exception:
logger.warning("Failed to delete temp doc: %s", output_path)
return res
@staticmethod
def _safe_sheet_name(name: str) -> str:
# Excel sheet names cannot exceed 31 chars or include certain symbols.
safe = (name or _DEFAULT_REVIEW_KEY).strip() or _DEFAULT_REVIEW_KEY
for ch in [":", "\\", "/", "?", "*", "[", "]"]:
safe = safe.replace(ch, "_")
return safe[:31]
@staticmethod
def _normalize_finding_key(key: str) -> str:
normalized = (key or "").strip().lower()
if not normalized:
return _DEFAULT_REVIEW_KEY
return normalized
def test_export_findings_to_doc_comments(doc_path: str) -> None: def test_export_findings_to_doc_comments(doc_path: str) -> None:
store = MemoryStore() store = MemoryStore()
finding = Finding( finding = Finding(
rule_title="违约责任", rule_title="违约责任",
segment_id=1, segment_id=1,
original_text="湖南麓谷发展集团有限公司", original_text="湖南麓谷发展集团有限公司",
issue="未约定违约金上限,可能导致赔偿范围过大", issue="未约定违约金上限,可能导致赔偿范围过大",
risk_level="H", risk_level="H",
suggestion="建议增加‘赔偿金额不超过合同总额的30%’", suggestion="建议增加‘赔偿金额不超过合同总额的30%’",
result="不合格", result="不合格",
) )
store.add_finding(FINDING_KEY_REFLECT, finding) store.add_finding(FINDING_KEY_REFLECT, finding)
"""测试:将 findings 作为批注写入文档并上传。""" """测试:将 findings 作为批注写入文档并上传。"""
if not doc_path: if not doc_path:
print("doc_path 为空,跳过批注导出测试") print("doc_path 为空,跳过批注导出测试")
return return
if not Path(doc_path).exists(): if not Path(doc_path).exists():
print(f"文件不存在,跳过批注导出测试: {doc_path}") print(f"文件不存在,跳过批注导出测试: {doc_path}")
return return
try: try:
from utils.spire_word_util import SpireWordDoc from utils.spire_word_util import SpireWordDoc
except Exception as exc: except Exception as exc:
print(f"加载 SpireWordDoc 失败,跳过批注导出测试: {exc}") print(f"加载 SpireWordDoc 失败,跳过批注导出测试: {exc}")
return return
doc = SpireWordDoc() doc = SpireWordDoc()
doc.load(doc_path) doc.load(doc_path)
res = store.export_findings_to_doc_comments(doc) res = store.export_findings_to_doc_comments(doc)
print("Export doc comments:") print("Export doc comments:")
print(json.dumps(res, ensure_ascii=False, indent=2)) print(json.dumps(res, ensure_ascii=False, indent=2))
def test_memory_and_export_excel(): def test_memory_and_export_excel():
# 简单示例:设置事实 -> 写入问题 -> 读取/搜索 # 简单示例:设置事实 -> 写入问题 -> 读取/搜索
store = MemoryStore() store = MemoryStore()
store.add_facts({ store.add_facts(
"公司": {"甲方": "A 公司", "乙方": "B 公司"}, {
"支付": {"方式": "银行转账", "期限": "验收后30日内"}, "公司": {"甲方": "A 公司", "乙方": "B 公司"},
META_KEY:{ "支付": {"方式": "银行转账", "期限": "验收后30日内"},
"segment_id":1 META_KEY: {"segment_id": 1},
} }
}) )
# print( store.search_facts(['支付'])) # print( store.search_facts(['支付']))
finding1 = Finding( finding1 = Finding(
rule_title="违约责任", rule_title="违约责任",
segment_id=1, segment_id=1,
original_text="违约方应赔偿全部损失", original_text="违约方应赔偿全部损失",
issue="未约定违约金上限,可能导致赔偿范围过大", issue="未约定违约金上限,可能导致赔偿范围过大",
risk_level="H", risk_level="H",
suggestion="建议增加‘赔偿金额不超过合同总额的30%’", suggestion="建议增加‘赔偿金额不超过合同总额的30%’",
) )
finding2 = Finding( finding2 = Finding(
rule_title="违约责任", rule_title="违约责任",
segment_id=2, segment_id=2,
original_text="违约方应赔偿全部损失", original_text="违约方应赔偿全部损失",
issue="未约定违约金上限,可能导致赔偿范围过大", issue="未约定违约金上限,可能导致赔偿范围过大",
risk_level="H", risk_level="H",
suggestion="建议增加‘赔偿金额不超过合同总额的30%’", suggestion="建议增加‘赔偿金额不超过合同总额的30%’",
) )
store.add_finding(FINDING_KEY_REVIEW, finding1) store.add_finding(FINDING_KEY_REVIEW, finding1)
store.add_finding(FINDING_KEY_REFLECT, finding2) store.add_finding(FINDING_KEY_REFLECT, finding2)
print(store.get_findings_by_segment(FINDING_KEY_REVIEW, 1)) print(store.get_findings_by_segment(FINDING_KEY_REVIEW, 1))
# print("Facts:\n" + json.dumps(store.get_facts(), ensure_ascii=False, indent=2))
# hits = store.search_findings("赔偿", rule_title="违约责任")
# print("Findings search:")
# for f in hits:
# print(json.dumps(asdict(f), ensure_ascii=False, indent=2))
print(store.export_to_excel())
# print("Facts:\n" + json.dumps(store.get_facts(), ensure_ascii=False, indent=2))
# hits = store.search_findings("赔偿", rule_title="违约责任")
# print("Findings search:")
# for f in hits:
# print(json.dumps(asdict(f), ensure_ascii=False, indent=2))
print(store.export_to_excel())
if __name__ == "__main__":
# test_export_findings_to_doc_comments("/home/ccran/lufa-contract/tmp/股份转让协议.docx")
test_memory_and_export_excel()
if __name__ == "__main__":
# test_export_findings_to_doc_comments("/home/ccran/lufa-contract/tmp/股份转让协议.docx")
test_memory_and_export_excel()
...@@ -9,7 +9,6 @@ from core.tool import ToolBase, tool, tool_func ...@@ -9,7 +9,6 @@ from core.tool import ToolBase, tool, tool_func
from utils.excel_util import ExcelUtil from utils.excel_util import ExcelUtil
@tool("retrieve_reference", "审查参考检索") @tool("retrieve_reference", "审查参考检索")
class RetrieveReferenceTool(ToolBase): class RetrieveReferenceTool(ToolBase):
def __init__(self) -> None: def __init__(self) -> None:
...@@ -22,12 +21,16 @@ class RetrieveReferenceTool(ToolBase): ...@@ -22,12 +21,16 @@ class RetrieveReferenceTool(ToolBase):
"triggers": "触发词", "triggers": "触发词",
"suggestion_template": "建议模板", "suggestion_template": "建议模板",
"case": "案例", "case": "案例",
"summary":"摘要项" "summary": "摘要项",
} }
rules_path = Path(__file__).resolve().parent.parent.parent / "data" / "rules.xlsx" rules_path = (
Path(__file__).resolve().parent.parent.parent / "data" / "rules.xlsx"
)
self.rulesets: Dict[str, List[Dict[str, Any]]] = {} self.rulesets: Dict[str, List[Dict[str, Any]]] = {}
for rs_id in ALL_RULESET_IDS: for rs_id in ALL_RULESET_IDS:
rules = ExcelUtil.load_mapped_excel(rules_path, sheet_name=rs_id, column_map=self.column_map) rules = ExcelUtil.load_mapped_excel(
rules_path, sheet_name=rs_id, column_map=self.column_map
)
self.rulesets[rs_id] = rules self.rulesets[rs_id] = rules
@tool_func( @tool_func(
...@@ -40,13 +43,21 @@ class RetrieveReferenceTool(ToolBase): ...@@ -40,13 +43,21 @@ class RetrieveReferenceTool(ToolBase):
"required": [], "required": [],
} }
) )
def run(self, ruleset_id: str = "", routed_rule_titles: List[str] | None = None) -> Dict[str, Any]: def run(
self, ruleset_id: str = "", routed_rule_titles: List[str] | None = None
) -> Dict[str, Any]:
target_ruleset_id = ruleset_id or self.default_ruleset_id target_ruleset_id = ruleset_id or self.default_ruleset_id
full_rules = self.rulesets.get(target_ruleset_id) or self.rulesets.get(self.default_ruleset_id, []) or [] full_rules = (
self.rulesets.get(target_ruleset_id)
or self.rulesets.get(self.default_ruleset_id, [])
or []
)
if routed_rule_titles is None: if routed_rule_titles is None:
rules = full_rules rules = full_rules
else: else:
title_set = {title for title in routed_rule_titles if isinstance(title, str)} title_set = {
title for title in routed_rule_titles if isinstance(title, str)
}
rules = [r for r in full_rules if r.get("title") in title_set] rules = [r for r in full_rules if r.get("title") in title_set]
return { return {
...@@ -59,6 +70,7 @@ class RetrieveReferenceTool(ToolBase): ...@@ -59,6 +70,7 @@ class RetrieveReferenceTool(ToolBase):
def summary_keywords(self, rules: List[Dict[str, Any]]) -> List[str]: def summary_keywords(self, rules: List[Dict[str, Any]]) -> List[str]:
return [r.get("summary", "") for r in rules if r.get("summary")] return [r.get("summary", "") for r in rules if r.get("summary")]
if __name__ == "__main__": if __name__ == "__main__":
tool = RetrieveReferenceTool() tool = RetrieveReferenceTool()
result = tool.run(ruleset_id="金盘", routed_rule_titles=None) result = tool.run(ruleset_id="金盘", routed_rule_titles=None)
...@@ -66,4 +78,4 @@ if __name__ == "__main__": ...@@ -66,4 +78,4 @@ if __name__ == "__main__":
print(f"Rule Title: {rule.get('title')}") print(f"Rule Title: {rule.get('title')}")
print(f"Case: {rule.get('case')}") print(f"Case: {rule.get('case')}")
print("-" * 20) print("-" * 20)
# print(result.get("total", 0)) # print(result.get("total", 0))
\ No newline at end of file
...@@ -3,7 +3,7 @@ import os ...@@ -3,7 +3,7 @@ import os
import re import re
import sys import sys
sys.path.append('../..') sys.path.append("../..")
import traceback import traceback
import concurrent.futures import concurrent.futures
...@@ -12,21 +12,21 @@ from loguru import logger ...@@ -12,21 +12,21 @@ from loguru import logger
from utils.common_util import random_str from utils.common_util import random_str
from utils.http_util import upload_file, fastgpt_openai_chat, download_file from utils.http_util import upload_file, fastgpt_openai_chat, download_file
# SUFFIX='_麓发迁移' SUFFIX = "_麓发迁移"
# batch_input_dir_path = 'jp-input' batch_input_dir_path = "jp-input"
# batch_output_dir_path = 'jp-output-lufa-new' batch_output_dir_path = "jp-output-lufa-new"
SUFFIX='_麓发' # SUFFIX = "_麓发"
batch_input_dir_path = 'lufa-input' # batch_input_dir_path = "lufa-input"
batch_output_dir_path = 'lufa-output' # batch_output_dir_path = "lufa-output"
batch_size = 5 batch_size = 5
# 麓发fastgpt接口 # 麓发fastgpt接口
url = 'http://192.168.252.71:18089/api/v1/chat/completions' # url = "http://192.168.252.71:18089/api/v1/chat/completions"
# 金盘fastgpt接口 # 金盘fastgpt接口
# url = 'http://192.168.252.71:18088/api/v1/chat/completions' url = "http://192.168.252.71:18088/api/v1/chat/completions"
# 麓发合同审查生产token # 麓发合同审查生产token
token = 'fastgpt-ek3Z6PxI6sXgYc0jxzZ5bVGqrxwM6aVyfSmA6JVErJYBMr2KmYxrHwEUOIMSYz' # token = "fastgpt-ek3Z6PxI6sXgYc0jxzZ5bVGqrxwM6aVyfSmA6JVErJYBMr2KmYxrHwEUOIMSYz"
# 金盘迁移麓发合同审查测试token # 金盘迁移麓发合同审查测试token
# token = 'fastgpt-vykT6qs07g7hR4tL2MNJE6DdNCIxaQjEu3Cxw9nuTBFg8MAG3CkByvnXKxSNEyMK7' token = "fastgpt-vykT6qs07g7hR4tL2MNJE6DdNCIxaQjEu3Cxw9nuTBFg8MAG3CkByvnXKxSNEyMK7"
# 人机交互测试(测试环境) # 人机交互测试(测试环境)
# token = 'fastgpt-p189K5zoTX5wjp0dBybFCwsbWm3juIwlJxt2wTGyiaOWOANI5Y10pKEZzyt' # token = 'fastgpt-p189K5zoTX5wjp0dBybFCwsbWm3juIwlJxt2wTGyiaOWOANI5Y10pKEZzyt'
# 人机交互测试(生产环境) # 人机交互测试(生产环境)
...@@ -34,9 +34,13 @@ token = 'fastgpt-ek3Z6PxI6sXgYc0jxzZ5bVGqrxwM6aVyfSmA6JVErJYBMr2KmYxrHwEUOIMSYz' ...@@ -34,9 +34,13 @@ token = 'fastgpt-ek3Z6PxI6sXgYc0jxzZ5bVGqrxwM6aVyfSmA6JVErJYBMr2KmYxrHwEUOIMSYz'
# 提取后审查测试 # 提取后审查测试
# token = 'fastgpt-n74gGX5ZqLT6o1ysMBSGUTjIciswYOWDRfQ75krMkE5gDVDkpzsbz8u' # token = 'fastgpt-n74gGX5ZqLT6o1ysMBSGUTjIciswYOWDRfQ75krMkE5gDVDkpzsbz8u'
def extract_url(text): def extract_url(text):
# \s * ([ ^ "\s]+?\.(?:docx?|pdf|xlsx)) # \s * ([ ^ "\s]+?\.(?:docx?|pdf|xlsx))
excel_p, doc_p = r'最终审查Excel\s*([^"]*xlsx)', r'最终审查批注\s*([^\" ]+?\.(?:docx?|pdf|wps))' excel_p, doc_p = (
r'最终审查Excel\s*([^"]*xlsx)',
r"最终审查批注\s*([^\" ]+?\.(?:docx?|pdf|wps))",
)
# 使用 re.search() 查找第一个匹配项 # 使用 re.search() 查找第一个匹配项
excel_m, doc_m = re.search(excel_p, text), re.search(doc_p, text) excel_m, doc_m = re.search(excel_p, text), re.search(doc_p, text)
if excel_m and doc_m: if excel_m and doc_m:
...@@ -46,7 +50,9 @@ def extract_url(text): ...@@ -46,7 +50,9 @@ def extract_url(text):
return None, None return None, None
def process_single_file(file, batch_input_dir_path, batch_output_dir_path, counter, start_file): def process_single_file(
file, batch_input_dir_path, batch_output_dir_path, counter, start_file
):
""" """
单文件处理逻辑,可被线程池并发调用 单文件处理逻辑,可被线程池并发调用
""" """
...@@ -55,29 +61,45 @@ def process_single_file(file, batch_input_dir_path, batch_output_dir_path, count ...@@ -55,29 +61,45 @@ def process_single_file(file, batch_input_dir_path, batch_output_dir_path, count
return return
# 提取文件前缀 # 提取文件前缀
file_name = file[:file.rfind('.')] file_name = file[: file.rfind(".")]
ext_name = file[file.rfind('.'):] ext_name = file[file.rfind(".") :]
# 源目标处理 # 源目标处理
original_file = f'{batch_input_dir_path}/{file}' original_file = f"{batch_input_dir_path}/{file}"
des_check_file = f'{batch_output_dir_path}/{file_name}.md' des_check_file = f"{batch_output_dir_path}/{file_name}.md"
des_excel_file = f'{batch_output_dir_path}/{file_name}{SUFFIX}.xlsx' des_excel_file = f"{batch_output_dir_path}/{file_name}{SUFFIX}.xlsx"
des_doc_file = f'{batch_output_dir_path}/{file_name}{SUFFIX}{ext_name}' des_doc_file = f"{batch_output_dir_path}/{file_name}{SUFFIX}{ext_name}"
try: try:
# 处理原文件 # 处理原文件
file_url = upload_file(original_file, input_url_to_inner=True).replace('218.77.58.8', '192.168.252.71') file_url = upload_file(original_file, input_url_to_inner=True).replace(
model = 'Qwen2-72B-Instruct' "218.77.58.8", "192.168.252.71"
)
model = "Qwen2-72B-Instruct"
# 合同审核Excel工作流处理 # 合同审核Excel工作流处理
logger.info(' 第{}个文件,处理文件: {}'.format(counter, original_file)) logger.info(" 第{}个文件,处理文件: {}".format(counter, original_file))
result = fastgpt_openai_chat(url, token, model, random_str(), file_url, f'测试批处理任务-{file_name}', False) result = fastgpt_openai_chat(
url,
token,
model,
random_str(),
file_url,
f"测试批处理任务-{file_name}",
False,
)
excel_url, doc_url = extract_url(result) excel_url, doc_url = extract_url(result)
if excel_url and doc_url: if excel_url and doc_url:
download_file(excel_url.replace('218.77.58.8', '192.168.252.71'), des_excel_file) download_file(
download_file(doc_url.replace('218.77.58.8', '192.168.252.71'), des_doc_file) excel_url.replace("218.77.58.8", "192.168.252.71"), des_excel_file
logger.info(f'第{counter}个文件下载:{excel_url}到{des_excel_file} {des_doc_file}') )
download_file(
doc_url.replace("218.77.58.8", "192.168.252.71"), des_doc_file
)
logger.info(
f"第{counter}个文件下载:{excel_url}到{des_excel_file} {des_doc_file}"
)
except Exception as e: except Exception as e:
logger.error(f'{original_file} 处理异常 第{counter}个文件: {e}') logger.error(f"{original_file} 处理异常 第{counter}个文件: {e}")
logger.error(traceback.print_exc()) logger.error(traceback.print_exc())
...@@ -103,5 +125,5 @@ def execute_batch(max_workers: int = 4): ...@@ -103,5 +125,5 @@ def execute_batch(max_workers: int = 4):
f.result() f.result()
if __name__ == '__main__': if __name__ == "__main__":
execute_batch(batch_size) execute_batch(batch_size)
\ No newline at end of file
...@@ -6,10 +6,11 @@ from contextlib import redirect_stdout, redirect_stderr ...@@ -6,10 +6,11 @@ from contextlib import redirect_stdout, redirect_stderr
fuzz_score_threshold = 80 fuzz_score_threshold = 80
def _normalize_cell(value: object) -> str: def _normalize_cell(value: object) -> str:
if pd.isna(value): if pd.isna(value):
return "" return ""
return str(value).strip() return str(value).strip()
def _load_rows(path: Path) -> list[tuple[str, str]]: def _load_rows(path: Path) -> list[tuple[str, str]]:
...@@ -27,236 +28,315 @@ def _load_rows(path: Path) -> list[tuple[str, str]]: ...@@ -27,236 +28,315 @@ def _load_rows(path: Path) -> list[tuple[str, str]]:
def _compare_impl(val_dir: Path, answer_dir: Path) -> None: def _compare_impl(val_dir: Path, answer_dir: Path) -> None:
val_dir = val_dir.resolve() val_dir = val_dir.resolve()
answer_dir = answer_dir.resolve() answer_dir = answer_dir.resolve()
overall_val = overall_answer = overall_matched = 0 overall_val = overall_answer = overall_matched = 0
# 累计各“审查项”的全局统计 # 累计各“审查项”的全局统计
overall_item_answer: dict[str, int] = {} overall_item_answer: dict[str, int] = {}
overall_item_matched: dict[str, int] = {} overall_item_matched: dict[str, int] = {}
overall_item_unmatched_answer: dict[str, int] = {} overall_item_unmatched_answer: dict[str, int] = {}
overall_item_unmatched_val: dict[str, int] = {} overall_item_unmatched_val: dict[str, int] = {}
for val_file in sorted(val_dir.glob("*.xlsx")): for val_file in sorted(val_dir.glob("*.xlsx")):
answer_file = answer_dir / val_file.name answer_file = answer_dir / val_file.name
if not answer_file.exists(): if not answer_file.exists():
print(f"Skip {val_file.name}: missing in answer") print(f"Skip {val_file.name}: missing in answer")
continue continue
val_rows = _load_rows(val_file) val_rows = _load_rows(val_file)
answer_rows = _load_rows(answer_file) answer_rows = _load_rows(answer_file)
# Baseline: answer -> match val, consume val to keep 1-1, report leftover answers # Baseline: answer -> match val, consume val to keep 1-1, report leftover answers
answer_counts: dict[str, int] = {} answer_counts: dict[str, int] = {}
for item, _ in answer_rows: for item, _ in answer_rows:
answer_counts[item] = answer_counts.get(item, 0) + 1 answer_counts[item] = answer_counts.get(item, 0) + 1
val_buckets: dict[str, list[str]] = {} val_buckets: dict[str, list[str]] = {}
for item, text in val_rows: for item, text in val_rows:
val_buckets.setdefault(item, []).append(text) val_buckets.setdefault(item, []).append(text)
matched_total = 0 matched_total = 0
matched_by_item: dict[str, list[tuple[str, str, int]]] = {} matched_by_item: dict[str, list[tuple[str, str, int]]] = {}
unmatched_answer_by_item: dict[str, list[str]] = {} unmatched_answer_by_item: dict[str, list[str]] = {}
for item, ans_text in answer_rows: for item, ans_text in answer_rows:
candidates = val_buckets.get(item, []) candidates = val_buckets.get(item, [])
if not candidates: if not candidates:
unmatched_answer_by_item.setdefault(item, []).append(ans_text) unmatched_answer_by_item.setdefault(item, []).append(ans_text)
continue continue
best_idx = -1 best_idx = -1
best_score = -1 best_score = -1
for idx, cand in enumerate(candidates): for idx, cand in enumerate(candidates):
ans_text = ans_text.strip() ans_text = ans_text.strip()
if cand is None or not isinstance(cand,str): if cand is None or not isinstance(cand, str):
continue continue
cand = cand.strip() cand = cand.strip()
score = max( score = max(
fuzz.partial_ratio(ans_text, cand), fuzz.partial_ratio(ans_text, cand),
fuzz.token_set_ratio(ans_text, cand) fuzz.token_set_ratio(ans_text, cand),
) )
if score > best_score: if score > best_score:
best_score = score best_score = score
best_idx = idx best_idx = idx
if best_score >= fuzz_score_threshold: if best_score >= fuzz_score_threshold:
matched_total += 1 matched_total += 1
matched_val = candidates.pop(best_idx) matched_val = candidates.pop(best_idx)
matched_by_item.setdefault(item, []).append((ans_text, matched_val, best_score)) matched_by_item.setdefault(item, []).append(
else: (ans_text, matched_val, best_score)
unmatched_answer_by_item.setdefault(item, []).append(ans_text) )
else:
# remaining vals in buckets are unmatched unmatched_answer_by_item.setdefault(item, []).append(ans_text)
unmatched_val_by_item: dict[str, list[str]] = {
item: texts for item, texts in val_buckets.items() if texts # remaining vals in buckets are unmatched
} unmatched_val_by_item: dict[str, list[str]] = {
item: texts for item, texts in val_buckets.items() if texts
val_total = len(val_rows) }
answer_total = len(answer_rows)
val_total = len(val_rows)
overall_val += val_total answer_total = len(answer_rows)
overall_answer += answer_total
overall_matched += matched_total overall_val += val_total
overall_answer += answer_total
unmatched_val_count = sum(len(v) for v in unmatched_val_by_item.values()) overall_matched += matched_total
unmatched_answer_count = sum(len(v) for v in unmatched_answer_by_item.values())
file_precision = (matched_total / val_total) if val_total != 0 else 0 unmatched_val_count = sum(len(v) for v in unmatched_val_by_item.values())
file_recall = (matched_total / answer_total) if answer_total != 0 else 0 unmatched_answer_count = sum(len(v) for v in unmatched_answer_by_item.values())
file_f1 = (2 * file_precision * file_recall / (file_precision + file_recall)) if (file_precision + file_recall) else 0 file_precision = (matched_total / val_total) if val_total != 0 else 0
file_false_positive_rate = (unmatched_val_count / val_total) if val_total != 0 else 0 file_recall = (matched_total / answer_total) if answer_total != 0 else 0
file_f1 = (
# 累加到各“审查项”的全局统计 (2 * file_precision * file_recall / (file_precision + file_recall))
for it, cnt in answer_counts.items(): if (file_precision + file_recall)
overall_item_answer[it] = overall_item_answer.get(it, 0) + cnt else 0
for it, lst in matched_by_item.items(): )
overall_item_matched[it] = overall_item_matched.get(it, 0) + len(lst) file_false_positive_rate = (
for it, lst in unmatched_answer_by_item.items(): (unmatched_val_count / val_total) if val_total != 0 else 0
overall_item_unmatched_answer[it] = overall_item_unmatched_answer.get(it, 0) + len(lst) )
for it, lst in unmatched_val_by_item.items():
overall_item_unmatched_val[it] = overall_item_unmatched_val.get(it, 0) + len(lst) # 累加到各“审查项”的全局统计
print('#' * 40) for it, cnt in answer_counts.items():
print( overall_item_answer[it] = overall_item_answer.get(it, 0) + cnt
f"{val_file.name}: matched {matched_total} | val {val_total} | answer {answer_total} " for it, lst in matched_by_item.items():
f"| unmatched val {unmatched_val_count} | unmatched answer {unmatched_answer_count} | precision {file_precision:.2%} | recall {file_recall:.2%} | f1 {file_f1:.2%} | false_positive_rate {file_false_positive_rate:.2%}" overall_item_matched[it] = overall_item_matched.get(it, 0) + len(lst)
) for it, lst in unmatched_answer_by_item.items():
import json overall_item_unmatched_answer[it] = overall_item_unmatched_answer.get(
print(f'unmatched_val_by_item: {json.dumps(unmatched_val_by_item, ensure_ascii=False, indent=2)}') it, 0
for item in sorted(answer_counts): ) + len(lst)
item_matches = matched_by_item.get(item, []) for it, lst in unmatched_val_by_item.items():
print(f" 审查项 {item}: matched {len(item_matches)} / {answer_counts[item]}") overall_item_unmatched_val[it] = overall_item_unmatched_val.get(
# 匹配成功的结果 it, 0
# for ans_text, val_text, score in item_matches: ) + len(lst)
# print(f" {score}% | answer: {ans_text} | val: {val_text}") print("#" * 40)
print(
ua = unmatched_answer_by_item.get(item, []) f"{val_file.name}: matched {matched_total} | val {val_total} | answer {answer_total} "
if ua: f"| unmatched val {unmatched_val_count} | unmatched answer {unmatched_answer_count} | precision {file_precision:.2%} | recall {file_recall:.2%} | f1 {file_f1:.2%} | false_positive_rate {file_false_positive_rate:.2%}"
print(f" 未匹配(answer 未被匹配){len(ua)} 条:") )
for t in ua: import json
print(f" answer: {t}")
print(
uv = unmatched_val_by_item.get(item, []) f"unmatched_val_by_item: {json.dumps(unmatched_val_by_item, ensure_ascii=False, indent=2)}"
if uv: )
print(f" 未匹配(val 残留){len(uv)} 条:") for item in sorted(answer_counts):
for t in uv: item_matches = matched_by_item.get(item, [])
print(f" val: {t}") print(
# break # only first file for demo f" 审查项 {item}: matched {len(item_matches)} / {answer_counts[item]}"
precision = overall_matched / overall_val if overall_val else 0 )
recall = overall_matched / overall_answer if overall_answer else 0 # 匹配成功的结果
f1 = (2 * precision * recall / (precision + recall)) if (precision + recall) else 0 # for ans_text, val_text, score in item_matches:
overall_false_positive_rate = (overall_val - overall_matched) / overall_val if overall_val else 0 # print(f" {score}% | answer: {ans_text} | val: {val_text}")
print(
f"Overall: matched {overall_matched} | val {overall_val} | answer {overall_answer} | precision {precision:.2%} | recall {recall:.2%} | f1 {f1:.2%}" ua = unmatched_answer_by_item.get(item, [])
) if ua:
print(f" 未匹配(answer 未被匹配){len(ua)} 条:")
# 按“审查项”的 overall 结果 for t in ua:
if overall_item_answer: print(f" answer: {t}")
print('#' * 40)
print("Overall by item:") uv = unmatched_val_by_item.get(item, [])
all_items = sorted(set(list(overall_item_answer.keys()) + list(overall_item_matched.keys()) + list(overall_item_unmatched_answer.keys()) + list(overall_item_unmatched_val.keys()))) if uv:
rows_by_item = [] print(f" 未匹配(val 残留){len(uv)} 条:")
for it in all_items: for t in uv:
ans = overall_item_answer.get(it, 0) print(f" val: {t}")
mat = overall_item_matched.get(it, 0) # break # only first file for demo
u_ans = overall_item_unmatched_answer.get(it, 0) precision = overall_matched / overall_val if overall_val else 0
u_val = overall_item_unmatched_val.get(it, 0) recall = overall_matched / overall_answer if overall_answer else 0
item_precision = (mat / (mat + u_val)) if (mat + u_val) else 0 f1 = (2 * precision * recall / (precision + recall)) if (precision + recall) else 0
acc = (mat / ans) if ans else 0 overall_false_positive_rate = (
item_f1 = (2 * item_precision * acc / (item_precision + acc)) if (item_precision + acc) else 0 (overall_val - overall_matched) / overall_val if overall_val else 0
item_false_positive_rate = u_val / (mat + u_val) if (mat + u_val) else 0 )
rows_by_item.append({ print(
"审查项": it, f"Overall: matched {overall_matched} | val {overall_val} | answer {overall_answer} | precision {precision:.2%} | recall {recall:.2%} | f1 {f1:.2%}"
"大模型匹配上的不合格项": mat, )
"合同所有不合格项": ans,
"大模型其他不合格项": u_val, # 按“审查项”的 overall 结果
"大模型未匹配上的不合格项(C-B)": u_ans, if overall_item_answer:
"查准率(B/B+D)": item_precision, print("#" * 40)
"查全率(B/C)": acc, print("Overall by item:")
"F1": item_f1, all_items = sorted(
"误报率(D/B+D)": item_false_positive_rate, set(
}) list(overall_item_answer.keys())
print( + list(overall_item_matched.keys())
f" 审查项 {it}: matched {mat} / answer {ans} | unmatched val {u_val} | unmatched answer {u_ans} | precision {item_precision:.2%} | recall {acc:.2%} | f1 {item_f1:.2%}" + list(overall_item_unmatched_answer.keys())
) + list(overall_item_unmatched_val.keys())
)
overall_by_item_df = pd.DataFrame(rows_by_item, columns=["审查项", "大模型匹配上的不合格项", "合同所有不合格项", "大模型其他不合格项", "大模型未匹配上的不合格项(C-B)", "查准率(B/B+D)", "查全率(B/C)", "F1", "误报率(D/B+D)"]) )
unmatched_val_total = sum(overall_item_unmatched_val.values()) rows_by_item = []
unmatched_answer_total = sum(overall_item_unmatched_answer.values()) for it in all_items:
overall_precision = overall_matched / (overall_matched + unmatched_val_total) if (overall_matched + unmatched_val_total) else 0 ans = overall_item_answer.get(it, 0)
overall_f1 = (2 * overall_precision * recall / (overall_precision + recall)) if (overall_precision + recall) else 0 mat = overall_item_matched.get(it, 0)
overall_invalid_rate = unmatched_val_total / (overall_matched + unmatched_val_total) if (overall_matched + unmatched_val_total) else 0 u_ans = overall_item_unmatched_answer.get(it, 0)
overall_total_df = pd.DataFrame([ u_val = overall_item_unmatched_val.get(it, 0)
{ item_precision = (mat / (mat + u_val)) if (mat + u_val) else 0
"审查项": "总体", acc = (mat / ans) if ans else 0
"大模型匹配上的不合格项": overall_matched, item_f1 = (
"合同所有不合格项": overall_answer, (2 * item_precision * acc / (item_precision + acc))
"大模型其他不合格项": unmatched_val_total, if (item_precision + acc)
"大模型未匹配上的不合格项(C-B)": unmatched_answer_total, else 0
"查准率(B/B+D)": overall_precision, )
"查全率(B/C)": recall, item_false_positive_rate = u_val / (mat + u_val) if (mat + u_val) else 0
"F1": overall_f1, rows_by_item.append(
"误报率(D/B+D)": overall_invalid_rate, {
} "审查项": it,
], columns=["审查项", "大模型匹配上的不合格项", "合同所有不合格项", "大模型其他不合格项", "大模型未匹配上的不合格项(C-B)", "查准率(B/B+D)", "查全率(B/C)", "F1", "误报率(D/B+D)"]) "大模型匹配上的不合格项": mat,
combined_df = pd.concat([overall_by_item_df, overall_total_df], ignore_index=True) "合同所有不合格项": ans,
"大模型其他不合格项": u_val,
compare_dir_name = val_dir.name "大模型未匹配上的不合格项(C-B)": u_ans,
results_dir = Path(__file__).parent / "results" "查准率(B/B+D)": item_precision,
results_dir.mkdir(parents=True, exist_ok=True) "查全率(B/C)": acc,
output_excel = results_dir / f"合同审查结果_{compare_dir_name}.xlsx" "F1": item_f1,
with pd.ExcelWriter(output_excel, engine="openpyxl") as writer: "误报率(D/B+D)": item_false_positive_rate,
combined_df.to_excel(writer, sheet_name="对比结果", index=False) }
print(f"Excel written to {output_excel}") )
print(
f" 审查项 {it}: matched {mat} / answer {ans} | unmatched val {u_val} | unmatched answer {u_ans} | precision {item_precision:.2%} | recall {acc:.2%} | f1 {item_f1:.2%}"
)
overall_by_item_df = pd.DataFrame(
rows_by_item,
columns=[
"审查项",
"大模型匹配上的不合格项",
"合同所有不合格项",
"大模型其他不合格项",
"大模型未匹配上的不合格项(C-B)",
"查准率(B/B+D)",
"查全率(B/C)",
"F1",
"误报率(D/B+D)",
],
)
unmatched_val_total = sum(overall_item_unmatched_val.values())
unmatched_answer_total = sum(overall_item_unmatched_answer.values())
overall_precision = (
overall_matched / (overall_matched + unmatched_val_total)
if (overall_matched + unmatched_val_total)
else 0
)
overall_f1 = (
(2 * overall_precision * recall / (overall_precision + recall))
if (overall_precision + recall)
else 0
)
overall_invalid_rate = (
unmatched_val_total / (overall_matched + unmatched_val_total)
if (overall_matched + unmatched_val_total)
else 0
)
overall_total_df = pd.DataFrame(
[
{
"审查项": "总体",
"大模型匹配上的不合格项": overall_matched,
"合同所有不合格项": overall_answer,
"大模型其他不合格项": unmatched_val_total,
"大模型未匹配上的不合格项(C-B)": unmatched_answer_total,
"查准率(B/B+D)": overall_precision,
"查全率(B/C)": recall,
"F1": overall_f1,
"误报率(D/B+D)": overall_invalid_rate,
}
],
columns=[
"审查项",
"大模型匹配上的不合格项",
"合同所有不合格项",
"大模型其他不合格项",
"大模型未匹配上的不合格项(C-B)",
"查准率(B/B+D)",
"查全率(B/C)",
"F1",
"误报率(D/B+D)",
],
)
combined_df = pd.concat(
[overall_by_item_df, overall_total_df], ignore_index=True
)
compare_dir_name = val_dir.name
results_dir = Path(__file__).parent / "results"
results_dir.mkdir(parents=True, exist_ok=True)
output_excel = results_dir / f"合同审查结果_{compare_dir_name}.xlsx"
with pd.ExcelWriter(output_excel, engine="openpyxl") as writer:
combined_df.to_excel(writer, sheet_name="对比结果", index=False)
print(f"Excel written to {output_excel}")
def compare(val_dir: Path, answer_dir: Path) -> None: def compare(val_dir: Path, answer_dir: Path) -> None:
_compare_impl(val_dir=val_dir, answer_dir=answer_dir) _compare_impl(val_dir=val_dir, answer_dir=answer_dir)
def compare_with_log(val_dir: Path, answer_dir: Path, log_path: Path | None = None) -> Path: def compare_with_log(
val_dir = val_dir.resolve() val_dir: Path, answer_dir: Path, log_path: Path | None = None
if log_path is None: ) -> Path:
results_dir = Path(__file__).parent / "results" val_dir = val_dir.resolve()
results_dir.mkdir(parents=True, exist_ok=True) if log_path is None:
log_path = results_dir / f"合同审查结果_{val_dir.name}.log" results_dir = Path(__file__).parent / "results"
else: results_dir.mkdir(parents=True, exist_ok=True)
log_path = log_path.resolve() log_path = results_dir / f"合同审查结果_{val_dir.name}.log"
log_path.parent.mkdir(parents=True, exist_ok=True) else:
log_path = log_path.resolve()
log_path.parent.mkdir(parents=True, exist_ok=True)
with open(log_path, "w", encoding="utf-8") as f, redirect_stdout(f), redirect_stderr(f): with open(log_path, "w", encoding="utf-8") as f, redirect_stdout(
_compare_impl(val_dir=val_dir, answer_dir=answer_dir) f
), redirect_stderr(f):
_compare_impl(val_dir=val_dir, answer_dir=answer_dir)
return log_path return log_path
def _parse_args() -> argparse.Namespace: def _parse_args() -> argparse.Namespace:
base = Path(__file__).parent base = Path(__file__).parent
parser = argparse.ArgumentParser(description="Compare extracted annotations with answers.") parser = argparse.ArgumentParser(
parser.add_argument( description="Compare extracted annotations with answers."
"--val-dir", )
type=Path, parser.add_argument(
default=base / "batch_output_0121_val", "--val-dir",
help="Directory containing extracted val xlsx files.", type=Path,
) default=base / "batch_output_0121_val",
parser.add_argument( help="Directory containing extracted val xlsx files.",
"--answer-dir", )
type=Path, parser.add_argument(
default=base / "审查答案", "--answer-dir",
help="Directory containing answer xlsx files.", type=Path,
) default=base / "审查答案",
parser.add_argument( help="Directory containing answer xlsx files.",
"--log-path", )
type=Path, parser.add_argument(
default=None, "--log-path",
help="Optional explicit log path. Defaults to results/合同审查结果_<val_dir_name>.log", type=Path,
) default=None,
return parser.parse_args() help="Optional explicit log path. Defaults to results/合同审查结果_<val_dir_name>.log",
)
return parser.parse_args()
if __name__ == "__main__": if __name__ == "__main__":
args = _parse_args() args = _parse_args()
final_log_path = compare_with_log( final_log_path = compare_with_log(
val_dir=args.val_dir, val_dir=args.val_dir,
answer_dir=args.answer_dir, answer_dir=args.answer_dir,
log_path=args.log_path, log_path=args.log_path,
) )
print(f"Log written to {final_log_path}") print(f"Log written to {final_log_path}")
\ No newline at end of file
...@@ -121,7 +121,7 @@ def _parse_args() -> argparse.Namespace: ...@@ -121,7 +121,7 @@ def _parse_args() -> argparse.Namespace:
parser.add_argument( parser.add_argument(
"--datasets-dir", "--datasets-dir",
type=Path, type=Path,
default=base / "results" / "jp-output-renji", default=base / "results" / "jp-output-lufa",
help="Directory containing Word files with annotations.", help="Directory containing Word files with annotations.",
) )
parser.add_argument( parser.add_argument(
...@@ -133,7 +133,7 @@ def _parse_args() -> argparse.Namespace: ...@@ -133,7 +133,7 @@ def _parse_args() -> argparse.Namespace:
parser.add_argument( parser.add_argument(
"--val-dir", "--val-dir",
type=Path, type=Path,
default=base / "results" / "jp-output-renji-extracted", default=base / "results" / "jp-output-lufa-extracted",
help="Directory to store extracted xlsx files for comparison.", help="Directory to store extracted xlsx files for comparison.",
) )
parser.add_argument( parser.add_argument(
......
No preview for this file type
...@@ -9,7 +9,9 @@ class DocBase(ABC): ...@@ -9,7 +9,9 @@ class DocBase(ABC):
self._doc_path = None self._doc_path = None
self._doc_name = None self._doc_name = None
self._kwargs = kwargs self._kwargs = kwargs
self._max_single_chunk_size = kwargs.get('max_single_chunk_size', MAX_SINGLE_CHUNK_SIZE) self._max_single_chunk_size = kwargs.get(
"max_single_chunk_size", MAX_SINGLE_CHUNK_SIZE
)
@abstractmethod @abstractmethod
def load(self, doc_path): def load(self, doc_path):
......
...@@ -509,10 +509,10 @@ class SpireWordDoc(DocBase): ...@@ -509,10 +509,10 @@ class SpireWordDoc(DocBase):
cell_list.append(cell_content) cell_list.append(cell_content)
# table_data += "|" + "|".join(cell_list) + "|" # table_data += "|" + "|".join(cell_list) + "|"
# table_data += "\n" # table_data += "\n"
table_data += ' '.join(cell_list) + '\n' table_data += " ".join(cell_list) + "\n"
if i == 0: if i == 0:
# table_data += "|" + "|".join(["--- " for _ in cell_list]) + "|\n" # table_data += "|" + "|".join(["--- " for _ in cell_list]) + "|\n"
table_data= ' '.join(cell_list) + '\n' table_data = " ".join(cell_list) + "\n"
return table_data return table_data
def get_chunk_info(self, chunk_id): def get_chunk_info(self, chunk_id):
...@@ -608,14 +608,18 @@ class SpireWordDoc(DocBase): ...@@ -608,14 +608,18 @@ class SpireWordDoc(DocBase):
return True return True
def _update_comment_content(self, comment_idx, suggest): def _update_comment_content(self, comment_idx, suggest):
self._doc.Comments.get_Item(comment_idx).Body.Paragraphs.get_Item(0).Text = suggest self._doc.Comments.get_Item(comment_idx).Body.Paragraphs.get_Item(
0
).Text = suggest
def _try_add_comment_in_paragraphs(self, paragraphs, target_text, author, suggest): def _try_add_comment_in_paragraphs(self, paragraphs, target_text, author, suggest):
if not target_text: if not target_text:
return False return False
for paragraph in paragraphs: for paragraph in paragraphs:
text_sel = paragraph.Find(target_text, False, True) text_sel = paragraph.Find(target_text, False, True)
if text_sel and self.set_comment_by_text_selection(text_sel, author, suggest): if text_sel and self.set_comment_by_text_selection(
text_sel, author, suggest
):
return True return True
return False return False
...@@ -767,8 +771,11 @@ class SpireWordDoc(DocBase): ...@@ -767,8 +771,11 @@ class SpireWordDoc(DocBase):
# update chunk_id # update chunk_id
comment_chunk_id = comment.get("chunk_id", -1) comment_chunk_id = comment.get("chunk_id", -1)
# 优先使用comments里提供的chunk_id,如果没有或无效则使用外部传入的chunk_id,如果都没有则异常处理 # 优先使用comments里提供的chunk_id,如果没有或无效则使用外部传入的chunk_id,如果都没有则异常处理
sub_chunks = self.get_sub_chunks(comment_chunk_id) if comment_chunk_id != -1 \ sub_chunks = (
and comment_chunk_id < self.get_chunk_num() else self.get_sub_chunks(chunk_id) self.get_sub_chunks(comment_chunk_id)
if comment_chunk_id != -1 and comment_chunk_id < self.get_chunk_num()
else self.get_sub_chunks(chunk_id)
)
author = self.format_comment_author(comment) author = self.format_comment_author(comment)
suggest = comment.get("suggest", "") suggest = comment.get("suggest", "")
find_key = comment["original_text"].strip() or comment["key_points"] find_key = comment["original_text"].strip() or comment["key_points"]
...@@ -808,7 +815,9 @@ class SpireWordDoc(DocBase): ...@@ -808,7 +815,9 @@ class SpireWordDoc(DocBase):
normalized_author = self._normalize_author_prefix(author) normalized_author = self._normalize_author_prefix(author)
for i in range(self._doc.Comments.Count): for i in range(self._doc.Comments.Count):
current_comment = self._doc.Comments.get_Item(i) current_comment = self._doc.Comments.get_Item(i)
comment_author = self._normalize_author_prefix(current_comment.Format.Author) comment_author = self._normalize_author_prefix(
current_comment.Format.Author
)
if comment_author == normalized_author: if comment_author == normalized_author:
return i return i
return None return None
...@@ -876,9 +885,7 @@ class SpireWordDoc(DocBase): ...@@ -876,9 +885,7 @@ class SpireWordDoc(DocBase):
if __name__ == "__main__": if __name__ == "__main__":
doc = SpireWordDoc() doc = SpireWordDoc()
doc.load( doc.load(r"/home/ccran/lufa-contract/demo/今麦郎合同审核.docx")
r"/home/ccran/lufa-contract/demo/今麦郎合同审核.docx"
)
print(doc._doc_name) print(doc._doc_name)
print("附件2《技术协议》" in doc.get_all_text()) print("附件2《技术协议》" in doc.get_all_text())
# doc.add_chunk_comment( # doc.add_chunk_comment(
...@@ -895,4 +902,4 @@ if __name__ == "__main__": ...@@ -895,4 +902,4 @@ if __name__ == "__main__":
# } # }
# ], # ],
# ) # )
# doc.to_file("/home/ccran/lufa-contract/demo/今麦郎合同审核_test.docx", True) # doc.to_file("/home/ccran/lufa-contract/demo/今麦郎合同审核_test.docx", True)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment