Commit 37f636e2 by ccran

feat: add skills;

parent 461d5ea7
......@@ -14,7 +14,7 @@ use_docker = False
@dataclass
class LLMConfig:
base_url: str = "http://192.168.252.71:9002/v1"
base_url: str = "http://172.21.107.80:9002/v1"
api_key: str = "none"
model: str = "Qwen2-72B-Instruct"
......@@ -23,8 +23,8 @@ min_single_chunk_size = 2000
max_single_chunk_size = 100000
max_chunk_page = 10
MAX_SINGLE_CHUNK_SIZE = 100000
# MAX_SINGLE_CHUNK_SIZE = 5000
# MAX_SINGLE_CHUNK_SIZE = 100000
MAX_SINGLE_CHUNK_SIZE = 5000
# MAX_SINGLE_CHUNK_SIZE = 2000
MERGE_RULE_PROMPT = False
META_KEY = "META"
......
......@@ -19,7 +19,7 @@ batch_size = 5
if not use_lufa:
SUFFIX = "_麓发迁移"
batch_input_dir_path = "jp-input"
batch_output_dir_path = f"/data/home/htsc/jp-contract/data/benchmark/results/jp-output-lufa-chunk100000"
batch_output_dir_path = f"/data/home/htsc/jp-contract/data/benchmark/results/jp-output-simple"
# 金盘fastgpt接口
url = "http://172.21.107.45:3002/api/v1/chat/completions"
# 金盘迁移麓发合同审查测试token
......
No preview for this file type
#!/usr/bin/env python3
"""Single-file CLI version of utils/common_util.py and utils/http_util.py.
This script mirrors the project utility functions while staying standalone:
it does not import local project modules such as utils.* or core.*. Runtime
defaults that originally came from core.config are CLI arguments here.
"""
"""Compatibility dispatcher for split common CLIs."""
from __future__ import annotations
import argparse
import json
import mimetypes
import random
import re
import string
import sys
import time
import urllib.parse
import urllib.request
from datetime import datetime
import argparse, subprocess, sys
from pathlib import Path
from typing import Any
DEFAULT_BASE_FASTGPT_URL = "http://172.21.107.45:3030"
DEFAULT_BASE_BACKEND_URL = "http://172.21.107.45:1122"
DEFAULT_OUTER_BACKEND_URL = "https://172.21.107.45:48080"
DEFAULT_MIN_SINGLE_CHUNK_SIZE = 2000
DEFAULT_MAX_SINGLE_CHUNK_SIZE = 100000
DEFAULT_MAX_CHUNK_PAGE = 10
def random_str(l: int = 5) -> str:
"""Mirror utils.common_util.random_str."""
if l > len(string.ascii_lowercase):
return "".join(random.choice(string.ascii_lowercase) for _ in range(l))
return "".join(random.sample(string.ascii_lowercase, l))
def format_now() -> str:
"""Mirror utils.common_util.format_now."""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def extract_url_file(url: str, support_formats: list[str]) -> str:
"""Mirror utils.common_util.extract_url_file."""
pattern = "|".join(
r"[\u4e00-\u9fa5()()0-9\w-]+" + re.escape(fmt)
for fmt in support_formats
)
search_result = re.search(pattern, url)
if search_result:
return search_result.group()
raise Exception(f"{support_formats} not found in url:{url}")
def adjust_single_chunk_size(
all_text_len: int,
max_chunk_page: int = DEFAULT_MAX_CHUNK_PAGE,
min_single_chunk_size: int = DEFAULT_MIN_SINGLE_CHUNK_SIZE,
max_single_chunk_size: int = DEFAULT_MAX_SINGLE_CHUNK_SIZE,
) -> int:
"""Mirror utils.common_util.adjust_single_chunk_size with explicit config."""
desired_chunk_size = all_text_len // max_chunk_page
return max(min_single_chunk_size, min(desired_chunk_size, max_single_chunk_size))
def _try_json_loads(text: str) -> Any:
try:
import json_repair # type: ignore
except ImportError:
return json.loads(text)
return json_repair.loads(text, strict=False)
def extract_json(json_str: str) -> list[Any]:
"""Mirror utils.common_util.extract_json.
Uses json_repair when available; otherwise falls back to strict stdlib JSON.
"""
def _try_parse_to_list(candidate: str, out_list: list[Any]) -> bool:
s = (candidate or "").strip()
if not s:
return False
s = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", s)
try:
obj = _try_json_loads(s)
except Exception:
return False
if isinstance(obj, list):
out_list.extend(obj)
else:
out_list.append(obj)
return True
results: list[Any] = []
for match in re.findall(r"```json([\s\S]*?)```", json_str or "", re.DOTALL):
_try_parse_to_list(match, results)
if results:
return results
if _try_parse_to_list(json_str or "", results):
return results
for match in re.findall(r"```([\s\S]*?)```", json_str or "", re.DOTALL):
if _try_parse_to_list(match, results):
return results
for match in re.findall(r"(\{[\s\S]*?\}|\[[\s\S]*?\])", json_str or "", re.DOTALL):
_try_parse_to_list(match, results)
return results
def remove_duplicates_by_key(data_list: list[dict[str, Any]], key: str) -> list[dict[str, Any]]:
"""Mirror utils.common_util.remove_duplicates_by_key."""
sorted_list = sorted(data_list, key=lambda x: len(str(x.get(key, ""))), reverse=True)
result = []
seen_strings = []
for item in sorted_list:
value = str(item.get(key, ""))
if not any(value in s for s in seen_strings):
seen_strings.append(value)
result.append(item)
return result
def extract_drop_json_part(json_str: str) -> str:
"""Mirror utils.common_util.extract_drop_json_part."""
return re.sub(r"```json([\s\S]*?)```", "", json_str, flags=re.DOTALL).strip()
def group_chunk_by_len(chunk_list: list[dict[str, Any]], key: str, chunk_len: int) -> list[list[dict[str, Any]]]:
"""Mirror utils.common_util.group_chunk_by_len."""
ret_chunk_list = []
sub_chunk_list = []
current_acc_len = 0
for chunk in chunk_list:
content_len = len(str(chunk.get(key, "")))
if current_acc_len + content_len > chunk_len and sub_chunk_list:
ret_chunk_list.append(sub_chunk_list)
sub_chunk_list = []
current_acc_len = 0
sub_chunk_list.append(chunk)
current_acc_len += content_len
if sub_chunk_list:
ret_chunk_list.append(sub_chunk_list)
return ret_chunk_list
def _download_basename(filename: str) -> str:
filename = urllib.parse.unquote(filename.strip().strip('"'))
filename = filename.replace("\\", "/")
return Path(filename).name or "downloaded_file"
TEXT_COMMANDS = {"random-str", "format-now", "extract-url-file", "adjust-single-chunk-size", "extract-json", "remove-duplicates-by-key", "extract-drop-json-part", "group-chunk-by-len"}
FILE_COMMANDS = {"url-replace-fastgpt", "download", "upload", "fastgpt-chat"}
def _resolve_download_filename(url: str, headers: dict[str, str]) -> str:
content_disposition = headers.get("content-disposition", "") or headers.get("Content-Disposition", "")
if content_disposition:
match = re.search(r"filename\*=(?:UTF-8''|utf-8'')?([^;]+)", content_disposition)
if match:
return _download_basename(match.group(1))
match = re.search(r'filename="?([^";]+)"?', content_disposition)
if match:
return _download_basename(match.group(1))
return _download_basename(urllib.parse.urlparse(url).path)
def script(name: str) -> str:
return str(Path(__file__).resolve().with_name(name))
def url_replace_fastgpt(origin: str, base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL) -> str:
"""Mirror utils.http_util.url_replace_fastgpt."""
if not origin.startswith("http:") and not origin.startswith("https:"):
origin = base_fastgpt_url + origin
return origin
def download_file(
url: str,
path: str,
input_url_to_inner: bool = True,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
) -> str | None:
"""Mirror utils.http_util.download_file."""
if input_url_to_inner and not url.startswith("http:") and not url.startswith("https:"):
url = base_fastgpt_url + url
if input_url_to_inner:
url = url.replace(outer_backend_url, base_backend_url)
request = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(request, timeout=120) as response:
target_path = Path(path)
if target_path.exists() and target_path.is_dir():
target_path = target_path / _resolve_download_filename(url, dict(response.headers))
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_bytes(response.read())
return str(target_path)
except urllib.error.HTTPError as exc:
print(f"{url}文件下载失败. HTTP Status Code: {exc.code}", file=sys.stderr)
return None
def _multipart_body(path: str, field_name: str = "file") -> tuple[bytes, str]:
file_path = Path(path)
boundary = f"----common-tool-{int(time.time() * 1000)}-{random_str(8)}"
content_type = mimetypes.guess_type(file_path.name)[0] or "application/octet-stream"
body = bytearray()
body.extend(f"--{boundary}\r\n".encode())
body.extend(
(
f'Content-Disposition: form-data; name="{field_name}"; filename="{file_path.name}"\r\n'
f"Content-Type: {content_type}\r\n\r\n"
).encode()
)
body.extend(file_path.read_bytes())
body.extend(f"\r\n--{boundary}--\r\n".encode())
return bytes(body), boundary
def _post_json(url: str, data: dict[str, Any], headers: dict[str, str] | None = None, timeout: int = 120) -> str:
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
request = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/json", **(headers or {})},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout) as response:
return response.read().decode("utf-8", errors="replace")
def upload_file(
path: str,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
username: str = "admin",
password: str = "admin@jpai.com",
) -> str:
"""Mirror utils.http_util.upload_file with explicit config."""
login_url = f"{base_backend_url}/admin-api/system/auth/login"
login_text = _post_json(login_url, {"username": username, "password": password})
token = (json.loads(login_text).get("data") or {}).get("accessToken")
if not token:
raise RuntimeError(f"后端登录异常: {login_text}")
body, boundary = _multipart_body(path)
upload_url = f"{base_backend_url}/admin-api/infra/file/upload"
request = urllib.request.Request(
upload_url,
data=body,
headers={
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Authorization": token,
},
method="POST",
)
with urllib.request.urlopen(request, timeout=120) as response:
response_text = response.read().decode("utf-8", errors="replace")
res = json.loads(response_text).get("data")
if res:
return res
raise Exception(f"上传{path}失败 Response text: {response_text}")
def fastgpt_openai_chat(url: str, token: str, model: str, chat_id: str, file_url: str, text: str, stream: bool = True) -> str:
"""Mirror utils.http_util.fastgpt_openai_chat."""
data = {
"chatId": chat_id,
"messages": [
{
"role": "user",
"content": [
{"type": "file_url", "name": "文件", "url": file_url},
{"type": "text", "text": text},
],
}
],
"model": model,
"stream": stream,
}
body = json.dumps(data, ensure_ascii=False).encode("utf-8")
request = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"},
method="POST",
)
with urllib.request.urlopen(request, timeout=60000) as response:
if not stream:
rsp = json.loads(response.read().decode("utf-8", errors="replace"))
return rsp.get("choices", [{}])[0].get("message", {}).get("content", "")
rsp_text = ""
for raw_line in response:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "):
line = line[6:]
try:
stream_rsp = json.loads(line)
rsp_text += stream_rsp.get("choices", [{}])[0].get("delta", {}).get("content", "")
except Exception:
continue
return rsp_text
def _read_json_arg(value: str) -> Any:
path = Path(value)
if path.exists():
return json.loads(path.read_text(encoding="utf-8"))
return json.loads(value)
def main() -> int:
parser = argparse.ArgumentParser(description="Single-file common/http utilities based on utils/")
sub = parser.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("random-str")
p.add_argument("-l", "--length", type=int, default=5)
sub.add_parser("format-now")
p = sub.add_parser("extract-url-file")
p.add_argument("url")
p.add_argument("formats", nargs="+")
p = sub.add_parser("adjust-single-chunk-size")
p.add_argument("all_text_len", type=int)
p.add_argument("--max-chunk-page", type=int, default=DEFAULT_MAX_CHUNK_PAGE)
p.add_argument("--min-single-chunk-size", type=int, default=DEFAULT_MIN_SINGLE_CHUNK_SIZE)
p.add_argument("--max-single-chunk-size", type=int, default=DEFAULT_MAX_SINGLE_CHUNK_SIZE)
p = sub.add_parser("extract-json")
p.add_argument("text", nargs="?")
p = sub.add_parser("remove-duplicates-by-key")
p.add_argument("json_list")
p.add_argument("key")
p = sub.add_parser("extract-drop-json-part")
p.add_argument("text", nargs="?")
p = sub.add_parser("group-chunk-by-len")
p.add_argument("json_list")
p.add_argument("key")
p.add_argument("chunk_len", type=int)
p = sub.add_parser("url-replace-fastgpt")
p.add_argument("origin")
p.add_argument("--base-fastgpt-url", default=DEFAULT_BASE_FASTGPT_URL)
p = sub.add_parser("download")
p.add_argument("url")
p.add_argument("path")
p.add_argument("--base-fastgpt-url", default=DEFAULT_BASE_FASTGPT_URL)
p.add_argument("--base-backend-url", default=DEFAULT_BASE_BACKEND_URL)
p.add_argument("--outer-backend-url", default=DEFAULT_OUTER_BACKEND_URL)
p.add_argument("--no-input-url-to-inner", action="store_true")
p = sub.add_parser("upload")
p.add_argument("path")
p.add_argument("--base-backend-url", default=DEFAULT_BASE_BACKEND_URL)
p.add_argument("--username", default="admin")
p.add_argument("--password", default="admin@jpai.com")
p = sub.add_parser("fastgpt-chat")
p.add_argument("--url", required=True)
p.add_argument("--token", required=True)
p.add_argument("--model", required=True)
p.add_argument("--chat-id", required=True)
p.add_argument("--file-url", required=True)
p.add_argument("--text", required=True)
p.add_argument("--no-stream", action="store_true")
args = parser.parse_args()
if args.cmd == "random-str":
print(random_str(args.length))
elif args.cmd == "format-now":
print(format_now())
elif args.cmd == "extract-url-file":
print(extract_url_file(args.url, args.formats))
elif args.cmd == "adjust-single-chunk-size":
print(adjust_single_chunk_size(args.all_text_len, args.max_chunk_page, args.min_single_chunk_size, args.max_single_chunk_size))
elif args.cmd == "extract-json":
text = args.text if args.text is not None else sys.stdin.read()
print(json.dumps(extract_json(text), ensure_ascii=False, indent=2))
elif args.cmd == "remove-duplicates-by-key":
print(json.dumps(remove_duplicates_by_key(_read_json_arg(args.json_list), args.key), ensure_ascii=False, indent=2))
elif args.cmd == "extract-drop-json-part":
text = args.text if args.text is not None else sys.stdin.read()
print(extract_drop_json_part(text))
elif args.cmd == "group-chunk-by-len":
print(json.dumps(group_chunk_by_len(_read_json_arg(args.json_list), args.key, args.chunk_len), ensure_ascii=False, indent=2))
elif args.cmd == "url-replace-fastgpt":
print(url_replace_fastgpt(args.origin, args.base_fastgpt_url))
elif args.cmd == "download":
print(download_file(args.url, args.path, not args.no_input_url_to_inner, args.base_fastgpt_url, args.base_backend_url, args.outer_backend_url))
elif args.cmd == "upload":
print(upload_file(args.path, args.base_backend_url, args.username, args.password))
elif args.cmd == "fastgpt-chat":
print(fastgpt_openai_chat(args.url, args.token, args.model, args.chat_id, args.file_url, args.text, not args.no_stream))
def main(argv: list[str] | None = None) -> int:
args = list(sys.argv[1:] if argv is None else argv)
if not args or args[0] in {"-h", "--help"}:
p = argparse.ArgumentParser(description="Compatibility dispatcher for json_text_tool.py and file_chat_tool.py")
p.add_argument("command", nargs="?", choices=sorted(TEXT_COMMANDS | FILE_COMMANDS)); p.print_help()
print("\nText/JSON commands:", ", ".join(sorted(TEXT_COMMANDS)))
print("File/chat commands:", ", ".join(sorted(FILE_COMMANDS)))
return 0
if args[0] in TEXT_COMMANDS:
target = script("json_text_tool.py")
elif args[0] in FILE_COMMANDS:
target = script("file_chat_tool.py")
else:
print(f"unknown command: {args[0]}", file=sys.stderr); return 2
return subprocess.call([sys.executable, target, *args])
if __name__ == "__main__":
......
#!/usr/bin/env python3
"""Standalone file and FastGPT chat utility CLI."""
from __future__ import annotations
import argparse, json, mimetypes, random, re, string, sys, time, urllib.error, urllib.parse, urllib.request
from pathlib import Path
from typing import Any
FASTGPT = "http://172.21.107.45:3030"
BACKEND = "http://172.21.107.45:1122"
OUTER = "https://172.21.107.45:48080"
def rand(n: int = 8) -> str:
return "".join(random.choice(string.ascii_lowercase) for _ in range(n))
def url_replace_fastgpt(origin: str, base_fastgpt_url: str = FASTGPT) -> str:
return origin if origin.startswith(("http:", "https:")) else base_fastgpt_url + origin
def basename(name: str) -> str:
return Path(urllib.parse.unquote(name.strip().strip('"')).replace("\\", "/")).name or "downloaded_file"
def resolve_name(url: str, headers: dict[str, str]) -> str:
cd = headers.get("content-disposition", "") or headers.get("Content-Disposition", "")
for pat in [r"filename\*=(?:UTF-8''|utf-8'')?([^;]+)", r'filename="?([^";]+)"?']:
m = re.search(pat, cd)
if m:
return basename(m.group(1))
return basename(urllib.parse.urlparse(url).path)
def download_file(url: str, path: str, input_url_to_inner: bool = True, base_fastgpt_url: str = FASTGPT, base_backend_url: str = BACKEND, outer_backend_url: str = OUTER) -> str | None:
if input_url_to_inner and not url.startswith(("http:", "https:")):
url = base_fastgpt_url + url
if input_url_to_inner:
url = url.replace(outer_backend_url, base_backend_url)
try:
with urllib.request.urlopen(urllib.request.Request(url, method="GET"), timeout=120) as resp:
target = Path(path)
if target.exists() and target.is_dir():
target = target / resolve_name(url, dict(resp.headers))
target.parent.mkdir(parents=True, exist_ok=True); target.write_bytes(resp.read()); return str(target)
except urllib.error.HTTPError as exc:
print(f"{url}文件下载失败. HTTP Status Code: {exc.code}", file=sys.stderr); return None
def post_json(url: str, data: dict[str, Any], headers: dict[str, str] | None = None, timeout: int = 120) -> str:
req = urllib.request.Request(url, data=json.dumps(data, ensure_ascii=False).encode(), headers={"Content-Type": "application/json", **(headers or {})}, method="POST")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="replace")
def multipart(path: str) -> tuple[bytes, str]:
p = Path(path); boundary = f"----common-tool-{int(time.time() * 1000)}-{rand()}"
ctype = mimetypes.guess_type(p.name)[0] or "application/octet-stream"
body = bytearray(f'--{boundary}\r\nContent-Disposition: form-data; name="file"; filename="{p.name}"\r\nContent-Type: {ctype}\r\n\r\n'.encode())
body.extend(p.read_bytes()); body.extend(f"\r\n--{boundary}--\r\n".encode()); return bytes(body), boundary
def upload_file(path: str, base_backend_url: str = BACKEND, username: str = "admin", password: str = "admin@jpai.com") -> str:
token = (json.loads(post_json(f"{base_backend_url}/admin-api/system/auth/login", {"username": username, "password": password})).get("data") or {}).get("accessToken")
if not token:
raise RuntimeError("后端登录异常")
body, boundary = multipart(path)
req = urllib.request.Request(f"{base_backend_url}/admin-api/infra/file/upload", data=body, headers={"Content-Type": f"multipart/form-data; boundary={boundary}", "Authorization": token}, method="POST")
with urllib.request.urlopen(req, timeout=120) as resp:
text = resp.read().decode("utf-8", errors="replace")
res = json.loads(text).get("data")
if not res:
raise RuntimeError(f"上传{path}失败 Response text: {text}")
return res
def fastgpt_openai_chat(url: str, token: str, model: str, chat_id: str, file_url: str, text: str, stream: bool = True) -> str:
data = {"chatId": chat_id, "messages": [{"role": "user", "content": [{"type": "file_url", "name": "文件", "url": file_url}, {"type": "text", "text": text}]}], "model": model, "stream": stream}
req = urllib.request.Request(url, data=json.dumps(data, ensure_ascii=False).encode(), headers={"Content-Type": "application/json", "Authorization": f"Bearer {token}"}, method="POST")
with urllib.request.urlopen(req, timeout=60000) as resp:
if not stream:
rsp = json.loads(resp.read().decode("utf-8", errors="replace")); return rsp.get("choices", [{}])[0].get("message", {}).get("content", "")
out = ""
for raw in resp:
line = raw.decode("utf-8", errors="replace").strip()
if not line or line == "data: [DONE]": continue
try:
out += json.loads(line[6:] if line.startswith("data: ") else line).get("choices", [{}])[0].get("delta", {}).get("content", "")
except Exception:
pass
return out
def main() -> int:
p = argparse.ArgumentParser(description="File/FastGPT utilities"); sub = p.add_subparsers(dest="cmd", required=True)
a = sub.add_parser("url-replace-fastgpt"); a.add_argument("origin"); a.add_argument("--base-fastgpt-url", default=FASTGPT)
a = sub.add_parser("download"); a.add_argument("url"); a.add_argument("path"); a.add_argument("--base-fastgpt-url", default=FASTGPT); a.add_argument("--base-backend-url", default=BACKEND); a.add_argument("--outer-backend-url", default=OUTER); a.add_argument("--no-input-url-to-inner", action="store_true")
a = sub.add_parser("upload"); a.add_argument("path"); a.add_argument("--base-backend-url", default=BACKEND); a.add_argument("--username", default="admin"); a.add_argument("--password", default="admin@jpai.com")
a = sub.add_parser("fastgpt-chat"); a.add_argument("--url", required=True); a.add_argument("--token", required=True); a.add_argument("--model", required=True); a.add_argument("--chat-id", required=True); a.add_argument("--file-url", required=True); a.add_argument("--text", required=True); a.add_argument("--no-stream", action="store_true")
x = p.parse_args()
if x.cmd == "url-replace-fastgpt": print(url_replace_fastgpt(x.origin, x.base_fastgpt_url))
elif x.cmd == "download": print(download_file(x.url, x.path, not x.no_input_url_to_inner, x.base_fastgpt_url, x.base_backend_url, x.outer_backend_url))
elif x.cmd == "upload": print(upload_file(x.path, x.base_backend_url, x.username, x.password))
elif x.cmd == "fastgpt-chat": print(fastgpt_openai_chat(x.url, x.token, x.model, x.chat_id, x.file_url, x.text, not x.no_stream))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone JSON/text utility CLI."""
from __future__ import annotations
import argparse, json, random, re, string, sys
from datetime import datetime
from pathlib import Path
from typing import Any
MIN_SIZE, MAX_SIZE, MAX_PAGE = 2000, 100000, 10
def random_str(n: int = 5) -> str:
return "".join(random.choice(string.ascii_lowercase) for _ in range(n)) if n > 26 else "".join(random.sample(string.ascii_lowercase, n))
def format_now() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def extract_url_file(url: str, formats: list[str]) -> str:
pat = "|".join(r"[\u4e00-\u9fa5()()0-9\w-]+" + re.escape(f) for f in formats)
m = re.search(pat, url)
if not m:
raise RuntimeError(f"{formats} not found in url:{url}")
return m.group()
def adjust_single_chunk_size(length: int, max_page: int = MAX_PAGE, min_size: int = MIN_SIZE, max_size: int = MAX_SIZE) -> int:
return max(min_size, min(length // max_page, max_size))
def _loads(text: str) -> Any:
try:
import json_repair # type: ignore
return json_repair.loads(text, strict=False)
except ImportError:
return json.loads(text)
def extract_json(text: str) -> list[Any]:
def add(candidate: str, out: list[Any]) -> bool:
s = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", (candidate or "").strip())
if not s:
return False
try:
obj = _loads(s)
except Exception:
return False
out.extend(obj if isinstance(obj, list) else [obj]); return True
out: list[Any] = []
for m in re.findall(r"```json([\s\S]*?)```", text or "", re.DOTALL):
add(m, out)
if out or add(text or "", out):
return out
for m in re.findall(r"```([\s\S]*?)```", text or "", re.DOTALL):
if add(m, out):
return out
for m in re.findall(r"(\{[\s\S]*?\}|\[[\s\S]*?\])", text or "", re.DOTALL):
add(m, out)
return out
def remove_duplicates_by_key(items: list[dict[str, Any]], key: str) -> list[dict[str, Any]]:
out, seen = [], []
for item in sorted(items, key=lambda x: len(str(x.get(key, ""))), reverse=True):
v = str(item.get(key, ""))
if not any(v in s for s in seen):
seen.append(v); out.append(item)
return out
def group_chunk_by_len(items: list[dict[str, Any]], key: str, chunk_len: int) -> list[list[dict[str, Any]]]:
groups, current, acc = [], [], 0
for item in items:
n = len(str(item.get(key, "")))
if current and acc + n > chunk_len:
groups.append(current); current, acc = [], 0
current.append(item); acc += n
return groups + ([current] if current else [])
def read_json_arg(value: str) -> Any:
p = Path(value)
return json.loads(p.read_text(encoding="utf-8")) if p.exists() else json.loads(value)
def main() -> int:
p = argparse.ArgumentParser(description="JSON/text utilities"); sub = p.add_subparsers(dest="cmd", required=True)
a = sub.add_parser("random-str"); a.add_argument("-l", "--length", type=int, default=5)
sub.add_parser("format-now")
a = sub.add_parser("extract-url-file"); a.add_argument("url"); a.add_argument("formats", nargs="+")
a = sub.add_parser("adjust-single-chunk-size"); a.add_argument("all_text_len", type=int); a.add_argument("--max-chunk-page", type=int, default=MAX_PAGE); a.add_argument("--min-single-chunk-size", type=int, default=MIN_SIZE); a.add_argument("--max-single-chunk-size", type=int, default=MAX_SIZE)
a = sub.add_parser("extract-json"); a.add_argument("text", nargs="?")
a = sub.add_parser("remove-duplicates-by-key"); a.add_argument("json_list"); a.add_argument("key")
a = sub.add_parser("extract-drop-json-part"); a.add_argument("text", nargs="?")
a = sub.add_parser("group-chunk-by-len"); a.add_argument("json_list"); a.add_argument("key"); a.add_argument("chunk_len", type=int)
x = p.parse_args()
if x.cmd == "random-str": print(random_str(x.length))
elif x.cmd == "format-now": print(format_now())
elif x.cmd == "extract-url-file": print(extract_url_file(x.url, x.formats))
elif x.cmd == "adjust-single-chunk-size": print(adjust_single_chunk_size(x.all_text_len, x.max_chunk_page, x.min_single_chunk_size, x.max_single_chunk_size))
elif x.cmd == "extract-json": print(json.dumps(extract_json(x.text if x.text is not None else sys.stdin.read()), ensure_ascii=False, indent=2))
elif x.cmd == "remove-duplicates-by-key": print(json.dumps(remove_duplicates_by_key(read_json_arg(x.json_list), x.key), ensure_ascii=False, indent=2))
elif x.cmd == "extract-drop-json-part": print(re.sub(r"```json([\s\S]*?)```", "", x.text if x.text is not None else sys.stdin.read(), flags=re.DOTALL).strip())
elif x.cmd == "group-chunk-by-len": print(json.dumps(group_chunk_by_len(read_json_arg(x.json_list), x.key, x.chunk_len), ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone contract review orchestration CLI."""
from __future__ import annotations
import argparse, json, subprocess, sys
from pathlib import Path
from typing import Any
ROOT = Path(__file__).resolve().parents[3]
DOC = ROOT / "skills/doc-excel-skill/scripts/doc_tool.py"
XLS = ROOT / "skills/doc-excel-skill/scripts/excel_tool.py"
LLM = ROOT / "skills/review-llm-skill/scripts/review_llm_skill.py"
COLS = {"id": "ID", "title": "审查项", "rule": "审查规则", "level": "风险等级", "triggers": "触发词", "suggestion_template": "建议模板", "case": "案例", "summary": "摘要项"}
def sh(args: list[str], text: bool = False) -> Any:
out = subprocess.check_output([sys.executable, *args], text=True)
return out if text else json.loads(out or "null")
def dump(path: Path, data: Any) -> str:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
return str(path)
def norm_rules(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [{k: r.get(v, "") for k, v in COLS.items()} for r in rows if isinstance(r, dict)]
def titles(items: list[dict[str, Any]]) -> list[str]:
return [str(i.get("title") or i.get("rule_title") or "").strip() for i in items if str(i.get("title") or i.get("rule_title") or "").strip()]
def pick_rules(all_rules: list[dict[str, Any]], selected: list[str]) -> list[dict[str, Any]]:
if not selected:
return all_rules
selected_set = set(selected)
return [r for r in all_rules if r.get("title") in selected_set]
def llm(tool: str, **kw: Any) -> dict[str, Any]:
args = [str(LLM), tool]
for key, value in kw.items():
flag = "--" + key.replace("_", "-")
if isinstance(value, (dict, list)):
args += [flag, json.dumps(value, ensure_ascii=False)]
elif value is not None:
args += [flag, str(value)]
return sh(args)
def route_segment(text: str, rules: list[dict[str, Any]], party_role: str, mode: str) -> tuple[list[str], list[str], list[dict[str, Any]]]:
if mode == "none":
return titles(rules), sorted({r.get("summary", "") for r in rules if r.get("summary")}), rules
res = llm("router", segment_text=text, rules=rules, party_role=party_role)
items = res.get("selected_items") or res.get("routed_rules") or []
sel_titles = titles(items)
routed = pick_rules(rules, sel_titles)
summaries = sorted({r.get("summary", "") for r in routed if r.get("summary")})
return titles(routed), summaries, routed
def reflect_findings(rules: list[dict[str, Any]], facts: list[dict[str, Any]], findings: list[dict[str, Any]], party_role: str) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for rule in rules:
name = rule.get("title", "")
scoped = [f for f in findings if f.get("rule_title") == name]
if not scoped:
continue
summary = rule.get("summary", "")
fact_scope = [{summary: f.get(summary)} for f in facts if summary and isinstance(f, dict) and summary in f]
res = llm("reflect", rule=rule, findings=scoped, facts=fact_scope, party_role=party_role)
out.extend(res.get("final_findings") or res.get("findings") or [])
return out
def merge_by_segment(findings: list[dict[str, Any]]) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
segs = sorted({int(f.get("segment_id", 0) or 0) for f in findings})
for seg in segs:
items = [f for f in findings if int(f.get("segment_id", 0) or 0) == seg and str(f.get("result", "")).strip() == "不合格"]
if not items:
continue
res = llm("merger", payload=items)
merged = res.get("findings") or []
for item in merged:
item.setdefault("segment_id", seg)
out.extend(merged)
return out
def run(file: Path, rules_path: Path, ruleset: str, out_dir: Path, party_role: str, route_by: str, reflect: bool, merge_mode: str, max_chunks: int, dry_run: bool) -> dict[str, Any]:
out_dir.mkdir(parents=True, exist_ok=True)
info = sh([str(DOC), "doc-load", str(file)])
rows = sh([str(XLS), "load-excel", str(rules_path), "--sheet-name", ruleset])
rules, chunk_ids = norm_rules(rows), info.get("chunk_ids", [])
if max_chunks:
chunk_ids = chunk_ids[:max_chunks]
memory = {"file": str(file), "ruleset": ruleset, "segment_ids": [i + 1 for i in chunk_ids], "rule_titles": titles(rules), "summary_names": sorted({r.get("summary", "") for r in rules if r.get("summary")}), "facts": [], "merge_facts": [], "findings": {"review": [], "reflect": [], "merge": []}}
if dry_run:
dump(out_dir / "memory.json", memory); return memory
for cid in chunk_ids:
text = sh([str(DOC), "doc-chunk", str(file), str(cid)], text=True)
routed_titles, routed_summaries, routed_rules = route_segment(text, rules, party_role, route_by)
summary = llm("summary", segment_text=text, rules=routed_rules, party_role=party_role)
fact = summary.get("facts", summary)
if isinstance(fact, dict):
memory["facts"].append(fact)
review = llm("review", segment_text=text, rules=routed_rules, party_role=party_role)
for f in review.get("findings", []):
f.setdefault("segment_id", cid); memory["findings"]["review"].append(f)
memory.setdefault("routes", []).append({"segment_id": cid + 1, "routed_rule_titles": routed_titles, "routed_summary_names": routed_summaries})
if reflect:
memory["findings"]["reflect"] = reflect_findings(rules, memory["facts"], memory["findings"]["review"], party_role)
source = memory["findings"]["reflect"] or memory["findings"]["review"]
memory["findings"]["merge"] = merge_by_segment(source)
fact_res = llm("fact-merge", facts=memory["facts"], summary_names=memory["summary_names"], merge_mode=merge_mode)
memory["merge_facts"] = [fact_res.get("merge_facts", {})]
mem_path = Path(dump(out_dir / "memory.json", memory))
sh([str(XLS), "export-findings-excel", "@" + str(mem_path), str(out_dir / "review.xlsx"), "--finding-key", "merge" if memory["findings"]["merge"] else ("reflect" if memory["findings"]["reflect"] else "review")], text=True)
if file.suffix.lower() == ".docx":
sh([str(DOC), "docx-add-comments", str(file), "@" + str(mem_path), str(out_dir / "commented.docx"), "--finding-key", "merge" if memory["findings"]["merge"] else ("reflect" if memory["findings"]["reflect"] else "review")], text=True)
return memory
def main() -> int:
p = argparse.ArgumentParser(description="Contract review flow orchestrator")
p.add_argument("file"); p.add_argument("--rules", default=str(ROOT / "data/rules.xlsx")); p.add_argument("--ruleset", default="通用"); p.add_argument("--out-dir", default="outputs/review-flow")
p.add_argument("--party-role", default=""); p.add_argument("--route-by", choices=["rule", "none"], default="rule"); p.add_argument("--no-reflect", action="store_true"); p.add_argument("--merge-mode", choices=["llm", "rule"], default="rule")
p.add_argument("--max-chunks", type=int, default=0); p.add_argument("--dry-run", action="store_true")
a = p.parse_args()
run(Path(a.file), Path(a.rules), a.ruleset, Path(a.out_dir), a.party_role, a.route_by, not a.no_reflect, a.merge_mode, a.max_chunks, a.dry_run)
print("输出目录:", Path(a.out_dir).resolve())
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Single-file CLI version inspired by utils/doc_util.py and utils/excel_util.py.
This script does not import local project files. It keeps the public shape of
ExcelUtil and a lightweight document chunk reader so the skill can be used from
CLI while staying portable.
"""
"""Compatibility dispatcher for the split doc/excel CLIs."""
from __future__ import annotations
import argparse
import csv
import html
import json
import re
import string
import zipfile
from abc import ABC, abstractmethod
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from xml.etree import ElementTree as ET
DEFAULT_MAX_SINGLE_CHUNK_SIZE = 100000
DEFAULT_MIN_SINGLE_CHUNK_SIZE = 2000
DEFAULT_MAX_CHUNK_PAGE = 10
WORD_NS = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main"}
XLSX_NS = {
"a": "http://schemas.openxmlformats.org/spreadsheetml/2006/main",
"r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships",
"rel": "http://schemas.openxmlformats.org/package/2006/relationships",
EXCEL_COMMANDS = {
"load-excel",
"list-sheets",
"find-value",
"map-rows",
"export-findings-excel",
"export-facts-excel",
}
DOC_COMMANDS = {
"doc-load",
"doc-ocr",
"doc-chunk",
"doc-info",
"doc-adjust-chunk-size",
"doc-text",
"docx-add-comments",
"process-string",
"is-messy-text",
}
class ExcelLoadError(Exception):
"""Raised when Excel loading fails. Mirrors utils.excel_util.ExcelLoadError."""
def _column_index(cell_ref: str) -> int:
letters = "".join(ch for ch in cell_ref if ch in string.ascii_letters)
index = 0
for ch in letters.upper():
index = index * 26 + (ord(ch) - ord("A") + 1)
return max(index - 1, 0)
class ExcelUtil:
"""Standalone Excel helper aligned with utils.excel_util.ExcelUtil."""
def __init__(self, file_path: Union[str, Path]):
self.file_path = Path(file_path)
@staticmethod
def _import_openpyxl():
try:
import openpyxl # type: ignore
except ImportError as exc:
raise ExcelLoadError("openpyxl is unavailable") from exc
return openpyxl
def _ensure_exists(self) -> None:
if not self.file_path.exists():
raise ExcelLoadError(f"File not found: {self.file_path}")
def _load_with_openpyxl(self, sheet_name: Optional[str], has_header: bool) -> List[Union[Dict[str, object], List[object]]]:
openpyxl = self._import_openpyxl()
try:
wb = openpyxl.load_workbook(self.file_path, data_only=True, read_only=True)
except Exception as exc:
raise ExcelLoadError(f"Failed to open Excel file: {exc}") from exc
ws = wb[sheet_name] if sheet_name else wb.active
rows = list(ws.iter_rows(values_only=True))
return self._rows_to_result(rows, has_header)
@staticmethod
def _rows_to_result(rows: list[tuple[Any, ...] | list[Any]], has_header: bool) -> List[Union[Dict[str, object], List[object]]]:
if not rows:
return []
if not has_header:
return [list(row) for row in rows]
headers = [str(h).strip() if h is not None else "" for h in rows[0]]
result: List[Dict[str, object]] = []
for row in rows[1:]:
row_dict = {headers[i] if i < len(headers) else f"col{i}": row[i] for i in range(len(row))}
result.append(row_dict)
return result
def load(self, sheet_name: Optional[str] = None, has_header: bool = True) -> List[Union[Dict[str, object], List[object]]]:
"""Mirror ExcelUtil.load. Uses openpyxl if available, otherwise stdlib xlsx parsing."""
self._ensure_exists()
if self.file_path.suffix.lower() in {".csv", ".tsv"}:
delimiter = "\t" if self.file_path.suffix.lower() == ".tsv" else ","
return self._rows_to_result(_read_csv_rows(self.file_path, delimiter), has_header)
try:
return self._load_with_openpyxl(sheet_name, has_header)
except ExcelLoadError:
if self.file_path.suffix.lower() != ".xlsx":
raise
return _load_xlsx_stdlib(self.file_path, sheet_name, has_header)
def list_sheets(self) -> List[str]:
"""Mirror ExcelUtil.list_sheets."""
self._ensure_exists()
try:
openpyxl = self._import_openpyxl()
wb = openpyxl.load_workbook(self.file_path, read_only=True)
return wb.sheetnames
except ExcelLoadError:
if self.file_path.suffix.lower() != ".xlsx":
raise
return _list_xlsx_sheets_stdlib(self.file_path)
except Exception as exc:
raise ExcelLoadError(f"Failed to read sheet names: {exc}") from exc
def find_value_by_column(
self,
key_column: str,
key_value: object,
value_column: str,
sheet_name: Optional[str] = None,
) -> Optional[object]:
"""Mirror ExcelUtil.find_value_by_column."""
rows = self.load(sheet_name=sheet_name, has_header=True)
for row in rows:
if isinstance(row, dict) and row.get(key_column) == key_value:
return row.get(value_column)
return None
def map_rows(self, sheet_name: Optional[str], column_map: Dict[str, str]) -> List[Dict[str, object]]:
"""Mirror ExcelUtil.map_rows."""
rows = self.load(sheet_name=sheet_name, has_header=True)
mapped: List[Dict[str, object]] = []
for row in rows:
if not isinstance(row, dict):
continue
mapped.append({new_key: row.get(header) for new_key, header in column_map.items()})
return mapped
@classmethod
def load_excel(cls, file_path: Union[str, Path], sheet_name: Optional[str] = None, has_header: bool = True):
return cls(file_path).load(sheet_name=sheet_name, has_header=has_header)
@classmethod
def list_excel_sheets(cls, file_path: Union[str, Path]) -> List[str]:
return cls(file_path).list_sheets()
@classmethod
def find_value_by_column_excel(
cls,
file_path: Union[str, Path],
key_column: str,
key_value: object,
value_column: str,
sheet_name: Optional[str] = None,
) -> Optional[object]:
return cls(file_path).find_value_by_column(key_column, key_value, value_column, sheet_name)
@classmethod
def load_mapped_excel(cls, file_path: Union[str, Path], sheet_name: Optional[str], column_map: Dict[str, str]):
return cls(file_path).map_rows(sheet_name=sheet_name, column_map=column_map)
def _read_csv_rows(path: Path, delimiter: str) -> list[list[str]]:
with path.open(newline="", encoding="utf-8-sig", errors="replace") as file:
return list(csv.reader(file, delimiter=delimiter))
def _xlsx_shared_strings(zf: zipfile.ZipFile) -> list[str]:
try:
root = ET.fromstring(zf.read("xl/sharedStrings.xml"))
except KeyError:
return []
return ["".join(t.text or "" for t in item.findall(".//a:t", XLSX_NS)) for item in root.findall(".//a:si", XLSX_NS)]
def _workbook_sheet_map(zf: zipfile.ZipFile) -> list[tuple[str, str]]:
workbook = ET.fromstring(zf.read("xl/workbook.xml"))
rels = ET.fromstring(zf.read("xl/_rels/workbook.xml.rels"))
rel_map = {rel.attrib["Id"]: rel.attrib["Target"] for rel in rels.findall("rel:Relationship", XLSX_NS)}
sheets = []
for sheet in workbook.findall(".//a:sheets/a:sheet", XLSX_NS):
name = sheet.attrib.get("name", "")
rel_id = sheet.attrib.get(f"{{{XLSX_NS['r']}}}id", "")
target = rel_map.get(rel_id, "")
path = "xl/" + target.lstrip("/") if not target.startswith("xl/") else target
sheets.append((name, path))
return sheets
def _list_xlsx_sheets_stdlib(path: Path) -> list[str]:
with zipfile.ZipFile(path) as zf:
return [name for name, _ in _workbook_sheet_map(zf)]
def _script(name: str) -> str:
return str(Path(__file__).resolve().with_name(name))
def _load_xlsx_stdlib(path: Path, sheet_name: Optional[str], has_header: bool):
with zipfile.ZipFile(path) as zf:
shared = _xlsx_shared_strings(zf)
sheets = _workbook_sheet_map(zf)
if not sheets:
return []
sheet_path = sheets[0][1]
if sheet_name:
for name, candidate_path in sheets:
if name == sheet_name:
sheet_path = candidate_path
break
else:
raise ExcelLoadError(f"Sheet not found: {sheet_name}")
root = ET.fromstring(zf.read(sheet_path))
def main(argv: list[str] | None = None) -> int:
args = list(sys.argv[1:] if argv is None else argv)
if not args or args[0] in {"-h", "--help"}:
parser = argparse.ArgumentParser(description="Compatibility dispatcher for doc_tool.py and excel_tool.py")
parser.add_argument("command", nargs="?", choices=sorted(EXCEL_COMMANDS | DOC_COMMANDS))
parser.print_help()
print("\nExcel commands:", ", ".join(sorted(EXCEL_COMMANDS)))
print("Doc commands:", ", ".join(sorted(DOC_COMMANDS)))
return 0
rows: list[list[Any]] = []
for row in root.findall(".//a:sheetData/a:row", XLSX_NS):
values: list[Any] = []
for cell in row.findall("a:c", XLSX_NS):
cell_idx = _column_index(cell.attrib.get("r", ""))
while len(values) < cell_idx:
values.append(None)
cell_type = cell.attrib.get("t")
value_node = cell.find("a:v", XLSX_NS)
inline_node = cell.find("a:is/a:t", XLSX_NS)
raw = value_node.text if value_node is not None else inline_node.text if inline_node is not None else None
if cell_type == "s" and raw is not None:
value = shared[int(raw)] if int(raw) < len(shared) else raw
command = args[0]
if command in EXCEL_COMMANDS:
target = _script("excel_tool.py")
elif command in DOC_COMMANDS:
target = _script("doc_tool.py")
else:
value = raw
values.append(value)
rows.append(values)
return ExcelUtil._rows_to_result(rows, has_header)
class DocBase(ABC):
"""Standalone shape of utils.doc_util.DocBase."""
def __init__(self, **kwargs):
self._doc_path = None
self._doc_name = None
self._kwargs = kwargs
self._max_single_chunk_size = kwargs.get("max_single_chunk_size", DEFAULT_MAX_SINGLE_CHUNK_SIZE)
@abstractmethod
def load(self, doc_path):
pass
@abstractmethod
def adjust_chunk_size(self):
pass
@abstractmethod
async def get_from_ocr(self):
pass
@abstractmethod
def get_chunk_item(self, chunk_id):
pass
@abstractmethod
def get_chunk_info(self, chunk_id):
pass
@abstractmethod
def get_chunk_location(self, chunk_id):
pass
@abstractmethod
def add_chunk_comment(self, chunk_id, comments):
pass
@abstractmethod
def edit_chunk_comment(self, comments):
pass
@abstractmethod
def delete_chunk_comment(self, comments):
pass
@abstractmethod
def get_chunk_id_list(self, step=1):
pass
@abstractmethod
def get_chunk_num(self):
pass
@abstractmethod
def get_all_text(self):
pass
def to_file(self, path, **kwargs):
Path(path).write_text(self.get_all_text(), encoding="utf-8")
def release(self):
pass
def adjust_single_chunk_size(
all_text_len: int,
max_chunk_page: int = DEFAULT_MAX_CHUNK_PAGE,
min_single_chunk_size: int = DEFAULT_MIN_SINGLE_CHUNK_SIZE,
max_single_chunk_size: int = DEFAULT_MAX_SINGLE_CHUNK_SIZE,
) -> int:
desired_chunk_size = all_text_len // max_chunk_page
return max(min_single_chunk_size, min(desired_chunk_size, max_single_chunk_size))
class StandaloneDoc(DocBase):
"""Lightweight document reader with DocBase-style chunk methods."""
def load(self, doc_path):
self._doc_path = str(doc_path)
self._doc_name = Path(doc_path).name
self._all_text = read_document_text(doc_path)
self._chunk_list = self._resolve_doc_chunk()
def adjust_chunk_size(self):
self._max_single_chunk_size = adjust_single_chunk_size(len(self.get_all_text()))
self._chunk_list = self._resolve_doc_chunk()
return self._max_single_chunk_size
async def get_from_ocr(self):
return ""
def _resolve_doc_chunk(self):
text = self._all_text
chunks = []
for start in range(0, len(text), self._max_single_chunk_size):
chunks.append({"text": text[start : start + self._max_single_chunk_size], "start": start, "end": min(start + self._max_single_chunk_size, len(text))})
return chunks or [{"text": "", "start": 0, "end": 0}]
def get_chunk_item(self, chunk_id):
return self._chunk_list[chunk_id]["text"]
def get_chunk_info(self, chunk_id):
chunk = self._chunk_list[chunk_id]
text = chunk["text"]
tips = f"[{text[:20]}]...到... [{text[-20:]}]" if text else "[]"
return f"文件块id: {chunk_id + 1}\n文件块位置: 字符{chunk['start']}到{chunk['end']}\n文件块简述: {tips}\n"
def get_chunk_location(self, chunk_id):
chunk = self._chunk_list[chunk_id]
return f"字符{chunk['start']}到{chunk['end']}"
def add_chunk_comment(self, chunk_id, comments):
raise NotImplementedError("StandaloneDoc does not modify Word/PDF comments.")
def edit_chunk_comment(self, comments):
raise NotImplementedError("StandaloneDoc does not modify Word/PDF comments.")
def delete_chunk_comment(self, comments):
raise NotImplementedError("StandaloneDoc does not modify Word/PDF comments.")
def get_chunk_id_list(self, step=1):
return [idx for idx in range(0, self.get_chunk_num(), step)]
def get_chunk_num(self):
return len(self._chunk_list)
def get_all_text(self):
return self._all_text
def parse_docx(path: Union[str, Path]) -> str:
with zipfile.ZipFile(path) as zf:
xml_data = zf.read("word/document.xml")
root = ET.fromstring(xml_data)
paragraphs = []
for paragraph in root.findall(".//w:p", WORD_NS):
parts = []
for node in paragraph.iter():
if node.tag == f"{{{WORD_NS['w']}}}t":
parts.append(node.text or "")
elif node.tag == f"{{{WORD_NS['w']}}}tab":
parts.append("\t")
text = "".join(parts)
if text:
paragraphs.append(text)
return "\n".join(paragraphs)
def read_document_text(path: Union[str, Path]) -> str:
suffix = Path(path).suffix.lower()
if suffix == ".docx":
return parse_docx(path)
if suffix in {".xlsx", ".xlsm", ".csv", ".tsv"}:
return json.dumps(ExcelUtil(path).load(has_header=False), ensure_ascii=False)
if suffix == ".pdf":
return read_pdf_text_optional(path)
return Path(path).read_text(encoding="utf-8", errors="replace")
def read_pdf_text_optional(path: Union[str, Path]) -> str:
try:
import fitz # type: ignore
except ImportError as exc:
raise RuntimeError("PDF text extraction needs PyMuPDF installed; this script does not import project utils.") from exc
pdf = fitz.open(path)
try:
return "\n".join(page.get_text() for page in pdf)
finally:
pdf.close()
def process_string(s: str) -> str:
"""Mirror utils.spire_word_util.process_string."""
newline_count = s.count("\n")
if newline_count == 0:
return s
if newline_count == 1:
parts = s.split("\n", 1)
return parts[0] if len(parts[0]) >= len(parts[1]) else parts[1]
parts = s.split("\n")
middle_parts = parts[1:-1] if len(parts) > 2 else []
if not middle_parts:
non_empty_parts = [p for p in parts if p]
return max(non_empty_parts, key=len) if non_empty_parts else ""
return max(middle_parts, key=len, default="")
def is_messy_text(
text: str,
min_chars=40,
chinese_ratio_thresh=0.20,
printable_ratio_thresh=0.70,
symbol_ratio_thresh=0.30,
longest_non_word_run_thresh=10,
english_word_density_thresh=0.03,
) -> bool:
"""Mirror utils.spire_pdf_util.is_messy_text."""
if not text:
return True
text_len = len(text)
if text_len < min_chars:
return True
chinese_count = sum(1 for c in text if "\u4e00" <= c <= "\u9fff")
printable_count = sum(1 for c in text if c.isprintable())
symbol_count = sum(1 for c in text if not (("\u4e00" <= c <= "\u9fff") or c.isalnum() or c.isspace()))
chinese_ratio = chinese_count / text_len
printable_ratio = printable_count / text_len
symbol_ratio = symbol_count / text_len
non_word_runs = re.findall(r"[^0-9A-Za-z\u4e00-\u9fff\s]+", text)
longest_non_word_run = max((len(s) for s in non_word_runs), default=0)
english_words = re.findall(r"\b[a-zA-Z]{2,}\b", text)
english_word_density = len(english_words) / max(1, text_len)
if printable_ratio < printable_ratio_thresh * 0.6:
return True
if symbol_ratio > max(0.5, symbol_ratio_thresh):
return True
if longest_non_word_run >= longest_non_word_run_thresh * 1.5:
return True
if chinese_ratio < chinese_ratio_thresh and english_word_density < english_word_density_thresh and symbol_ratio > symbol_ratio_thresh:
return True
if chinese_ratio < (chinese_ratio_thresh * 0.5) and printable_ratio < printable_ratio_thresh:
return True
return False
def _json_print(value: Any) -> None:
print(json.dumps(value, ensure_ascii=False, indent=2))
def main() -> int:
parser = argparse.ArgumentParser(description="Single-file doc/excel utilities based on utils/")
sub = parser.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("load-excel")
p.add_argument("file")
p.add_argument("--sheet-name")
p.add_argument("--no-header", action="store_true")
p = sub.add_parser("list-sheets")
p.add_argument("file")
p = sub.add_parser("find-value")
p.add_argument("file")
p.add_argument("key_column")
p.add_argument("key_value")
p.add_argument("value_column")
p.add_argument("--sheet-name")
p = sub.add_parser("map-rows")
p.add_argument("file")
p.add_argument("column_map", help="JSON object: {new_key: header_name}")
p.add_argument("--sheet-name")
p = sub.add_parser("doc-load")
p.add_argument("file")
p.add_argument("--max-single-chunk-size", type=int, default=DEFAULT_MAX_SINGLE_CHUNK_SIZE)
p = sub.add_parser("doc-chunk")
p.add_argument("file")
p.add_argument("chunk_id", type=int)
p.add_argument("--max-single-chunk-size", type=int, default=DEFAULT_MAX_SINGLE_CHUNK_SIZE)
p = sub.add_parser("doc-info")
p.add_argument("file")
p.add_argument("chunk_id", type=int)
p.add_argument("--max-single-chunk-size", type=int, default=DEFAULT_MAX_SINGLE_CHUNK_SIZE)
p = sub.add_parser("doc-adjust-chunk-size")
p.add_argument("file")
p = sub.add_parser("process-string")
p.add_argument("text")
p = sub.add_parser("is-messy-text")
p.add_argument("text")
args = parser.parse_args()
if args.cmd == "load-excel":
_json_print(ExcelUtil.load_excel(args.file, sheet_name=args.sheet_name, has_header=not args.no_header))
elif args.cmd == "list-sheets":
_json_print(ExcelUtil.list_excel_sheets(args.file))
elif args.cmd == "find-value":
value: object = args.key_value
_json_print(ExcelUtil.find_value_by_column_excel(args.file, args.key_column, value, args.value_column, args.sheet_name))
elif args.cmd == "map-rows":
_json_print(ExcelUtil.load_mapped_excel(args.file, args.sheet_name, json.loads(args.column_map)))
elif args.cmd in {"doc-load", "doc-chunk", "doc-info", "doc-adjust-chunk-size"}:
doc = StandaloneDoc(max_single_chunk_size=getattr(args, "max_single_chunk_size", DEFAULT_MAX_SINGLE_CHUNK_SIZE))
doc.load(args.file)
if args.cmd == "doc-load":
_json_print({"chunk_num": doc.get_chunk_num(), "chunk_ids": doc.get_chunk_id_list(), "text_len": len(doc.get_all_text())})
elif args.cmd == "doc-chunk":
print(doc.get_chunk_item(args.chunk_id))
elif args.cmd == "doc-info":
print(doc.get_chunk_info(args.chunk_id))
elif args.cmd == "doc-adjust-chunk-size":
print(doc.adjust_chunk_size())
elif args.cmd == "process-string":
print(process_string(args.text))
elif args.cmd == "is-messy-text":
print(json.dumps(is_messy_text(args.text), ensure_ascii=False))
return 0
print(f"unknown command: {command}", file=sys.stderr)
return 2
return subprocess.call([sys.executable, target, *args])
if __name__ == "__main__":
......
#!/usr/bin/env python3
"""Standalone document CLI for text chunks and docx comments."""
from __future__ import annotations
import argparse, json, re, shutil, zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from xml.etree import ElementTree as ET
MAX_CHUNK, MIN_CHUNK, MAX_PAGE = 100000, 2000, 10
NS = {"w": "http://schemas.openxmlformats.org/wordprocessingml/2006/main", "r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships", "rel": "http://schemas.openxmlformats.org/package/2006/relationships", "ct": "http://schemas.openxmlformats.org/package/2006/content-types"}
def _load_json(v: str) -> Any:
return json.loads(Path(v[1:]).read_text(encoding="utf-8") if v.startswith("@") else v)
def q(ns: str, tag: str) -> str:
return f"{{{NS[ns]}}}{tag}"
def parse_docx(path: str | Path) -> str:
with zipfile.ZipFile(path) as zf:
root = ET.fromstring(zf.read("word/document.xml"))
ps = []
for p in root.findall(".//w:p", NS):
parts = []
for n in p.iter():
if n.tag == q("w", "t"):
parts.append(n.text or "")
elif n.tag == q("w", "tab"):
parts.append("\t")
if "".join(parts):
ps.append("".join(parts))
return "\n".join(ps)
def read_text(path: str | Path) -> str:
p = Path(path)
if p.suffix.lower() == ".docx":
return parse_docx(p)
if p.suffix.lower() == ".pdf":
try:
import fitz # type: ignore
except ImportError as exc:
raise RuntimeError("PyMuPDF is required for PDF text extraction") from exc
pdf = fitz.open(p)
try:
return "\n".join(page.get_text() for page in pdf)
finally:
pdf.close()
return p.read_text(encoding="utf-8", errors="replace")
def chunk_text(text: str, size: int) -> list[dict[str, Any]]:
return [{"text": text[i : i + size], "start": i, "end": min(i + size, len(text))} for i in range(0, len(text), size)] or [{"text": "", "start": 0, "end": 0}]
def adjust_size(text_len: int) -> int:
return max(MIN_CHUNK, min(text_len // MAX_PAGE, MAX_CHUNK))
def process_string(s: str) -> str:
if "\n" not in s:
return s
parts = s.split("\n")
if len(parts) == 2:
return parts[0] if len(parts[0]) >= len(parts[1]) else parts[1]
middle = parts[1:-1]
return max(middle or [p for p in parts if p], key=len, default="")
def is_messy_text(text: str, min_chars: int = 40) -> bool:
if not text or len(text) < min_chars:
return True
n = len(text)
cn = sum(1 for c in text if "\u4e00" <= c <= "\u9fff") / n
printable = sum(1 for c in text if c.isprintable()) / n
sym = sum(1 for c in text if not (("\u4e00" <= c <= "\u9fff") or c.isalnum() or c.isspace())) / n
longest = max((len(s) for s in re.findall(r"[^0-9A-Za-z\u4e00-\u9fff\s]+", text)), default=0)
return printable < 0.42 or sym > 0.5 or longest >= 15 or (cn < 0.1 and printable < 0.7)
def norm_findings(v: Any, key: str | None = None) -> list[dict[str, Any]]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
if not isinstance(v, dict):
return []
if "findings" in v:
x = v["findings"]
return norm_findings(x.get(key) if key and isinstance(x, dict) else x)
if key and key in v:
return norm_findings(v[key])
if any(k in v for k in ("rule_title", "issue", "suggestion")):
return [v]
return [x for items in v.values() for x in norm_findings(items)]
def p_text(p: ET.Element) -> str:
return "".join(n.text or "" for n in p.iter(q("w", "t")))
def comment_body(f: dict[str, Any]) -> str:
pairs = [("风险等级", f.get("risk_level") or f.get("level")), ("合格性", f.get("result")), ("问题", f.get("issue")), ("建议", f.get("suggestion"))]
return "\n".join(f"{k}:{v}" for k, v in pairs if v) or "合同审查提示"
def target_para(paras: list[ET.Element], f: dict[str, Any]) -> ET.Element | None:
original = str(f.get("original_text") or "").strip()
if original:
compact = re.sub(r"\s+", "", original)
for p in paras:
t = p_text(p)
if original in t or compact in re.sub(r"\s+", "", t):
return p
non_empty = [p for p in paras if p_text(p).strip()]
idx = max(int(f.get("segment_id") or 0), 0)
return non_empty[min(idx, len(non_empty) - 1)] if non_empty else (paras[0] if paras else None)
def ensure_rel(root: ET.Element) -> None:
rels = root.findall(f"{{{NS['rel']}}}Relationship")
if any(r.attrib.get("Type", "").endswith("/comments") for r in rels):
return
ids = [int(r.attrib["Id"][3:]) for r in rels if r.attrib.get("Id", "").startswith("rId") and r.attrib["Id"][3:].isdigit()]
r = ET.SubElement(root, f"{{{NS['rel']}}}Relationship", Id=f"rId{(max(ids) if ids else 0) + 1}")
r.set("Type", "http://schemas.openxmlformats.org/officeDocument/2006/relationships/comments"); r.set("Target", "comments.xml")
def ensure_ct(root: ET.Element) -> None:
if any(o.attrib.get("PartName") == "/word/comments.xml" for o in root.findall(f"{{{NS['ct']}}}Override")):
return
ET.SubElement(root, f"{{{NS['ct']}}}Override", PartName="/word/comments.xml", ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.comments+xml")
def add_comments(src: str, findings: Any, out: str, key: str | None = None, author: str = "合同审查智能体") -> str:
if Path(src).suffix.lower() != ".docx":
raise ValueError("docx-add-comments only supports .docx")
Path(out).parent.mkdir(parents=True, exist_ok=True)
fs = norm_findings(findings, key)
if not fs:
shutil.copyfile(src, out); return out
ET.register_namespace("w", NS["w"]); ET.register_namespace("r", NS["r"])
with zipfile.ZipFile(src) as zin:
files = {n: zin.read(n) for n in zin.namelist()}
doc = ET.fromstring(files["word/document.xml"]); paras = doc.findall(".//w:p", NS)
comments = ET.fromstring(files["word/comments.xml"]) if "word/comments.xml" in files else ET.Element(q("w", "comments"))
ids = [int(c.attrib.get(q("w", "id"), -1)) for c in comments.findall("w:comment", NS)]
cid = (max(ids) if ids else -1) + 1
for f in fs:
p = target_para(paras, f)
if p is None:
continue
c = ET.SubElement(comments, q("w", "comment")); c.set(q("w", "id"), str(cid)); c.set(q("w", "author"), author); c.set(q("w", "date"), datetime.now(timezone.utc).replace(microsecond=0).isoformat())
ET.SubElement(ET.SubElement(ET.SubElement(c, q("w", "p")), q("w", "r")), q("w", "t")).text = comment_body(f)
start = ET.Element(q("w", "commentRangeStart")); start.set(q("w", "id"), str(cid))
end = ET.Element(q("w", "commentRangeEnd")); end.set(q("w", "id"), str(cid))
ref_run = ET.Element(q("w", "r")); ET.SubElement(ref_run, q("w", "commentReference")).set(q("w", "id"), str(cid))
p.insert(0, start); p.append(end); p.append(ref_run); cid += 1
rel_path = "word/_rels/document.xml.rels"
rel = ET.fromstring(files[rel_path]) if rel_path in files else ET.Element(f"{{{NS['rel']}}}Relationships")
ct = ET.fromstring(files["[Content_Types].xml"]); ensure_rel(rel); ensure_ct(ct)
files.update({"word/document.xml": ET.tostring(doc, encoding="utf-8", xml_declaration=True), "word/comments.xml": ET.tostring(comments, encoding="utf-8", xml_declaration=True), rel_path: ET.tostring(rel, encoding="utf-8", xml_declaration=True), "[Content_Types].xml": ET.tostring(ct, encoding="utf-8", xml_declaration=True)})
with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zout:
for n, data in files.items():
zout.writestr(n, data)
return out
def main() -> int:
p = argparse.ArgumentParser(description="Standalone document CLI"); sub = p.add_subparsers(dest="cmd", required=True)
for name in ["doc-load", "doc-text", "doc-adjust-chunk-size"]:
a = sub.add_parser(name); a.add_argument("file"); a.add_argument("--max-single-chunk-size", type=int, default=MAX_CHUNK)
a = sub.add_parser("doc-chunk"); a.add_argument("file"); a.add_argument("chunk_id", type=int); a.add_argument("--max-single-chunk-size", type=int, default=MAX_CHUNK)
a = sub.add_parser("doc-info"); a.add_argument("file"); a.add_argument("chunk_id", type=int); a.add_argument("--max-single-chunk-size", type=int, default=MAX_CHUNK)
a = sub.add_parser("doc-ocr"); a.add_argument("file")
a = sub.add_parser("process-string"); a.add_argument("text")
a = sub.add_parser("is-messy-text"); a.add_argument("text")
a = sub.add_parser("docx-add-comments"); a.add_argument("input"); a.add_argument("findings"); a.add_argument("output"); a.add_argument("--finding-key"); a.add_argument("--author", default="合同审查智能体")
x = p.parse_args()
if x.cmd == "process-string": print(process_string(x.text)); return 0
if x.cmd == "is-messy-text": print(json.dumps(is_messy_text(x.text))); return 0
if x.cmd == "docx-add-comments": print(add_comments(x.input, _load_json(x.findings), x.output, x.finding_key, x.author)); return 0
if x.cmd == "doc-ocr": raise SystemExit("doc-ocr is not implemented here; use ocr-skill")
text = read_text(x.file); size = getattr(x, "max_single_chunk_size", MAX_CHUNK); chunks = chunk_text(text, size)
if x.cmd == "doc-load": print(json.dumps({"tool": "StandaloneDoc", "chunk_num": len(chunks), "chunk_ids": list(range(len(chunks))), "text_len": len(text)}, ensure_ascii=False, indent=2))
elif x.cmd == "doc-text": print(text)
elif x.cmd == "doc-adjust-chunk-size": print(adjust_size(len(text)))
elif x.cmd == "doc-chunk": print(chunks[x.chunk_id]["text"])
elif x.cmd == "doc-info":
c = chunks[x.chunk_id]; t = c["text"]; print(f"文件块id: {x.chunk_id + 1}\n文件块位置: 字符{c['start']}到{c['end']}\n文件块简述: [{t[:20]}]...到... [{t[-20:]}]\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone Excel CLI for rule tables and review exports."""
from __future__ import annotations
import argparse
import csv
import json
import string
import zipfile
from pathlib import Path
from typing import Any
from xml.etree import ElementTree as ET
NS = {"a": "http://schemas.openxmlformats.org/spreadsheetml/2006/main", "r": "http://schemas.openxmlformats.org/officeDocument/2006/relationships", "rel": "http://schemas.openxmlformats.org/package/2006/relationships"}
class ExcelLoadError(Exception):
pass
def _json(v: Any) -> None:
print(json.dumps(v, ensure_ascii=False, indent=2))
def _load_json(v: str) -> Any:
return json.loads(Path(v[1:]).read_text(encoding="utf-8") if v.startswith("@") else v)
def _col_idx(ref: str) -> int:
n = 0
for ch in "".join(c for c in ref if c in string.ascii_letters).upper():
n = n * 26 + ord(ch) - 64
return max(n - 1, 0)
def _rows_to_result(rows: list, header: bool) -> list:
if not rows:
return []
if not header:
return [list(r) for r in rows]
heads = [str(h).strip() if h is not None else "" for h in rows[0]]
return [{heads[i] if i < len(heads) else f"col{i}": row[i] for i in range(len(row))} for row in rows[1:]]
def _sheet_map(zf: zipfile.ZipFile) -> list[tuple[str, str]]:
wb = ET.fromstring(zf.read("xl/workbook.xml"))
rels = ET.fromstring(zf.read("xl/_rels/workbook.xml.rels"))
rel_map = {r.attrib["Id"]: r.attrib["Target"] for r in rels.findall("rel:Relationship", NS)}
out = []
for s in wb.findall(".//a:sheets/a:sheet", NS):
target = rel_map.get(s.attrib.get(f"{{{NS['r']}}}id", ""), "")
out.append((s.attrib.get("name", ""), "xl/" + target.lstrip("/") if not target.startswith("xl/") else target))
return out
def _shared(zf: zipfile.ZipFile) -> list[str]:
try:
root = ET.fromstring(zf.read("xl/sharedStrings.xml"))
except KeyError:
return []
return ["".join(t.text or "" for t in item.findall(".//a:t", NS)) for item in root.findall(".//a:si", NS)]
def _load_std_xlsx(path: Path, sheet: str | None, header: bool) -> list:
with zipfile.ZipFile(path) as zf:
shared, sheets = _shared(zf), _sheet_map(zf)
if not sheets:
return []
sheet_path = next((p for n, p in sheets if n == sheet), sheets[0][1])
root = ET.fromstring(zf.read(sheet_path))
rows = []
for r in root.findall(".//a:sheetData/a:row", NS):
values = []
for c in r.findall("a:c", NS):
while len(values) < _col_idx(c.attrib.get("r", "")):
values.append(None)
raw = (c.find("a:v", NS).text if c.find("a:v", NS) is not None else None)
values.append(shared[int(raw)] if c.attrib.get("t") == "s" and raw is not None and int(raw) < len(shared) else raw)
rows.append(values)
return _rows_to_result(rows, header)
def load_excel(path: str, sheet: str | None = None, header: bool = True) -> list:
p = Path(path)
if p.suffix.lower() in {".csv", ".tsv"}:
with p.open(newline="", encoding="utf-8-sig", errors="replace") as f:
return _rows_to_result(list(csv.reader(f, delimiter="\t" if p.suffix.lower() == ".tsv" else ",")), header)
try:
import openpyxl # type: ignore
wb = openpyxl.load_workbook(p, data_only=True, read_only=True)
ws = wb[sheet] if sheet else wb.active
return _rows_to_result(list(ws.iter_rows(values_only=True)), header)
except ImportError:
if p.suffix.lower() != ".xlsx":
raise ExcelLoadError("openpyxl is required for non-xlsx files")
return _load_std_xlsx(p, sheet, header)
def list_sheets(path: str) -> list[str]:
try:
import openpyxl # type: ignore
return openpyxl.load_workbook(path, read_only=True).sheetnames
except ImportError:
with zipfile.ZipFile(path) as zf:
return [n for n, _ in _sheet_map(zf)]
def _norm_findings(v: Any, key: str | None = None) -> list[dict]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
if not isinstance(v, dict):
return []
if "findings" in v:
return _norm_findings(v["findings"].get(key) if key and isinstance(v["findings"], dict) else v["findings"])
if key and key in v:
return _norm_findings(v[key])
if any(k in v for k in ("rule_title", "issue", "suggestion")):
return [v]
return [x for items in v.values() for x in _norm_findings(items)]
def _norm_facts(v: Any, key: str) -> list[dict]:
if isinstance(v, list):
return [x for x in v if isinstance(x, dict)]
if isinstance(v, dict):
x = v.get(key, v)
return [i for i in x if isinstance(i, dict)] if isinstance(x, list) else ([x] if isinstance(x, dict) else [])
return []
def _cell(v: Any) -> str:
return json.dumps(v, ensure_ascii=False, indent=2) if isinstance(v, (dict, list)) else ("" if v is None else str(v))
def export_excel(findings: Any, out: str, facts: Any = None, merge: Any = None, key: str | None = None) -> str:
from openpyxl import Workbook # type: ignore
from openpyxl.styles import Alignment, Font # type: ignore
wb, headers = Workbook(), ["ID", "规则标题", "分段ID", "原文", "问题描述", "风险等级", "合格性", "建议"]
ws = wb.active; ws.title = "审查结果"; ws.append(headers)
for f in _norm_findings(findings, key):
ws.append([f.get("id", ""), f.get("rule_title", ""), f.get("segment_id", ""), f.get("original_text", ""), f.get("issue", ""), f.get("risk_level") or f.get("level", ""), f.get("result", ""), f.get("suggestion", "")])
for sheet, rows in {"合同事实": _norm_facts(facts or {}, "facts"), "合并事实": _norm_facts(merge or {}, "merge_facts")}.items():
w = wb.create_sheet(sheet); w.append(["提取项", "提取内容"])
for item in rows:
for k, v in item.items():
if str(k) not in {"_meta", "meta"}:
w.append([k, _cell(v)])
for w in wb.worksheets:
for c in w[1]:
c.font = Font(bold=True)
for row in w.iter_rows():
for c in row:
c.alignment = Alignment(vertical="top", wrap_text=True)
Path(out).parent.mkdir(parents=True, exist_ok=True); wb.save(out); return out
def main() -> int:
p = argparse.ArgumentParser(description="Standalone Excel CLI"); sub = p.add_subparsers(dest="cmd", required=True)
a = sub.add_parser("load-excel"); a.add_argument("file"); a.add_argument("--sheet-name"); a.add_argument("--no-header", action="store_true")
a = sub.add_parser("list-sheets"); a.add_argument("file")
a = sub.add_parser("find-value"); a.add_argument("file"); a.add_argument("key_column"); a.add_argument("key_value"); a.add_argument("value_column"); a.add_argument("--sheet-name")
a = sub.add_parser("map-rows"); a.add_argument("file"); a.add_argument("column_map"); a.add_argument("--sheet-name")
a = sub.add_parser("export-findings-excel"); a.add_argument("findings"); a.add_argument("output"); a.add_argument("--facts"); a.add_argument("--merge-facts"); a.add_argument("--finding-key")
a = sub.add_parser("export-facts-excel"); a.add_argument("facts"); a.add_argument("output"); a.add_argument("--merge-facts")
x = p.parse_args()
if x.cmd == "load-excel": _json(load_excel(x.file, x.sheet_name, not x.no_header))
elif x.cmd == "list-sheets": _json(list_sheets(x.file))
elif x.cmd == "find-value": _json(next((r.get(x.value_column) for r in load_excel(x.file, x.sheet_name) if isinstance(r, dict) and r.get(x.key_column) == x.key_value), None))
elif x.cmd == "map-rows": _json([{k: r.get(v) for k, v in json.loads(x.column_map).items()} for r in load_excel(x.file, x.sheet_name) if isinstance(r, dict)])
elif x.cmd == "export-findings-excel": print(export_excel(_load_json(x.findings), x.output, _load_json(x.facts) if x.facts else None, _load_json(x.merge_facts) if x.merge_facts else None, x.finding_key))
elif x.cmd == "export-facts-excel": print(export_excel([], x.output, _load_json(x.facts), _load_json(x.merge_facts) if x.merge_facts else None))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone HTTP upload/download CLI."""
from __future__ import annotations
import argparse, json, mimetypes, random, re, string, sys, time, urllib.error, urllib.request
from pathlib import Path
from urllib.parse import unquote, urlparse
DEFAULT_OUTER_BACKEND_URL = "https://172.21.107.45:48080"
DEFAULT_BASE_FASTGPT_URL = "http://172.21.107.45:3030"
DEFAULT_BASE_BACKEND_URL = "http://172.21.107.45:1122"
DEFAULT_BACKEND_ADMIN_USERNAME = "admin"
DEFAULT_BACKEND_ADMIN_PASSWORD = "admin@jpai.com"
base_fastgpt_url, base_backend_url, outer_backend_url = DEFAULT_BASE_FASTGPT_URL, DEFAULT_BASE_BACKEND_URL, DEFAULT_OUTER_BACKEND_URL
backend_admin_username, backend_admin_password = DEFAULT_BACKEND_ADMIN_USERNAME, DEFAULT_BACKEND_ADMIN_PASSWORD
def configure_urls(fastgpt_url: str | None = None, backend_url: str | None = None, outer_url: str | None = None) -> None:
global base_fastgpt_url, base_backend_url, outer_backend_url
base_fastgpt_url = fastgpt_url or base_fastgpt_url
base_backend_url = backend_url or base_backend_url
outer_backend_url = outer_url or outer_backend_url
def configure_login(username: str | None = None, password: str | None = None) -> None:
global backend_admin_username, backend_admin_password
backend_admin_username = username or backend_admin_username
backend_admin_password = password or backend_admin_password
def _strip(url: str | None) -> str | None:
return url.rstrip("/") if url else url
def _random_str(n: int = 8) -> str:
return "".join(random.choice(string.ascii_lowercase) for _ in range(n))
def _post_json(url: str, data: dict, timeout: int = 120) -> str:
req = urllib.request.Request(url, data=json.dumps(data, ensure_ascii=False).encode(), headers={"Content-Type": "application/json"}, method="POST")
with urllib.request.urlopen(req, timeout=timeout) as resp:
return resp.read().decode("utf-8", errors="replace")
def _multipart_body(path: str, field: str = "file") -> tuple[bytes, str]:
p = Path(path); boundary = f"----http-skill-{int(time.time() * 1000)}-{_random_str()}"
ctype = mimetypes.guess_type(p.name)[0] or "application/octet-stream"
body = bytearray()
body.extend(f"--{boundary}\r\n".encode())
body.extend(f'Content-Disposition: form-data; name="{field}"; filename="{p.name}"\r\nContent-Type: {ctype}\r\n\r\n'.encode())
body.extend(p.read_bytes()); body.extend(f"\r\n--{boundary}--\r\n".encode())
return bytes(body), boundary
def upload_file(path, input_url_to_inner=True, output_url_to_inner=False) -> str:
login = _post_json(f"{base_backend_url}/admin-api/system/auth/login", {"username": backend_admin_username, "password": backend_admin_password})
token = (json.loads(login).get("data") or {}).get("accessToken")
if not token:
raise RuntimeError(f"后端登录异常:{login}")
body, boundary = _multipart_body(path)
req = urllib.request.Request(f"{base_backend_url}/admin-api/infra/file/upload", data=body, headers={"Content-Type": f"multipart/form-data; boundary={boundary}", "Authorization": token}, method="POST")
with urllib.request.urlopen(req, timeout=120) as resp:
text = resp.read().decode("utf-8", errors="replace")
res = json.loads(text).get("data")
if not res:
raise RuntimeError(f"上传{path}失败 Response text: {text}")
return res
def _basename(name: str) -> str:
return Path(unquote(name.strip().strip('"')).replace("\\", "/")).name or "downloaded_file"
def _resolve_name(url: str, headers) -> str:
cd = headers.get("content-disposition", "") or headers.get("Content-Disposition", "")
for pat in [r"filename\*=(?:UTF-8''|utf-8'')?([^;]+)", r'filename="?([^";]+)"?']:
m = re.search(pat, cd)
if m:
return _basename(m.group(1))
return _basename(urlparse(url).path)
def download_file(url, path, input_url_to_inner=True):
if input_url_to_inner and not url.startswith(("http:", "https:")):
url = base_fastgpt_url + url
if input_url_to_inner:
url = url.replace(outer_backend_url, base_backend_url)
try:
with urllib.request.urlopen(urllib.request.Request(url, method="GET"), timeout=120) as resp:
target = Path(path)
if target.exists() and target.is_dir():
target = target / _resolve_name(url, resp.headers)
target.parent.mkdir(parents=True, exist_ok=True); target.write_bytes(resp.read())
return str(target)
except urllib.error.HTTPError as exc:
print(f"{url}文件下载失败. HTTP Status Code: {exc.code}", file=sys.stderr)
return None
def url_replace_fastgpt(origin: str):
return origin if origin.startswith(("http:", "https:")) else base_fastgpt_url + origin
def add_url_args(p: argparse.ArgumentParser) -> None:
p.add_argument("--base-fastgpt-url", default=DEFAULT_BASE_FASTGPT_URL)
p.add_argument("--base-backend-url", default=DEFAULT_BASE_BACKEND_URL)
p.add_argument("--outer-backend-url", default=DEFAULT_OUTER_BACKEND_URL)
def build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="上传、下载或补全 FastGPT/后端文件 URL。")
sub = p.add_subparsers(dest="command", required=True)
u = sub.add_parser("upload"); add_url_args(u); u.add_argument("--username", default=DEFAULT_BACKEND_ADMIN_USERNAME); u.add_argument("--password", default=DEFAULT_BACKEND_ADMIN_PASSWORD); u.add_argument("path")
d = sub.add_parser("download"); add_url_args(d); d.add_argument("url"); d.add_argument("path")
n = sub.add_parser("normalize-url"); add_url_args(n); n.add_argument("url")
return p
def main(argv: list[str] | None = None) -> int:
p = build_arg_parser(); a = p.parse_args(argv)
configure_urls(_strip(a.base_fastgpt_url), _strip(a.base_backend_url), _strip(a.outer_backend_url))
if a.command == "upload":
configure_login(a.username, a.password); print(upload_file(a.path)); return 0
if a.command == "download":
saved = download_file(a.url, a.path)
if saved is None:
return 1
print(saved); return 0
if a.command == "normalize-url":
print(url_replace_fastgpt(a.url)); return 0
p.error(f"unsupported command: {a.command}"); return 2
if __name__ == "__main__":
sys.exit(main())
#!/usr/bin/env python3
"""Single-file CLI version of utils/ocr_util.py, paddle_ocr_util.py and tesseract_ocr_util.py.
"""Minimal OCR module exposing only `TesseractOCRUtil`.
This script keeps the same class names and method names where practical, but
does not import project-local modules. External runtime tools/libraries such
as PyMuPDF or tesseract are optional and loaded only when their commands need
them.
This file was trimmed to keep just the Tesseract utility requested by the
user. It intentionally omits CLI, PaddleOCR, remote OCR helpers, and other
utilities.
"""
from __future__ import annotations
import argparse
import asyncio
import codecs
import argparse
import json
import mimetypes
import os
import random
import re
import string
import subprocess
import time
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any
DEFAULT_BASE_FASTGPT_URL = "http://172.21.107.45:3030"
DEFAULT_BASE_BACKEND_URL = "http://172.21.107.45:1122"
DEFAULT_OUTER_BACKEND_URL = "https://172.21.107.45:48080"
DEFAULT_PADDLE_OCR_URL = "http://192.168.252.71:56100/ocr/pdf-robust"
def random_str(l: int = 5) -> str:
if l > len(string.ascii_lowercase):
return "".join(random.choice(string.ascii_lowercase) for _ in range(l))
return "".join(random.sample(string.ascii_lowercase, l))
def url_replace_fastgpt(origin: str, base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL) -> str:
if not origin.startswith("http:"):
origin = base_fastgpt_url + origin
return origin
def download_file(
url: str,
path: str,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
) -> str | None:
if not url.startswith("http:") and not url.startswith("https:"):
url = base_fastgpt_url + url
url = url.replace(outer_backend_url, base_backend_url)
with urllib.request.urlopen(url, timeout=120) as response:
target_path = Path(path)
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_bytes(response.read())
return str(target_path)
def _multipart_post(url: str, file_path: str, field_name: str = "file", timeout: int = 1200, content_type: str | None = None) -> str:
path = Path(file_path)
boundary = f"----ocr-tool-{int(time.time() * 1000)}-{random_str(8)}"
file_content_type = content_type or mimetypes.guess_type(path.name)[0] or "application/octet-stream"
body = bytearray()
body.extend(f"--{boundary}\r\n".encode())
body.extend(
(
f'Content-Disposition: form-data; name="{field_name}"; filename="{path.name}"\r\n'
f"Content-Type: {file_content_type}\r\n\r\n"
).encode()
)
body.extend(path.read_bytes())
body.extend(f"\r\n--{boundary}--\r\n".encode())
request = urllib.request.Request(
url,
data=bytes(body),
headers={
"Content-Type": f"multipart/form-data; boundary={boundary}",
"Content-Length": str(len(body)),
},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout) as response:
return response.read().decode("utf-8", errors="replace")
class OCRUtil:
"""Standalone variant of utils.ocr_util.OCRUtil."""
def __init__(self, ocr_url: str):
self.ocr_url = ocr_url
async def ocr_requests_async(self, session, file_path):
del session
return await asyncio.to_thread(_multipart_post, self.ocr_url, file_path, "file", 600), file_path
async def ocr_image_async(self, path_list):
responses = await asyncio.gather(*[self.ocr_requests_async(None, file_path) for file_path in path_list])
res_dict = {}
for response_text, file_path in responses:
rsp_json = json.loads(response_text)
if "data" not in rsp_json:
continue
page_num = int(self.get_pdf_2_img_page_num(file_path))
res_dict[page_num] = rsp_json["data"]["strRes"]
return [res_dict[key] for key in sorted(res_dict)]
def set_pdf_2_img_page(self, path, page_idx):
return f"{path}_{page_idx + 1}.png"
def get_pdf_2_img_page_num(self, path):
split_path = path.split("_")
return split_path[-1][:-4]
def pdf_2_img(self, path, zoom_x=1, zoom_y=1):
try:
import fitz # type: ignore
except ImportError as exc:
raise RuntimeError("pdf_2_img needs PyMuPDF installed; no project-local imports are used.") from exc
pdf = fitz.open(path)
pdf_list = []
try:
for pg in range(0, pdf.page_count):
page = pdf[pg]
trans = fitz.Matrix(zoom_x, zoom_y)
pm = page.get_pixmap(matrix=trans, alpha=False)
dest_png = self.set_pdf_2_img_page(path, pg)
pm.save(dest_png)
pdf_list.append(dest_png)
finally:
pdf.close()
return pdf_list
def ocr_download_path(
self,
url,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
):
url = url_replace_fastgpt(url, base_fastgpt_url)
url_parsed = urllib.parse.urlparse(url)
query_dict = urllib.parse.parse_qs(url_parsed.query)
filename = query_dict.get("filename", [f"{random_str()}.pdf"])[0]
dest_path = f"ocr/{filename}"
download_file(url, dest_path, base_fastgpt_url, base_backend_url, outer_backend_url)
return dest_path
async def ocr_result_pdf(self, dest_path):
pdf_list = self.pdf_2_img(dest_path)
try:
return await self.ocr_image_async(pdf_list)
finally:
for pdf in pdf_list:
if os.path.exists(pdf):
os.remove(pdf)
class PaddleOCRUtil:
"""Standalone variant of utils.paddle_ocr_util.PaddleOCRUtil."""
def __init__(self, ocr_url: str = DEFAULT_PADDLE_OCR_URL):
self.ocr_url = ocr_url
@staticmethod
def _decode_text(text):
if text is None:
return ""
if not isinstance(text, str):
text = str(text)
text = text.strip()
if not text:
return ""
if re.search(r"\\u[0-9a-fA-F]{4}", text):
try:
text = codecs.decode(text, "unicode_escape")
except UnicodeDecodeError:
pass
return text
def _parse_response_text(self, response_text):
try:
rsp_json = json.loads(response_text)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid paddle ocr response json: {response_text[:500]}") from exc
if not rsp_json.get("ok") or rsp_json.get("code") != 0:
raise ValueError(f"Paddle ocr failed: {rsp_json}")
data = rsp_json.get("data") or {}
return self._decode_text(data.get("text", ""))
async def ocr_requests_async(self, session, file_path):
del session
return await asyncio.to_thread(_multipart_post, self.ocr_url, file_path, "file", 1200, "application/pdf")
async def ocr_result_pdf(self, dest_path):
response_text = await self.ocr_requests_async(None, dest_path)
return [self._parse_response_text(response_text)]
def ocr_download_path(
self,
url,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
):
url = url_replace_fastgpt(url, base_fastgpt_url)
url_parsed = urllib.parse.urlparse(url)
query_dict = urllib.parse.parse_qs(url_parsed.query)
filename = query_dict.get("filename", [f"{random_str()}.pdf"])[0]
dest_path = f"ocr/{filename}"
download_file(url, dest_path, base_fastgpt_url, base_backend_url, outer_backend_url)
return dest_path
from typing import List
class TesseractOCRUtil:
"""Standalone variant of utils.tesseract_ocr_util.TesseractOCRUtil."""
"""Minimal, self-contained Tesseract OCR utility.
Methods:
- `ocr_image(file_path) -> str`: run tesseract on an image and return text.
- `ocr_image_async(path_list) -> List[str]`: async wrapper over `ocr_image`.
- `pdf_2_img(pdf_path) -> List[str]`: convert PDF to PNG pages (requires PyMuPDF).
- `ocr_result_pdf(pdf_path) -> List[str]`: OCR all pages from a PDF and clean up.
"""
def __init__(self, lang: str = "chi_sim+eng", executable: str = "tesseract"):
self.lang = lang
self.executable = executable
def ocr_image(self, file_path):
def ocr_image(self, file_path: str) -> str:
result = subprocess.run(
[self.executable, file_path, "stdout", "-l", self.lang],
check=True,
......@@ -232,60 +40,46 @@ class TesseractOCRUtil:
)
return result.stdout
async def ocr_image_async(self, path_list):
async def ocr_image_async(self, path_list: List[str]) -> List[str]:
tasks = [asyncio.to_thread(self.ocr_image, file_path) for file_path in path_list]
responses = await asyncio.gather(*tasks)
res_dict = {}
for file_path, content in zip(path_list, responses):
page_num = int(self.get_pdf_2_img_page_num(file_path))
res_dict[page_num] = content
return [res_dict[key] for key in sorted(res_dict)]
return list(responses)
def set_pdf_2_img_page(self, path, page_idx):
def set_pdf_2_img_page(self, path: str, page_idx: int) -> str:
return f"{path}_{page_idx + 1}.png"
def get_pdf_2_img_page_num(self, path):
@staticmethod
def _page_num_from_png_path(path: str) -> int:
match = re.search(r"_(\d+)\.png$", path)
if not match:
raise ValueError(f"Invalid pdf page image path: {path}")
return match.group(1)
return int(match.group(1))
def get_pdf_2_img_page_num(self, path: str) -> str:
return str(self._page_num_from_png_path(path))
def pdf_2_img(self, path, zoom_x=2, zoom_y=2):
def pdf_2_img(self, path: str, zoom_x: float = 2, zoom_y: float = 2) -> List[str]:
try:
import fitz # type: ignore
except ImportError as exc:
raise RuntimeError("pdf_2_img needs PyMuPDF installed; no project-local imports are used.") from exc
raise RuntimeError("pdf_to_img needs PyMuPDF installed.") from exc
pdf = fitz.open(path)
pdf_list = []
pdf_list: List[str] = []
try:
for pg in range(0, pdf.page_count):
page = pdf[pg]
trans = fitz.Matrix(zoom_x, zoom_y)
pm = page.get_pixmap(matrix=trans, alpha=False)
dest_png = self.set_pdf_2_img_page(path, pg)
pm.save(dest_png)
for page_index in range(pdf.page_count):
page = pdf[page_index]
matrix = fitz.Matrix(zoom_x, zoom_y)
pixmap = page.get_pixmap(matrix=matrix, alpha=False)
dest_png = f"{path}_{page_index + 1}.png"
pixmap.save(dest_png)
pdf_list.append(dest_png)
finally:
pdf.close()
return pdf_list
def ocr_download_path(
self,
url,
base_fastgpt_url: str = DEFAULT_BASE_FASTGPT_URL,
base_backend_url: str = DEFAULT_BASE_BACKEND_URL,
outer_backend_url: str = DEFAULT_OUTER_BACKEND_URL,
):
url = url_replace_fastgpt(url, base_fastgpt_url)
url_parsed = urllib.parse.urlparse(url)
query_dict = urllib.parse.parse_qs(url_parsed.query)
filename = query_dict.get("filename", [f"{random_str()}.pdf"])[0]
dest_path = f"ocr/{filename}"
download_file(url, dest_path, base_fastgpt_url, base_backend_url, outer_backend_url)
return dest_path
async def ocr_result_pdf(self, dest_path):
pdf_list = self.pdf_2_img(dest_path)
async def ocr_result_pdf(self, dest_path: str, zoom_x: float = 2, zoom_y: float = 2) -> List[str]:
pdf_list = self.pdf_2_img(dest_path, zoom_x, zoom_y)
try:
return await self.ocr_image_async(pdf_list)
finally:
......@@ -294,76 +88,40 @@ class TesseractOCRUtil:
os.remove(pdf)
def _json_print(value: Any) -> None:
print(json.dumps(value, ensure_ascii=False, indent=2))
def main() -> int:
parser = argparse.ArgumentParser(description="Single-file OCR utilities based on utils/")
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Standalone Tesseract OCR CLI")
parser.add_argument("--lang", default="chi_sim+eng", help="Tesseract language, default: chi_sim+eng")
parser.add_argument("--executable", default="tesseract", help="Tesseract executable path")
sub = parser.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("remote-image")
p.add_argument("ocr_url")
p.add_argument("images", nargs="+")
image = sub.add_parser("image", help="OCR a single image and print text")
image.add_argument("file")
p = sub.add_parser("remote-pdf")
p.add_argument("ocr_url")
p.add_argument("pdf")
pdf = sub.add_parser("pdf", help="Convert a PDF to images, OCR each page, and print JSON")
pdf.add_argument("file")
pdf.add_argument("--zoom-x", type=float, default=2)
pdf.add_argument("--zoom-y", type=float, default=2)
p = sub.add_parser("paddle-pdf")
p.add_argument("pdf")
p.add_argument("--ocr-url", default=DEFAULT_PADDLE_OCR_URL)
return parser
p = sub.add_parser("tesseract-image")
p.add_argument("image")
p.add_argument("--lang", default="chi_sim+eng")
p.add_argument("--executable", default="tesseract")
p = sub.add_parser("tesseract-pdf")
p.add_argument("pdf")
p.add_argument("--lang", default="chi_sim+eng")
p.add_argument("--executable", default="tesseract")
def main(argv: list[str] | None = None) -> int:
parser = build_arg_parser()
args = parser.parse_args(argv)
util = TesseractOCRUtil(lang=args.lang, executable=args.executable)
p = sub.add_parser("pdf-to-img")
p.add_argument("pdf")
p.add_argument("--zoom-x", type=float, default=2)
p.add_argument("--zoom-y", type=float, default=2)
p.add_argument("--mode", choices=["ocr", "tesseract"], default="tesseract")
p = sub.add_parser("download-path")
p.add_argument("url")
p.add_argument("--base-fastgpt-url", default=DEFAULT_BASE_FASTGPT_URL)
p.add_argument("--base-backend-url", default=DEFAULT_BASE_BACKEND_URL)
p.add_argument("--outer-backend-url", default=DEFAULT_OUTER_BACKEND_URL)
p.add_argument("--mode", choices=["ocr", "paddle", "tesseract"], default="ocr")
p.add_argument("--ocr-url", default=DEFAULT_PADDLE_OCR_URL)
if args.cmd == "image":
print(util.ocr_image(args.file))
return 0
args = parser.parse_args()
if args.cmd == "remote-image":
util = OCRUtil(args.ocr_url)
_json_print(asyncio.run(util.ocr_image_async(args.images)))
elif args.cmd == "remote-pdf":
util = OCRUtil(args.ocr_url)
_json_print(asyncio.run(util.ocr_result_pdf(args.pdf)))
elif args.cmd == "paddle-pdf":
_json_print(asyncio.run(PaddleOCRUtil(args.ocr_url).ocr_result_pdf(args.pdf)))
elif args.cmd == "tesseract-image":
print(TesseractOCRUtil(args.lang, args.executable).ocr_image(args.image), end="")
elif args.cmd == "tesseract-pdf":
_json_print(asyncio.run(TesseractOCRUtil(args.lang, args.executable).ocr_result_pdf(args.pdf)))
elif args.cmd == "pdf-to-img":
util = OCRUtil("unused") if args.mode == "ocr" else TesseractOCRUtil()
_json_print(util.pdf_2_img(args.pdf, args.zoom_x, args.zoom_y))
elif args.cmd == "download-path":
if args.mode == "paddle":
util = PaddleOCRUtil(args.ocr_url)
elif args.mode == "tesseract":
util = TesseractOCRUtil()
else:
util = OCRUtil(args.ocr_url)
print(util.ocr_download_path(args.url, args.base_fastgpt_url, args.base_backend_url, args.outer_backend_url))
if args.cmd == "pdf":
texts = asyncio.run(util.ocr_result_pdf(args.file, zoom_x=args.zoom_x, zoom_y=args.zoom_y))
print(json.dumps(texts, ensure_ascii=False, indent=2))
return 0
parser.error(f"unsupported command: {args.cmd}")
return 2
if __name__ == "__main__":
raise SystemExit(main())
"""Standalone CLI scripts for review-llm-skill."""
"""Compact prompt templates kept for compatibility."""
PROMPTS = {
"review": "基于当前分段和审查规则审查合同,仅输出JSON:{\"overall_conclusion\":\"\",\"findings\":[]}。\n分段:{segment_text}\n立场:{party_role}\n规则:{ruleset_text}",
"summary": "提取当前分段中与规则字段相关的客观事实,仅输出JSON:{\"facts\":{}}。\n分段:{segment_text}\n字段:{rule_fields}",
"router": "从候选规则中选择当前分段应执行的审查项,仅输出JSON:{\"selected_items\":[]}。\n分段:{segment_text}\n记忆:{context_memories_json}\n立场:{party_role}\n候选:{candidate_rules_json}",
"merger": "合并重复或相关的不合格findings,仅输出JSON:{\"findings\":[]}。\n输入:{payload}",
"reflect": "基于规则、已有findings和facts复核、去重、拆分、合并并定稿,仅输出JSON:{\"final_findings\":[]}。\n规则:{rule}\nfindings:{findings_json}\nfacts:{facts_json}\n立场:{party_role}",
"fact-merge": "合并summary_name下多个分段facts,不新增事实,仅输出JSON:{\"merge_facts\":{}}。\nsummary_names:{summary_names_json}\nfacts:{facts_json}",
"ruleset-route": "从候选ruleset_id中按问题选择一个,不得编造,仅输出JSON:{\"ruleset_id\":\"\",\"reason\":\"\"}。\n候选:{ruleset_ids_json}\n问题:{question}",
"party-role": "分析指定公司在合同中的商业角色,不仅按甲乙方判断,仅输出JSON:{\"party_role\":\"demand_side | supplier_side | unclear\",\"reason\":\"\"}。\n公司:{company_name}\n合同:{contract_text}",
"llm": "你是通用LLM助手。",
}
import os
import re
import json
import urllib.request
from typing import Any, List, Dict
class LLMTool:
def __init__(self, system_prompt: str = ""):
self.system_prompt = system_prompt or ""
self.model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
self.base_url = (os.environ.get("OPENAI_BASE_URL") or "https://api.openai.com/v1").rstrip("/")
self.api_key = os.environ.get("OPENAI_API_KEY")
def build_messages(self, user_content: str, system_content: str | None = None) -> List[Dict[str, str]]:
msgs = []
if system_content:
msgs.append({"role": "system", "content": system_content})
msgs.append({"role": "user", "content": user_content})
return msgs
def run(self, messages: List[Dict[str, str]]) -> str:
if not self.api_key:
raise RuntimeError("OPENAI_API_KEY is required")
body = json.dumps({"model": self.model, "messages": messages}, ensure_ascii=False).encode("utf-8")
request = urllib.request.Request(
f"{self.base_url}/chat/completions",
data=body,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
},
method="POST",
)
with urllib.request.urlopen(request, timeout=120) as response:
payload = json.loads(response.read().decode("utf-8"))
return (((payload.get("choices") or [{}])[0].get("message") or {}).get("content")) or ""
def chat_async(self, messages: List[Dict[str, str]]) -> str:
return self.run(messages)
def run_with_loop(self, chat_response: str) -> str:
return chat_response
def parse_first_json(self, text: str) -> Any:
if not text:
return None
try:
return json.loads(text)
except Exception:
pass
m = re.search(r"(\{.*\}|\[.*\])", text, re.S)
if not m:
return None
blob = m.group(1)
try:
return json.loads(blob)
except Exception:
return None
#!/usr/bin/env python3
"""Standalone review LLM CLI."""
from __future__ import annotations
import argparse, json, os, re, urllib.request
from pathlib import Path
from typing import Any
PROMPTS = {
"review": "基于当前分段和审查规则审查合同,仅输出JSON:{\"overall_conclusion\":\"\",\"findings\":[]}。\n分段:{segment_text}\n立场:{party_role}\n规则:{ruleset_text}",
"summary": "仅提取当前分段中与规则字段相关的客观事实,仅输出JSON:{\"facts\":{}}。\n分段:{segment_text}\n字段:{rule_fields}",
"router": "从候选规则中选择当前分段应执行的审查项,仅输出JSON:{\"selected_items\":[]}。\n分段:{segment_text}\n记忆:{context_memories_json}\n立场:{party_role}\n候选:{candidate_rules_json}",
"merger": "合并同一分段内重复或相关的不合格findings,仅输出JSON:{\"findings\":[]}。\n输入:{payload}",
"reflect": "基于规则、已有findings和facts复核、去重、拆分、合并并定稿,仅输出JSON:{\"final_findings\":[]}。\n规则:{rule}\nfindings:{findings_json}\nfacts:{facts_json}\n立场:{party_role}",
"fact-merge": "合并summary_name下多个分段facts,不新增事实,仅输出JSON:{\"merge_facts\":{}}。\nsummary_names:{summary_names_json}\nfacts:{facts_json}",
"ruleset-route": "从候选ruleset_id中按问题选择一个,不得编造,仅输出JSON:{\"ruleset_id\":\"\",\"reason\":\"\"}。\n候选:{ruleset_ids_json}\n问题:{question}",
"party-role": "分析指定公司在合同中的商业角色,不仅按甲乙方判断,仅输出JSON:{\"party_role\":\"demand_side | supplier_side | unclear\",\"reason\":\"\"}。\n公司:{company_name}\n合同:{contract_text}",
"llm": "你是通用LLM助手。",
}
class LLMTool:
def __init__(self, system_prompt: str = ""):
self.system_prompt = system_prompt
self.model = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
self.base_url = (os.environ.get("OPENAI_BASE_URL") or "https://api.openai.com/v1").rstrip("/")
self.api_key = os.environ.get("OPENAI_API_KEY")
def build_messages(self, user: str, system: str | None = None) -> list[dict[str, str]]:
return ([{"role": "system", "content": system}] if system else []) + [{"role": "user", "content": user}]
def run(self, messages: list[dict[str, str]]) -> str:
if not self.api_key:
raise RuntimeError("OPENAI_API_KEY is required")
body = json.dumps({"model": self.model, "messages": messages}, ensure_ascii=False).encode()
req = urllib.request.Request(f"{self.base_url}/chat/completions", data=body, headers={"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}, method="POST")
with urllib.request.urlopen(req, timeout=120) as resp:
data = json.loads(resp.read().decode())
return (((data.get("choices") or [{}])[0].get("message") or {}).get("content")) or ""
def parse_first_json(self, text: str) -> Any:
if not text:
return None
try:
return json.loads(text)
except Exception:
m = re.search(r"(\{.*\}|\[.*\])", text, re.S)
if not m:
return None
try:
return json.loads(m.group(1))
except Exception:
return None
def jdump(v: Any) -> str:
return json.dumps(v, ensure_ascii=False, indent=2)
def rules_text(rules: list[dict[str, Any]]) -> str:
return "\n".join(f"标题:{r.get('title','')}\n规则:{r.get('rule','')}\n等级:{r.get('level','')}\n建议:{r.get('suggestion_template','')}\n案例:{r.get('case','')}" for r in rules or [])
def default_rulesets() -> list[str]:
return ["合同信息提取(合同组)", "合同信息提取(技术部)", "合同信息提取(采购部)", "技术协议提取(合同组)", "技术协议提取(技术部)"]
def empty_fact(v: Any) -> bool:
return v is None or (isinstance(v, str) and (not v.strip() or v.strip() == "未明确")) or (isinstance(v, (dict, list)) and not v)
def dedupe(values: list[Any]) -> list[Any]:
out, seen = [], set()
for v in values:
key = jdump(v) if isinstance(v, (dict, list)) else str(v)
if not empty_fact(v) and key not in seen:
seen.add(key); out.append(v)
return out
def merge_facts_rule(facts: list[dict[str, Any]], names: list[str]) -> dict[str, Any]:
merged: dict[str, Any] = {}
for name in dict.fromkeys(str(n).strip() for n in names or [] if str(n).strip()):
vals = dedupe([item.get(name) for item in facts or [] if isinstance(item, dict) and name in item])
if vals:
merged[name] = vals[0] if len(vals) == 1 else vals
merged["_meta"] = {"summary_names": names, "source_fact_count": len(facts or [])}
return merged
def run_review_llm(tool_name: str = "review", segment_id: int = 0, user_prompt: str | None = None, **kw) -> dict[str, Any]:
name = (tool_name or "review").lower()
if name == "fact-merge" and str(kw.get("merge_mode") or "llm").lower() != "llm":
return {"merge_facts": merge_facts_rule(kw.get("facts") or [], kw.get("summary_names") or [])}
if name == "llm":
prompt, user = user_prompt or PROMPTS["llm"], kw.get("user_content") or kw.get("segment_text") or ""
elif name == "review":
prompt, user = PROMPTS[name].format(segment_text=kw.get("segment_text", ""), party_role=kw.get("party_role", ""), ruleset_text=rules_text(kw.get("rules") or [])), ""
elif name == "summary":
prompt, user = PROMPTS[name].format(segment_text=kw.get("segment_text", ""), rule_fields=jdump([r.get("summary") for r in kw.get("rules") or [] if r.get("summary")])), ""
elif name == "router":
prompt, user = PROMPTS[name].format(segment_text=kw.get("segment_text", ""), context_memories_json=jdump(kw.get("context_facts") or []), party_role=kw.get("party_role", ""), candidate_rules_json=jdump([{r.get("title", ""): r.get("rule", "")} for r in kw.get("rules") or []])), ""
elif name == "merger":
prompt, user = PROMPTS[name].format(payload=jdump(kw.get("payload") or kw.get("findings") or [])), ""
elif name == "reflect":
prompt, user = PROMPTS[name].format(rule=jdump(kw.get("rule") or {}), findings_json=jdump(kw.get("findings") or []), facts_json=jdump(kw.get("facts") or kw.get("context_facts") or []), party_role=kw.get("party_role", "")), ""
elif name == "fact-merge":
prompt, user = PROMPTS[name].format(summary_names_json=jdump(kw.get("summary_names") or []), facts_json=jdump(kw.get("facts") or [])), ""
elif name == "ruleset-route":
prompt, user = PROMPTS[name].format(question=kw.get("question") or user_prompt or "", ruleset_ids_json=jdump(kw.get("ruleset_ids") or default_rulesets())), ""
elif name == "party-role":
prompt, user = PROMPTS[name].format(company_name=kw.get("company_name") or "", contract_text=kw.get("contract_text") or kw.get("segment_text") or ""), ""
else:
return {"error": f"unknown tool: {tool_name}"}
llm = LLMTool(prompt); raw = llm.run(llm.build_messages(user, prompt)); return llm.parse_first_json(raw) or {"raw": raw}
def load_arg(v: str | None, default: Any) -> Any:
if v is None:
return default
return json.loads(Path(v[1:]).read_text(encoding="utf-8") if v.startswith("@") else v)
def parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Standalone review LLM CLI")
p.add_argument("tool_name", nargs="?", default="review", choices=["review", "summary", "router", "merger", "reflect", "fact-merge", "ruleset-route", "party-role", "llm"])
for name, default in [("segment-id", 0), ("segment-text", ""), ("segment-text-file", None), ("party-role", ""), ("rules", "[]"), ("context-facts", "{}"), ("payload", None), ("findings", "[]"), ("facts", "[]"), ("rule", "{}"), ("summary-names", "[]"), ("question", ""), ("ruleset-ids", None), ("company-name", ""), ("contract-text", ""), ("contract-text-file", None), ("user-prompt", None), ("user-content", None)]:
flag = f"--{name}"; kwargs = {"default": default}
if name == "segment-id": kwargs["type"] = int
p.add_argument(flag, **kwargs)
p.add_argument("--merge-mode", choices=["llm", "rule"], default="llm"); p.add_argument("--output-raw", action="store_true")
return p
def main(argv: list[str] | None = None) -> int:
a = parser().parse_args(argv)
seg = Path(a.segment_text_file).read_text(encoding="utf-8") if a.segment_text_file else a.segment_text
contract = Path(a.contract_text_file).read_text(encoding="utf-8") if a.contract_text_file else a.contract_text
kw = {"segment_text": seg, "party_role": a.party_role, "rules": load_arg(a.rules, []), "context_facts": load_arg(a.context_facts, {}), "payload": load_arg(a.payload, []) if a.payload else None, "findings": load_arg(a.findings, []), "facts": load_arg(a.facts, []), "rule": load_arg(a.rule, {}), "summary_names": load_arg(a.summary_names, []), "merge_mode": a.merge_mode, "question": a.question, "ruleset_ids": load_arg(a.ruleset_ids, default_rulesets()) if a.ruleset_ids else default_rulesets(), "company_name": a.company_name, "contract_text": contract, "user_content": a.user_content}
res = run_review_llm(a.tool_name, a.segment_id, a.user_prompt, **kw)
print(res if a.output_raw else jdump(res))
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""Standalone JSON-backed review memory CLI."""
from __future__ import annotations
import argparse, json, logging
from dataclasses import asdict, dataclass
from pathlib import Path
from threading import RLock
from typing import Any
from uuid import uuid4
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger("review_memory_cli")
@dataclass
class Finding:
rule_title: str
segment_id: int
original_text: str
issue: str
risk_level: str
suggestion: str
id: str = ""
result: str = ""
@classmethod
def from_dict(cls, data: dict) -> "Finding":
d = data or {}
return cls(str(d.get("rule_title", "")), int(d.get("segment_id", 0) or 0), str(d.get("original_text", "")), str(d.get("issue", "")), str(d.get("risk_level", "")), str(d.get("suggestion", "")), str(d.get("id", "")), str(d.get("result", "")))
def to_dict(self) -> dict[str, Any]:
return asdict(self)
class MemoryStore:
def __init__(self, storage_name: str = "default.json") -> None:
self._storage_path = Path(__file__).resolve().parent.parent / "tmp" / storage_name
self._storage_path.parent.mkdir(parents=True, exist_ok=True)
self._lock = RLock()
self.facts: list[dict[str, Any]] = []
self.merge_facts: list[dict[str, Any]] = []
self.findings: dict[str, list[Finding]] = {}
self._load()
def _key(self, key: str | None) -> str:
return (key or "").strip().lower() or "review"
def add_fact(self, value: dict[str, Any]) -> list[dict[str, Any]]:
with self._lock:
self.facts.append(value); self._persist(); return self.facts
def add_merge_fact(self, value: dict[str, Any]) -> list[dict[str, Any]]:
with self._lock:
self.merge_facts.append(value); self._persist(); return self.merge_facts
def get_facts(self) -> list[dict[str, Any]]:
with self._lock:
return list(self.facts)
def add_finding(self, key: str, finding: Finding) -> Finding:
with self._lock:
if not finding.id:
finding.id = uuid4().hex
self.findings.setdefault(self._key(key), []).append(finding)
self._persist(); return finding
def list_findings(self, key: str | None = None) -> dict[str, list[dict[str, Any]]]:
with self._lock:
keys = [self._key(key)] if key else list(self.findings)
return {k: [f.to_dict() for f in self.findings.get(k, [])] for k in keys}
def get_findings_by_segment(self, key: str, segment_id: int) -> list[dict[str, Any]]:
return [f.to_dict() for f in self.findings.get(self._key(key), []) if f.segment_id == segment_id]
def search_findings(self, key: str, rule_title: str = "") -> list[dict[str, Any]]:
title = (rule_title or "").strip().lower()
return [f.to_dict() for f in self.findings.get(self._key(key), []) if not title or f.rule_title.lower() == title]
def delete_findings_by_segment(self, key: str, segment_id: int) -> int:
with self._lock:
k, current = self._key(key), list(self.findings.get(self._key(key), []))
self.findings[k] = [f for f in current if f.segment_id != segment_id]
removed = len(current) - len(self.findings[k])
if removed:
self._persist()
return removed
def search_facts(self, keywords: list[str]) -> list[Any]:
keys = [str(k).strip().lower() for k in keywords if str(k).strip()]
out = []
for item in self.facts:
for name, value in item.items():
low = str(name).lower()
if any(k in low or low in k for k in keys):
out.append({name: value})
return out
def clear(self) -> None:
with self._lock:
self.facts.clear(); self.merge_facts.clear(); self.findings.clear(); self._persist()
def _payload(self) -> dict[str, Any]:
return {"facts": self.facts, "merge_facts": self.merge_facts, "findings": {k: [f.to_dict() for f in v] for k, v in self.findings.items()}}
def _persist(self) -> None:
self._storage_path.write_text(json.dumps(self._payload(), ensure_ascii=False, indent=2), encoding="utf-8")
def _load(self) -> None:
if not self._storage_path.exists():
return
try:
data = json.loads(self._storage_path.read_text(encoding="utf-8") or "{}")
self.facts = data.get("facts") or []; self.merge_facts = data.get("merge_facts") or []
self.findings = {self._key(k): [Finding.from_dict(i) for i in items or []] for k, items in (data.get("findings") or {}).items()}
except Exception as exc:
logger.error("Failed to load memory store: %s", exc)
def export_to_json(self, path: str | None = None) -> str:
out = path or str(self._storage_path).replace(".json", "_export.json")
Path(out).write_text(json.dumps(self._payload(), ensure_ascii=False, indent=2), encoding="utf-8")
return out
def out(obj: Any) -> None:
print(json.dumps(obj, ensure_ascii=False, indent=2))
def load_json_arg(value: str) -> Any:
return json.loads(Path(value[1:]).read_text(encoding="utf-8") if value.startswith("@") else value)
def parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(prog="review-memory-cli"); p.add_argument("--storage", default="default.json")
sub = p.add_subparsers(dest="cmd")
sub.add_parser("list-facts")
a = sub.add_parser("add-fact"); a.add_argument("data")
a = sub.add_parser("add-merge-fact"); a.add_argument("data")
a = sub.add_parser("search-facts"); a.add_argument("keywords", nargs="+")
a = sub.add_parser("add-finding"); a.add_argument("--key", default="review"); a.add_argument("--rule", required=True); a.add_argument("--segment", type=int, default=0); a.add_argument("--original", default=""); a.add_argument("--issue", default=""); a.add_argument("--risk", default=""); a.add_argument("--suggest", default=""); a.add_argument("--result", default="")
sub.add_parser("list-findings")
a = sub.add_parser("list-findings-key"); a.add_argument("key")
a = sub.add_parser("findings-by-seg"); a.add_argument("key"); a.add_argument("segment", type=int)
a = sub.add_parser("search-findings"); a.add_argument("key"); a.add_argument("--rule-title", default="")
a = sub.add_parser("delete-findings-seg"); a.add_argument("key"); a.add_argument("segment", type=int)
sub.add_parser("clear")
a = sub.add_parser("export"); a.add_argument("--out")
return p
def main(argv: list[str] | None = None) -> int:
a = parser().parse_args(argv); store = MemoryStore(a.storage)
if a.cmd == "list-facts": out(store.get_facts()); return 0
if a.cmd == "add-fact": store.add_fact(load_json_arg(a.data)); print("OK"); return 0
if a.cmd == "add-merge-fact": store.add_merge_fact(load_json_arg(a.data)); print("OK"); return 0
if a.cmd == "search-facts": out(store.search_facts(a.keywords)); return 0
if a.cmd == "add-finding":
out(store.add_finding(a.key, Finding(a.rule, a.segment, a.original, a.issue, a.risk, a.suggest, result=a.result)).to_dict()); return 0
if a.cmd == "list-findings": out(store.list_findings()); return 0
if a.cmd == "list-findings-key": out(store.list_findings(a.key)); return 0
if a.cmd == "findings-by-seg": out(store.get_findings_by_segment(a.key, a.segment)); return 0
if a.cmd == "search-findings": out(store.search_findings(a.key, a.rule_title)); return 0
if a.cmd == "delete-findings-seg": print(store.delete_findings_by_segment(a.key, a.segment)); return 0
if a.cmd == "clear": store.clear(); print("cleared"); return 0
if a.cmd == "export": print(store.export_to_json(a.out)); return 0
parser().print_help(); return 1
if __name__ == "__main__":
raise SystemExit(main())
import argparse
import json
import re
import sys
from pathlib import Path
from urllib.parse import unquote, urlparse
import requests
from loguru import logger
DEFAULT_OUTER_BACKEND_URL = "https://172.21.107.45:48080"
DEFAULT_BASE_FASTGPT_URL = "http://172.21.107.45:3030"
DEFAULT_BASE_BACKEND_URL = "http://172.21.107.45:1122"
DEFAULT_BACKEND_ADMIN_USERNAME = "admin"
DEFAULT_BACKEND_ADMIN_PASSWORD = "admin@jpai.com"
base_fastgpt_url = DEFAULT_BASE_FASTGPT_URL
base_backend_url = DEFAULT_BASE_BACKEND_URL
outer_backend_url = DEFAULT_OUTER_BACKEND_URL
backend_admin_username = DEFAULT_BACKEND_ADMIN_USERNAME
backend_admin_password = DEFAULT_BACKEND_ADMIN_PASSWORD
def configure_urls(
fastgpt_url: str | None = None,
backend_url: str | None = None,
outer_url: str | None = None,
):
global base_fastgpt_url, base_backend_url, outer_backend_url
if fastgpt_url is not None:
base_fastgpt_url = fastgpt_url
if backend_url is not None:
base_backend_url = backend_url
if outer_url is not None:
outer_backend_url = outer_url
def configure_login(username: str | None = None, password: str | None = None):
global backend_admin_username, backend_admin_password
if username is not None:
backend_admin_username = username
if password is not None:
backend_admin_password = password
def _strip_trailing_slash(url: str | None) -> str | None:
if url is None:
return None
return url.rstrip("/")
def upload_file(path, input_url_to_inner=True, output_url_to_inner=False) -> str:
from requests_toolbelt import MultipartEncoder
login_data = {
"username": backend_admin_username,
"password": backend_admin_password,
}
login_url = f"{base_backend_url}/admin-api/system/auth/login"
response = requests.post(
url=login_url,
headers={"Content-Type": "application/json"},
data=json.dumps(login_data),
)
response.raise_for_status()
try:
token = json.loads(response.text).get("data").get("accessToken")
except Exception as e:
logger.error(f"后端登录异常:{e}")
raise
upload_url = f"{base_backend_url}/admin-api/infra/file/upload"
with open(path, "rb") as file_obj:
encoder = MultipartEncoder(fields={"file": (Path(path).name, file_obj)})
response = requests.post(
url=upload_url,
headers={"Content-Type": encoder.content_type, "Authorization": token},
data=encoder,
)
response.raise_for_status()
res = json.loads(response.text).get("data")
if res:
return res
raise Exception(f"上传{path}失败 Response text: {response.text}")
def _download_basename(filename: str) -> str:
filename = unquote(filename.strip().strip('"'))
filename = filename.replace("\\", "/")
return Path(filename).name or "downloaded_file"
def _resolve_download_filename(url: str, response: requests.Response) -> str:
content_disposition = response.headers.get("content-disposition", "")
if content_disposition:
match = re.search(
r"filename\*=(?:UTF-8''|utf-8'')?([^;]+)", content_disposition
)
if match:
return _download_basename(match.group(1))
match = re.search(r'filename="?([^";]+)"?', content_disposition)
if match:
return _download_basename(match.group(1))
url_filename = _download_basename(urlparse(url).path)
if url_filename:
return url_filename
return "downloaded_file"
def download_file(url, path, input_url_to_inner=True):
if not url.startswith("http:") and not url.startswith("https:"):
url = base_fastgpt_url + url
url = url.replace(outer_backend_url, base_backend_url)
logger.info(f"url准备下载:{url}")
response = requests.get(url)
if response.status_code == 200:
target_path = Path(path)
if target_path.exists() and target_path.is_dir():
target_path = target_path / _resolve_download_filename(url, response)
target_path.parent.mkdir(parents=True, exist_ok=True)
with open(target_path, "wb") as f:
f.write(response.content)
logger.info(f"{url}文件下载成功,保存到{target_path}")
return str(target_path)
logger.error(f"{url}文件下载失败. HTTP Status Code: {response.status_code}")
return None
def url_replace_fastgpt(origin: str):
if not origin.startswith("http:"):
origin = base_fastgpt_url + origin
return origin
def _add_common_url_args(parser: argparse.ArgumentParser):
parser.add_argument(
"--base-fastgpt-url",
default=DEFAULT_BASE_FASTGPT_URL,
help=f"FastGPT 内网基础地址,默认:{DEFAULT_BASE_FASTGPT_URL}",
)
parser.add_argument(
"--base-backend-url",
default=DEFAULT_BASE_BACKEND_URL,
help=f"后端内网基础地址,默认:{DEFAULT_BASE_BACKEND_URL}",
)
parser.add_argument(
"--outer-backend-url",
default=DEFAULT_OUTER_BACKEND_URL,
help=f"后端外网地址,下载时会替换为内网地址,默认:{DEFAULT_OUTER_BACKEND_URL}",
)
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="单文件上传/下载工具:通过后端接口上传文件,或下载 FastGPT/后端文件 URL。"
)
parser.set_defaults(command=None)
subparsers = parser.add_subparsers(dest="command", required=True)
upload_parser = subparsers.add_parser("upload", help="上传本地文件。")
_add_common_url_args(upload_parser)
upload_parser.add_argument(
"--username",
default=DEFAULT_BACKEND_ADMIN_USERNAME,
help=f"后端管理员用户名,默认:{DEFAULT_BACKEND_ADMIN_USERNAME}",
)
upload_parser.add_argument(
"--password",
default=DEFAULT_BACKEND_ADMIN_PASSWORD,
help=f"后端管理员密码,默认:{DEFAULT_BACKEND_ADMIN_PASSWORD}",
)
upload_parser.add_argument("path", help="要上传的本地文件路径。")
download_parser = subparsers.add_parser("download", help="下载 URL 到本地路径。")
_add_common_url_args(download_parser)
download_parser.add_argument("url", help="HTTP URL 或 FastGPT/后端相对路径。")
download_parser.add_argument(
"path", help="输出文件路径;如果是已存在目录,则自动解析文件名。"
)
normalize_parser = subparsers.add_parser(
"normalize-url", help="把 FastGPT 相对路径补全为绝对 URL。"
)
_add_common_url_args(normalize_parser)
normalize_parser.add_argument("url", help="HTTP URL 或 FastGPT 相对路径。")
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_arg_parser()
args = parser.parse_args(argv)
configure_urls(
fastgpt_url=_strip_trailing_slash(args.base_fastgpt_url),
backend_url=_strip_trailing_slash(args.base_backend_url),
outer_url=_strip_trailing_slash(args.outer_backend_url),
)
if args.command == "upload":
configure_login(username=args.username, password=args.password)
if args.command == "upload":
print(upload_file(args.path))
return 0
if args.command == "download":
saved_path = download_file(args.url, args.path)
if saved_path is None:
return 1
print(saved_path)
return 0
if args.command == "normalize-url":
print(url_replace_fastgpt(args.url))
return 0
parser.error(f"unsupported command: {args.command}")
return 2
if __name__ == "__main__":
sys.exit(main())
......@@ -26,6 +26,7 @@ class OpenAITool:
msg = msg[1:]
# deepseek专用关闭思考
extra_body["thinking"] = {"type": "disabled"}
extra_body["chat_template_kwargs"] = {"enable_thinking": False}
try:
response = await self.client.chat.completions.create(
model=self.llm_config.model, messages=msg, extra_body=extra_body
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment