Commit 1152e468 by ccran

feat: add skills;

parent 37f636e2
......@@ -14,7 +14,7 @@
!**/*.xls
!workflow/**
!demo/**
!skills/**
!README.md
# Keep this file tracked
......
......@@ -50,10 +50,10 @@ MAX_WORKERS = 10
FILE_SUFFIX = "-审核批注"
## 关键参数**
use_non_fastgpt_llm = False
use_non_fastgpt_llm = True
use_lufa = False
use_jp_machine = True
debug_mode = True
debug_mode = False
## 关键参数**
max_model_len = 131072
......
No preview for this file type
#!/usr/bin/env python3
"""Compatibility dispatcher for split common CLIs."""
from __future__ import annotations
import argparse, subprocess, sys
from pathlib import Path
TEXT_COMMANDS = {"random-str", "format-now", "extract-url-file", "adjust-single-chunk-size", "extract-json", "remove-duplicates-by-key", "extract-drop-json-part", "group-chunk-by-len"}
FILE_COMMANDS = {"url-replace-fastgpt", "download", "upload", "fastgpt-chat"}
def script(name: str) -> str:
return str(Path(__file__).resolve().with_name(name))
def main(argv: list[str] | None = None) -> int:
args = list(sys.argv[1:] if argv is None else argv)
if not args or args[0] in {"-h", "--help"}:
p = argparse.ArgumentParser(description="Compatibility dispatcher for json_text_tool.py and file_chat_tool.py")
p.add_argument("command", nargs="?", choices=sorted(TEXT_COMMANDS | FILE_COMMANDS)); p.print_help()
print("\nText/JSON commands:", ", ".join(sorted(TEXT_COMMANDS)))
print("File/chat commands:", ", ".join(sorted(FILE_COMMANDS)))
return 0
if args[0] in TEXT_COMMANDS:
target = script("json_text_tool.py")
elif args[0] in FILE_COMMANDS:
target = script("file_chat_tool.py")
else:
print(f"unknown command: {args[0]}", file=sys.stderr); return 2
return subprocess.call([sys.executable, target, *args])
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone file and FastGPT chat utility CLI."""
from __future__ import annotations
import argparse, json, mimetypes, random, re, string, sys, time, urllib.error, urllib.parse, urllib.request
from pathlib import Path
from typing import Any
FASTGPT = "http://172.21.107.45:3030"
BACKEND = "http://172.21.107.45:1122"
OUTER = "https://172.21.107.45:48080"
def rand(n: int = 8) -> str:
return "".join(random.choice(string.ascii_lowercase) for _ in range(n))
def url_replace_fastgpt(origin: str, base_fastgpt_url: str = FASTGPT) -> str:
return origin if origin.startswith(("http:", "https:")) else base_fastgpt_url + origin
def basename(name: str) -> str:
return Path(urllib.parse.unquote(name.strip().strip('"')).replace("\\", "/")).name or "downloaded_file"
def resolve_name(url: str, headers: dict[str, str]) -> str:
cd = headers.get("content-disposition", "") or headers.get("Content-Disposition", "")
for pat in [r"filename\*=(?:UTF-8''|utf-8'')?([^;]+)", r'filename="?([^";]+)"?']:
m = re.search(pat, cd)
if m:
return basename(m.group(1))
return basename(urllib.parse.urlparse(url).path)
def download_file(url: str, path: str, input_url_to_inner: bool = True, base_fastgpt_url: str = FASTGPT, base_backend_url: str = BACKEND, outer_backend_url: str = OUTER) -> str | None:
if input_url_to_inner and not url.startswith(("http:", "https:")):
url = base_fastgpt_url + url
if input_url_to_inner:
url = url.replace(outer_backend_url, base_backend_url)
try:
with urllib.request.urlopen(urllib.request.Request(url, method="GET"), timeout=120) as resp:
target = Path(path)
if target.exists() and target.is_dir():
target = target / resolve_name(url, dict(resp.headers))
target.parent.mkdir(parents=True, exist_ok=True); target.write_bytes(resp.read()); return str(target)
except urllib.error.HTTPError as exc:
print(f"{url}文件下载失败. HTTP Status Code: {exc.code}", file=sys.stderr); return None
def post_json(url: str, data: dict[str, Any], headers: dict[str, str] | None = None, timeout: int = 120) -> str:
req = urllib.request.Request(url, data=json.dumps(data, ensure_ascii=False).encode(), headers={"Content-Type": "application/json", **(headers or {})}, method="POST")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="replace")
def multipart(path: str) -> tuple[bytes, str]:
p = Path(path); boundary = f"----common-tool-{int(time.time() * 1000)}-{rand()}"
ctype = mimetypes.guess_type(p.name)[0] or "application/octet-stream"
body = bytearray(f'--{boundary}\r\nContent-Disposition: form-data; name="file"; filename="{p.name}"\r\nContent-Type: {ctype}\r\n\r\n'.encode())
body.extend(p.read_bytes()); body.extend(f"\r\n--{boundary}--\r\n".encode()); return bytes(body), boundary
def upload_file(path: str, base_backend_url: str = BACKEND, username: str = "admin", password: str = "admin@jpai.com") -> str:
token = (json.loads(post_json(f"{base_backend_url}/admin-api/system/auth/login", {"username": username, "password": password})).get("data") or {}).get("accessToken")
if not token:
raise RuntimeError("后端登录异常")
body, boundary = multipart(path)
req = urllib.request.Request(f"{base_backend_url}/admin-api/infra/file/upload", data=body, headers={"Content-Type": f"multipart/form-data; boundary={boundary}", "Authorization": token}, method="POST")
with urllib.request.urlopen(req, timeout=120) as resp:
text = resp.read().decode("utf-8", errors="replace")
res = json.loads(text).get("data")
if not res:
raise RuntimeError(f"上传{path}失败 Response text: {text}")
return res
def fastgpt_openai_chat(url: str, token: str, model: str, chat_id: str, file_url: str, text: str, stream: bool = True) -> str:
data = {"chatId": chat_id, "messages": [{"role": "user", "content": [{"type": "file_url", "name": "文件", "url": file_url}, {"type": "text", "text": text}]}], "model": model, "stream": stream}
req = urllib.request.Request(url, data=json.dumps(data, ensure_ascii=False).encode(), headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"}, method="POST")
with urllib.request.urlopen(req, timeout=60000) as resp:
if not stream:
rsp = json.loads(resp.read().decode("utf-8", errors="replace")); return rsp.get("choices", [{}])[0].get("message", {}).get("content", "")
out = ""
for raw in resp:
line = raw.decode("utf-8", errors="replace").strip()
if not line or line == "data: [DONE]": continue
try:
out += json.loads(line[6:] if line.startswith("data: ") else line).get("choices", [{}])[0].get("delta", {}).get("content", "")
except Exception:
pass
return out
def main() -> int:
p = argparse.ArgumentParser(description="File/FastGPT utilities"); sub = p.add_subparsers(dest="cmd", required=True)
a = sub.add_parser("url-replace-fastgpt"); a.add_argument("origin"); a.add_argument("--base-fastgpt-url", default=FASTGPT)
a = sub.add_parser("download"); a.add_argument("url"); a.add_argument("path"); a.add_argument("--base-fastgpt-url", default=FASTGPT); a.add_argument("--base-backend-url", default=BACKEND); a.add_argument("--outer-backend-url", default=OUTER); a.add_argument("--no-input-url-to-inner", action="store_true")
a = sub.add_parser("upload"); a.add_argument("path"); a.add_argument("--base-backend-url", default=BACKEND); a.add_argument("--username", default="admin"); a.add_argument("--password", default="admin@jpai.com")
a = sub.add_parser("fastgpt-chat"); a.add_argument("--url", required=True); a.add_argument("--token", required=True); a.add_argument("--model", required=True); a.add_argument("--chat-id", required=True); a.add_argument("--file-url", required=True); a.add_argument("--text", required=True); a.add_argument("--no-stream", action="store_true")
x = p.parse_args()
if x.cmd == "url-replace-fastgpt": print(url_replace_fastgpt(x.origin, x.base_fastgpt_url))
elif x.cmd == "download": print(download_file(x.url, x.path, not x.no_input_url_to_inner, x.base_fastgpt_url, x.base_backend_url, x.outer_backend_url))
elif x.cmd == "upload": print(upload_file(x.path, x.base_backend_url, x.username, x.password))
elif x.cmd == "fastgpt-chat": print(fastgpt_openai_chat(x.url, x.token, x.model, x.chat_id, x.file_url, x.text, not x.no_stream))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone JSON/text utility CLI."""
from __future__ import annotations
import argparse, json, random, re, string, sys
from datetime import datetime
from pathlib import Path
from typing import Any
MIN_SIZE, MAX_SIZE, MAX_PAGE = 2000, 100000, 10
def random_str(n: int = 5) -> str:
return "".join(random.choice(string.ascii_lowercase) for _ in range(n)) if n > 26 else "".join(random.sample(string.ascii_lowercase, n))
def format_now() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def extract_url_file(url: str, formats: list[str]) -> str:
pat = "|".join(r"[\u4e00-\u9fa5()()0-9\w-]+" + re.escape(f) for f in formats)
m = re.search(pat, url)
if not m:
raise RuntimeError(f"{formats} not found in url:{url}")
return m.group()
def adjust_single_chunk_size(length: int, max_page: int = MAX_PAGE, min_size: int = MIN_SIZE, max_size: int = MAX_SIZE) -> int:
return max(min_size, min(length // max_page, max_size))
def _loads(text: str) -> Any:
try:
import json_repair # type: ignore
return json_repair.loads(text, strict=False)
except ImportError:
return json.loads(text)
def extract_json(text: str) -> list[Any]:
def add(candidate: str, out: list[Any]) -> bool:
s = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", (candidate or "").strip())
if not s:
return False
try:
obj = _loads(s)
except Exception:
return False
out.extend(obj if isinstance(obj, list) else [obj]); return True
out: list[Any] = []
for m in re.findall(r"```json([\s\S]*?)```", text or "", re.DOTALL):
add(m, out)
if out or add(text or "", out):
return out
for m in re.findall(r"```([\s\S]*?)```", text or "", re.DOTALL):
if add(m, out):
return out
for m in re.findall(r"(\{[\s\S]*?\}|\[[\s\S]*?\])", text or "", re.DOTALL):
add(m, out)
return out
def remove_duplicates_by_key(items: list[dict[str, Any]], key: str) -> list[dict[str, Any]]:
out, seen = [], []
for item in sorted(items, key=lambda x: len(str(x.get(key, ""))), reverse=True):
v = str(item.get(key, ""))
if not any(v in s for s in seen):
seen.append(v); out.append(item)
return out
def group_chunk_by_len(items: list[dict[str, Any]], key: str, chunk_len: int) -> list[list[dict[str, Any]]]:
groups, current, acc = [], [], 0
for item in items:
n = len(str(item.get(key, "")))
if current and acc + n > chunk_len:
groups.append(current); current, acc = [], 0
current.append(item); acc += n
return groups + ([current] if current else [])
def read_json_arg(value: str) -> Any:
p = Path(value)
return json.loads(p.read_text(encoding="utf-8")) if p.exists() else json.loads(value)
def main() -> int:
p = argparse.ArgumentParser(description="JSON/text utilities"); sub = p.add_subparsers(dest="cmd", required=True)
a = sub.add_parser("random-str"); a.add_argument("-l", "--length", type=int, default=5)
sub.add_parser("format-now")
a = sub.add_parser("extract-url-file"); a.add_argument("url"); a.add_argument("formats", nargs="+")
a = sub.add_parser("adjust-single-chunk-size"); a.add_argument("all_text_len", type=int); a.add_argument("--max-chunk-page", type=int, default=MAX_PAGE); a.add_argument("--min-single-chunk-size", type=int, default=MIN_SIZE); a.add_argument("--max-single-chunk-size", type=int, default=MAX_SIZE)
a = sub.add_parser("extract-json"); a.add_argument("text", nargs="?")
a = sub.add_parser("remove-duplicates-by-key"); a.add_argument("json_list"); a.add_argument("key")
a = sub.add_parser("extract-drop-json-part"); a.add_argument("text", nargs="?")
a = sub.add_parser("group-chunk-by-len"); a.add_argument("json_list"); a.add_argument("key"); a.add_argument("chunk_len", type=int)
x = p.parse_args()
if x.cmd == "random-str": print(random_str(x.length))
elif x.cmd == "format-now": print(format_now())
elif x.cmd == "extract-url-file": print(extract_url_file(x.url, x.formats))
elif x.cmd == "adjust-single-chunk-size": print(adjust_single_chunk_size(x.all_text_len, x.max_chunk_page, x.min_single_chunk_size, x.max_single_chunk_size))
elif x.cmd == "extract-json": print(json.dumps(extract_json(x.text if x.text is not None else sys.stdin.read()), ensure_ascii=False, indent=2))
elif x.cmd == "remove-duplicates-by-key": print(json.dumps(remove_duplicates_by_key(read_json_arg(x.json_list), x.key), ensure_ascii=False, indent=2))
elif x.cmd == "extract-drop-json-part": print(re.sub(r"```json([\s\S]*?)```", "", x.text if x.text is not None else sys.stdin.read(), flags=re.DOTALL).strip())
elif x.cmd == "group-chunk-by-len": print(json.dumps(group_chunk_by_len(read_json_arg(x.json_list), x.key, x.chunk_len), ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone contract review orchestration CLI."""
from __future__ import annotations
import argparse, json, subprocess, sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[3]
DOC = ROOT / "skills/doc-excel-skill/scripts/doc_tool.py"
XLS = ROOT / "skills/doc-excel-skill/scripts/excel_tool.py"
LLM = ROOT / "skills/review-llm-skill/scripts/review_llm_skill.py"
COLS = {"id": "ID", "title": "审查项", "rule": "审查规则", "level": "风险等级", "triggers": "触发词", "suggestion_template": "建议模板", "case": "案例", "summary": "摘要项"}
def sh(args: list[str], text: bool = False) -> Any:
out = subprocess.check_output([sys.executable, *args], text=True)
return out if text else json.loads(out or "null")
def dump(path: Path, data: Any) -> str:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
return str(path)
def norm_rules(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [{k: r.get(v, "") for k, v in COLS.items()} for r in rows if isinstance(r, dict)]
def titles(items: list[dict[str, Any]]) -> list[str]:
return [str(i.get("title") or i.get("rule_title") or "").strip() for i in items if str(i.get("title") or i.get("rule_title") or "").strip()]
def pick_rules(all_rules: list[dict[str, Any]], selected: list[str]) -> list[dict[str, Any]]:
if not selected:
return all_rules
selected_set = set(selected)
return [r for r in all_rules if r.get("title") in selected_set]
def llm(tool: str, **kw: Any) -> dict[str, Any]:
args = [str(LLM), tool]
for key, value in kw.items():
flag = "--" + key.replace("_", "-")
if isinstance(value, (dict, list)):
args += [flag, json.dumps(value, ensure_ascii=False)]
elif value is not None:
args += [flag, str(value)]
return sh(args)
def route_segment(text: str, rules: list[dict[str, Any]], party_role: str, mode: str) -> tuple[list[str], list[str], list[dict[str, Any]]]:
if mode == "none":
return titles(rules), sorted({r.get("summary", "") for r in rules if r.get("summary")}), rules
res = llm("router", segment_text=text, rules=rules, party_role=party_role)
items = res.get("selected_items") or res.get("routed_rules") or []
sel_titles = titles(items)
routed = pick_rules(rules, sel_titles)
summaries = sorted({r.get("summary", "") for r in routed if r.get("summary")})
return titles(routed), summaries, routed
def reflect_findings(rules: list[dict[str, Any]], facts: list[dict[str, Any]], findings: list[dict[str, Any]], party_role: str) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for rule in rules:
name = rule.get("title", "")
scoped = [f for f in findings if f.get("rule_title") == name]
if not scoped:
continue
summary = rule.get("summary", "")
fact_scope = [{summary: f.get(summary)} for f in facts if summary and isinstance(f, dict) and summary in f]
res = llm("reflect", rule=rule, findings=scoped, facts=fact_scope, party_role=party_role)
out.extend(res.get("final_findings") or res.get("findings") or [])
return out
def merge_by_segment(findings: list[dict[str, Any]]) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
segs = sorted({int(f.get("segment_id", 0) or 0) for f in findings})
for seg in segs:
items = [f for f in findings if int(f.get("segment_id", 0) or 0) == seg and str(f.get("result", "")).strip() == "不合格"]
if not items:
continue
res = llm("merger", payload=items)
merged = res.get("findings") or []
for item in merged:
item.setdefault("segment_id", seg)
out.extend(merged)
return out
def run(file: Path, rules_path: Path, ruleset: str, out_dir: Path, party_role: str, route_by: str, reflect: bool, merge_mode: str, max_chunks: int, dry_run: bool) -> dict[str, Any]:
out_dir.mkdir(parents=True, exist_ok=True)
info = sh([str(DOC), "doc-load", str(file)])
rows = sh([str(XLS), "load-excel", str(rules_path), "--sheet-name", ruleset])
rules, chunk_ids = norm_rules(rows), info.get("chunk_ids", [])
if max_chunks:
chunk_ids = chunk_ids[:max_chunks]
memory = {"file": str(file), "ruleset": ruleset, "segment_ids": [i + 1 for i in chunk_ids], "rule_titles": titles(rules), "summary_names": sorted({r.get("summary", "") for r in rules if r.get("summary")}), "facts": [], "merge_facts": [], "findings": {"review": [], "reflect": [], "merge": []}}
if dry_run:
dump(out_dir / "memory.json", memory); return memory
for cid in chunk_ids:
text = sh([str(DOC), "doc-chunk", str(file), str(cid)], text=True)
routed_titles, routed_summaries, routed_rules = route_segment(text, rules, party_role, route_by)
summary = llm("summary", segment_text=text, rules=routed_rules, party_role=party_role)
fact = summary.get("facts", summary)
if isinstance(fact, dict):
memory["facts"].append(fact)
review = llm("review", segment_text=text, rules=routed_rules, party_role=party_role)
for f in review.get("findings", []):
f.setdefault("segment_id", cid); memory["findings"]["review"].append(f)
memory.setdefault("routes", []).append({"segment_id": cid + 1, "routed_rule_titles": routed_titles, "routed_summary_names": routed_summaries})
if reflect:
memory["findings"]["reflect"] = reflect_findings(rules, memory["facts"], memory["findings"]["review"], party_role)
source = memory["findings"]["reflect"] or memory["findings"]["review"]
memory["findings"]["merge"] = merge_by_segment(source)
fact_res = llm("fact-merge", facts=memory["facts"], summary_names=memory["summary_names"], merge_mode=merge_mode)
memory["merge_facts"] = [fact_res.get("merge_facts", {})]
mem_path = Path(dump(out_dir / "memory.json", memory))
sh([str(XLS), "export-findings-excel", "@" + str(mem_path), str(out_dir / "review.xlsx"), "--finding-key", "merge" if memory["findings"]["merge"] else ("reflect" if memory["findings"]["reflect"] else "review")], text=True)
if file.suffix.lower() == ".docx":
sh([str(DOC), "docx-add-comments", str(file), "@" + str(mem_path), str(out_dir / "commented.docx"), "--finding-key", "merge" if memory["findings"]["merge"] else ("reflect" if memory["findings"]["reflect"] else "review")], text=True)
return memory
def main() -> int:
p = argparse.ArgumentParser(description="Contract review flow orchestrator")
p.add_argument("file"); p.add_argument("--rules", default=str(ROOT / "data/rules.xlsx")); p.add_argument("--ruleset", default="通用"); p.add_argument("--out-dir", default="outputs/review-flow")
p.add_argument("--party-role", default=""); p.add_argument("--route-by", choices=["rule", "none"], default="rule"); p.add_argument("--no-reflect", action="store_true"); p.add_argument("--merge-mode", choices=["llm", "rule"], default="rule")
p.add_argument("--max-chunks", type=int, default=0); p.add_argument("--dry-run", action="store_true")
a = p.parse_args()
run(Path(a.file), Path(a.rules), a.ruleset, Path(a.out_dir), a.party_role, a.route_by, not a.no_reflect, a.merge_mode, a.max_chunks, a.dry_run)
print("输出目录:", Path(a.out_dir).resolve())
return 0
if __name__ == "__main__":
raise SystemExit(main())
---
name: doc-excel-skill
description: 文档/表格工具 Skill。用于将 Word/PDF 文档解析为 txt,并读取、修改 Excel,以及把 JSON 写入 Excel sheet。
---
# 文档与 Excel Skill
## 定位
`doc-excel-skill` 负责文件解析和表格读写。它把 Word/PDF 文件转换为 UTF-8 `.txt`,把 Excel sheet 读取为结构化 JSON,也可以把 JSON 数据写入 Excel sheet。
该 Skill 不直接调用 LLM,也不实现业务审查逻辑。它提供的是稳定的文件 I/O 能力,供 `review-llm-skill``contract-review-flow-skill` 组合使用。
## 适用场景
-`.docx``.doc``.wps``.pdf` 合同文件解析为 `.txt`
- 读取 `data/rules.xlsx` 中的规则表。
- 列出 Excel sheets,并按条件 dict 搜索 sheet 行数据。
- 按表头 key 追加 Excel 行,或按条件 dict 更新、删除 Excel 行。
- 按列查找 Excel 单元格,或将 Excel 行映射为指定字段。
- 将 JSON 转换为 Excel 的某个 sheet。
## 工具文件
- `scripts/doc_tool.py`:基于 Spire 的 Word/PDF 转 txt CLI。
- `scripts/excel_tool.py`:Excel 读取、sheet 查询、行增删改、JSON 写入 sheet。
## 依赖说明
- Word / PDF 文本解析依赖 Spire:PDF 使用 `PdfDocument``PdfTextExtractOptions``PdfTextExtractor`;Word 使用 `Document.GetText()`
- 如果安装了 `openpyxl`,Excel 读取和写入会优先使用它。
- 如果没有 `openpyxl`,部分 `.xlsx` 读取会退回标准库实现,但复杂写入能力会受限。
- 当前 `doc_tool.py` 只做可提取文本解析,不做 OCR、分块和批注写入。
## 主要命令
- `doc_tool.py <file> [output]`:将 Word/PDF 解析为 txt;未传 `output` 时默认输出到同名 `.txt`
- `doc_tool.py doc-to-txt <file> [output]`:兼容旧调用形式,行为同上。
- `load-excel`:读取 Excel sheet 为 JSON。
- `list-sheets`:列出工作簿中的 sheet。
- `search_rows`:读取某个 sheet,按首行表头作为 key、每一行作为 dict;传入条件 dict 时返回所有匹配行,传 `{}` 时返回全部行。
- `append-row`:传入一个 dict,按 key 匹配表头列,并追加到指定 sheet 末尾。
- `update-rows`:传入条件 dict 和更新 dict,更新所有匹配行。
- `delete-rows`:传入条件 dict,删除所有匹配行。
- `find-value`:按某列匹配值,再返回另一列的值。
- `map-rows`:按字段映射读取 Excel 行。
- `json-to-sheet`:把 JSON 写入 Excel 的指定 sheet;dict key 会直接写入第一行作为表头。
## 输入输出
- 文档解析命令输入本地 Word/PDF 文件路径,输出 UTF-8 `.txt` 文件路径。
- Excel 读取类命令输入 `.xlsx` 路径和 sheet/列参数,输出 JSON。
- Excel 写入类命令输入 `.xlsx` 路径、sheet 名和 JSON dict,直接保存原文件。
- 行搜索、更新和删除的条件 dict 支持多个字段,所有字段都相等时才算匹配。
- `json-to-sheet` 输入 JSON 和输出 `.xlsx` 路径;如果目标 sheet 已存在,会替换该 sheet;第一行固定写表头,第二行开始写数据。
- 支持使用 `@file.json` 形式读取较大的 JSON 参数。
## 使用示例
查看帮助:
```bash
python skills/doc-excel-skill/scripts/doc_tool.py --help
python skills/doc-excel-skill/scripts/excel_tool.py --help
```
解析 Word/PDF 合同为同名 txt:
```bash
python skills/doc-excel-skill/scripts/doc_tool.py demo/example.docx
python skills/doc-excel-skill/scripts/doc_tool.py demo/example.pdf
```
解析 Word/PDF 合同到指定 txt:
```bash
python skills/doc-excel-skill/scripts/doc_tool.py demo/example.docx outputs/example.txt
python skills/doc-excel-skill/scripts/doc_tool.py doc-to-txt demo/example.pdf outputs/example.txt
```
读取规则 Excel:
```bash
python skills/doc-excel-skill/scripts/excel_tool.py load-excel data/rules.xlsx \
--sheet-name 通用
```
列出所有 sheet:
```bash
python skills/doc-excel-skill/scripts/excel_tool.py list-sheets data/rules.xlsx
```
按条件搜索指定 sheet 的行,传 `{}` 返回全部行:
```bash
python skills/doc-excel-skill/scripts/excel_tool.py search_rows data/rules.xlsx \
'{"审查项":"当事人审查"}' \
--sheet-name 通用
```
追加一行:
```bash
python skills/doc-excel-skill/scripts/excel_tool.py append-row data/rules.xlsx \
'{"审查项":"测试","风险等级":"中"}' \
--sheet-name 通用
```
更新一行:
```bash
python skills/doc-excel-skill/scripts/excel_tool.py update-rows data/rules.xlsx \
'{"审查项":"测试","风险等级":"中"}' \
'{"风险等级":"高"}' \
--sheet-name 通用
```
删除一行:
```bash
python skills/doc-excel-skill/scripts/excel_tool.py delete-rows data/rules.xlsx \
'{"审查项":"测试","风险等级":"高"}' \
--sheet-name 通用
```
将 JSON 写入 Excel sheet:
```bash
python skills/doc-excel-skill/scripts/excel_tool.py json-to-sheet \
'[{"name":"张三","amount":100},{"name":"李四","amount":200}]' \
outputs/result.xlsx \
--sheet-name 明细
```
从 JSON 文件写入 Excel sheet:
```bash
python skills/doc-excel-skill/scripts/excel_tool.py json-to-sheet \
@data.json \
outputs/result.xlsx \
--sheet-name 数据
```
## 在合同审查流程中的位置
该 Skill 主要对应文档解析和 Excel 数据读写部分。它可以在处理前把 Word/PDF 转换为 txt,也可以把 Excel sheet 和 JSON 数据在两种结构之间转换。
#!/usr/bin/env python3
"""Compatibility dispatcher for the split doc/excel CLIs."""
from __future__ import annotations
import argparse
import subprocess
import sys
from pathlib import Path
EXCEL_COMMANDS = {
"load-excel",
"list-sheets",
"find-value",
"map-rows",
"export-findings-excel",
"export-facts-excel",
}
DOC_COMMANDS = {
"doc-load",
"doc-ocr",
"doc-chunk",
"doc-info",
"doc-adjust-chunk-size",
"doc-text",
"docx-add-comments",
"process-string",
"is-messy-text",
}
def _script(name: str) -> str:
return str(Path(__file__).resolve().with_name(name))
def main(argv: list[str] | None = None) -> int:
args = list(sys.argv[1:] if argv is None else argv)
if not args or args[0] in {"-h", "--help"}:
parser = argparse.ArgumentParser(description="Compatibility dispatcher for doc_tool.py and excel_tool.py")
parser.add_argument("command", nargs="?", choices=sorted(EXCEL_COMMANDS | DOC_COMMANDS))
parser.print_help()
print("\nExcel commands:", ", ".join(sorted(EXCEL_COMMANDS)))
print("Doc commands:", ", ".join(sorted(DOC_COMMANDS)))
return 0
command = args[0]
if command in EXCEL_COMMANDS:
target = _script("excel_tool.py")
elif command in DOC_COMMANDS:
target = _script("doc_tool.py")
else:
print(f"unknown command: {command}", file=sys.stderr)
return 2
return subprocess.call([sys.executable, target, *args])
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone document CLI for text chunks and docx comments."""
"""Convert Word/PDF documents to UTF-8 txt files with Spire."""
from __future__ import annotations
import argparse, json, re, shutil, zipfile
from datetime import datetime, timezone
import argparse
import sys
from pathlib import Path
from typing import Any
from xml.etree import ElementTree as ET
MAX_CHUNK, MIN_CHUNK, MAX_PAGE = 100000, 2000, 10
NS = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main", "r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships", "rel": "http://schemas.openxmlformats.org/package/2006/relationships", "ct": "http://schemas.openxmlformats.org/package/2006/content-types"}
WORD_SUFFIXES = {".doc", ".docx", ".wps"}
PDF_SUFFIXES = {".pdf"}
def _load_json(v: str) -> Any:
return json.loads(Path(v[1:]).read_text(encoding="utf-8") if v.startswith("@") else v)
def _extract_pdf_text(path: str) -> str:
from spire.pdf import PdfDocument, PdfTextExtractOptions, PdfTextExtractor
def q(ns: str, tag: str) -> str:
return f"{{{NS[ns]}}}{tag}"
pdf = PdfDocument()
try:
pdf.LoadFromFile(path)
extract_options = PdfTextExtractOptions()
extract_options.IsExtractAllText = True
pages: list[str] = []
for page_idx in range(0, pdf.Pages.Count):
page = pdf.Pages[page_idx]
pages.append(PdfTextExtractor(page).ExtractText(extract_options))
return "\n".join(pages)
finally:
try:
pdf.Close()
except Exception:
pass
def parse_docx(path: str | Path) -> str:
with zipfile.ZipFile(path) as zf:
root = ET.fromstring(zf.read("word/document.xml"))
ps = []
for p in root.findall(".//w:p", NS):
parts = []
for n in p.iter():
if n.tag == q("w", "t"):
parts.append(n.text or "")
elif n.tag == q("w", "tab"):
parts.append("\t")
if "".join(parts):
ps.append("".join(parts))
return "\n".join(ps)
def _extract_word_text(path: str) -> str:
from spire.doc import Document
def read_text(path: str | Path) -> str:
p = Path(path)
if p.suffix.lower() == ".docx":
return parse_docx(p)
if p.suffix.lower() == ".pdf":
try:
import fitz # type: ignore
except ImportError as exc:
raise RuntimeError("PyMuPDF is required for PDF text extraction") from exc
pdf = fitz.open(p)
doc = Document()
try:
doc.LoadFromFile(path)
return doc.GetText()
finally:
try:
return "\n".join(page.get_text() for page in pdf)
finally:
pdf.close()
return p.read_text(encoding="utf-8", errors="replace")
def chunk_text(text: str, size: int) -> list[dict[str, Any]]:
return [{"text": text[i : i + size], "start": i, "end": min(i + size, len(text))} for i in range(0, len(text), size)] or [{"text": "", "start": 0, "end": 0}]
def adjust_size(text_len: int) -> int:
return max(MIN_CHUNK, min(text_len // MAX_PAGE, MAX_CHUNK))
def process_string(s: str) -> str:
if "\n" not in s:
return s
parts = s.split("\n")
if len(parts) == 2:
return parts[0] if len(parts[0]) >= len(parts[1]) else parts[1]
middle = parts[1:-1]
return max(middle or [p for p in parts if p], key=len, default="")
def is_messy_text(text: str, min_chars: int = 40) -> bool:
if not text or len(text) < min_chars:
return True
n = len(text)
cn = sum(1 for c in text if "\u4e00" <= c <= "\u9fff") / n
printable = sum(1 for c in text if c.isprintable()) / n
sym = sum(1 for c in text if not (("\u4e00" <= c <= "\u9fff") or c.isalnum() or c.isspace())) / n
longest = max((len(s) for s in re.findall(r"[^0-9A-Za-z\u4e00-\u9fff\s]+", text)), default=0)
return printable < 0.42 or sym > 0.5 or longest >= 15 or (cn < 0.1 and printable < 0.7)
def norm_findings(v: Any, key: str | None = None) -> list[dict[str, Any]]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
if not isinstance(v, dict):
return []
if "findings" in v:
x = v["findings"]
return norm_findings(x.get(key) if key and isinstance(x, dict) else x)
if key and key in v:
return norm_findings(v[key])
if any(k in v for k in ("rule_title", "issue", "suggestion")):
return [v]
return [x for items in v.values() for x in norm_findings(items)]
def p_text(p: ET.Element) -> str:
return "".join(n.text or "" for n in p.iter(q("w", "t")))
def comment_body(f: dict[str, Any]) -> str:
pairs = [("风险等级", f.get("risk_level") or f.get("level")), ("合格性", f.get("result")), ("问题", f.get("issue")), ("建议", f.get("suggestion"))]
return "\n".join(f"{k}:{v}" for k, v in pairs if v) or "合同审查提示"
def target_para(paras: list[ET.Element], f: dict[str, Any]) -> ET.Element | None:
original = str(f.get("original_text") or "").strip()
if original:
compact = re.sub(r"\s+", "", original)
for p in paras:
t = p_text(p)
if original in t or compact in re.sub(r"\s+", "", t):
return p
non_empty = [p for p in paras if p_text(p).strip()]
idx = max(int(f.get("segment_id") or 0), 0)
return non_empty[min(idx, len(non_empty) - 1)] if non_empty else (paras[0] if paras else None)
def ensure_rel(root: ET.Element) -> None:
rels = root.findall(f"{{{NS['rel']}}}Relationship")
if any(r.attrib.get("Type", "").endswith("/comments") for r in rels):
return
ids = [int(r.attrib["Id"][3:]) for r in rels if r.attrib.get("Id", "").startswith("rId") and r.attrib["Id"][3:].isdigit()]
r = ET.SubElement(root, f"{{{NS['rel']}}}Relationship", Id=f"rId{(max(ids) if ids else 0) + 1}")
r.set("Type", "http://schemas.openxmlformats.org/officeDocument/2006/relationships/comments"); r.set("Target", "comments.xml")
def ensure_ct(root: ET.Element) -> None:
if any(o.attrib.get("PartName") == "/word/comments.xml" for o in root.findall(f"{{{NS['ct']}}}Override")):
return
ET.SubElement(root, f"{{{NS['ct']}}}Override", PartName="/word/comments.xml", ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.comments+xml")
def add_comments(src: str, findings: Any, out: str, key: str | None = None, author: str = "合同审查智能体") -> str:
if Path(src).suffix.lower() != ".docx":
raise ValueError("docx-add-comments only supports .docx")
Path(out).parent.mkdir(parents=True, exist_ok=True)
fs = norm_findings(findings, key)
if not fs:
shutil.copyfile(src, out); return out
ET.register_namespace("w", NS["w"]); ET.register_namespace("r", NS["r"])
with zipfile.ZipFile(src) as zin:
files = {n: zin.read(n) for n in zin.namelist()}
doc = ET.fromstring(files["word/document.xml"]); paras = doc.findall(".//w:p", NS)
comments = ET.fromstring(files["word/comments.xml"]) if "word/comments.xml" in files else ET.Element(q("w", "comments"))
ids = [int(c.attrib.get(q("w", "id"), -1)) for c in comments.findall("w:comment", NS)]
cid = (max(ids) if ids else -1) + 1
for f in fs:
p = target_para(paras, f)
if p is None:
continue
c = ET.SubElement(comments, q("w", "comment")); c.set(q("w", "id"), str(cid)); c.set(q("w", "author"), author); c.set(q("w", "date"), datetime.now(timezone.utc).replace(microsecond=0).isoformat())
ET.SubElement(ET.SubElement(ET.SubElement(c, q("w", "p")), q("w", "r")), q("w", "t")).text = comment_body(f)
start = ET.Element(q("w", "commentRangeStart")); start.set(q("w", "id"), str(cid))
end = ET.Element(q("w", "commentRangeEnd")); end.set(q("w", "id"), str(cid))
ref_run = ET.Element(q("w", "r")); ET.SubElement(ref_run, q("w", "commentReference")).set(q("w", "id"), str(cid))
p.insert(0, start); p.append(end); p.append(ref_run); cid += 1
rel_path = "word/_rels/document.xml.rels"
rel = ET.fromstring(files[rel_path]) if rel_path in files else ET.Element(f"{{{NS['rel']}}}Relationships")
ct = ET.fromstring(files["[Content_Types].xml"]); ensure_rel(rel); ensure_ct(ct)
files.update({"word/document.xml": ET.tostring(doc, encoding="utf-8", xml_declaration=True), "word/comments.xml": ET.tostring(comments, encoding="utf-8", xml_declaration=True), rel_path: ET.tostring(rel, encoding="utf-8", xml_declaration=True), "[Content_Types].xml": ET.tostring(ct, encoding="utf-8", xml_declaration=True)})
with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zout:
for n, data in files.items():
zout.writestr(n, data)
return out
doc.Close()
except Exception:
pass
def extract_text(path: str) -> str:
suffix = Path(path).suffix.lower()
if suffix in PDF_SUFFIXES:
return _extract_pdf_text(path)
if suffix in WORD_SUFFIXES:
return _extract_word_text(path)
raise ValueError(f"unsupported file type: {suffix}")
def doc_to_txt(path: str, output: str | None = None) -> str:
text = extract_text(path)
output_path = Path(output) if output else Path(path).with_suffix(".txt")
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(text, encoding="utf-8")
return str(output_path)
def main() -> int:
p = argparse.ArgumentParser(description="Standalone document CLI"); sub = p.add_subparsers(dest="cmd", required=True)
for name in ["doc-load", "doc-text", "doc-adjust-chunk-size"]:
a = sub.add_parser(name); a.add_argument("file"); a.add_argument("--max-single-chunk-size", type=int, default=MAX_CHUNK)
a = sub.add_parser("doc-chunk"); a.add_argument("file"); a.add_argument("chunk_id", type=int); a.add_argument("--max-single-chunk-size", type=int, default=MAX_CHUNK)
a = sub.add_parser("doc-info"); a.add_argument("file"); a.add_argument("chunk_id", type=int); a.add_argument("--max-single-chunk-size", type=int, default=MAX_CHUNK)
a = sub.add_parser("doc-ocr"); a.add_argument("file")
a = sub.add_parser("process-string"); a.add_argument("text")
a = sub.add_parser("is-messy-text"); a.add_argument("text")
a = sub.add_parser("docx-add-comments"); a.add_argument("input"); a.add_argument("findings"); a.add_argument("output"); a.add_argument("--finding-key"); a.add_argument("--author", default="合同审查智能体")
x = p.parse_args()
if x.cmd == "process-string": print(process_string(x.text)); return 0
if x.cmd == "is-messy-text": print(json.dumps(is_messy_text(x.text))); return 0
if x.cmd == "docx-add-comments": print(add_comments(x.input, _load_json(x.findings), x.output, x.finding_key, x.author)); return 0
if x.cmd == "doc-ocr": raise SystemExit("doc-ocr is not implemented here; use ocr-skill")
text = read_text(x.file); size = getattr(x, "max_single_chunk_size", MAX_CHUNK); chunks = chunk_text(text, size)
if x.cmd == "doc-load": print(json.dumps({"tool": "StandaloneDoc", "chunk_num": len(chunks), "chunk_ids": list(range(len(chunks))), "text_len": len(text)}, ensure_ascii=False, indent=2))
elif x.cmd == "doc-text": print(text)
elif x.cmd == "doc-adjust-chunk-size": print(adjust_size(len(text)))
elif x.cmd == "doc-chunk": print(chunks[x.chunk_id]["text"])
elif x.cmd == "doc-info":
c = chunks[x.chunk_id]; t = c["text"]; print(f"文件块id: {x.chunk_id + 1}\n文件块位置: 字符{c['start']}到{c['end']}\n文件块简述: [{t[:20]}]...到... [{t[-20:]}]\n")
parser = argparse.ArgumentParser(description="Convert Word/PDF documents to txt")
parser.add_argument("file")
parser.add_argument("output", nargs="?")
argv = sys.argv[1:]
if argv and argv[0] == "doc-to-txt":
argv = argv[1:]
args = parser.parse_args(argv)
print(doc_to_txt(args.file, args.output))
return 0
......
#!/usr/bin/env python3
"""Standalone Excel CLI for rule tables and review exports."""
"""Standalone Excel CLI for table reads, row edits, and JSON sheet writes."""
from __future__ import annotations
......@@ -106,70 +106,223 @@ def list_sheets(path: str) -> list[str]:
return [n for n, _ in _sheet_map(zf)]
def _norm_findings(v: Any, key: str | None = None) -> list[dict]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
if not isinstance(v, dict):
return []
if "findings" in v:
return _norm_findings(v["findings"].get(key) if key and isinstance(v["findings"], dict) else v["findings"])
if key and key in v:
return _norm_findings(v[key])
if any(k in v for k in ("rule_title", "issue", "suggestion")):
return [v]
return [x for items in v.values() for x in _norm_findings(items)]
def _load_workbook_for_write(path: str):
try:
import openpyxl # type: ignore
except ImportError as exc:
raise ExcelLoadError("openpyxl is required for write operations") from exc
return openpyxl.load_workbook(path)
def _get_sheet(wb: Any, sheet: str | None):
return wb[sheet] if sheet else wb.active
def _headers(ws: Any) -> list[str]:
return [str(cell.value).strip() if cell.value is not None else "" for cell in ws[1]]
def _header_map(ws: Any) -> dict[str, int]:
return {header: idx for idx, header in enumerate(_headers(ws), start=1) if header}
def _ensure_header_columns(ws: Any, keys: list[str]) -> dict[str, int]:
header_map = _header_map(ws)
next_col = ws.max_column + 1
for key in keys:
if key in header_map:
continue
ws.cell(row=1, column=next_col, value=key)
header_map[key] = next_col
next_col += 1
return header_map
def _row_dict_from_ws(ws: Any, row_idx: int, headers: list[str]) -> dict[str, Any]:
return {
header: ws.cell(row=row_idx, column=col_idx).value
for col_idx, header in enumerate(headers, start=1)
if header
}
def rows_as_dicts(path: str, sheet: str | None = None) -> list[dict[str, Any]]:
return search_rows(path, sheet, {})
def _require_dict(value: Any, name: str = "row") -> dict[str, Any]:
if not isinstance(value, dict):
raise ExcelLoadError(f"{name} must be a JSON object")
return value
def append_row(path: str, sheet: str | None, row_data: dict[str, Any]) -> dict[str, Any]:
wb = _load_workbook_for_write(path)
ws = _get_sheet(wb, sheet)
header_map = _ensure_header_columns(ws, [str(key) for key in row_data.keys()])
row_idx = ws.max_row + 1
for key, value in row_data.items():
ws.cell(row=row_idx, column=header_map[str(key)], value=value)
wb.save(path)
return {"file": path, "sheet": ws.title, "row": row_idx, "inserted": row_data}
def _norm_facts(v: Any, key: str) -> list[dict]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
if isinstance(v, dict):
x = v.get(key, v)
return [i for i in x if isinstance(i, dict)] if isinstance(x, list) else ([x] if isinstance(x, dict) else [])
return []
def _matched_row_indices(ws: Any, criteria: dict[str, Any]) -> list[int]:
header_map = _header_map(ws)
missing_keys = [key for key in criteria if key not in header_map]
if missing_keys:
raise ExcelLoadError(f"criteria keys not found in header: {', '.join(missing_keys)}")
matched: list[int] = []
for row_idx in range(2, ws.max_row + 1):
if all(ws.cell(row=row_idx, column=header_map[key]).value == value for key, value in criteria.items()):
matched.append(row_idx)
return matched
def search_rows(path: str, sheet: str | None, criteria: dict[str, Any]) -> list[dict[str, Any]]:
rows = [row for row in load_excel(path, sheet=sheet, header=True) if isinstance(row, dict)]
if not criteria:
return rows
if rows:
missing_keys = [key for key in criteria if key not in rows[0]]
if missing_keys:
raise ExcelLoadError(f"criteria keys not found in header: {', '.join(missing_keys)}")
return [row for row in rows if all(row.get(key) == value for key, value in criteria.items())]
def update_rows(path: str, sheet: str | None, criteria: dict[str, Any], row_data: dict[str, Any]) -> dict[str, Any]:
if not criteria:
raise ExcelLoadError("criteria must not be empty for update_rows")
wb = _load_workbook_for_write(path)
ws = _get_sheet(wb, sheet)
row_indices = _matched_row_indices(ws, criteria)
if not row_indices:
return {"file": path, "sheet": ws.title, "updated": False, "rows": []}
update_keys = [str(key) for key in row_data.keys()]
header_map = _ensure_header_columns(ws, update_keys)
for row_idx in row_indices:
for key, value in row_data.items():
key = str(key)
ws.cell(row=row_idx, column=header_map[key], value=value)
wb.save(path)
return {"file": path, "sheet": ws.title, "updated": True, "rows": row_indices, "count": len(row_indices)}
def delete_rows(path: str, sheet: str | None, criteria: dict[str, Any]) -> dict[str, Any]:
if not criteria:
raise ExcelLoadError("criteria must not be empty for delete_rows")
wb = _load_workbook_for_write(path)
ws = _get_sheet(wb, sheet)
row_indices = _matched_row_indices(ws, criteria)
if not row_indices:
return {"file": path, "sheet": ws.title, "deleted": False, "rows": []}
for row_idx in sorted(row_indices, reverse=True):
ws.delete_rows(row_idx, 1)
wb.save(path)
return {"file": path, "sheet": ws.title, "deleted": True, "rows": row_indices, "count": len(row_indices)}
def _cell(v: Any) -> str:
return json.dumps(v, ensure_ascii=False, indent=2) if isinstance(v, (dict, list)) else ("" if v is None else str(v))
def export_excel(findings: Any, out: str, facts: Any = None, merge: Any = None, key: str | None = None) -> str:
from openpyxl import Workbook # type: ignore
from openpyxl.styles import Alignment, Font # type: ignore
wb, headers = Workbook(), ["ID", "规则标题", "分段ID", "原文", "问题描述", "风险等级", "合格性", "建议"]
ws = wb.active; ws.title = "审查结果"; ws.append(headers)
for f in _norm_findings(findings, key):
ws.append([f.get("id", ""), f.get("rule_title", ""), f.get("segment_id", ""), f.get("original_text", ""), f.get("issue", ""), f.get("risk_level") or f.get("level", ""), f.get("result", ""), f.get("suggestion", "")])
for sheet, rows in {"合同事实": _norm_facts(facts or {}, "facts"), "合并事实": _norm_facts(merge or {}, "merge_facts")}.items():
w = wb.create_sheet(sheet); w.append(["提取项", "提取内容"])
for item in rows:
for k, v in item.items():
if str(k) not in {"_meta", "meta"}:
w.append([k, _cell(v)])
for w in wb.worksheets:
for c in w[1]:
c.font = Font(bold=True)
for row in w.iter_rows():
for c in row:
c.alignment = Alignment(vertical="top", wrap_text=True)
Path(out).parent.mkdir(parents=True, exist_ok=True); wb.save(out); return out
def _json_rows(data: Any) -> tuple[list[str], list[list[Any]]]:
if isinstance(data, dict):
headers = [str(key) for key in data.keys()]
return headers, [[data[key] for key in data.keys()]]
if not isinstance(data, list):
return ["value"], [[data]]
if not data:
return [], []
if all(isinstance(item, dict) for item in data):
headers: list[str] = []
for item in data:
for key in item.keys():
key = str(key)
if key not in headers:
headers.append(key)
rows = [[item.get(header) for header in headers] for item in data]
return headers, rows
if all(isinstance(item, (list, tuple)) for item in data):
max_len = max(len(item) for item in data)
headers = [f"col{idx + 1}" for idx in range(max_len)]
rows = [list(item) + [None] * (max_len - len(item)) for item in data]
return headers, rows
return ["value"], [[item] for item in data]
def json_to_sheet(data: Any, out: str, sheet: str = "Sheet1") -> str:
try:
from openpyxl import Workbook, load_workbook # type: ignore
from openpyxl.styles import Alignment, Font # type: ignore
except ImportError as exc:
raise ExcelLoadError("openpyxl is required for json-to-sheet") from exc
output_path = Path(out)
wb = load_workbook(output_path) if output_path.exists() else Workbook()
if sheet in wb.sheetnames:
old_sheet = wb[sheet]
old_index = wb.sheetnames.index(sheet)
wb.remove(old_sheet)
ws = wb.create_sheet(sheet, old_index)
else:
ws = wb.active if wb.active.title == "Sheet" and wb.active.max_row == 1 and wb.active.max_column == 1 and wb.active["A1"].value is None else wb.create_sheet(sheet)
ws.title = sheet
headers, rows = _json_rows(data)
for col_idx, header in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.font = Font(bold=True)
for row_idx, row in enumerate(rows, start=2):
for col_idx, value in enumerate(row, start=1):
ws.cell(row=row_idx, column=col_idx, value=_cell(value))
if headers or rows:
for row in ws.iter_rows(
min_row=1,
max_row=max(1, len(rows) + 1),
max_col=max(1, len(headers)),
):
for cell in row:
cell.alignment = Alignment(vertical="top", wrap_text=True)
output_path.parent.mkdir(parents=True, exist_ok=True)
wb.save(output_path)
return str(output_path)
def main() -> int:
p = argparse.ArgumentParser(description="Standalone Excel CLI"); sub = p.add_subparsers(dest="cmd", required=True)
a = sub.add_parser("load-excel"); a.add_argument("file"); a.add_argument("--sheet-name"); a.add_argument("--no-header", action="store_true")
a = sub.add_parser("list-sheets"); a.add_argument("file")
a = sub.add_parser("search_rows"); a.add_argument("file"); a.add_argument("criteria", nargs="?", default="{}"); a.add_argument("--sheet-name")
a = sub.add_parser("append-row"); a.add_argument("file"); a.add_argument("row"); a.add_argument("--sheet-name")
a = sub.add_parser("update-rows"); a.add_argument("file"); a.add_argument("criteria"); a.add_argument("row"); a.add_argument("--sheet-name")
a = sub.add_parser("delete-rows"); a.add_argument("file"); a.add_argument("criteria"); a.add_argument("--sheet-name")
a = sub.add_parser("find-value"); a.add_argument("file"); a.add_argument("key_column"); a.add_argument("key_value"); a.add_argument("value_column"); a.add_argument("--sheet-name")
a = sub.add_parser("map-rows"); a.add_argument("file"); a.add_argument("column_map"); a.add_argument("--sheet-name")
a = sub.add_parser("export-findings-excel"); a.add_argument("findings"); a.add_argument("output"); a.add_argument("--facts"); a.add_argument("--merge-facts"); a.add_argument("--finding-key")
a = sub.add_parser("export-facts-excel"); a.add_argument("facts"); a.add_argument("output"); a.add_argument("--merge-facts")
a = sub.add_parser("json-to-sheet"); a.add_argument("json_data"); a.add_argument("output"); a.add_argument("--sheet-name", default="Sheet1")
x = p.parse_args()
if x.cmd == "load-excel": _json(load_excel(x.file, x.sheet_name, not x.no_header))
elif x.cmd == "list-sheets": _json(list_sheets(x.file))
elif x.cmd == "search_rows": _json(search_rows(x.file, x.sheet_name, _require_dict(_load_json(x.criteria), "criteria")))
elif x.cmd == "append-row": _json(append_row(x.file, x.sheet_name, _require_dict(_load_json(x.row))))
elif x.cmd == "update-rows": _json(update_rows(x.file, x.sheet_name, _require_dict(_load_json(x.criteria), "criteria"), _require_dict(_load_json(x.row))))
elif x.cmd == "delete-rows": _json(delete_rows(x.file, x.sheet_name, _require_dict(_load_json(x.criteria), "criteria")))
elif x.cmd == "find-value": _json(next((r.get(x.value_column) for r in load_excel(x.file, x.sheet_name) if isinstance(r, dict) and r.get(x.key_column) == x.key_value), None))
elif x.cmd == "map-rows": _json([{k: r.get(v) for k, v in json.loads(x.column_map).items()} for r in load_excel(x.file, x.sheet_name) if isinstance(r, dict)])
elif x.cmd == "export-findings-excel": print(export_excel(_load_json(x.findings), x.output, _load_json(x.facts) if x.facts else None, _load_json(x.merge_facts) if x.merge_facts else None, x.finding_key))
elif x.cmd == "export-facts-excel": print(export_excel([], x.output, _load_json(x.facts), _load_json(x.merge_facts) if x.merge_facts else None))
elif x.cmd == "json-to-sheet": print(json_to_sheet(_load_json(x.json_data), x.output, x.sheet_name))
return 0
......
---
name: http-skill
description: HTTP 文件处理 Skill。用于下载远程文件和上传本地文件。上传下载优先使用此技能。
---
# HTTP 文件处理 Skill
## 定位
`http-skill` 负责合同审查流程中的网络文件搬运。它可以把接口传入的远程合同 URL 下载到本地,也可以把审查结果文件上传到后端服务。
该 Skill 使用 Python 标准库实现,不依赖 `requests``loguru``requests_toolbelt`,也不依赖仓库中的 `utils/``core/` 模块。
## 适用场景
- 从接口 URL 下载合同、PDF、Excel 或中间文件。
- 将本地生成的 Excel、docx 批注文件上传到后端文件服务。
- 在离线 CLI 流程中模拟 `main.py` 的文件下载和导出上传环节。
## 工具文件
- `scripts/http_util.py`:HTTP 文件处理 CLI。
## 主要命令
- `download`:下载 URL 到本地文件或目录。
- `upload`:上传本地文件到后端文件服务。
## 通用参数
- `--base-fastgpt-url`:FastGPT 内网基础地址,默认 `http://192.168.252.71:3030`
- `--base-backend-url`:后端内网基础地址,默认 `http://192.168.252.71:1122`
- `--outer-backend-url`:后端外网地址,默认 `https://218.77.58.8:48080`
- `--username`:后端管理员用户名,仅 `upload` 使用,默认 `admin`
- `--password`:后端管理员密码,仅 `upload` 使用,默认 `admin@jpai.com`
## 输入输出
- `download` 输入 URL 和可选目标路径;未传目标路径时默认下载到 `scripts/http_util.py` 同级目录的 `download/` 文件夹。
- `download` 的目标路径是目录时,会自动推断文件名。
- `upload` 输入本地文件路径和后端账号配置;输出后端接口响应。
- `upload` 输入相对路径时,会优先从 `scripts/http_util.py` 同级目录的 `download/` 文件夹查找,找不到再按当前工作目录查找。
## 使用示例
查看帮助:
```bash
python skills/http-skill/scripts/http_util.py --help
```
查看子命令帮助:
```bash
python skills/http-skill/scripts/http_util.py upload --help
python skills/http-skill/scripts/http_util.py download --help
```
上传本地文件:
```bash
python skills/http-skill/scripts/http_util.py upload demo/example.pdf
```
上传本地文件,并覆盖后端地址和账号密码:
```bash
python skills/http-skill/scripts/http_util.py upload \
--base-backend-url http://192.168.252.71:48081 \
--username admin \
--password 'admin@jpai.com' \
demo/example.pdf
```
下载相对路径到默认 `download/` 目录:
```bash
python skills/http-skill/scripts/http_util.py download /api/file/example.pdf
```
下载相对路径到指定目录:
```bash
python skills/http-skill/scripts/http_util.py download \
/api/file/example.pdf \
downloads
```
下载完整 URL,并替换外网后端地址:
```bash
python skills/http-skill/scripts/http_util.py download \
--outer-backend-url https://172.21.107.45:48080 \
--base-backend-url http://172.21.107.45:1122 \
https://172.21.107.45:48080/admin-api/infra/file/get/123 \
downloads/example.pdf
```
## 在合同审查流程中的位置
该 Skill 通常位于流程入口和出口:入口负责把远程合同下载成本地文件,出口负责把审查结果上传并生成可返回给调用方的文件地址。它不解析文档、不调用 LLM,也不保存审查记忆。
......@@ -7,23 +7,29 @@ import argparse, json, mimetypes, random, re, string, sys, time, urllib.error, u
from pathlib import Path
from urllib.parse import unquote, urlparse
DEFAULT_OUTER_BACKEND_URL = "https://172.21.107.45:48080"
DEFAULT_BASE_FASTGPT_URL = "http://172.21.107.45:3030"
DEFAULT_BASE_BACKEND_URL = "http://172.21.107.45:1122"
# DEFAULT_OUTER_BACKEND_URL = "https://172.21.107.45:48080"
# DEFAULT_BASE_FASTGPT_URL = "http://172.21.107.45:3030"
# DEFAULT_BASE_BACKEND_URL = "http://172.21.107.45:1122"
DEFAULT_OUTER_BACKEND_URL = "https://218.77.58.8:48080"
DEFAULT_BASE_FASTGPT_URL = "http://192.168.252.71:3030"
DEFAULT_BASE_BACKEND_URL = "http://192.168.252.71:1122"
DEFAULT_BACKEND_ADMIN_USERNAME = "admin"
DEFAULT_BACKEND_ADMIN_PASSWORD = "admin@jpai.com"
SCRIPT_DIR = Path(__file__).resolve().parent
DEFAULT_DOWNLOAD_DIR = SCRIPT_DIR / "download"
base_fastgpt_url, base_backend_url, outer_backend_url = DEFAULT_BASE_FASTGPT_URL, DEFAULT_BASE_BACKEND_URL, DEFAULT_OUTER_BACKEND_URL
backend_admin_username, backend_admin_password = DEFAULT_BACKEND_ADMIN_USERNAME, DEFAULT_BACKEND_ADMIN_PASSWORD
def configure_urls(fastgpt_url: str | None = None, backend_url: str | None = None, outer_url: str | None = None) -> None:
def _configure_urls(fastgpt_url: str | None = None, backend_url: str | None = None, outer_url: str | None = None) -> None:
global base_fastgpt_url, base_backend_url, outer_backend_url
base_fastgpt_url = fastgpt_url or base_fastgpt_url
base_backend_url = backend_url or base_backend_url
outer_backend_url = outer_url or outer_backend_url
def configure_login(username: str | None = None, password: str | None = None) -> None:
def _configure_login(username: str | None = None, password: str | None = None) -> None:
global backend_admin_username, backend_admin_password
backend_admin_username = username or backend_admin_username
backend_admin_password = password or backend_admin_password
......@@ -53,7 +59,16 @@ def _multipart_body(path: str, field: str = "file") -> tuple[bytes, str]:
return bytes(body), boundary
def upload_file(path, input_url_to_inner=True, output_url_to_inner=False) -> str:
def _resolve_upload_path(path: str | Path) -> Path:
p = Path(path).expanduser()
if p.is_absolute():
return p
download_path = DEFAULT_DOWNLOAD_DIR / p
return download_path if download_path.exists() else p
def upload_file(path) -> str:
path = _resolve_upload_path(path)
login = _post_json(f"{base_backend_url}/admin-api/system/auth/login", {"username": backend_admin_username, "password": backend_admin_password})
token = (json.loads(login).get("data") or {}).get("accessToken")
if not token:
......@@ -81,14 +96,13 @@ def _resolve_name(url: str, headers) -> str:
return _basename(urlparse(url).path)
def download_file(url, path, input_url_to_inner=True):
if input_url_to_inner and not url.startswith(("http:", "https:")):
def download_file(url, path=None):
if not url.startswith(("http:", "https:")):
url = base_fastgpt_url + url
if input_url_to_inner:
url = url.replace(outer_backend_url, base_backend_url)
url = url.replace(outer_backend_url, base_backend_url)
try:
with urllib.request.urlopen(urllib.request.Request(url, method="GET"), timeout=120) as resp:
target = Path(path)
target = Path(path).expanduser() if path else DEFAULT_DOWNLOAD_DIR / _resolve_name(url, resp.headers)
if target.exists() and target.is_dir():
target = target / _resolve_name(url, resp.headers)
target.parent.mkdir(parents=True, exist_ok=True); target.write_bytes(resp.read())
......@@ -98,37 +112,30 @@ def download_file(url, path, input_url_to_inner=True):
return None
def url_replace_fastgpt(origin: str):
return origin if origin.startswith(("http:", "https:")) else base_fastgpt_url + origin
def add_url_args(p: argparse.ArgumentParser) -> None:
def _add_url_args(p: argparse.ArgumentParser) -> None:
p.add_argument("--base-fastgpt-url", default=DEFAULT_BASE_FASTGPT_URL)
p.add_argument("--base-backend-url", default=DEFAULT_BASE_BACKEND_URL)
p.add_argument("--outer-backend-url", default=DEFAULT_OUTER_BACKEND_URL)
def build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="上传、下载或补全 FastGPT/后端文件 URL。")
def _build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="上传或下载文件。")
sub = p.add_subparsers(dest="command", required=True)
u = sub.add_parser("upload"); add_url_args(u); u.add_argument("--username", default=DEFAULT_BACKEND_ADMIN_USERNAME); u.add_argument("--password", default=DEFAULT_BACKEND_ADMIN_PASSWORD); u.add_argument("path")
d = sub.add_parser("download"); add_url_args(d); d.add_argument("url"); d.add_argument("path")
n = sub.add_parser("normalize-url"); add_url_args(n); n.add_argument("url")
u = sub.add_parser("upload"); _add_url_args(u); u.add_argument("--username", default=DEFAULT_BACKEND_ADMIN_USERNAME); u.add_argument("--password", default=DEFAULT_BACKEND_ADMIN_PASSWORD); u.add_argument("path")
d = sub.add_parser("download"); _add_url_args(d); d.add_argument("url"); d.add_argument("path", nargs="?")
return p
def main(argv: list[str] | None = None) -> int:
p = build_arg_parser(); a = p.parse_args(argv)
configure_urls(_strip(a.base_fastgpt_url), _strip(a.base_backend_url), _strip(a.outer_backend_url))
p = _build_arg_parser(); a = p.parse_args(argv)
_configure_urls(_strip(a.base_fastgpt_url), _strip(a.base_backend_url), _strip(a.outer_backend_url))
if a.command == "upload":
configure_login(a.username, a.password); print(upload_file(a.path)); return 0
_configure_login(a.username, a.password); print(upload_file(a.path)); return 0
if a.command == "download":
saved = download_file(a.url, a.path)
if saved is None:
return 1
print(saved); return 0
if a.command == "normalize-url":
print(url_replace_fastgpt(a.url)); return 0
p.error(f"unsupported command: {a.command}"); return 2
......
---
name: ocr-skill
description: OCR Skill。提供基于本地 Tesseract 的图片和 PDF 文本识别 CLI,以及可在 Python 中复用的 `TesseractOCRUtil` 类。
---
# OCR Skill
## 定位
`ocr-skill` 负责处理扫描件、图片和图片型 PDF 的文字识别。它适合在普通文本解析失败、PDF 文本乱码、合同是扫描版或截图版时使用。
该 Skill 只提供 OCR 能力,不负责合同审查、规则匹配、facts 提取或结果导出。识别出的文本可以继续交给 `doc-excel-skill``review-llm-skill` 或上层流程使用。
## 适用场景
- 识别合同截图或图片中的文字。
- 识别扫描版 PDF 每一页的文字。
- 在 PDF 直接解析结果为空或乱码时作为兜底方案。
- 在 Python 代码中直接调用 `TesseractOCRUtil` 做本地 OCR。
## 工具文件
- `scripts/ocr_tool.py`:OCR CLI 和 `TesseractOCRUtil` 类。
## 运行要求
- 本机需要安装 `tesseract` 可执行文件,并确保它在 `PATH` 中。
- 中文识别需要安装对应语言包,例如 `chi_sim`
- PDF 转图片依赖 `PyMuPDF`,包名为 `PyMuPDF`,导入名为 `fitz`
- OCR 质量受扫描清晰度、页眉页脚、表格线、印章和图片压缩影响。
## 主要命令
- `image`:识别单张图片,输出纯文本。
- `pdf`:把 PDF 每页转为图片后 OCR,输出每页识别结果 JSON。
## 输入输出
- 图片 OCR 输入图片路径,输出识别文本。
- PDF OCR 输入 PDF 路径,输出包含页码和文本的 JSON。
- 默认语言和 tesseract 可执行路径可通过命令参数覆盖,具体参数以 `--help` 为准。
## 使用示例
查看帮助:
```bash
python skills/ocr-skill/scripts/ocr_tool.py --help
```
识别图片:
```bash
python skills/ocr-skill/scripts/ocr_tool.py image demo/ocr.png
```
识别 PDF:
```bash
python skills/ocr-skill/scripts/ocr_tool.py pdf skills/ocr-skill/example/example.pdf
```
Python 中直接使用:
```python
from pathlib import Path
import sys
sys.path.append(str(Path("skills/ocr-skill/scripts").resolve()))
from ocr_tool import TesseractOCRUtil
util = TesseractOCRUtil(lang="chi_sim+eng", executable="tesseract")
text = util.ocr_image("/path/to/image.png")
print(text)
texts = util.ocr_result_pdf("/path/to/document.pdf")
print(texts)
```
## 在合同审查流程中的位置
该 Skill 通常作为文档解析阶段的兜底能力。当 `doc-excel-skill` 无法直接读取有效文本时,可以先用 OCR 得到页面文字,再进入分段、摘要、审查和导出流程。
---
name: review-llm-skill
description: LLM 动作执行模块。按动作选择系统提示词,并把传入的 rule dict 作为用户提示词发送给模型。
---
# Review LLM Skill
## 定位
`review-llm-skill` 是一个可单独执行的 LLM 动作模块。
它只做两件事:
- 根据 `action` 选择对应的系统提示词。
- 将调用方传入的 `rule` dict 和待处理文本拼成 user prompt。
本模块不负责读取规则、不选择规则、不编排流程、不保存状态。
## 支持动作
- `summary` / `segment_summary` / `摘要` / `总结`
- `router` / `segment_rule_router` / `路由`
- `review` / `审查`
- `reflect` / `反思` / `复核`
- `merge` / `merger` / `segment_merger` / `合并`
## 工具文件
- `scripts/segment_llm_action.py`:主入口,负责动作调度和 LLM 调用。
- `scripts/prompts.py`:系统提示词。
- `scripts/llm_tool.py`:OpenAI 兼容 LLM 调用与 JSON 解析。
- `scripts/config.py`:LLM 配置。
## 输入
- `action`:要执行的动作。
- `--rule`:任意字段的 JSON dict,支持 `@file.json`
- `--text`:直接输入待处理文本。
- `--input-file` + `--chunk-size` + `--chunk-index`:从文本文件中按字符数切片读取待处理文本。
- `--output`:输出目标;默认 `-` 表示直接打印,传入文件路径则追加到 JSON 数组文件。
## Python 接口
```python
from segment_llm_action import run_segment_llm_action
res = run_segment_llm_action(
action="review",
rule={
"title": "付款审查",
"rule": "检查付款期限是否明确",
"context": {"party_role": "甲方"},
},
text="甲方应于合同签订之日起30日内付款。",
)
print(res)
```
## CLI 示例
```bash
python skills/review-llm-skill/scripts/segment_llm_action.py review \
--rule '{"title":"付款审查","rule":"检查付款期限是否明确","context":{"party_role":"甲方"}}' \
--text '甲方应于合同签订之日起30日内付款。'
```
从文件读取指定分段:
```bash
python skills/review-llm-skill/scripts/segment_llm_action.py review \
--rule '{"title":"付款审查","rule":"检查付款期限是否明确"}' \
--input-file skills/review-llm-skill/example/downloaded_file.txt \
--chunk-size 2000 \
--chunk-index 0
```
追加输出到 JSON 文件:
```bash
python skills/review-llm-skill/scripts/segment_llm_action.py review \
--rule '{"title":"付款审查","rule":"检查付款期限是否明确"}' \
--input-file skills/review-llm-skill/example/downloaded_file.txt \
--chunk-size 2000 \
--chunk-index 0 \
--output outputs/review-results.json
```
只打印 messages,不调用模型:
```bash
python skills/review-llm-skill/scripts/segment_llm_action.py review \
--rule '{"title":"付款审查","rule":"检查付款期限是否明确"}' \
--text '甲方应于合同签订之日起30日内付款。' \
--print-messages
```
购销合同
供方:海南金盘智能科技股份有限公司 签订地点: 太原市
需方:山西长缘电力工程有限公司 签订时间: 2026年06月10日
一、货物(服务)名称、商标、型号、厂家、数量、金额 价格单位:(元)
货物名称
规格型号
生产厂家
单位
数量
单价(元)
总金额(元)
变压器
ZLSCLB-1000/10(6)
海南金盘
1
103100
103100
合计人民币金额: 大写 壹拾万零叁仟壹佰元整 小写:¥103100元
含:国标变压器本体、温控、IP20钢板外壳(标准色为RAL7035)、包装运输及13%增值税票等。
图号:DK1457.01.12GZ
技术参数:连接组别:Dyn11; 阻抗:6%;分接范围:±2×5% (变压器外壳与太重挖掘机全焊接抗震性相同)
二、交(提)货时间、地点:合同签订且方案或技术协议签订后 45 日内发到指定地点。
三、质量要求、技术标准:按国家及行业规范,产品交付之日起十八个月,或产品运行之日起十二个月,两者以先到时间为准。在保修(质保)期内如出现产品质量问题由卖方负责免费“三包”;操作、使用或保养不当等造成损坏的或不属产品质量问题的不在“三包”服务之列。
四、运输方式及到达站港和费用负担:由供方负担。
五、合理损耗及计算方法: 无损耗。
六、包装标准、包装物的供应与回收和费用负担:按国家及行业规范包装,包装物不回收。
七、验收标准、方法:按国家及行业规定。
八、异议期限及处理方法:需方收货后 3个月内或在货物安装使用后 6个月内发现货物存在质量问题,提出书面异议,双方协商解决。
九、随机备品、配件工具数量及供应方法:无备品配件,随货带装置说明书。
十、结算方式及期限:1、电汇或一线银行开具的6个月以内银行承兑汇票 2、合同签订后,发货前付清全款,供方开具税率为13%的增值税专用发票。
十一、违约责任:按中国法律。本合同双方签字盖章的扫描件具备与纸质版同等的法律效力。
十二、解决合同纠纷的方式:由双方友好协商;若协商不成则由卖方所在地法院管辖。
需 方
供 方
买受人(章)
山西长缘电力工程有限公司
出卖人(章)
海南金盘智能科技股份有限公司
地址:
山西省太原市小店区平阳路14号26幢20层2001、2002、2003号(太原首信商务秘书有限公司-1144号)集群登记
地址:
海南省海口市南海大道168-39号
法定代表人:
马林俊
法定代表人:
李辉
委托代理人:(签章)
委托代理人:(签章)
电话:
电话:
0898-66811301
开户银行:
中国农业银行太原平阳南路支行
开户银行:
交通银行海口南海支行
帐号:
04138201040004607
帐号:
461602303018010043627
税务登记号:
91140105MAENF9FL7F
税务登记号:
9146010062006446XN
邮政编码:
邮政编码:
"""Standalone CLI scripts for review-llm-skill."""
"""Compact prompt templates kept for compatibility."""
PROMPTS = {
"review": "基于当前分段和审查规则审查合同,仅输出JSON:{\"overall_conclusion\":\"\",\"findings\":[]}。\n分段:{segment_text}\n立场:{party_role}\n规则:{ruleset_text}",
"summary": "提取当前分段中与规则字段相关的客观事实,仅输出JSON:{\"facts\":{}}。\n分段:{segment_text}\n字段:{rule_fields}",
"router": "从候选规则中选择当前分段应执行的审查项,仅输出JSON:{\"selected_items\":[]}。\n分段:{segment_text}\n记忆:{context_memories_json}\n立场:{party_role}\n候选:{candidate_rules_json}",
"merger": "合并重复或相关的不合格findings,仅输出JSON:{\"findings\":[]}。\n输入:{payload}",
"reflect": "基于规则、已有findings和facts复核、去重、拆分、合并并定稿,仅输出JSON:{\"final_findings\":[]}。\n规则:{rule}\nfindings:{findings_json}\nfacts:{facts_json}\n立场:{party_role}",
"fact-merge": "合并summary_name下多个分段facts,不新增事实,仅输出JSON:{\"merge_facts\":{}}。\nsummary_names:{summary_names_json}\nfacts:{facts_json}",
"ruleset-route": "从候选ruleset_id中按问题选择一个,不得编造,仅输出JSON:{\"ruleset_id\":\"\",\"reason\":\"\"}。\n候选:{ruleset_ids_json}\n问题:{question}",
"party-role": "分析指定公司在合同中的商业角色,不仅按甲乙方判断,仅输出JSON:{\"party_role\":\"demand_side | supplier_side | unclear\",\"reason\":\"\"}。\n公司:{company_name}\n合同:{contract_text}",
"llm": "你是通用LLM助手。",
}
import os
OPENAI_MODEL = os.environ.get("OPENAI_MODEL", "Qwen3.5-122B-A10B-AWQ")
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL","http://192.168.252.71:9002/v1")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY","none")
USE_FASTGPT_SYSTEM_VARIABLE = False
DISABLE_LLM_THINKING = True
\ No newline at end of file
import os
import re
import json
import urllib.request
from typing import Any, List, Dict
from openai import OpenAI
from tenacity import retry, stop_after_attempt, stop_after_delay, wait_fixed
try:
from .config import (
DISABLE_LLM_THINKING,
OPENAI_API_KEY,
OPENAI_BASE_URL,
OPENAI_MODEL,
USE_FASTGPT_SYSTEM_VARIABLE,
)
except ImportError:
from config import (
DISABLE_LLM_THINKING,
OPENAI_API_KEY,
OPENAI_BASE_URL,
OPENAI_MODEL,
USE_FASTGPT_SYSTEM_VARIABLE,
)
class LLMTool:
def __init__(self, system_prompt: str = ""):
self.system_prompt = system_prompt or ""
self.model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
self.base_url = (os.environ.get("OPENAI_BASE_URL") or "https://api.openai.com/v1").rstrip("/")
self.api_key = os.environ.get("OPENAI_API_KEY")
self.model = OPENAI_MODEL
self.base_url = OPENAI_BASE_URL
self.api_key = OPENAI_API_KEY
self.client = OpenAI(base_url=self.base_url, api_key=self.api_key) if self.api_key else None
self.use_fastgpt_system_variable = USE_FASTGPT_SYSTEM_VARIABLE
self.disable_thinking = DISABLE_LLM_THINKING
def build_messages(self, user_content: str, system_content: str | None = None) -> List[Dict[str, str]]:
msgs = []
......@@ -19,22 +40,37 @@ class LLMTool:
msgs.append({"role": "user", "content": user_content})
return msgs
def _prepare_request(
self, messages: List[Dict[str, str]]
) -> tuple[List[Dict[str, str]], Dict[str, Any]]:
request_messages = list(messages)
extra_body: Dict[str, Any] = {}
if self.use_fastgpt_system_variable and request_messages and request_messages[0].get("role") == "system":
extra_body["variables"] = {"system": request_messages[0].get("content", "")}
request_messages = request_messages[1:]
if self.disable_thinking:
extra_body["thinking"] = {"type": "disabled"}
extra_body["chat_template_kwargs"] = {"enable_thinking": False}
return request_messages, extra_body
@retry(stop=stop_after_delay(600) | stop_after_attempt(3), wait=wait_fixed(1))
def run(self, messages: List[Dict[str, str]]) -> str:
if not self.api_key:
if not self.client:
raise RuntimeError("OPENAI_API_KEY is required")
body = json.dumps({"model": self.model, "messages": messages}, ensure_ascii=False).encode("utf-8")
request = urllib.request.Request(
f"{self.base_url}/chat/completions",
data=body,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
},
method="POST",
)
with urllib.request.urlopen(request, timeout=120) as response:
payload = json.loads(response.read().decode("utf-8"))
return (((payload.get("choices") or [{}])[0].get("message") or {}).get("content")) or ""
request_messages, extra_body = self._prepare_request(messages)
kwargs: Dict[str, Any] = {
"model": self.model,
"messages": request_messages,
}
if extra_body:
kwargs["extra_body"] = extra_body
response = self.client.chat.completions.create(**kwargs)
return response.choices[0].message.content or ""
def chat_async(self, messages: List[Dict[str, str]]) -> str:
return self.run(messages)
......
"""System prompts for review-llm-skill."""
from __future__ import annotations
REVIEW_SYSTEM_PROMPT = """
你是一个专业的合同分段审查智能体(SegmentReview)。
你的任务是:基于给定审查项规则,对“待处理文本”进行审查,识别其中与规则相关且证据充分的条款,并判断其结果为“合格”或“不合格”,输出审查结论及必要的修改建议。
【输入说明】
- 审查项名称:本次要执行的审查项。
- 审查项规则:由 rules.xlsx 加载得到的该审查项规则 dict。
- 上下文信息:调用方提供的合同上下文、角色、事实、已有结论等 dict。
- 待处理文本:本次需要审查的文本分段。
【审查范围】
你只能审查待处理文本自身已经明确体现的内容。
你只能识别合格条款和不合格条款,不得对无关或证据不足内容生成 finding。
【审查原则】
- 严格基于给定审查项规则进行审查,不得脱离规则自行扩展审查标准。
- 可以读取上下文信息辅助理解主体、角色、术语和已有事实,但不得用上下文信息替代待处理文本中的证据。
- 优先识别“确定成立”的合格或不合格结论,不输出模糊怀疑类表述。
- 必须逐句扫描待处理文本,穷举所有证据充分的问题或合格表述。
【单一证据约束】
每一个 finding 必须只对应一个独立判断点和一个最小证据句;若多个句子分别支持不同问题,必须拆分为多个 findings;严禁在 original_text 中拼接多个不连续句子。
【输出约束】
严格输出 JSON 数组;不得输出 JSON 之外的解释性文字。若未发现证据充分的合格或不合格条款,返回 []。
【输出格式】
[
{
"rule_name": "审查项名称",
"result": "合格 或 不合格",
"issue": "基于规则和原文说明为什么合格或不合格",
"original_text": "待处理文本中的最小证据原文",
"suggestion": "合格时填写“无需修改”;不合格时填写具体、可执行的修改建议"
}
]
"""
REFLECT_SYSTEM_PROMPT = """
你是一个合同审查反思智能体(ReviewReflection)。
你的任务不是从零重新审查合同,也不是简单删减 findings,而是基于“审查项规则、待处理文本、上下文信息中的已有 findings/facts/角色/全文信息”,对已有 findings 进行规则内复核、去重、校正、拆分、合并与定稿,输出最终 findings 数组。
【输入说明】
- 审查项名称:本次要反思复核的审查项。
- 审查项规则:由 rules.xlsx 加载得到的该审查项规则 dict。
- 上下文信息:调用方提供的已有 findings、facts、合同全文、合同角色等 dict。
- 待处理文本:本次复核对应的文本,可以是分段文本或相关全文片段。
【允许执行的操作】
删除重复、证据不足、引用不当或超出当前审查项规则的 findings;修订 issue、result、original_text 或 suggestion 不准确的 findings;合并多个指向同一问题的 findings;拆分包含多个独立问题的 finding。
【禁止事项】
不得脱离当前审查项规则新增全新的审查维度;不得凭空创造合同中不存在的事实;不得输出无法由合同原文直接支持的结论;不得输出模糊、空泛、不可执行的 suggestion。
【核心判定原则】
final result 必须以“审查项规则 + 待处理文本 + 上下文信息”为准;每条 final finding 必须能被合同原文直接支持;original_text 必须是最小充分证据片段;result 只能为“合格”或“不合格”。
【输出约束】
严格输出 JSON 数组;不得输出任何解释性文字;若反思后无成立 findings,返回 []。
【输出格式】
[
{
"rule_name": "审查项名称",
"result": "合格 或 不合格",
"issue": "复核后的准确风险或合格说明",
"original_text": "合同原文中的最小证据片段",
"suggestion": "可直接替换原文、新增条款措辞,或明确的修改方向"
}
]
"""
SUMMARY_SYSTEM_PROMPT = """
你是合同事实提取智能体(SegmentSummary)。
你的任务是:**基于给定的审查规则,从当前合同分段中提取“与该规则直接相关的客观事实”,并结构化输出。**
【核心原则】
你必须严格围绕“规则所需信息”进行提取。
---
【事实定义】
事实必须满足:
1. 可以在当前分段原文中直接找到对应表述;
2. 不得对原文进行抽象、概括或推断;
3. 不得补充未出现的主体、条件或数值;
4. 允许对原文做最小结构化拆分(例如金额、比例、期限)。
---
【规则驱动提取要求(关键)】
- 仅提取“该审查规则执行所需要的信息字段”
- 不得提取与该规则无关的信息(即使这些信息在文本中存在)
- 若规则未涉及某类信息,则不得输出对应字段
- 若规则涉及某字段但文本未出现,需显式标记为 "未明确"
---
【输出结构】
- 输出字段:facts
- facts 是一个对象
- 键必须来自【规则字段定义(rule_fields)】
- 不得使用预设通用维度(如“支付/违约责任”等)
---
【字段填充规则】
- 每个字段值必须是对象或对象列表
- 不得输出字符串作为字段值
- 字段内容必须为原文的最小结构化表达
- 不得改写原文含义
---
【缺失信息处理(非常重要)】
- 若规则要求的字段在当前分段未出现:
→ 必须输出该字段,并标记为:
"未明确"
(用于后续审查判断)
---
【约束】
- 严禁编造信息
- 严禁推断未出现的内容
- 不得输出风险判断或解释
- 严格输出 JSON
【输出格式示例】
```json
{
"facts": {
"支付审查": {"方式": "银行转账", "时间": "验收后30日内"},
"违约责任审查": {"违约金比例": "合同总金额的5%"}
}
}
```
"""
ROUTER_SYSTEM_PROMPT = """
你是合同分段规则路由智能体(SegmentRuleRouter)。
你的任务是:基于“当前分段文本”,从候选审查规则中选出“应执行审查”的规则项。
【路由目标】
- 仅做规则适配判断,不输出风险结论、不输出审查建议。
- 高召回优先:只要当前分段与规则存在明确相关性,就应路由命中。
- 若候选规则明显无关,则不要命中。
【判断依据】
- 以当前分段文本为主。
- 可参考上下文记忆辅助理解术语,但不得脱离当前分段文本做臆断。
【输出约束】
- 严格输出 JSON。
- 每个命中规则需给出简短 reason,说明该分段为何与规则相关。
- 若确实没有任何相关规则,返回 {"selected_items": []}。
【输出格式示例】
```json
{
"selected_items": [
{
"title": "规则标题",
"reason": "命中原因(简短)"
}
]
}
```
"""
MERGE_SYSTEM_PROMPT = """
你将收到同一组 findings 的 issue 与 suggestion 列表,请做信息融合而非机械拼接。
要求:
1. 输入中已经包含同组条款原文`original_text`,请仅将其作为分析依据。
2. `issue`:提炼并合并组内风险点,去重、保留关键信息,语言精炼。
3. `suggestion`:合并为一条可执行建议,必须基于输入原文的具体表述来给出,避免空泛、泛化或与原文脱节,必要时按“先补充条款、再明确标准”这类逻辑组织。
4. 禁止输出与输入无关的信息。
【输出格式示例】
```json
{
"issue": "提炼合并后的风险点",
"suggestion": "提炼合并后的建议"
}
```
"""
#!/usr/bin/env python3
"""Standalone review LLM CLI."""
"""Backward-compatible entry point for segment_llm_action.py."""
from __future__ import annotations
import argparse, json, os, re, urllib.request
from pathlib import Path
from typing import Any
PROMPTS = {
"review": "基于当前分段和审查规则审查合同,仅输出JSON:{\"overall_conclusion\":\"\",\"findings\":[]}。\n分段:{segment_text}\n立场:{party_role}\n规则:{ruleset_text}",
"summary": "仅提取当前分段中与规则字段相关的客观事实,仅输出JSON:{\"facts\":{}}。\n分段:{segment_text}\n字段:{rule_fields}",
"router": "从候选规则中选择当前分段应执行的审查项,仅输出JSON:{\"selected_items\":[]}。\n分段:{segment_text}\n记忆:{context_memories_json}\n立场:{party_role}\n候选:{candidate_rules_json}",
"merger": "合并同一分段内重复或相关的不合格findings,仅输出JSON:{\"findings\":[]}。\n输入:{payload}",
"reflect": "基于规则、已有findings和facts复核、去重、拆分、合并并定稿,仅输出JSON:{\"final_findings\":[]}。\n规则:{rule}\nfindings:{findings_json}\nfacts:{facts_json}\n立场:{party_role}",
"fact-merge": "合并summary_name下多个分段facts,不新增事实,仅输出JSON:{\"merge_facts\":{}}。\nsummary_names:{summary_names_json}\nfacts:{facts_json}",
"ruleset-route": "从候选ruleset_id中按问题选择一个,不得编造,仅输出JSON:{\"ruleset_id\":\"\",\"reason\":\"\"}。\n候选:{ruleset_ids_json}\n问题:{question}",
"party-role": "分析指定公司在合同中的商业角色,不仅按甲乙方判断,仅输出JSON:{\"party_role\":\"demand_side | supplier_side | unclear\",\"reason\":\"\"}。\n公司:{company_name}\n合同:{contract_text}",
"llm": "你是通用LLM助手。",
}
class LLMTool:
def __init__(self, system_prompt: str = ""):
self.system_prompt = system_prompt
self.model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
self.base_url = (os.environ.get("OPENAI_BASE_URL") or "https://api.openai.com/v1").rstrip("/")
self.api_key = os.environ.get("OPENAI_API_KEY")
def build_messages(self, user: str, system: str | None = None) -> list[dict[str, str]]:
return ([{"role": "system", "content": system}] if system else []) + [{"role": "user", "content": user}]
def run(self, messages: list[dict[str, str]]) -> str:
if not self.api_key:
raise RuntimeError("OPENAI_API_KEY is required")
body = json.dumps({"model": self.model, "messages": messages}, ensure_ascii=False).encode()
req = urllib.request.Request(f"{self.base_url}/chat/completions", data=body, headers={"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}, method="POST")
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read().decode())
return (((data.get("choices") or [{}])[0].get("message") or {}).get("content")) or ""
def parse_first_json(self, text: str) -> Any:
if not text:
return None
try:
return json.loads(text)
except Exception:
m = re.search(r"(\{.*\}|\[.*\])", text, re.S)
if not m:
return None
try:
return json.loads(m.group(1))
except Exception:
return None
def jdump(v: Any) -> str:
return json.dumps(v, ensure_ascii=False, indent=2)
def rules_text(rules: list[dict[str, Any]]) -> str:
return "\n".join(f"标题:{r.get('title','')}\n规则:{r.get('rule','')}\n等级:{r.get('level','')}\n建议:{r.get('suggestion_template','')}\n案例:{r.get('case','')}" for r in rules or [])
def default_rulesets() -> list[str]:
return ["合同信息提取(合同组)", "合同信息提取(技术部)", "合同信息提取(采购部)", "技术协议提取(合同组)", "技术协议提取(技术部)"]
def empty_fact(v: Any) -> bool:
return v is None or (isinstance(v, str) and (not v.strip() or v.strip() == "未明确")) or (isinstance(v, (dict, list)) and not v)
def dedupe(values: list[Any]) -> list[Any]:
out, seen = [], set()
for v in values:
key = jdump(v) if isinstance(v, (dict, list)) else str(v)
if not empty_fact(v) and key not in seen:
seen.add(key); out.append(v)
return out
def merge_facts_rule(facts: list[dict[str, Any]], names: list[str]) -> dict[str, Any]:
merged: dict[str, Any] = {}
for name in dict.fromkeys(str(n).strip() for n in names or [] if str(n).strip()):
vals = dedupe([item.get(name) for item in facts or [] if isinstance(item, dict) and name in item])
if vals:
merged[name] = vals[0] if len(vals) == 1 else vals
merged["_meta"] = {"summary_names": names, "source_fact_count": len(facts or [])}
return merged
def run_review_llm(tool_name: str = "review", segment_id: int = 0, user_prompt: str | None = None, **kw) -> dict[str, Any]:
name = (tool_name or "review").lower()
if name == "fact-merge" and str(kw.get("merge_mode") or "llm").lower() != "llm":
return {"merge_facts": merge_facts_rule(kw.get("facts") or [], kw.get("summary_names") or [])}
if name == "llm":
prompt, user = user_prompt or PROMPTS["llm"], kw.get("user_content") or kw.get("segment_text") or ""
elif name == "review":
prompt, user = PROMPTS[name].format(segment_text=kw.get("segment_text", ""), party_role=kw.get("party_role", ""), ruleset_text=rules_text(kw.get("rules") or [])), ""
elif name == "summary":
prompt, user = PROMPTS[name].format(segment_text=kw.get("segment_text", ""), rule_fields=jdump([r.get("summary") for r in kw.get("rules") or [] if r.get("summary")])), ""
elif name == "router":
prompt, user = PROMPTS[name].format(segment_text=kw.get("segment_text", ""), context_memories_json=jdump(kw.get("context_facts") or []), party_role=kw.get("party_role", ""), candidate_rules_json=jdump([{r.get("title", ""): r.get("rule", "")} for r in kw.get("rules") or []])), ""
elif name == "merger":
prompt, user = PROMPTS[name].format(payload=jdump(kw.get("payload") or kw.get("findings") or [])), ""
elif name == "reflect":
prompt, user = PROMPTS[name].format(rule=jdump(kw.get("rule") or {}), findings_json=jdump(kw.get("findings") or []), facts_json=jdump(kw.get("facts") or kw.get("context_facts") or []), party_role=kw.get("party_role", "")), ""
elif name == "fact-merge":
prompt, user = PROMPTS[name].format(summary_names_json=jdump(kw.get("summary_names") or []), facts_json=jdump(kw.get("facts") or [])), ""
elif name == "ruleset-route":
prompt, user = PROMPTS[name].format(question=kw.get("question") or user_prompt or "", ruleset_ids_json=jdump(kw.get("ruleset_ids") or default_rulesets())), ""
elif name == "party-role":
prompt, user = PROMPTS[name].format(company_name=kw.get("company_name") or "", contract_text=kw.get("contract_text") or kw.get("segment_text") or ""), ""
else:
return {"error": f"unknown tool: {tool_name}"}
llm = LLMTool(prompt); raw = llm.run(llm.build_messages(user, prompt)); return llm.parse_first_json(raw) or {"raw": raw}
def load_arg(v: str | None, default: Any) -> Any:
if v is None:
return default
return json.loads(Path(v[1:]).read_text(encoding="utf-8") if v.startswith("@") else v)
def parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Standalone review LLM CLI")
p.add_argument("tool_name", nargs="?", default="review", choices=["review", "summary", "router", "merger", "reflect", "fact-merge", "ruleset-route", "party-role", "llm"])
for name, default in [("segment-id", 0), ("segment-text", ""), ("segment-text-file", None), ("party-role", ""), ("rules", "[]"), ("context-facts", "{}"), ("payload", None), ("findings", "[]"), ("facts", "[]"), ("rule", "{}"), ("summary-names", "[]"), ("question", ""), ("ruleset-ids", None), ("company-name", ""), ("contract-text", ""), ("contract-text-file", None), ("user-prompt", None), ("user-content", None)]:
flag = f"--{name}"; kwargs = {"default": default}
if name == "segment-id": kwargs["type"] = int
p.add_argument(flag, **kwargs)
p.add_argument("--merge-mode", choices=["llm", "rule"], default="llm"); p.add_argument("--output-raw", action="store_true")
return p
def main(argv: list[str] | None = None) -> int:
a = parser().parse_args(argv)
seg = Path(a.segment_text_file).read_text(encoding="utf-8") if a.segment_text_file else a.segment_text
contract = Path(a.contract_text_file).read_text(encoding="utf-8") if a.contract_text_file else a.contract_text
kw = {"segment_text": seg, "party_role": a.party_role, "rules": load_arg(a.rules, []), "context_facts": load_arg(a.context_facts, {}), "payload": load_arg(a.payload, []) if a.payload else None, "findings": load_arg(a.findings, []), "facts": load_arg(a.facts, []), "rule": load_arg(a.rule, {}), "summary_names": load_arg(a.summary_names, []), "merge_mode": a.merge_mode, "question": a.question, "ruleset_ids": load_arg(a.ruleset_ids, default_rulesets()) if a.ruleset_ids else default_rulesets(), "company_name": a.company_name, "contract_text": contract, "user_content": a.user_content}
res = run_review_llm(a.tool_name, a.segment_id, a.user_prompt, **kw)
print(res if a.output_raw else jdump(res))
return 0
from segment_llm_action import * # noqa: F403
from segment_llm_action import main
if __name__ == "__main__":
......
#!/usr/bin/env python3
"""Run an LLM action with a rule payload."""
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import Any
SCRIPT_DIR = Path(__file__).resolve().parent
if str(SCRIPT_DIR) not in sys.path:
sys.path.insert(0, str(SCRIPT_DIR))
try:
from .prompts import (
MERGE_SYSTEM_PROMPT,
REFLECT_SYSTEM_PROMPT,
REVIEW_SYSTEM_PROMPT,
ROUTER_SYSTEM_PROMPT,
SUMMARY_ROUTER_SYSTEM_PROMPT,
SUMMARY_SYSTEM_PROMPT,
)
except ImportError:
from prompts import (
MERGE_SYSTEM_PROMPT,
REFLECT_SYSTEM_PROMPT,
REVIEW_SYSTEM_PROMPT,
ROUTER_SYSTEM_PROMPT,
SUMMARY_ROUTER_SYSTEM_PROMPT,
SUMMARY_SYSTEM_PROMPT,
)
Action = str
def jdump(value: Any) -> str:
return json.dumps(value, ensure_ascii=False, indent=2)
def load_json_arg(value: str | None, default: Any) -> Any:
if value is None:
return default
if value.startswith("@"):
return json.loads(Path(value[1:]).read_text(encoding="utf-8"))
return json.loads(value)
def pick_text_chunk(text: str, chunk_size: int | None, chunk_index: int) -> str:
if chunk_size is None:
return text
if chunk_size <= 0:
raise ValueError("--chunk-size must be > 0")
if chunk_index < 0:
raise ValueError("--chunk-index must be >= 0")
start = chunk_index * chunk_size
return text[start : start + chunk_size]
def load_review_text(text: str, input_file: str | None, chunk_size: int | None, chunk_index: int) -> str:
if input_file:
file_text = Path(input_file).read_text(encoding="utf-8")
return pick_text_chunk(file_text, chunk_size, chunk_index)
return text
def system_prompt_for(action: Action) -> str:
return {
"review": REVIEW_SYSTEM_PROMPT,
"reflect": REFLECT_SYSTEM_PROMPT,
"summary": SUMMARY_SYSTEM_PROMPT,
"router": ROUTER_SYSTEM_PROMPT,
"merge": MERGE_SYSTEM_PROMPT,
}[action]
def ensure_rule_dict(rule: Any) -> dict[str, Any]:
if not isinstance(rule, dict):
raise ValueError("--rule must be a JSON object")
return rule
def build_user_prompt(rule: dict[str, Any], text: str) -> str:
return jdump({"rule": ensure_rule_dict(rule), "text": text})
def build_messages(action: Action, rule: dict[str, Any], text: str = "") -> list[dict[str, str]]:
normalized_action = normalize_action(action)
return [
{"role": "system", "content": system_prompt_for(normalized_action)},
{"role": "user", "content": build_user_prompt(rule, text)},
]
def append_json_output(output_file: str, value: Any) -> None:
path = Path(output_file)
if path.exists() and path.stat().st_size > 0:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, list):
raise ValueError(f"--output file must contain a JSON array: {output_file}")
else:
data = []
if isinstance(value, list):
data.extend(value)
else:
data.append(value)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(jdump(data) + "\n", encoding="utf-8")
def write_output(output: str, value: Any) -> None:
if output == "-":
print(jdump(value))
return
append_json_output(output, value)
def create_llm_tool(system_prompt: str):
try:
from .llm_tool import LLMTool
except ImportError:
from llm_tool import LLMTool
return LLMTool(system_prompt)
def normalize_action(action: str) -> Action:
value = (action or "review").strip().lower()
aliases = {
"review": "review",
"审查": "review",
"reflect": "reflect",
"reflection": "reflect",
"反思": "reflect",
"复核": "reflect",
"summary": "summary",
"segment_summary": "summary",
"summarize": "summary",
"摘要": "summary",
"总结": "summary",
"router": "router",
"route": "router",
"segment_rule_router": "router",
"路由": "router",
"merge": "merge",
"merger": "merge",
"segment_merger": "merge",
"合并": "merge",
}
if value not in aliases:
raise ValueError(f"unknown action: {action}")
return aliases[value]
def run_segment_llm_action(action: Action, rule: dict[str, Any], text: str = "") -> Any:
messages = build_messages(action, rule, text)
llm = create_llm_tool(messages[0]["content"])
raw = llm.run(messages)
return llm.parse_first_json(raw) or {"raw": raw}
def parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Run an LLM action with a rule payload")
p.add_argument("action", choices=["review", "reflect", "summary", "segment_summary", "router", "segment_rule_router", "merge", "merger", "segment_merger", "审查", "反思", "复核", "摘要", "总结", "路由", "摘要路由", "摘要项路由", "合并"])
p.add_argument("--rule", required=True, help="任意字段的 JSON dict;支持 @file.json")
p.add_argument("--text", "--segment-text", dest="text", default="", help="直接输入待审查文本")
p.add_argument("--input-file", help="从文本文件读取待审查文本")
p.add_argument("--chunk-size", type=int, help="从文件读取时的分段大小")
p.add_argument("--chunk-index", type=int, default=0, help="从文件读取时的分段序号,从 0 开始")
p.add_argument("--output", default="-", help="输出目标;'-' 直接打印,其他路径则追加到 JSON 数组文件")
p.add_argument("--print-messages", action="store_true", help="只打印构造后的 messages,不调用模型")
return p
def main(argv: list[str] | None = None) -> int:
args = parser().parse_args(argv)
rule = ensure_rule_dict(load_json_arg(args.rule, {}))
text = load_review_text(args.text, args.input_file, args.chunk_size, args.chunk_index)
messages = build_messages(args.action, rule, text)
if args.print_messages:
print(jdump(messages))
return 0
llm = create_llm_tool(messages[0]["content"])
raw = llm.run(messages)
write_output(args.output, llm.parse_first_json(raw) or {"raw": raw})
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone JSON-backed review memory CLI."""
from __future__ import annotations
import argparse, json, logging
from dataclasses import asdict, dataclass
from pathlib import Path
from threading import RLock
from typing import Any
from uuid import uuid4
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger("review_memory_cli")
@dataclass
class Finding:
rule_title: str
segment_id: int
original_text: str
issue: str
risk_level: str
suggestion: str
id: str = ""
result: str = ""
@classmethod
def from_dict(cls, data: dict) -> "Finding":
d = data or {}
return cls(str(d.get("rule_title", "")), int(d.get("segment_id", 0) or 0), str(d.get("original_text", "")), str(d.get("issue", "")), str(d.get("risk_level", "")), str(d.get("suggestion", "")), str(d.get("id", "")), str(d.get("result", "")))
def to_dict(self) -> dict[str, Any]:
return asdict(self)
class MemoryStore:
def __init__(self, storage_name: str = "default.json") -> None:
self._storage_path = Path(__file__).resolve().parent.parent / "tmp" / storage_name
self._storage_path.parent.mkdir(parents=True, exist_ok=True)
self._lock = RLock()
self.facts: list[dict[str, Any]] = []
self.merge_facts: list[dict[str, Any]] = []
self.findings: dict[str, list[Finding]] = {}
self._load()
def _key(self, key: str | None) -> str:
return (key or "").strip().lower() or "review"
def add_fact(self, value: dict[str, Any]) -> list[dict[str, Any]]:
with self._lock:
self.facts.append(value); self._persist(); return self.facts
def add_merge_fact(self, value: dict[str, Any]) -> list[dict[str, Any]]:
with self._lock:
self.merge_facts.append(value); self._persist(); return self.merge_facts
def get_facts(self) -> list[dict[str, Any]]:
with self._lock:
return list(self.facts)
def add_finding(self, key: str, finding: Finding) -> Finding:
with self._lock:
if not finding.id:
finding.id = uuid4().hex
self.findings.setdefault(self._key(key), []).append(finding)
self._persist(); return finding
def list_findings(self, key: str | None = None) -> dict[str, list[dict[str, Any]]]:
with self._lock:
keys = [self._key(key)] if key else list(self.findings)
return {k: [f.to_dict() for f in self.findings.get(k, [])] for k in keys}
def get_findings_by_segment(self, key: str, segment_id: int) -> list[dict[str, Any]]:
return [f.to_dict() for f in self.findings.get(self._key(key), []) if f.segment_id == segment_id]
def search_findings(self, key: str, rule_title: str = "") -> list[dict[str, Any]]:
title = (rule_title or "").strip().lower()
return [f.to_dict() for f in self.findings.get(self._key(key), []) if not title or f.rule_title.lower() == title]
def delete_findings_by_segment(self, key: str, segment_id: int) -> int:
with self._lock:
k, current = self._key(key), list(self.findings.get(self._key(key), []))
self.findings[k] = [f for f in current if f.segment_id != segment_id]
removed = len(current) - len(self.findings[k])
if removed:
self._persist()
return removed
def search_facts(self, keywords: list[str]) -> list[Any]:
keys = [str(k).strip().lower() for k in keywords if str(k).strip()]
out = []
for item in self.facts:
for name, value in item.items():
low = str(name).lower()
if any(k in low or low in k for k in keys):
out.append({name: value})
return out
def clear(self) -> None:
with self._lock:
self.facts.clear(); self.merge_facts.clear(); self.findings.clear(); self._persist()
def _payload(self) -> dict[str, Any]:
return {"facts": self.facts, "merge_facts": self.merge_facts, "findings": {k: [f.to_dict() for f in v] for k, v in self.findings.items()}}
def _persist(self) -> None:
self._storage_path.write_text(json.dumps(self._payload(), ensure_ascii=False, indent=2), encoding="utf-8")
def _load(self) -> None:
if not self._storage_path.exists():
return
try:
data = json.loads(self._storage_path.read_text(encoding="utf-8") or "{}")
self.facts = data.get("facts") or []; self.merge_facts = data.get("merge_facts") or []
self.findings = {self._key(k): [Finding.from_dict(i) for i in items or []] for k, items in (data.get("findings") or {}).items()}
except Exception as exc:
logger.error("Failed to load memory store: %s", exc)
def export_to_json(self, path: str | None = None) -> str:
out = path or str(self._storage_path).replace(".json", "_export.json")
Path(out).write_text(json.dumps(self._payload(), ensure_ascii=False, indent=2), encoding="utf-8")
return out
def out(obj: Any) -> None:
print(json.dumps(obj, ensure_ascii=False, indent=2))
def load_json_arg(value: str) -> Any:
return json.loads(Path(value[1:]).read_text(encoding="utf-8") if value.startswith("@") else value)
def parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="review-memory-cli"); p.add_argument("--storage", default="default.json")
sub = p.add_subparsers(dest="cmd")
sub.add_parser("list-facts")
a = sub.add_parser("add-fact"); a.add_argument("data")
a = sub.add_parser("add-merge-fact"); a.add_argument("data")
a = sub.add_parser("search-facts"); a.add_argument("keywords", nargs="+")
a = sub.add_parser("add-finding"); a.add_argument("--key", default="review"); a.add_argument("--rule", required=True); a.add_argument("--segment", type=int, default=0); a.add_argument("--original", default=""); a.add_argument("--issue", default=""); a.add_argument("--risk", default=""); a.add_argument("--suggest", default=""); a.add_argument("--result", default="")
sub.add_parser("list-findings")
a = sub.add_parser("list-findings-key"); a.add_argument("key")
a = sub.add_parser("findings-by-seg"); a.add_argument("key"); a.add_argument("segment", type=int)
a = sub.add_parser("search-findings"); a.add_argument("key"); a.add_argument("--rule-title", default="")
a = sub.add_parser("delete-findings-seg"); a.add_argument("key"); a.add_argument("segment", type=int)
sub.add_parser("clear")
a = sub.add_parser("export"); a.add_argument("--out")
return p
def main(argv: list[str] | None = None) -> int:
a = parser().parse_args(argv); store = MemoryStore(a.storage)
if a.cmd == "list-facts": out(store.get_facts()); return 0
if a.cmd == "add-fact": store.add_fact(load_json_arg(a.data)); print("OK"); return 0
if a.cmd == "add-merge-fact": store.add_merge_fact(load_json_arg(a.data)); print("OK"); return 0
if a.cmd == "search-facts": out(store.search_facts(a.keywords)); return 0
if a.cmd == "add-finding":
out(store.add_finding(a.key, Finding(a.rule, a.segment, a.original, a.issue, a.risk, a.suggest, result=a.result)).to_dict()); return 0
if a.cmd == "list-findings": out(store.list_findings()); return 0
if a.cmd == "list-findings-key": out(store.list_findings(a.key)); return 0
if a.cmd == "findings-by-seg": out(store.get_findings_by_segment(a.key, a.segment)); return 0
if a.cmd == "search-findings": out(store.search_findings(a.key, a.rule_title)); return 0
if a.cmd == "delete-findings-seg": print(store.delete_findings_by_segment(a.key, a.segment)); return 0
if a.cmd == "clear": store.clear(); print("cleared"); return 0
if a.cmd == "export": print(store.export_to_json(a.out)); return 0
parser().print_help(); return 1
if __name__ == "__main__":
raise SystemExit(main())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment