Commit af61e996 by ccran

feat: 添加合并函数

parent 36fd9cb1
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
# TODO 将Excel与JSON结果合并
\ No newline at end of file
"""
读取Excel并将每行导出为dict。
用法示例:
python merge.py --excel datasets/excel/your_file.xlsx
python merge.py --excel datasets/excel/your_file.xlsx --out rows.json
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any, Iterable
import pandas as pd
columns_map = {
'审查项':'review_item',
'合同原文':'original_text',
'建议':'suggest'
}
def _normalize_value(value: Any) -> Any:
if pd.isna(value):
return None
return value
def read_excel_rows(excel_path: str | Path, sheet_name: str | int | None = 0) -> list[dict[str, Any]]:
"""读取Excel并返回每行的dict列表。"""
df = pd.read_excel(excel_path, sheet_name=sheet_name)
rows = df.to_dict(orient="records")
return [
{columns_map.get(str(k).strip(), k): _normalize_value(v) for k, v in row.items()}
for row in rows
]
def read_excel_rows_from_path(
excel_path: str | Path, sheet_name: str | int | None = 0
) -> list[dict[str, Any]]:
"""读取单个Excel或目录下所有xlsx文件并返回每行的dict列表。"""
path = Path(excel_path)
if path.is_dir():
all_rows: list[dict[str, Any]] = []
for file_path in sorted(path.glob("*.xlsx")):
rows = read_excel_rows(file_path, sheet_name=sheet_name)
for row in rows:
row["__source_file__"] = file_path.name
row["ground_truth"] = '不合格'
all_rows.extend(rows)
return all_rows
return read_excel_rows(path, sheet_name=sheet_name)
def _print_rows(rows: Iterable[dict[str, Any]]) -> None:
for row in rows:
print(row)
def main() -> None:
parser = argparse.ArgumentParser(description="读取Excel并导出每行dict")
parser.add_argument("--excel", default='datasets/excel', help="Excel文件路径或目录")
parser.add_argument("--sheet", default=0, help="sheet名称或索引,默认0")
parser.add_argument("--out", help="可选输出JSON文件路径")
args = parser.parse_args()
sheet: str | int | None
if isinstance(args.sheet, str) and args.sheet.isdigit():
sheet = int(args.sheet)
else:
sheet = args.sheet
rows = read_excel_rows_from_path(args.excel, sheet_name=sheet)
if args.out:
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(rows, ensure_ascii=False, indent=2), encoding="utf-8")
return
# _print_rows(rows)
print(f'len(rows)={len(rows)}')
print(rows[0])
if __name__ == "__main__":
main()
\ No newline at end of file
"""
读取目录下所有JSON文件并合并为一个大列表。
要求:每个JSON文件必须是 List[Dict]。
用法示例:
python merge_json.py --json-dir datasets/json
python merge_json.py --json-dir datasets/json --out merged.json
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
# 字段映射:目标字段 -> 源字段列表
FIELD_MAPPING: dict[str, list[str]] = {
"suggest": ["model_suggest", "销服修改意见"],
"review_item": ["Review_Dimension"],
}
def read_json_list(json_path: str | Path) -> list[dict[str, Any]]:
"""读取单个JSON文件并返回 List[Dict]。"""
path = Path(json_path)
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, list) or not all(isinstance(item, dict) for item in data):
raise ValueError(f"JSON文件不是 List[Dict]: {path}")
return data
def read_json_lists_from_dir(json_dir: str | Path) -> list[dict[str, Any]]:
"""读取目录下所有JSON文件并合并为一个大列表。"""
path = Path(json_dir)
if not path.is_dir():
raise ValueError(f"不是目录: {path}")
all_rows: list[dict[str, Any]] = []
for file_path in sorted(path.glob("*.json")):
rows = read_json_list(file_path)
for row in rows:
row['review_item'] = file_path.name.split('-')[0]
row["__source_file__"] = file_path.name
all_rows.extend(rows)
return all_rows
def apply_field_mapping(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""按全局字段映射将源字段统一到目标字段,并删除源字段。"""
for row in rows:
for target_key, source_keys in FIELD_MAPPING.items():
for source_key in source_keys:
if source_key in row:
row[target_key] = row[source_key]
if source_key != target_key:
row.pop(source_key, None)
break
return rows
def show_ds(rows):
from collections import Counter
c = Counter()
for row in rows:
gt = row.get('review_item', '未知')
c[gt] += 1
print(c)
def main() -> None:
parser = argparse.ArgumentParser(description="读取目录下所有JSON并合并")
parser.add_argument("--json-dir", default='datasets/json', help="JSON目录路径")
parser.add_argument("--out", help="可选输出JSON文件路径")
args = parser.parse_args()
rows = read_json_lists_from_dir(args.json_dir)
rows = apply_field_mapping(rows)
if args.out:
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(json.dumps(rows, ensure_ascii=False, indent=2), encoding="utf-8")
return
print(f'len(rows)={len(rows)}')
# print(rows[0])
show_ds(rows)
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment