Commit 2347d107 by ccran

feat: 技术性能77%

parent 1f595950
......@@ -16,8 +16,9 @@ class LLMConfig:
MERGE_RULE_PROMPT = False
MAX_SINGLE_CHUNK_SIZE = 5000
META_KEY = "META"
DEFAULT_RULESET_ID = "金盘简化"
ALL_RULESET_IDS = ["金盘简化"]
DEFAULT_RULESET_ID = "通用"
ALL_RULESET_IDS = ["通用", "借款", "担保", "财务口", "金盘", "金盘简化", "麓发测试"]
use_lufa = False
if use_lufa:
outer_backend_url = "http://znkf.lgfzgroup.com:48081"
......
from __future__ import annotations
import difflib
import json
import re
import unicodedata
from typing import Any, Dict, List
from core.tool import tool, tool_func
......@@ -8,7 +11,7 @@ from core.tools.segment_llm import LLMTool
from loguru import logger
MERGER_SYSTEM_PROMPT = '''
MERGER_SYSTEM_PROMPT = """
你是合同审查结果合并智能体(SegmentMerger)。
你的任务是:接收一组 findings,按 original_text 分组后合并。
......@@ -46,9 +49,9 @@ MERGER_SYSTEM_PROMPT = '''
【输出要求】
- 严格输出 JSON
- 顶层结构必须是:{"findings": [...]}
'''
"""
MERGER_USER_PROMPT = '''
MERGER_USER_PROMPT = """
【输入 findings】
{findings_json}
......@@ -57,9 +60,9 @@ MERGER_USER_PROMPT = '''
若同组文本可互补,请将 original_text 扩展为覆盖组内信息的并集文本;无关文本必须分到不同组。
输出 JSON。
'''
"""
OUTPUT_EXAMPLE = '''
OUTPUT_EXAMPLE = """
```json
{
"findings": [
......@@ -75,101 +78,260 @@ OUTPUT_EXAMPLE = '''
]
}
```
'''
"""
def _as_dict(item: Any) -> Dict[str, Any]:
if isinstance(item, dict):
return dict(item)
if hasattr(item, "__dict__"):
return dict(getattr(item, "__dict__"))
return {}
if isinstance(item, dict):
return dict(item)
if hasattr(item, "__dict__"):
return dict(getattr(item, "__dict__"))
return {}
def _normalize_finding(raw: Dict[str, Any]) -> Dict[str, Any]:
risk_level = str(raw.get("risk_level", "") or "").upper()
if risk_level not in {"H", "M", "L", ""}:
risk_level = ""
result = str(raw.get("result", "") or "")
if result not in {"合格", "不合格", ""}:
result = ""
return {
"rule_title": str(raw.get("rule_title", "") or ""),
"segment_id": int(raw.get("segment_id", 0) or 0),
"original_text": str(raw.get("original_text", "") or ""),
"issue": str(raw.get("issue", "") or ""),
"risk_level": risk_level,
"suggestion": str(raw.get("suggestion", "") or ""),
"result": result,
}
risk_level = str(raw.get("risk_level", "") or "").upper()
if risk_level not in {"H", "M", "L", ""}:
risk_level = ""
result = str(raw.get("result", "") or "")
if result not in {"合格", "不合格", ""}:
result = ""
return {
"rule_title": str(raw.get("rule_title", "") or ""),
"segment_id": int(raw.get("segment_id", 0) or 0),
"original_text": str(raw.get("original_text", "") or ""),
"issue": str(raw.get("issue", "") or ""),
"risk_level": risk_level,
"suggestion": str(raw.get("suggestion", "") or ""),
"result": result,
}
def _merge_unique_text(parts: List[str]) -> str:
merged: List[str] = []
seen = set()
for part in parts:
text = str(part or "").strip()
if not text or text in seen:
continue
seen.add(text)
merged.append(text)
return "\n".join(merged)
def _max_suffix_prefix_overlap(left: str, right: str) -> int:
max_k = min(len(left), len(right))
for k in range(max_k, 0, -1):
if left[-k:] == right[:k]:
return k
return 0
def _merge_text_union(base: str, other: str) -> str:
left = str(base or "")
right = str(other or "")
if not left:
return right
if not right:
return left
if left in right:
return right
if right in left:
return left
ov1 = _max_suffix_prefix_overlap(left, right)
ov2 = _max_suffix_prefix_overlap(right, left)
if ov1 >= ov2 and ov1 >= 3:
return left + right[ov1:]
if ov2 > ov1 and ov2 >= 3:
return right + left[ov2:]
return f"{left}\n{right}"
def _has_substring_overlap(a: str, b: str, min_common_len: int = 8) -> bool:
left = _normalize_text_for_match(str(a or ""))
right = _normalize_text_for_match(str(b or ""))
if not left or not right:
return False
if left in right or right in left:
return True
match = difflib.SequenceMatcher(None, left, right).find_longest_match(
0, len(left), 0, len(right)
)
return match.size >= min_common_len
def _normalize_text_for_match(text: str) -> str:
# Normalize full-width/half-width and case, then remove whitespace and punctuation.
normalized = unicodedata.normalize("NFKC", str(text or "")).lower()
normalized = re.sub(r"[\s\W_]+", "", normalized, flags=re.UNICODE)
return normalized
def _risk_rank(level: str) -> int:
return {"": 0, "L": 1, "M": 2, "H": 3}.get(level, 0)
def _merge_group(group: List[Dict[str, Any]]) -> Dict[str, Any]:
if not group:
return _normalize_finding({})
merged_text = ""
for item in group:
merged_text = _merge_text_union(merged_text, item.get("original_text", ""))
rule_titles = [item.get("rule_title", "") for item in group]
issues = [item.get("issue", "") for item in group]
suggestions = [item.get("suggestion", "") for item in group]
segment_ids = [int(item.get("segment_id", 0) or 0) for item in group]
risk_levels = [str(item.get("risk_level", "") or "") for item in group]
results = [str(item.get("result", "") or "") for item in group]
best_risk = ""
for level in risk_levels:
if _risk_rank(level) > _risk_rank(best_risk):
best_risk = level
if "不合格" in results:
merged_result = "不合格"
elif "合格" in results:
merged_result = "合格"
else:
merged_result = ""
return _normalize_finding(
{
"rule_title": " / ".join([t for t in dict.fromkeys(rule_titles) if t]),
"segment_id": min(segment_ids) if segment_ids else 0,
"original_text": merged_text,
"issue": _merge_unique_text(issues),
"risk_level": best_risk,
"suggestion": _merge_unique_text(suggestions),
"result": merged_result,
}
)
def _rule_based_merge(findings: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
n = len(findings)
if n <= 1:
return findings
visited = [False] * n
groups: List[List[Dict[str, Any]]] = []
for i in range(n):
if visited[i]:
continue
stack = [i]
visited[i] = True
group_idx: List[int] = []
while stack:
cur = stack.pop()
group_idx.append(cur)
cur_text = findings[cur].get("original_text", "")
for j in range(n):
if visited[j]:
continue
if _has_substring_overlap(
cur_text, findings[j].get("original_text", "")
):
visited[j] = True
stack.append(j)
groups.append([findings[idx] for idx in group_idx])
return [_merge_group(group) for group in groups]
@tool("segment_merger", "同证据 findings 合并")
class SegmentMergerTool(LLMTool):
def __init__(self) -> None:
super().__init__(MERGER_SYSTEM_PROMPT)
@tool_func(
{
"type": "object",
"properties": {
"findings": {"type": "array", "items": {"type": "object"}},
},
"required": ["findings"],
}
)
def run(self, findings: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]:
normalized_input = [_normalize_finding(_as_dict(item)) for item in (findings or [])]
if not normalized_input:
return {"findings": []}
msgs = self._build_prompt(normalized_input)
try:
resp = self.run_with_loop(self.chat_async(msgs))
data = self.parse_first_json(resp)
raw_findings = data.get("findings") or data.get("merged_findings") or []
if not isinstance(raw_findings, list):
raw_findings = []
normalized_output = [_normalize_finding(_as_dict(item)) for item in raw_findings]
return {"findings": normalized_output}
except Exception as e:
logger.error(f"SegmentMergerTool run error: {e}")
return {"findings": normalized_input}
def _build_prompt(self, findings: List[Dict[str, Any]]) -> List[Dict[str, str]]:
user_content = MERGER_USER_PROMPT.format(
findings_json=json.dumps(findings, ensure_ascii=False, indent=2)
) + OUTPUT_EXAMPLE
return self.build_messages(user_content)
def __init__(self) -> None:
super().__init__(MERGER_SYSTEM_PROMPT)
@tool_func(
{
"type": "object",
"properties": {
"findings": {"type": "array", "items": {"type": "object"}},
"merge_mode": {
"type": "string",
"enum": ["llm", "rule"],
"default": "llm",
},
},
"required": ["findings"],
}
)
def run(
self, findings: List[Dict[str, Any]], merge_mode: str = "llm"
) -> Dict[str, List[Dict[str, Any]]]:
normalized_input = [
_normalize_finding(_as_dict(item)) for item in (findings or [])
]
if not normalized_input:
return {"findings": []}
mode = str(merge_mode or "llm").lower()
if mode == "rule":
return {"findings": _rule_based_merge(normalized_input)}
msgs = self._build_prompt(normalized_input)
try:
resp = self.run_with_loop(self.chat_async(msgs))
data = self.parse_first_json(resp)
raw_findings = data.get("findings") or data.get("merged_findings") or []
if not isinstance(raw_findings, list):
raw_findings = []
normalized_output = [
_normalize_finding(_as_dict(item)) for item in raw_findings
]
return {"findings": normalized_output}
except Exception as e:
logger.error(f"SegmentMergerTool run error: {e}")
return {"findings": _rule_based_merge(normalized_input)}
def _build_prompt(self, findings: List[Dict[str, Any]]) -> List[Dict[str, str]]:
user_content = (
MERGER_USER_PROMPT.format(
findings_json=json.dumps(findings, ensure_ascii=False, indent=2)
)
+ OUTPUT_EXAMPLE
)
return self.build_messages(user_content)
if __name__ == "__main__":
tool = SegmentMergerTool()
sample = [
{
"rule_title": "支付时间审查",
"segment_id": 0,
"original_text": "本协议约定的服务内容全部履行完毕经甲方认可,在乙方提交检测数据后15个工作日内,乙方须向甲方提供相应数额的正规发票,甲方一次性支付合同总金额的100%给乙方。",
"issue": "付款条件缺乏实质把控。条款将付款绑定于提交数据,未明确'经甲方认可'的验收标准、期限及异议机制,且未设置质保金,存在验收流于形式即需全额付款的风险。",
"risk_level": "H",
"suggestion": "修改为:'乙方提交报告后,甲方在X个工作日内验收。验收合格且收到发票后15个工作日内支付95%;剩余5%作为质保金,满X个月无异议后无息支付。若验收不合格,甲方有权拒付并要求整改。'",
"result": "不合格"
},
{
"rule_title": "发票审查",
"segment_id": 0,
"original_text": "在乙方提交检测数据后15个工作日内,乙方须向甲方提供相应数额的正规发票,甲方一次性支付合同总金额的100%给乙方。甲方指定由其全资子公司长沙高新控股集团有限公司(简称高新控股)承担并支付本合同约定的检查服务费,乙方向高新控股开具相应金额的增值税专用发票。",
"issue": "缺失发票税率约定。条款明确了发票类型和开具时间,但未约定适用税率,违反审查规则,可能导致后续开票金额争议或税务合规风险。",
"risk_level": "H",
"suggestion": "补充税率约定。建议在'乙方向高新控股开具相应金额的增值税专用发票'后补充:'(税率:6%)'或根据实际业务类型补充具体税率数值。",
"result": "不合格"
},{
"rule_title": "主体审查",
"segment_id": 0,
"original_text": "委托方(甲方): 湖南麓谷发展集团有限公司... 甲方指定由其全资子公司长沙高新控股集团有限公司(简称高新控股)承担并支付本合同约定的检查服务费... 签章处:甲方:长沙高新控股集团有限公司",
"issue": "签约主体不一致。首部甲方为'湖南麓谷发展集团有限公司',但签章处及付款义务主体变更为'长沙高新控股集团有限公司',且未明确授权委托或变更确认条款,存在主体混同及履约风险。",
"risk_level": "H",
"suggestion": "统一合同主体名称。若确由子公司履约,应将首部及正文甲方统一修改为'长沙高新控股集团有限公司';若由母公司签约,应在签章处由母公司盖章,并补充'指定子公司代为履行付款义务'条款。",
"result": "不合格"
}]
print(json.dumps(tool.run(sample), ensure_ascii=False, indent=2))
tool = SegmentMergerTool()
sample = [
{
"rule_title": "支付时间审查",
"segment_id": 0,
"original_text": "本协议约定的服务内容全部履行完毕经甲方认可,在乙方提交检测数据后15个工作日内,乙方须向甲方提供相应数额的正规发票,甲方一次性支付合同总金额的100%给乙方。",
"issue": "付款条件缺乏实质把控。条款将付款绑定于提交数据,未明确'经甲方认可'的验收标准、期限及异议机制,且未设置质保金,存在验收流于形式即需全额付款的风险。",
"risk_level": "H",
"suggestion": "修改为:'乙方提交报告后,甲方在X个工作日内验收。验收合格且收到发票后15个工作日内支付95%;剩余5%作为质保金,满X个月无异议后无息支付。若验收不合格,甲方有权拒付并要求整改。'",
"result": "不合格",
},
{
"rule_title": "发票审查",
"segment_id": 0,
"original_text": "在乙方提交检测数据后15个工作日内,乙方须向甲方提供相应数额的正规发票,甲方一次性支付合同总金额的100%给乙方。甲方指定由其全资子公司长沙高新控股集团有限公司(简称高新控股)承担并支付本合同约定的检查服务费,乙方向高新控股开具相应金额的增值税专用发票。",
"issue": "缺失发票税率约定。条款明确了发票类型和开具时间,但未约定适用税率,违反审查规则,可能导致后续开票金额争议或税务合规风险。",
"risk_level": "H",
"suggestion": "补充税率约定。建议在'乙方向高新控股开具相应金额的增值税专用发票'后补充:'(税率:6%)'或根据实际业务类型补充具体税率数值。",
"result": "不合格",
},
{
"rule_title": "主体审查",
"segment_id": 0,
"original_text": "委托方(甲方): 湖南麓谷发展集团有限公司... 甲方指定由其全资子公司长沙高新控股集团有限公司(简称高新控股)承担并支付本合同约定的检查服务费... 签章处:甲方:长沙高新控股集团有限公司",
"issue": "签约主体不一致。首部甲方为'湖南麓谷发展集团有限公司',但签章处及付款义务主体变更为'长沙高新控股集团有限公司',且未明确授权委托或变更确认条款,存在主体混同及履约风险。",
"risk_level": "H",
"suggestion": "统一合同主体名称。若确由子公司履约,应将首部及正文甲方统一修改为'长沙高新控股集团有限公司';若由母公司签约,应在签章处由母公司盖章,并补充'指定子公司代为履行付款义务'条款。",
"result": "不合格",
},
]
print(json.dumps(tool.run(sample, merge_mode="rule"), ensure_ascii=False, indent=2))
......@@ -9,7 +9,7 @@ from core.tools.segment_llm import LLMTool
import re
from loguru import logger
REVIEW_SYSTEM_PROMPT = '''
REVIEW_SYSTEM_PROMPT = """
你是一个专业的合同分段审查智能体(SegmentReview)。
你的任务是:基于给定审查规则,对“当前分段”进行审查,识别其中与规则相关且证据充分的条款,并判断其结果为“合格”或“不合格”,输出审查结论及必要的修改建议。
......@@ -82,15 +82,6 @@ REVIEW_SYSTEM_PROMPT = '''
- suggestion 应简洁填写,可写“无需修改”;
- 不得为了凑内容而提出与审查结论无关的修改建议。
【规则适用性判断】
在执行任何审查规则之前,你必须先判断:
当前分段是否包含与该审查规则相关的信息维度。
如果当前分段与该审查规则无关,则:
- 不得生成 findings
- 不得引用原文
- 继续检查下一条规则
如果所有规则均无关或均无证据充分的结论,则返回 {"findings": []}。
【输出约束】
- 严格按照指定 JSON Schema 输出。
- 不得输出任何 JSON 之外的解释性文字。
......@@ -105,9 +96,9 @@ Step D:为每一个成立的判断生成一个 finding
只有完成上述穷举后,才允许输出最终结果
提示:在合同审查中,一个分段通常可能包含多个独立风险点或合规点,findings 数量通常大于1(如2–5条或更多),除非该段确实只涉及单一事项。
'''
REVIEW_USER_PROMPT = '''
提示:在合同审查中,一个分段通常可能包含多个独立风险点或合规点,findings 数量通常大于1,除非该段确实只涉及单一事项。
"""
REVIEW_USER_PROMPT = """
【当前分段文本】
{segment_text}
......@@ -131,8 +122,8 @@ REVIEW_USER_PROMPT = '''
【输出要求】
- 仅输出 JSON。
'''
REVIEW_OUTPUT_SCHEMA = '''
"""
REVIEW_OUTPUT_SCHEMA = """
```json
{
"findings": [
......@@ -145,7 +136,7 @@ REVIEW_OUTPUT_SCHEMA = '''
}
]
}
'''
"""
LEVEL_WEIGHT = {"H": 3, "M": 2, "L": 1}
......@@ -162,6 +153,7 @@ def _norm(text: str) -> str:
def _has_evidence(f: Dict) -> bool:
return bool(f.get("original_text"))
@tool("segment_review", "合同分段审查")
class SegmentReviewTool(LLMTool):
def __init__(self):
......@@ -182,10 +174,16 @@ class SegmentReviewTool(LLMTool):
"required": ["segment_id", "segment_text", "rules", "party_role"],
}
)
def run(self, segment_id: str, segment_text: str, rules: List[Dict], party_role: str,
context_summaries: Optional[List[Dict]] = None,
context_memories: Optional[List[Dict]] = None,
merge_rules_prompt: bool = True) -> Dict:
def run(
self,
segment_id: str,
segment_text: str,
rules: List[Dict],
party_role: str,
context_summaries: Optional[List[Dict]] = None,
context_memories: Optional[List[Dict]] = None,
merge_rules_prompt: bool = True,
) -> Dict:
rules = rules or []
result = self._evaluate_rules(
party_role,
......@@ -197,16 +195,16 @@ class SegmentReviewTool(LLMTool):
merge_rules_prompt=merge_rules_prompt,
)
overall = "revise" if (result["findings"] ) else "pass"
overall = "revise" if (result["findings"]) else "pass"
return {
"segment_id": segment_id,
"overall_conclusion": overall,
"findings": result["findings"],
}
def _stringify_rule(self, rule:Dict) -> str:
res = ''
def _stringify_rule(self, rule: Dict) -> str:
res = ""
res += f"## 审查项标题\n{rule.get('title','')}\n"
res += f"## 审查规则\n{rule.get('rule','')}\n"
res += f"## 风险等级\n{rule.get('level','')}\n"
......@@ -214,45 +212,95 @@ class SegmentReviewTool(LLMTool):
res += f"## 参考案例\n{rule.get('case','')}\n"
return res
def _build_prompt(self, party_role: str, rule: Dict, segment_id: int, segment_text: str,
context_summaries: Optional[List[Dict]], context_memories: Optional[List[Dict]]) -> List[Dict[str, str]]:
user_content = REVIEW_USER_PROMPT.format(
segment_id=segment_id,
segment_text=segment_text,
party_role=party_role,
# context_facts_json=json.dumps(context_summaries or [], ensure_ascii=False),
# context_memories_json=json.dumps(context_memories or [], ensure_ascii=False),
ruleset_text=self._stringify_rule(rule)
) + REVIEW_OUTPUT_SCHEMA
def _build_prompt(
self,
party_role: str,
rule: Dict,
segment_id: int,
segment_text: str,
context_summaries: Optional[List[Dict]],
context_memories: Optional[List[Dict]],
) -> List[Dict[str, str]]:
user_content = (
REVIEW_USER_PROMPT.format(
segment_id=segment_id,
segment_text=segment_text,
party_role=party_role,
# context_facts_json=json.dumps(context_summaries or [], ensure_ascii=False),
# context_memories_json=json.dumps(context_memories or [], ensure_ascii=False),
ruleset_text=self._stringify_rule(rule),
)
+ REVIEW_OUTPUT_SCHEMA
)
return self.build_messages(user_content)
def _build_prompt_with_rules(self, party_role: str, rules: List[Dict], segment_id: int, segment_text: str,
context_summaries: Optional[List[Dict]], context_memories: Optional[List[Dict]]) -> List[Dict[str, str]]:
def _build_prompt_with_rules(
self,
party_role: str,
rules: List[Dict],
segment_id: int,
segment_text: str,
context_summaries: Optional[List[Dict]],
context_memories: Optional[List[Dict]],
) -> List[Dict[str, str]]:
ruleset_text = "\n\n".join([self._stringify_rule(rule) for rule in rules])
user_content = REVIEW_USER_PROMPT.format(
segment_id=segment_id,
segment_text=segment_text,
party_role=party_role,
context_facts_json=json.dumps(context_summaries or [], ensure_ascii=False),
context_memories_json=json.dumps(context_memories or [], ensure_ascii=False),
ruleset_text=ruleset_text,
) + REVIEW_OUTPUT_SCHEMA
user_content = (
REVIEW_USER_PROMPT.format(
segment_id=segment_id,
segment_text=segment_text,
party_role=party_role,
context_facts_json=json.dumps(
context_summaries or [], ensure_ascii=False
),
context_memories_json=json.dumps(
context_memories or [], ensure_ascii=False
),
ruleset_text=ruleset_text,
)
+ REVIEW_OUTPUT_SCHEMA
)
return self.build_messages(user_content)
async def _evaluate_rules_async(self, party_role: str, segment_id: int, segment_text: str, rules: List[Dict],
context_summaries: Optional[List[Dict]], context_memories: Optional[List[Dict]],
merge_rules_prompt: bool = False) -> Dict[str, List[Dict]]:
async def _evaluate_rules_async(
self,
party_role: str,
segment_id: int,
segment_text: str,
rules: List[Dict],
context_summaries: Optional[List[Dict]],
context_memories: Optional[List[Dict]],
merge_rules_prompt: bool = False,
) -> Dict[str, List[Dict]]:
if merge_rules_prompt:
msgs = [self._build_prompt_with_rules(party_role, rules, segment_id, segment_text, context_summaries, context_memories)]
msgs = [
self._build_prompt_with_rules(
party_role,
rules,
segment_id,
segment_text,
context_summaries,
context_memories,
)
]
else:
msgs = [self._build_prompt(party_role, rule, segment_id, segment_text, context_summaries, context_memories) for rule in rules]
msgs = [
self._build_prompt(
party_role,
rule,
segment_id,
segment_text,
context_summaries,
context_memories,
)
for rule in rules
]
if not msgs:
return {"findings": []}
try:
responses = await self.chat_batch_async(msgs)
except Exception as e:
logger.error(f'Error in segment review for segment {segment_id}: {e}')
logger.error(f"Error in segment review for segment {segment_id}: {e}")
return {"findings": []}
all_findings: List[Dict] = []
......@@ -266,10 +314,10 @@ class SegmentReviewTool(LLMTool):
f["risk_level"] = ""
all_findings.extend(findings)
else:
for idx,resp in enumerate(responses):
for idx, resp in enumerate(responses):
data = self.parse_first_json(resp)
rule_title = rules[idx].get("title","")
rule_level = rules[idx].get("level","")
rule_title = rules[idx].get("title", "")
rule_level = rules[idx].get("level", "")
findings = data.get("findings", []) or []
# 为每条 finding 添加对应的 rule_title 和 risk_level
for f in findings:
......@@ -281,73 +329,85 @@ class SegmentReviewTool(LLMTool):
"findings": all_findings,
}
def _evaluate_rules(self, party_role: str, segment_id: int, segment_text: str, rules: List[Dict],
context_summaries: Optional[List[Dict]], context_memories: Optional[List[Dict]],
merge_rules_prompt: bool = False) -> Dict[str, List[Dict]]:
def _evaluate_rules(
self,
party_role: str,
segment_id: int,
segment_text: str,
rules: List[Dict],
context_summaries: Optional[List[Dict]],
context_memories: Optional[List[Dict]],
merge_rules_prompt: bool = False,
) -> Dict[str, List[Dict]]:
try:
return asyncio.run(self._evaluate_rules_async(
party_role,
segment_id,
segment_text,
rules,
context_summaries,
context_memories,
merge_rules_prompt=merge_rules_prompt,
))
return asyncio.run(
self._evaluate_rules_async(
party_role,
segment_id,
segment_text,
rules,
context_summaries,
context_memories,
merge_rules_prompt=merge_rules_prompt,
)
)
except RuntimeError:
loop = asyncio.get_event_loop()
return loop.run_until_complete(self._evaluate_rules_async(
party_role,
segment_id,
segment_text,
rules,
context_summaries,
context_memories,
merge_rules_prompt=merge_rules_prompt,
))
if __name__=="__main__":
return loop.run_until_complete(
self._evaluate_rules_async(
party_role,
segment_id,
segment_text,
rules,
context_summaries,
context_memories,
merge_rules_prompt=merge_rules_prompt,
)
)
if __name__ == "__main__":
tool = SegmentReviewTool()
segment_text = '''
3.1 合同价格
3.1.1 合同协议书中载明的签约合同价包括卖方为完成合同全部义务应承担的一切成本、费用和支出以及卖方的合理利润。
3.1.2 除专用合同条款另有约定外,签约合同价为固定价格。
3.2 合同价款的支付
除专用合同条款另有约定外,买方应通过以下方式和比例向卖方支付合同价款:
3.2.1 预付款
合同生效后,买方在收到卖方开具的注明应付预付款金额的财务收据正本一份并经审核无误后 30 日内,向卖方支付签约合同价的10%作为预付款。买方支付预付款后,如卖方未履行合同义务,则买方有权收回预付款;如卖方依约履行了 合同义务,则预付款抵作合同价款。
3.2.2 交货款
卖方按合同约定交付全部合同设备后,买方在收到卖方提交的下列全部单据并经审核无误后30日内,向卖方支付合同价格的 60%
(1) 卖方出具的交货清单正本一份;
(2) 买方签署的收货清单正本一份;
(3) 制造商出具的出厂质量合格证正本一份;
(4) 合同价格100%金额的增值税发票正本一份。
3.2.3 验收款
买方在收到卖方提交的买卖双方签署的合同设备验收证书或已生效的验收款支付函正本一份并经审核无误后30 日内,向卖方支付合同价格的 25%
3.2.4 结清款
买方在收到卖方提交的买方签署的质量保证期届满证书或已生效的结清款支付函正本一份并经审核无误后 30 日内,向卖方支付合同价格的 5%。如果依照合同第 9.1 项,卖方应向买方支付费用的,买方有权从结清款中直接扣除该笔费用。
除专用合同条款另有约定外,在买方向卖方支付验收款的同时或其后的任何时间内,卖方可在向买方提交买方可接受的金额为合同价格 5%的合同结清款保函的前提下,要求买方支付合同结清款,买方不得拒绝。'''
segment_text = """
变压器本体漏(渗)油,每起扣除质保金2000元,发现5台及以上变压器本体出现漏油情况,除应赔偿的质保金外,需延长该批次变压器的保质期5年。
箱变出现漏水或渗水现象每起扣4000元。
箱变散热片出现生锈现象每起扣3000元。发现5台及以上变压器散热片出现生锈情况,除应赔偿的质保金外,招标方可拒收该批次变压器,由招标方重新生产该批次变压器散热片。
交接试验期间,出现设备不能满足或因设备自身问题没有通过交接试验项目复测仍无法通过的情况,招标方可无条件拒收该设备,投标人应赔偿由设备损坏或性能不达标对招标方造成的工期延误、劳务费用、发电量和信誉等所有损失并重新生产该设备。
因设备自身故障停电超过7天的,每起扣除质保金5000元。
高低压室、变压器等温升超出技术协议要求值5k以上并持续时间超过3小时的,每起扣除质保金10000元。
箱变满载运行时,高低压室、变压器等温升超出厂家承诺值3k以上并持续时间超过1小时的,每起扣除质保金10000元,单批次出现两台以上的,业主单位可拒绝签收该批次设备。
UPS在散热设计上着重考虑,出现UPS因过热死机现象,每起扣除质保金2000元。
按照项目所用箱式变压器年度统计可用率达到99%为基准,每降低5‰扣除质保金10000元。
"""
result = tool.run(
segment_id=1,
segment_text=segment_text,
rules=[
{
"title": "付款时间审查",
"rule": "1)我司规定如果发货前全额付款,审查合格2)如果发货前没有全额付款,发货后每个付款项的支付条件(除了预付款,发货款以外,例如到货款,交货款,验收款,结清款等),要以“到货XX天/月”作为充分条件之一,如果有多个充分条件,需要提及“先到为准”。(要有到货XX天/月作为闭口时间)",
"title": "技术性能审查",
"rule": """
## 背景知识
技术性能(性能指标)
包括:一些泛指:"技术性能","性能指标","参数标准",或者具体的技术性能参数比如散热、工艺、负载、用料、漏水、渗水等
不包括:一些泛指,比如设备质量问题,设备验收问题,设备故障,设备考核等
## 审查规则
1)前提条件:明确说明“设备(货物)的技术性能(性能指标)不能满足/达到保证值(合同要求)”
2)后果条件:明确链接到“惩罚性经济责任”(违约,赔偿等责任)或“合同解约权”(合同解约)。
同时满足前提与后果条件,则审查不合格,否则,审查合格。
""",
"level": "M",
"suggestion_template": "1、修改付款条款,使得“到货XX天/月”作为付款的充分条件之一",
"suggestion_template": "1、提醒不合规的技术性能要求",
"case": """
## 案例1:
原文:收到乙方正式发货通知及进度款发票后15个工作日内支付 30% 发货款
结论:非全额付款,但是发货款无需“到货XX天/月”的条件作为闭口条件,因此审查合格
## 案例2
原文:交付合同设备后,现场经清点无误并验收合格,到货款付款20%
结论:非全额付款,非发货款,并且没有“到货XX天/月”的条件作为闭口时间,因此审查不合格
## 案例3
原文:交货款付款20%;验收款付款20%;结清款付款10%,质保款在质量保证期满后一次性支付
结论:所有非发货款(包括交货款,验收款,结清款,质保款),没有“到货XX天/月”的条件作为闭口时间,因此审查不合格
"""
## 案例1:
原文:承揽人根据附件2《技术协议》进行性能考核
结论:前提条件不完整(只提及了性能考核)并且没有后果条件,因此审查合格。
## 案例2
原文:产品散热设计故障,每起扣款2000元。
结论:同时满足前提与后果条件,散热设计故障属于技术性能不能满足保证值,扣款表明需要损失赔偿,因此审查不合格。
## 案例3
原文:出现设备故障或者设备质量问题导致验收不合格,需要扣除违约金。
结论:前提条件不完整,没有明确提及设备故障、质量问题、验收不合格为技术性能(性能指标)不满足保证值(合同要求),因此审查合格。
""",
}
],
party_role="金盘",
......@@ -355,4 +415,4 @@ if __name__=="__main__":
print(json.dumps(result, ensure_ascii=False, indent=2))
# filter_result = tool.filter(1,result)
# print(json.dumps(filter_result, ensure_ascii=False, indent=2))
\ No newline at end of file
# print(json.dumps(filter_result, ensure_ascii=False, indent=2))
......@@ -16,9 +16,10 @@ from utils.http_util import upload_file, fastgpt_openai_chat, download_file
SUFFIX = "_麓发迁移"
batch_input_dir_path = "jp-input"
batch_output_dir_path = f"/home/ccran/lufa-contract/data/benchmark/results/jp-output-lufa-{time.strftime('%Y%m%d-%H%M%S', time.localtime())}"
# SUFFIX = "_麓发"
# batch_input_dir_path = "lufa-input"
# batch_output_dir_path = "lufa-output"
# batch_output_dir_path = "lufa-output-standard"
batch_size = 5
# 麓发fastgpt接口
# url = "http://192.168.252.71:18089/api/v1/chat/completions"
......@@ -26,6 +27,8 @@ batch_size = 5
url = "http://192.168.252.71:18088/api/v1/chat/completions"
# 麓发合同审查生产token
# token = "fastgpt-ek3Z6PxI6sXgYc0jxzZ5bVGqrxwM6aVyfSmA6JVErJYBMr2KmYxrHwEUOIMSYz"
# 麓发合同审查生产token-标准化
# token = "fastgpt-mg5tQUgreJeF7peoOr5zqP0NR4EIrfS2bEVXge6FUL94Suu1TvEMR1sGNRSiV"
# 金盘迁移麓发合同审查测试token
token = "fastgpt-vykT6qs07g7hR4tL2MNJE6DdNCIxaQjEu3Cxw9nuTBFg8MAG3CkByvnXKxSNEyMK7"
# 人机交互测试(测试环境)
......
......@@ -121,7 +121,7 @@ def _parse_args() -> argparse.Namespace:
parser.add_argument(
"--datasets-dir",
type=Path,
default=base / "results" / "jp-output-lufa",
default=base / "results" / "jp-output-lufa-20260408-182708",
help="Directory containing Word files with annotations.",
)
parser.add_argument(
......
No preview for this file type
from spire.doc import *
import difflib
# 将文本写入文件
def WriteAllText(fname: str, text: str):
with open(fname, "w", encoding="utf-8") as fp:
fp.write(text)
inputFile = "/home/ccran/lufa-contract/demo/修订测试.docx"
# 创建Document类的对象
doc = Document()
# 加载Word文档
doc.LoadFromFile(inputFile)
before_text = doc.GetText()
print("===== 修订前文本 =====")
print(before_text)
# 检查文档是否有未接受的修订
if doc.HasChanges:
# 接受所有修订
doc.AcceptChanges()
after_text = doc.GetText()
print("===== 修订后文本 =====")
print(after_text)
before_lines = before_text.splitlines()
after_lines = after_text.splitlines()
diff_lines = list(
difflib.unified_diff(
before_lines,
after_lines,
fromfile="before_revision",
tofile="after_revision",
lineterm="",
)
)
print("===== Diff 对比结果 =====")
if diff_lines:
print("\n".join(diff_lines))
else:
print("修订前后文本一致,无差异。")
doc.Close()
from spire.doc import *
# 将文本写入文件
def WriteAllText(fname: str, text: str):
with open(fname, "w", encoding="utf-8") as fp:
fp.write(text)
inputFile = "/home/ccran/lufa-contract/demo/修订测试.docx"
outputFile1 = "/home/ccran/lufa-contract/demo/新增修订.txt"
outputFile2 = "/home/ccran/lufa-contract/demo/删除修订.txt"
# 创建Document类的对象
document = Document()
# 加载Word文档
document.LoadFromFile(inputFile)
print(document.GetText())
# 初始化列表来收集文本片段
insert_revisions = []
delete_revisions = []
# 遍历文档所有章节
for k in range(document.Sections.Count):
sec = document.Sections.get_Item(k)
# 遍历章节中的正文元素
for m in range(sec.Body.ChildObjects.Count):
# Check if the item is a Paragraph
docItem = sec.Body.ChildObjects.get_Item(m)
if isinstance(docItem, Paragraph):
para = docItem
# 判断段落是否为插入修订
if para.IsInsertRevision:
# 获取修订的类型,作者及其关联的内容
insRevison = para.InsertRevision
insType = insRevison.Type
insAuthor = insRevison.Author
insert_revisions.append(f"Revision Type: {insType.name}\n")
insert_revisions.append(f"Revision Author: {insAuthor}\n")
insert_revisions.append(f"Insertion Text: {para.Text}\n\n")
# 判断段落是否为删除修订
elif para.IsDeleteRevision:
# 获取修订的类型,作者及其关联的内容
delRevison = para.DeleteRevision
delType = delRevison.Type
delAuthor = delRevison.Author
delete_revisions.append(f"Revision Type:: {delType.name}\n")
delete_revisions.append(f"Revision Author: {delAuthor}\n")
delete_revisions.append(f"Deletion Text: {para.Text}\n\n")
# 如果段落没有修订,则遍历段落中的元素
else:
for j in range(para.ChildObjects.Count):
obj = para.ChildObjects.get_Item(j)
if isinstance(obj, TextRange):
textRange = obj
# 判断textrange是否为插入修订
if textRange.IsInsertRevision:
# 获取修订的类型,作者及其关联的内容
insRevison = textRange.InsertRevision
insType = insRevison.Type
insAuthor = insRevison.Author
insert_revisions.append(f"Revision Type: {insType.name}\n")
insert_revisions.append(f"Revision Author: {insAuthor}\n")
insert_revisions.append(
f"Insertion Text: {textRange.Text}\n"
)
# 判断textrange是否为删除修订
elif textRange.IsDeleteRevision:
# 获取修订的类型,作者及其关联的内容
delRevison = textRange.DeleteRevision
delType = delRevison.Type
delAuthor = delRevison.Author
delete_revisions.append(f"Revision Type: {delType.name}\n")
delete_revisions.append(f"Revision Author: {delAuthor}\n")
delete_revisions.append(
f"Deletion Text: {textRange.Text}\n"
)
# 写出修订信息到文件
WriteAllText(outputFile1, "".join(insert_revisions))
WriteAllText(outputFile2, "".join(delete_revisions))
# 释放资源
document.Dispose()
......@@ -394,7 +394,9 @@ def merge_segment_findings(payload: MergerRequest) -> MergerResponse:
unqualified_findings = [
f for f in segment_findings if (f.result or "").strip() == "不合格"
]
merged_result = merger_tool.run([f.__dict__ for f in unqualified_findings])
merged_result = merger_tool.run(
[f.__dict__ for f in unqualified_findings], merge_mode="rule"
)
merged_findings = merged_result.get("findings", []) or []
for f in merged_findings:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment