Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
ccran
/
lufa-contract
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Members
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
3bb9ff31
authored
Mar 25, 2026
by
ccran
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
feat: 增加触发词逻辑;修改批注逻辑;
parent
6b4d3476
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
998 additions
and
89 deletions
+998
-89
core/__pycache__/config.cpython-312.pyc
+0
-0
core/config.py
+1
-1
core/tools/segment_rule_router.py
+25
-35
data/batch/batch.py
+4
-2
data/benchmark/eval.py
+3
-3
data/rules.xlsx
+0
-0
utils/__pycache__/spire_word_util.cpython-312.pyc
+0
-0
utils/spire_word_util copy.py
+765
-0
utils/spire_word_util.py
+200
-48
No files found.
core/__pycache__/config.cpython-312.pyc
View file @
3bb9ff31
No preview for this file type
core/config.py
View file @
3bb9ff31
...
...
@@ -16,7 +16,7 @@ MAX_SINGLE_CHUNK_SIZE=5000
META_KEY
=
"META"
DEFAULT_RULESET_ID
=
"通用"
ALL_RULESET_IDS
=
[
"通用"
,
"借款"
,
"担保"
,
"财务口"
,
"金盘"
,
"金盘简化"
]
use_lufa
=
Tru
e
use_lufa
=
Fals
e
if
use_lufa
:
outer_backend_url
=
"http://znkf.lgfzgroup.com:48081"
base_fastgpt_url
=
"http://192.168.252.71:18089"
...
...
core/tools/segment_rule_router.py
View file @
3bb9ff31
...
...
@@ -141,16 +141,27 @@ class SegmentRuleRouterTool(LLMTool):
for
item
in
llm_selected
if
item
.
get
(
"title"
)
}
trigger_titles
=
self
.
_match_trigger_titles
(
segment_text
=
segment_text
,
rules
=
rules
)
merged_titles
=
selected_titles
|
trigger_titles
if
not
select
ed_titles
:
return
self
.
_fallback_route
(
segment_text
=
segment_text
,
rules
=
rules
)
if
not
merg
ed_titles
:
return
[]
title_to_rule
=
{
str
(
r
.
get
(
"title"
,
""
))
.
strip
():
r
for
r
in
rules
if
r
.
get
(
"title"
)}
routed_rules
:
List
[
Dict
]
=
[]
for
title
in
selected_tit
les
:
rule
=
title_to_rule
.
get
(
title
)
if
not
rule
:
for
rule
in
ru
les
:
title
=
str
(
rule
.
get
(
"title"
,
""
))
.
strip
(
)
if
not
title
or
title
not
in
merged_titles
:
continue
llm_reason
=
selected_reasons
.
get
(
title
,
""
)
trigger_matched
=
title
in
trigger_titles
if
llm_reason
and
trigger_matched
:
reason
=
f
"llm+trigger: {llm_reason}"
elif
llm_reason
:
reason
=
llm_reason
else
:
reason
=
"trigger matched"
routed_rules
.
append
(
{
"id"
:
rule
.
get
(
"id"
,
""
),
...
...
@@ -158,42 +169,21 @@ class SegmentRuleRouterTool(LLMTool):
"level"
:
rule
.
get
(
"level"
,
""
),
"rule"
:
rule
.
get
(
"rule"
,
""
),
"triggers"
:
rule
.
get
(
"triggers"
,
""
),
"reason"
:
selected_reasons
.
get
(
title
,
""
)
,
"reason"
:
reason
,
}
)
return
routed_rules
or
self
.
_fallback_route
(
segment_text
=
segment_text
,
rules
=
rules
)
return
routed_rules
def
_
fallback_route
(
self
,
segment_text
:
str
,
rules
:
List
[
Dict
])
->
List
[
Dict
]:
def
_
match_trigger_titles
(
self
,
segment_text
:
str
,
rules
:
List
[
Dict
])
->
set
[
str
]:
text
=
segment_text
or
""
routed
:
List
[
Dict
]
=
[]
matched_titles
:
set
[
str
]
=
set
()
for
r
in
rules
:
triggers
=
self
.
_parse_triggers
(
str
(
r
.
get
(
"triggers"
,
""
)))
if
triggers
and
any
(
t
in
text
for
t
in
triggers
):
routed
.
append
(
{
"id"
:
r
.
get
(
"id"
,
""
),
"title"
:
r
.
get
(
"title"
,
""
),
"level"
:
r
.
get
(
"level"
,
""
),
"rule"
:
r
.
get
(
"rule"
,
""
),
"triggers"
:
r
.
get
(
"triggers"
,
""
),
"reason"
:
"fallback: trigger matched"
,
}
)
# 兜底策略:若触发词也未命中,返回全部规则,保证召回不漏审。
if
not
routed
:
for
r
in
rules
:
routed
.
append
(
{
"id"
:
r
.
get
(
"id"
,
""
),
"title"
:
r
.
get
(
"title"
,
""
),
"level"
:
r
.
get
(
"level"
,
""
),
"rule"
:
r
.
get
(
"rule"
,
""
),
"triggers"
:
r
.
get
(
"triggers"
,
""
),
"reason"
:
"fallback: conservative full recall"
,
}
)
return
routed
title
=
str
(
r
.
get
(
"title"
,
""
))
.
strip
()
if
title
:
matched_titles
.
add
(
title
)
return
matched_titles
def
_parse_triggers
(
self
,
trigger_text
:
str
)
->
List
[
str
]:
parts
=
re
.
split
(
r"[,,、;;\s/|]+"
,
trigger_text
or
""
)
...
...
data/batch/batch.py
View file @
3bb9ff31
...
...
@@ -12,9 +12,9 @@ from loguru import logger
from
utils.common_util
import
random_str
from
utils.http_util
import
upload_file
,
fastgpt_openai_chat
,
download_file
SUFFIX
=
'_麓发
改进
'
SUFFIX
=
'_麓发
迁移
'
batch_input_dir_path
=
'jp-input'
batch_output_dir_path
=
'jp-output-lufa-simple'
batch_output_dir_path
=
'jp-output-lufa-simple
-new
'
batch_size
=
5
# 麓发fastgpt接口
# url = 'http://192.168.252.71:18089/api/v1/chat/completions'
...
...
@@ -24,6 +24,8 @@ url = 'http://192.168.252.71:18088/api/v1/chat/completions'
# token = 'fastgpt-ek3Z6PxI6sXgYc0jxzZ5bVGqrxwM6aVyfSmA6JVErJYBMr2KmYxrHwEUOIMSYz'
# 金盘迁移麓发合同审查测试token
token
=
'fastgpt-vykT6qs07g7hR4tL2MNJE6DdNCIxaQjEu3Cxw9nuTBFg8MAG3CkByvnXKxSNEyMK7'
# 人机交互测试(测试环境)
# token = 'fastgpt-p189K5zoTX5wjp0dBybFCwsbWm3juIwlJxt2wTGyiaOWOANI5Y10pKEZzyt'
# 人机交互测试(生产环境)
# token = 'fastgpt-ry4jIjgNwmNgufMr5jR0ncvJVmSS4GZl4bx2ItsNPoncdQzW9Na3IP1Xrankr'
# 提取后审查测试
...
...
data/benchmark/eval.py
View file @
3bb9ff31
...
...
@@ -121,7 +121,7 @@ def _parse_args() -> argparse.Namespace:
parser
.
add_argument
(
"--datasets-dir"
,
type
=
Path
,
default
=
base
/
"results"
/
"jp-output-
renji
"
,
default
=
base
/
"results"
/
"jp-output-
lufa-simple-new
"
,
help
=
"Directory containing Word files with annotations."
,
)
parser
.
add_argument
(
...
...
@@ -133,13 +133,13 @@ def _parse_args() -> argparse.Namespace:
parser
.
add_argument
(
"--val-dir"
,
type
=
Path
,
default
=
base
/
"results"
/
"jp-output-
renji
-extracted"
,
default
=
base
/
"results"
/
"jp-output-
lufa-simple-new
-extracted"
,
help
=
"Directory to store extracted xlsx files for comparison."
,
)
parser
.
add_argument
(
"--strip-suffixes"
,
nargs
=
"*"
,
default
=
[
'_麓发改进'
,
'_人机交互'
],
default
=
[
'_麓发改进'
,
'_人机交互'
,
'_麓发迁移'
],
help
=
(
"Optional filename suffixes to strip from generated val xlsx stems before "
"comparison, e.g. --strip-suffixes _v1 _审阅版"
...
...
data/rules.xlsx
View file @
3bb9ff31
No preview for this file type
utils/__pycache__/spire_word_util.cpython-312.pyc
View file @
3bb9ff31
No preview for this file type
utils/spire_word_util copy.py
0 → 100644
View file @
3bb9ff31
from
spire.doc
import
Document
,
Paragraph
,
Table
,
Comment
,
CommentMark
,
CommentMarkType
from
loguru
import
logger
import
re
from
thefuzz
import
fuzz
from
utils.doc_util
import
DocBase
from
utils.common_util
import
adjust_single_chunk_size
import
os
def
extract_table_cells_text
(
table
,
joiner
=
"
\n
"
):
"""
从 Spire.Doc 的 Table 对象中提取每个单元格文本,并按行主序返回扁平列表:
["r0c0_text", "r0c1_text", "r1c0_text", ...]
joiner: 用于连接单元格内多段落或嵌套表行的分隔符(默认换行)
注意:不对文本做任何清洗或 strip,保持原有格式
"""
def
_para_text
(
para
):
# 优先使用 para.Text(保留原样),否则尝试从 para.ChildObjects 收集 Text-like 字段
try
:
if
hasattr
(
para
,
"Text"
):
return
para
.
Text
if
para
.
Text
is
not
None
else
""
except
Exception
:
pass
parts
=
[]
try
:
for
idx
in
range
(
para
.
ChildObjects
.
Count
):
obj
=
para
.
ChildObjects
[
idx
]
if
hasattr
(
obj
,
"Text"
):
parts
.
append
(
obj
.
Text
if
obj
.
Text
is
not
None
else
""
)
except
Exception
:
pass
return
""
.
join
(
parts
)
def
_extract_cell_text
(
cell
):
parts
=
[]
# 收集单元格内所有段落文本(保持原样,不做 strip)
try
:
for
p_idx
in
range
(
cell
.
Paragraphs
.
Count
):
para
=
cell
.
Paragraphs
[
p_idx
]
parts
.
append
(
_para_text
(
para
))
except
Exception
:
pass
# 处理嵌套表格(若存在),把嵌套表每一行合并为一条字符串,并按行加入 parts
try
:
if
hasattr
(
cell
,
"Tables"
)
and
cell
.
Tables
.
Count
>
0
:
for
t_idx
in
range
(
cell
.
Tables
.
Count
):
nested
=
cell
.
Tables
[
t_idx
]
nested_rows
=
[]
for
nr
in
range
(
nested
.
Rows
.
Count
):
nested_row_cells
=
[]
for
nc
in
range
(
nested
.
Rows
[
nr
]
.
Cells
.
Count
):
try
:
# 取嵌套单元格的所有段落并用 joiner 连接(保留原样)
nc_parts
=
[]
for
np_idx
in
range
(
nested
.
Rows
[
nr
]
.
Cells
[
nc
]
.
Paragraphs
.
Count
):
nc_parts
.
append
(
_para_text
(
nested
.
Rows
[
nr
]
.
Cells
[
nc
]
.
Paragraphs
[
np_idx
]
)
)
nested_row_cells
.
append
(
joiner
.
join
(
nc_parts
))
except
Exception
:
nested_row_cells
.
append
(
""
)
nested_rows
.
append
(
joiner
.
join
(
nested_row_cells
))
parts
.
append
(
joiner
.
join
(
nested_rows
))
else
:
# 有时嵌套表格会放在 cell.ChildObjects 中,兼容处理
try
:
for
idx
in
range
(
cell
.
ChildObjects
.
Count
):
ch
=
cell
.
ChildObjects
[
idx
]
if
hasattr
(
ch
,
"Rows"
)
and
getattr
(
ch
,
"Rows"
)
is
not
None
:
nested
=
ch
nested_rows
=
[]
for
nr
in
range
(
nested
.
Rows
.
Count
):
nested_row_cells
=
[]
for
nc
in
range
(
nested
.
Rows
[
nr
]
.
Cells
.
Count
):
try
:
nc_parts
=
[]
for
np_idx
in
range
(
nested
.
Rows
[
nr
]
.
Cells
[
nc
]
.
Paragraphs
.
Count
):
nc_parts
.
append
(
_para_text
(
nested
.
Rows
[
nr
]
.
Cells
[
nc
]
.
Paragraphs
[
np_idx
]
)
)
nested_row_cells
.
append
(
joiner
.
join
(
nc_parts
))
except
Exception
:
nested_row_cells
.
append
(
""
)
nested_rows
.
append
(
joiner
.
join
(
nested_row_cells
))
parts
.
append
(
joiner
.
join
(
nested_rows
))
except
Exception
:
pass
except
Exception
:
pass
# 把单元格内收集到的片段用 joiner 连接成最终字符串(不做任何 trim/clean)
return
joiner
.
join
(
parts
)
flat
=
[]
for
r
in
range
(
table
.
Rows
.
Count
):
row
=
table
.
Rows
[
r
]
for
c
in
range
(
row
.
Cells
.
Count
):
cell
=
row
.
Cells
[
c
]
cell_text
=
_extract_cell_text
(
cell
)
# 保持原样,空单元格返回空字符串
flat
.
append
(
cell_text
)
return
flat
def
process_string
(
s
):
# 统计换行符数量
newline_count
=
s
.
count
(
"
\n
"
)
# 情况1:没有换行符
if
newline_count
==
0
:
return
s
# 情况2:只有一个换行符
elif
newline_count
==
1
:
# 分割成两部分
parts
=
s
.
split
(
"
\n
"
,
1
)
# 比较前后部分长度
return
parts
[
0
]
if
len
(
parts
[
0
])
>=
len
(
parts
[
1
])
else
parts
[
1
]
# 情况3:多个换行符
else
:
# 分割所有部分
parts
=
s
.
split
(
"
\n
"
)
# 找出中间部分(排除首尾)
middle_parts
=
parts
[
1
:
-
1
]
if
len
(
parts
)
>
2
else
[]
# 如果没有有效中间部分
if
not
middle_parts
:
# 返回最长的一段(排除空字符串)
non_empty_parts
=
[
p
for
p
in
parts
if
p
]
return
max
(
non_empty_parts
,
key
=
len
)
if
non_empty_parts
else
""
# 返回最长的中间部分
return
max
(
middle_parts
,
key
=
len
,
default
=
""
)
def
_score_target_against_query
(
target_text
:
str
,
query_text
:
str
):
"""对单个候选文本与查询文本打分,并返回最适合落批注的匹配片段。"""
if
not
target_text
or
not
query_text
:
return
None
,
0
if
query_text
in
target_text
:
return
query_text
,
100
# partial_ratio 负责召回,ratio 负责精度;组合分用于排序
def
_combined_score
(
text_a
:
str
,
text_b
:
str
):
ratio_score
=
fuzz
.
ratio
(
text_a
,
text_b
)
partial_score
=
fuzz
.
partial_ratio
(
text_a
,
text_b
)
combined
=
int
(
round
(
0.4
*
ratio_score
+
0.6
*
partial_score
))
return
combined
best_text
=
target_text
best_score
=
_combined_score
(
target_text
,
query_text
)
# 对长句按常见中文分隔符做子句拆分,避免整句比较被噪声稀释。
for
clause
in
target_text
.
replace
(
"。"
,
";"
)
.
replace
(
","
,
";"
)
.
split
(
";"
):
clause
=
clause
.
strip
()
if
not
clause
:
continue
clause_score
=
_combined_score
(
clause
,
query_text
)
if
clause_score
>
best_score
:
best_score
=
clause_score
best_text
=
clause
return
best_text
,
best_score
def
_build_narrowed_queries
(
text
:
str
,
min_len
=
12
):
"""对文本做一步缩窄,生成下一轮候选。"""
if
not
text
:
return
[]
text
=
text
.
strip
()
if
len
(
text
)
<=
min_len
:
return
[]
next_queries
=
[]
cut
=
max
(
1
,
len
(
text
)
//
8
)
left_cut
=
text
[
cut
:]
right_cut
=
text
[:
-
cut
]
center_cut
=
text
[
cut
:
-
cut
]
if
len
(
text
)
>
2
*
cut
else
""
for
item
in
(
left_cut
,
right_cut
,
center_cut
):
item
=
item
.
strip
()
if
len
(
item
)
>=
min_len
:
next_queries
.
append
(
item
)
simplified
=
process_string
(
text
)
if
simplified
and
len
(
simplified
)
>=
min_len
:
next_queries
.
append
(
simplified
.
strip
())
parts
=
[
p
.
strip
()
for
p
in
re
.
split
(
r"[。;;,,\n]"
,
text
)
if
p
.
strip
()]
if
len
(
parts
)
>
1
:
longest_part
=
max
(
parts
,
key
=
len
)
if
len
(
longest_part
)
>=
min_len
:
next_queries
.
append
(
longest_part
)
if
len
(
parts
)
>
2
:
mid_join
=
""
.
join
(
parts
[
1
:
-
1
])
.
strip
()
if
len
(
mid_join
)
>=
min_len
:
next_queries
.
append
(
mid_join
)
deduped
=
[]
seen
=
set
()
for
item
in
next_queries
:
if
item
not
in
seen
:
seen
.
add
(
item
)
deduped
.
append
(
item
)
return
deduped
def
_find_best_match_in_texts
(
target_texts
,
original_text
):
"""在候选文本列表中查找与 original_text 最相近的一条(支持递进缩窄查询)。"""
if
not
target_texts
or
not
original_text
:
return
None
,
-
1
best_match
=
None
best_score
=
-
1
# beam_size: 每轮仅保留得分最高的前 N 个查询继续扩展,控制搜索分支爆炸。
beam_size
=
5
# max_rounds: 递进缩窄的最大轮数,避免异常文本导致无限尝试。
max_rounds
=
8
min_query_len
=
12
active_queries
=
[
original_text
.
strip
()]
seen_queries
=
set
(
active_queries
)
for
_
in
range
(
max_rounds
):
if
not
active_queries
:
break
query_best_scores
=
[]
for
query
in
active_queries
:
local_best
=
-
1
for
target_text
in
target_texts
:
match_text
,
score
=
_score_target_against_query
(
target_text
,
query
)
if
score
>
best_score
:
best_match
=
match_text
best_score
=
score
if
score
>
local_best
:
local_best
=
score
query_best_scores
.
append
((
query
,
local_best
))
if
best_score
>=
100
:
break
# 先保留当前轮最有希望的查询,再基于它们生成下一轮缩窄查询。
query_best_scores
.
sort
(
key
=
lambda
x
:
x
[
1
],
reverse
=
True
)
top_queries
=
[
q
for
q
,
_
in
query_best_scores
[:
beam_size
]]
next_queries
=
[]
for
query
in
top_queries
:
for
narrowed
in
_build_narrowed_queries
(
query
,
min_len
=
min_query_len
):
if
narrowed
not
in
seen_queries
:
seen_queries
.
add
(
narrowed
)
next_queries
.
append
(
narrowed
)
active_queries
=
next_queries
return
best_match
,
best_score
# spire doc解析
class
SpireWordDoc
(
DocBase
):
def
load
(
self
,
doc_path
,
**
kwargs
):
# License.SetLicenseFileFullPath(f"{root_path}/license.elic.python.xml")
self
.
_doc_path
=
doc_path
self
.
_doc_name
=
os
.
path
.
basename
(
doc_path
)
self
.
_doc
=
Document
()
self
.
_doc
.
LoadFromFile
(
doc_path
)
self
.
_chunk_list
=
self
.
_resolve_doc_chunk
()
return
self
def
_ensure_loaded
(
self
):
if
not
self
.
_doc
:
raise
RuntimeError
(
"Document not loaded. Call load() first."
)
def
adjust_chunk_size
(
self
):
self
.
_ensure_loaded
()
all_text_len
=
len
(
self
.
get_all_text
())
self
.
_max_single_chunk_size
=
adjust_single_chunk_size
(
all_text_len
)
logger
.
info
(
f
"SpireWordDoc adjust _max_single_chunk_size to {self._max_single_chunk_size}"
)
self
.
_chunk_list
=
self
.
_resolve_doc_chunk
()
return
self
.
_max_single_chunk_size
async
def
get_from_ocr
(
self
):
pass
# 把文档分割成chunk
def
_resolve_doc_chunk
(
self
):
self
.
_ensure_loaded
()
chunk_list
=
[]
# 单个chunk
single_chunk
=
""
# 单个chunk的位置信息
single_chunk_location
=
[]
# 遍历每个节
for
section_idx
in
range
(
self
.
_doc
.
Sections
.
Count
):
current_section
=
self
.
_doc
.
Sections
.
get_Item
(
section_idx
)
# 遍历节里面每个子对象
for
section_child_idx
in
range
(
current_section
.
Body
.
ChildObjects
.
Count
):
# 获取子对象
child_obj
=
current_section
.
Body
.
ChildObjects
.
get_Item
(
section_child_idx
)
# 段落处理
current_child_text
=
""
if
isinstance
(
child_obj
,
Paragraph
):
paragraph
=
child_obj
current_child_text
=
paragraph
.
Text
# 表格处理
elif
isinstance
(
child_obj
,
Table
):
table
=
child_obj
current_child_text
=
self
.
_resolve_table
(
table
)
# 跳过其他非文本子对象
else
:
continue
# 添加新对象
if
(
len
(
single_chunk
)
+
len
(
current_child_text
)
>
self
.
_max_single_chunk_size
):
chunk_list
.
append
(
{
"chunk_content"
:
single_chunk
,
"chunk_location"
:
single_chunk_location
,
}
)
single_chunk
=
""
single_chunk_location
=
[]
single_chunk
+=
current_child_text
+
"
\n
"
single_chunk_location
.
append
(
{
"section_idx"
:
section_idx
,
"section_child_idx"
:
section_child_idx
}
)
if
len
(
single_chunk
):
chunk_list
.
append
(
{
"chunk_content"
:
single_chunk
,
"chunk_location"
:
single_chunk_location
}
)
return
chunk_list
# 表格解析为markdown
def
_resolve_table
(
self
,
table
):
table_data
=
""
for
i
in
range
(
0
,
table
.
Rows
.
Count
):
# 遍历行的单元格(cells)
cell_list
=
[]
for
j
in
range
(
0
,
table
.
Rows
.
get_Item
(
i
)
.
Cells
.
Count
):
# 获取每一个单元格(cell)
cell
=
table
.
Rows
.
get_Item
(
i
)
.
Cells
.
get_Item
(
j
)
cell_content
=
""
for
para_idx
in
range
(
cell
.
Paragraphs
.
Count
):
paragraph_text
=
cell
.
Paragraphs
.
get_Item
(
para_idx
)
.
Text
cell_content
+=
paragraph_text
cell_list
.
append
(
cell_content
)
# table_data += "|" + "|".join(cell_list) + "|"
# table_data += "\n"
table_data
+=
' '
.
join
(
cell_list
)
+
'
\n
'
if
i
==
0
:
# table_data += "|" + "|".join(["--- " for _ in cell_list]) + "|\n"
table_data
=
' '
.
join
(
cell_list
)
+
'
\n
'
return
table_data
def
get_chunk_info
(
self
,
chunk_id
):
chunk
=
self
.
_chunk_list
[
chunk_id
]
chunk_content
=
chunk
[
"chunk_content"
]
chunk_location
=
chunk
[
"chunk_location"
]
from_location
=
f
"[第{chunk_location[0]['section_idx'] + 1}节的第{chunk_location[0]['section_child_idx'] + 1}段落]"
to_location
=
f
"[第{chunk_location[-1]['section_idx'] + 1}节的第{chunk_location[-1]['section_child_idx'] + 1}段落]"
chunk_content_tips
=
(
"["
+
chunk_content
[:
20
]
+
"]...到...["
+
chunk_content
[
-
20
:]
+
"]"
)
return
f
"文件块id: {chunk_id + 1}
\n
文件块位置: 从{from_location}到{to_location}
\n
文件块简述: {chunk_content_tips}
\n
"
def
get_chunk_location
(
self
,
chunk_id
):
return
self
.
get_chunk_info
(
chunk_id
)
def
get_chunk_num
(
self
):
self
.
_ensure_loaded
()
return
len
(
self
.
_chunk_list
)
def
get_chunk_item
(
self
,
chunk_id
):
self
.
_ensure_loaded
()
return
self
.
_chunk_list
[
chunk_id
][
"chunk_content"
]
# 根据locations获取数据
def
get_sub_chunks
(
self
,
chunk_id
):
if
chunk_id
>=
len
(
self
.
_chunk_list
):
logger
.
error
(
f
"get_sub_chunks_error:{chunk_id}"
)
return
[]
chunk
=
self
.
_chunk_list
[
chunk_id
]
chunk_locations
=
chunk
[
"chunk_location"
]
return
[
self
.
_doc
.
Sections
.
get_Item
(
loc
[
"section_idx"
])
.
Body
.
ChildObjects
.
get_Item
(
loc
[
"section_child_idx"
]
)
for
loc
in
chunk_locations
]
def
format_comment_author
(
self
,
comment
):
return
"{}|{}"
.
format
(
str
(
comment
[
"id"
]),
comment
[
"key_points"
])
def
_decorate_author_with_match_type
(
self
,
author
,
match_type
):
if
match_type
==
"exact"
:
return
f
"(精确){author}"
if
match_type
==
"fuzzy"
:
return
f
"(模糊){author}"
return
author
def
_normalize_author_prefix
(
self
,
author
):
# 去掉匹配来源前缀后再比对,确保“精确/模糊”两种作者标签都能命中同一条批注。
if
not
author
:
return
author
for
prefix
in
(
"(精确)"
,
"(模糊)"
):
if
author
.
startswith
(
prefix
):
return
author
[
len
(
prefix
)
:]
return
author
def
remove_comment_prefix
(
self
,
):
for
i
in
range
(
self
.
_doc
.
Comments
.
Count
):
current_comment
=
self
.
_doc
.
Comments
.
get_Item
(
i
)
comment_author
=
current_comment
.
Format
.
Author
split_author
=
comment_author
.
split
(
"|"
)
if
len
(
split_author
)
==
2
:
current_comment
.
Format
.
Author
=
comment_author
.
split
(
"|"
)[
1
]
def
_insert_comment_by_text_range
(
self
,
text_range
,
author
,
comment_content
):
if
text_range
is
None
:
return
False
paragraph
=
text_range
.
OwnerParagraph
if
paragraph
is
None
:
return
False
comment
=
Comment
(
self
.
_doc
)
comment
.
Body
.
AddParagraph
()
.
Text
=
comment_content
comment
.
Format
.
Author
=
author
paragraph
.
ChildObjects
.
Insert
(
paragraph
.
ChildObjects
.
IndexOf
(
text_range
)
+
1
,
comment
)
# Word 批注需要成对的起止标记;两者共享同一个 CommentId。
comment_start
=
CommentMark
(
self
.
_doc
,
CommentMarkType
.
CommentStart
)
comment_end
=
CommentMark
(
self
.
_doc
,
CommentMarkType
.
CommentEnd
)
comment_start
.
CommentId
=
comment
.
Format
.
CommentId
comment_end
.
CommentId
=
comment
.
Format
.
CommentId
paragraph
.
ChildObjects
.
Insert
(
paragraph
.
ChildObjects
.
IndexOf
(
text_range
),
comment_start
)
paragraph
.
ChildObjects
.
Insert
(
paragraph
.
ChildObjects
.
IndexOf
(
text_range
)
+
1
,
comment_end
)
return
True
def
_update_comment_content
(
self
,
comment_idx
,
suggest
):
self
.
_doc
.
Comments
.
get_Item
(
comment_idx
)
.
Body
.
Paragraphs
.
get_Item
(
0
)
.
Text
=
suggest
def
_try_add_comment_in_paragraphs
(
self
,
paragraphs
,
target_text
,
author
,
suggest
):
if
not
target_text
:
return
False
for
paragraph
in
paragraphs
:
text_sel
=
paragraph
.
Find
(
target_text
,
False
,
True
)
if
text_sel
and
self
.
set_comment_by_text_selection
(
text_sel
,
author
,
suggest
):
return
True
return
False
def
_try_add_comment_by_exact
(
self
,
sub_chunks
,
find_key
,
author
,
suggest
):
for
obj
in
sub_chunks
:
if
isinstance
(
obj
,
Paragraph
):
try
:
text_sel
=
obj
.
Find
(
find_key
,
False
,
True
)
if
text_sel
and
self
.
set_comment_by_text_selection
(
text_sel
,
author
,
suggest
):
return
True
except
Exception
as
e
:
print
(
f
"段落批注添加失败: {str(e)}"
)
elif
isinstance
(
obj
,
Table
):
try
:
if
self
.
add_table_comment
(
obj
,
find_key
,
suggest
,
author
):
return
True
except
Exception
as
e
:
print
(
f
"表格批注添加失败: {str(e)}"
)
return
False
def
_try_add_comment_by_fuzzy
(
self
,
sub_chunks
,
comment
,
author
,
suggest
):
original_text
=
comment
.
get
(
"original_text"
,
""
)
candidates
=
[]
# 段落与表格同权:统一加入候选池,按最高分排序后尝试落批注
for
order
,
obj
in
enumerate
(
sub_chunks
):
if
isinstance
(
obj
,
Paragraph
):
match_text
,
score
=
_find_best_match_in_texts
([
obj
.
Text
],
original_text
)
candidates
.
append
(
{
"kind"
:
"paragraph"
,
"obj"
:
obj
,
"match_text"
:
match_text
,
"score"
:
score
,
"order"
:
order
,
}
)
elif
isinstance
(
obj
,
Table
):
table_data
=
extract_table_cells_text
(
obj
)
match_text
,
score
=
_find_best_match_in_texts
(
table_data
,
original_text
)
candidates
.
append
(
{
"kind"
:
"table"
,
"obj"
:
obj
,
"match_text"
:
match_text
,
"score"
:
score
,
"order"
:
order
,
}
)
# 过滤无效候选后按分数降序、原文档顺序升序尝试,优先高分且靠前的位置。
candidates
=
[
item
for
item
in
candidates
if
item
.
get
(
"match_text"
)
and
item
.
get
(
"score"
,
-
1
)
>=
0
]
candidates
.
sort
(
key
=
lambda
x
:
(
-
x
[
"score"
],
x
[
"order"
]))
for
item
in
candidates
:
match_text
=
item
[
"match_text"
]
processed_text
=
process_string
(
match_text
)
if
match_text
else
""
if
item
[
"kind"
]
==
"paragraph"
:
paragraph
=
item
[
"obj"
]
# 先尝试原匹配片段,再尝试 process_string 压缩后的片段,提高落点成功率。
if
self
.
_try_add_comment_in_paragraphs
(
[
paragraph
],
match_text
,
author
,
suggest
):
return
True
if
self
.
_try_add_comment_in_paragraphs
(
[
paragraph
],
processed_text
,
author
,
suggest
):
return
True
else
:
table
=
item
[
"obj"
]
# 表格同样使用“原片段 -> 压缩片段”的两阶段策略。
if
self
.
add_table_comment
(
table
,
match_text
,
suggest
,
author
):
return
True
if
processed_text
and
self
.
add_table_comment
(
table
,
processed_text
,
suggest
,
author
):
return
True
return
False
# 根据text_selection批注
def
set_comment_by_text_selection
(
self
,
text_sel
,
author
,
comment_content
):
if
text_sel
is
None
:
return
False
text_range
=
text_sel
.
GetAsOneRange
()
return
self
.
_insert_comment_by_text_range
(
text_range
,
author
,
comment_content
)
# 设置chunk批注
def
add_table_comment
(
self
,
table
,
target_text
,
comment_text
,
author
=
"审阅助手"
,
initials
=
"AI"
):
"""
在表格中添加批注
返回是否成功添加
"""
added
=
False
# 遍历表格所有单元格
for
i
in
range
(
table
.
Rows
.
Count
):
row
=
table
.
Rows
[
i
]
for
j
in
range
(
row
.
Cells
.
Count
):
cell
=
row
.
Cells
[
j
]
# 遍历单元格中的段落
for
k
in
range
(
cell
.
Paragraphs
.
Count
):
para
=
cell
.
Paragraphs
[
k
]
# 在段落中查找目标文本
selection
=
para
.
Find
(
target_text
,
False
,
True
)
if
selection
:
text_range
=
selection
.
GetAsOneRange
()
if
self
.
_insert_comment_by_text_range
(
text_range
,
author
,
comment_text
):
added
=
True
# print(f"表格批注添加成功: '{target_text[:20]}...'")
# 添加成功后跳出内层循环
break
# 如果已经添加,跳出单元格循环
if
added
:
break
# 如果已经添加,跳出行循环
if
added
:
break
return
added
def
add_chunk_comment
(
self
,
chunk_id
,
comments
):
"""
为 chunk 添加批注(保证每条评论只批注一次)。
执行顺序:
1) 过滤非“不合格”项;
2) 先按作者标识查重,命中则更新内容;
3) 未命中时先精确匹配,再模糊匹配;
4) 仍失败则记录日志。
"""
for
comment
in
comments
:
if
comment
.
get
(
"result"
)
!=
"不合格"
:
continue
# update chunk_id
comment_chunk_id
=
comment
.
get
(
"chunk_id"
,
-
1
)
# 优先使用comments里提供的chunk_id,如果没有或无效则使用外部传入的chunk_id,如果都没有则异常处理
sub_chunks
=
self
.
get_sub_chunks
(
comment_chunk_id
)
if
comment_chunk_id
!=
-
1
\
and
comment_chunk_id
<
self
.
get_chunk_num
()
else
self
.
get_sub_chunks
(
chunk_id
)
author
=
self
.
format_comment_author
(
comment
)
suggest
=
comment
.
get
(
"suggest"
,
""
)
find_key
=
comment
[
"original_text"
]
.
strip
()
or
comment
[
"key_points"
]
# 先检查是否已有同一“规则ID|要点”的批注,避免重复插入。
existing_comment_idx
=
self
.
find_comment
(
author
)
if
existing_comment_idx
is
not
None
:
# 已存在批注,则更新内容
self
.
_update_comment_content
(
existing_comment_idx
,
suggest
)
# print(f"批注已存在,更新内容: '{find_key[:20]}...'")
continue
exact_author
=
self
.
_decorate_author_with_match_type
(
author
,
"exact"
)
fuzzy_author
=
self
.
_decorate_author_with_match_type
(
author
,
"fuzzy"
)
# 优先精确匹配,成功则不再进入模糊匹配。
matched
=
self
.
_try_add_comment_by_exact
(
sub_chunks
,
find_key
,
exact_author
,
suggest
)
if
not
matched
:
try
:
# 精确失败后走模糊匹配(段落/表格统一候选池评分)。
matched
=
self
.
_try_add_comment_by_fuzzy
(
sub_chunks
,
comment
,
fuzzy_author
,
suggest
)
except
Exception
as
e
:
print
(
f
"模糊匹配失败: {str(e)}"
)
# ---------- 3. 匹配最终失败 ----------
if
not
matched
:
logger
.
error
(
f
"未找到可批注位置: '{find_key[:20]}...'"
)
# 根据作者名称查找批注
def
find_comment
(
self
,
author
):
# 比较前去掉“(精确)/(模糊)”前缀,只按真实作者键(id|key_points)识别唯一批注。
normalized_author
=
self
.
_normalize_author_prefix
(
author
)
for
i
in
range
(
self
.
_doc
.
Comments
.
Count
):
current_comment
=
self
.
_doc
.
Comments
.
get_Item
(
i
)
comment_author
=
self
.
_normalize_author_prefix
(
current_comment
.
Format
.
Author
)
if
comment_author
==
normalized_author
:
return
i
return
None
def
delete_chunk_comment
(
self
,
comments
):
"""
删除指定作者批注
"""
for
comment
in
comments
:
author
=
self
.
format_comment_author
(
comment
)
author_comment_idx
=
self
.
find_comment
(
author
)
if
author_comment_idx
is
not
None
:
self
.
_doc
.
Comments
.
RemoveAt
(
author_comment_idx
)
print
(
f
"删除批注: '{author}'"
)
def
edit_chunk_comment
(
self
,
comments
):
"""
编辑chunk批注:删除已合格的批注,修改存在的批注,不存在则新增
"""
for
comment
in
comments
:
author
=
self
.
format_comment_author
(
comment
)
review_answer
=
comment
[
"result"
]
existing_comment_idx
=
self
.
find_comment
(
author
)
if
review_answer
==
"合格"
:
# 删除批注
if
existing_comment_idx
is
not
None
:
self
.
_doc
.
Comments
.
RemoveAt
(
existing_comment_idx
)
# print(f"已删除合格批注: '{author}'")
else
:
# 不合格,更新或新增
suggest
=
comment
.
get
(
"suggest"
,
""
)
if
existing_comment_idx
is
not
None
:
self
.
_update_comment_content
(
existing_comment_idx
,
suggest
)
# print(f"更新已有批注: '{author}'")
else
:
# chunk_id要从comment中获取
self
.
add_chunk_comment
(
comment
[
"chunk_id"
]
-
1
,
[
comment
])
def
get_chunk_id_list
(
self
,
step
=
1
):
self
.
_ensure_loaded
()
return
[
idx
for
idx
in
range
(
0
,
self
.
get_chunk_num
(),
step
)]
def
get_all_text
(
self
):
self
.
_ensure_loaded
()
return
self
.
_doc
.
GetText
()
def
to_file
(
self
,
path
,
remove_prefix
=
False
):
self
.
_ensure_loaded
()
if
remove_prefix
:
self
.
remove_comment_prefix
()
self
.
_doc
.
SaveToFile
(
path
)
def
release
(
self
):
# 关闭文件
if
self
.
_doc
:
self
.
_doc
.
Close
()
super
()
.
release
()
def
__del__
(
self
):
pass
# self.release()
if
__name__
==
"__main__"
:
doc
=
SpireWordDoc
()
doc
.
load
(
r"/home/ccran/lufa-contract/demo/今麦郎合同审核.docx"
)
print
(
doc
.
_doc_name
)
print
(
"附件2《技术协议》"
in
doc
.
get_all_text
())
# doc.add_chunk_comment(
# 0,
# [
# {
# "id": "1",
# "key_points": "日期审查",
# "original_text": "承诺",
# "details": "1111",
# "chunk_id": 0,
# "result": "不合格",
# "suggest": "这是测试建议",
# }
# ],
# )
# doc.to_file("/home/ccran/lufa-contract/demo/今麦郎合同审核_test.docx", True)
\ No newline at end of file
utils/spire_word_util.py
View file @
3bb9ff31
...
...
@@ -149,6 +149,163 @@ def process_string(s):
return
max
(
middle_parts
,
key
=
len
,
default
=
""
)
def
_normalize_whitespace
(
text
:
str
):
"""将不同空白统一为单空格,便于容错匹配。"""
if
not
text
:
return
""
return
re
.
sub
(
r"\s+"
,
" "
,
text
.
replace
(
"
\u3000
"
,
" "
))
.
strip
()
def
_remove_all_whitespace
(
text
:
str
):
"""移除全部空白字符,用于处理仅空格差异的场景。"""
if
not
text
:
return
""
return
re
.
sub
(
r"\s+"
,
""
,
text
.
replace
(
"
\u3000
"
,
" "
))
def
_split_query_clauses
(
text
:
str
,
min_len
=
6
):
"""按标点切分 original_text,得到用于初筛的有效子句。"""
if
not
text
:
return
[]
raw_parts
=
re
.
split
(
r"[。!?!?.;;,,、\n]"
,
text
)
clauses
=
[
p
.
strip
()
for
p
in
raw_parts
if
p
and
p
.
strip
()]
return
[
p
for
p
in
clauses
if
len
(
_remove_all_whitespace
(
p
))
>=
min_len
]
def
_contains_clause
(
candidate_text
:
str
,
clause
:
str
):
"""空白宽容包含判断:原文包含或去空白后包含。"""
if
not
candidate_text
or
not
clause
:
return
False
if
clause
in
candidate_text
:
return
True
return
_remove_all_whitespace
(
clause
)
in
_remove_all_whitespace
(
candidate_text
)
def
_prefilter_candidates
(
target_texts
,
original_text
):
"""召回阶段:按子句包含命中数筛选候选,并保留命中统计。"""
clauses
=
_split_query_clauses
(
original_text
)
if
not
clauses
:
return
[(
idx
,
text
,
0
)
for
idx
,
text
in
enumerate
(
target_texts
)]
# 至少命中部分子句,避免仅靠一个高频短语触发误召回。
min_hits
=
max
(
1
,
int
(
len
(
clauses
)
*
0.3
))
selected
=
[]
best_partial_hits
=
0
for
idx
,
text
in
enumerate
(
target_texts
):
if
not
text
:
continue
hit_count
=
sum
(
1
for
clause
in
clauses
if
_contains_clause
(
text
,
clause
))
best_partial_hits
=
max
(
best_partial_hits
,
hit_count
)
if
hit_count
>=
min_hits
:
selected
.
append
((
idx
,
text
,
hit_count
))
if
selected
:
selected
.
sort
(
key
=
lambda
x
:
(
-
x
[
2
],
x
[
0
]))
return
selected
# 严格阈值无命中时,降级保留“命中子句最多”的候选,避免空结果。
if
best_partial_hits
>
0
:
fallback
=
[]
for
idx
,
text
in
enumerate
(
target_texts
):
if
not
text
:
continue
hit_count
=
sum
(
1
for
clause
in
clauses
if
_contains_clause
(
text
,
clause
))
if
hit_count
==
best_partial_hits
:
fallback
.
append
((
idx
,
text
,
hit_count
))
fallback
.
sort
(
key
=
lambda
x
:
x
[
0
])
return
fallback
return
[(
idx
,
text
,
0
)
for
idx
,
text
in
enumerate
(
target_texts
)
if
text
]
def
_window_similarity
(
window_text
:
str
,
query_text
:
str
):
"""重排阶段打分:组合 ratio/partial/token_set,并兼容空白差异。"""
if
not
window_text
or
not
query_text
:
return
0
norm_window
=
_normalize_whitespace
(
window_text
)
norm_query
=
_normalize_whitespace
(
query_text
)
if
norm_query
and
norm_query
in
norm_window
:
return
100
ratio_score
=
fuzz
.
ratio
(
norm_window
,
norm_query
)
partial_score
=
fuzz
.
partial_ratio
(
norm_window
,
norm_query
)
token_score
=
fuzz
.
token_set_ratio
(
norm_window
,
norm_query
)
base_score
=
int
(
round
(
0.3
*
ratio_score
+
0.5
*
partial_score
+
0.2
*
token_score
))
# 去空白后再算一轮,专门修正“仅差空格”导致的评分偏低。
nospace_window
=
_remove_all_whitespace
(
norm_window
)
nospace_query
=
_remove_all_whitespace
(
norm_query
)
if
nospace_query
and
nospace_query
in
nospace_window
:
return
100
nospace_score
=
int
(
round
(
0.3
*
fuzz
.
ratio
(
nospace_window
,
nospace_query
)
+
0.7
*
fuzz
.
partial_ratio
(
nospace_window
,
nospace_query
)
)
)
return
max
(
base_score
,
nospace_score
)
def
_iter_sliding_windows
(
text
:
str
,
query_len
:
int
):
"""生成长度区间窗口,覆盖 0.7L~1.3L,并对长文本使用较大步长降本。"""
if
not
text
:
return
text_len
=
len
(
text
)
if
text_len
==
0
:
return
min_w
=
max
(
8
,
int
(
query_len
*
0.7
))
max_w
=
max
(
min_w
,
int
(
query_len
*
1.3
))
mid_w
=
max
(
min_w
,
query_len
)
candidate_lengths
=
sorted
(
{
min_w
,
int
(
mid_w
*
0.85
),
mid_w
,
int
(
mid_w
*
1.15
),
max_w
,
}
)
for
window_len
in
candidate_lengths
:
if
window_len
<=
0
:
continue
if
window_len
>=
text_len
:
yield
text
continue
step
=
max
(
1
,
window_len
//
20
)
for
start
in
range
(
0
,
text_len
-
window_len
+
1
,
step
):
yield
text
[
start
:
start
+
window_len
]
def
_best_window_match
(
target_text
:
str
,
original_text
:
str
):
"""在单个候选文本上做滑窗重排,返回最佳片段与分数。"""
if
not
target_text
or
not
original_text
:
return
None
,
0
query
=
original_text
.
strip
()
query_len
=
max
(
1
,
len
(
_remove_all_whitespace
(
query
)))
best_text
=
target_text
best_score
=
_window_similarity
(
target_text
,
query
)
for
window
in
_iter_sliding_windows
(
target_text
,
query_len
):
score
=
_window_similarity
(
window
,
query
)
if
score
>
best_score
:
best_text
=
window
best_score
=
score
if
best_score
>=
100
:
break
return
best_text
,
best_score
def
_score_target_against_query
(
target_text
:
str
,
query_text
:
str
):
"""对单个候选文本与查询文本打分,并返回最适合落批注的匹配片段。"""
if
not
target_text
or
not
query_text
:
...
...
@@ -167,6 +324,7 @@ def _score_target_against_query(target_text: str, query_text: str):
best_text
=
target_text
best_score
=
_combined_score
(
target_text
,
query_text
)
# 对长句按常见中文分隔符做子句拆分,避免整句比较被噪声稀释。
for
clause
in
target_text
.
replace
(
"。"
,
";"
)
.
replace
(
","
,
";"
)
.
split
(
";"
):
clause
=
clause
.
strip
()
if
not
clause
:
...
...
@@ -225,52 +383,32 @@ def _build_narrowed_queries(text: str, min_len=12):
def
_find_best_match_in_texts
(
target_texts
,
original_text
):
"""
在候选文本列表中查找与 original_text 最相近的一条(支持递进缩窄查询)
。"""
"""
两阶段匹配:分句初筛召回 + 滑窗重排,返回最佳候选片段
。"""
if
not
target_texts
or
not
original_text
:
return
None
,
-
1
best_match
=
None
best_score
=
-
1
beam_size
=
5
max_rounds
=
8
min_query_len
=
12
active_queries
=
[
original_text
.
strip
()]
seen_queries
=
set
(
active_queries
)
shortlisted
=
_prefilter_candidates
(
target_texts
,
original_text
)
for
_
in
range
(
max_rounds
):
if
not
active_queries
:
break
for
order
,
target_text
,
hit_count
in
shortlisted
:
match_text
,
score
=
_best_window_match
(
target_text
,
original_text
)
query_best_scores
=
[]
# 初筛命中子句越多,排序时给予轻微优势(不改变 100 分绝对上限)。
score
=
min
(
100
,
score
+
min
(
6
,
hit_count
*
2
))
for
query
in
active_queries
:
local_best
=
-
1
for
target_text
in
target_texts
:
match_text
,
score
=
_score_target_against_query
(
target_text
,
query
)
if
score
>
best_score
:
best_match
=
match_text
best_score
=
score
if
score
>
local_best
:
local_best
=
score
query_best_scores
.
append
((
query
,
local_best
))
elif
score
==
best_score
and
best_match
and
match_text
:
# 分数相同优先更短片段,便于后续 Find 命中。
if
len
(
match_text
)
<
len
(
best_match
):
best_match
=
match_text
if
best_score
>=
100
:
break
query_best_scores
.
sort
(
key
=
lambda
x
:
x
[
1
],
reverse
=
True
)
top_queries
=
[
q
for
q
,
_
in
query_best_scores
[:
beam_size
]]
next_queries
=
[]
for
query
in
top_queries
:
for
narrowed
in
_build_narrowed_queries
(
query
,
min_len
=
min_query_len
):
if
narrowed
not
in
seen_queries
:
seen_queries
.
add
(
narrowed
)
next_queries
.
append
(
narrowed
)
active_queries
=
next_queries
return
best_match
,
best_score
...
...
@@ -424,6 +562,7 @@ class SpireWordDoc(DocBase):
return
author
def
_normalize_author_prefix
(
self
,
author
):
# 去掉匹配来源前缀后再比对,确保“精确/模糊”两种作者标签都能命中同一条批注。
if
not
author
:
return
author
for
prefix
in
(
"(精确)"
,
"(模糊)"
):
...
...
@@ -455,6 +594,7 @@ class SpireWordDoc(DocBase):
paragraph
.
ChildObjects
.
IndexOf
(
text_range
)
+
1
,
comment
)
# Word 批注需要成对的起止标记;两者共享同一个 CommentId。
comment_start
=
CommentMark
(
self
.
_doc
,
CommentMarkType
.
CommentStart
)
comment_end
=
CommentMark
(
self
.
_doc
,
CommentMarkType
.
CommentEnd
)
comment_start
.
CommentId
=
comment
.
Format
.
CommentId
...
...
@@ -528,6 +668,7 @@ class SpireWordDoc(DocBase):
}
)
# 过滤无效候选后按分数降序、原文档顺序升序尝试,优先高分且靠前的位置。
candidates
=
[
item
for
item
in
candidates
...
...
@@ -541,6 +682,7 @@ class SpireWordDoc(DocBase):
if
item
[
"kind"
]
==
"paragraph"
:
paragraph
=
item
[
"obj"
]
# 先尝试原匹配片段,再尝试 process_string 压缩后的片段,提高落点成功率。
if
self
.
_try_add_comment_in_paragraphs
(
[
paragraph
],
match_text
,
author
,
suggest
):
...
...
@@ -551,6 +693,7 @@ class SpireWordDoc(DocBase):
return
True
else
:
table
=
item
[
"obj"
]
# 表格同样使用“原片段 -> 压缩片段”的两阶段策略。
if
self
.
add_table_comment
(
table
,
match_text
,
suggest
,
author
):
return
True
if
processed_text
and
self
.
add_table_comment
(
...
...
@@ -611,7 +754,12 @@ class SpireWordDoc(DocBase):
def
add_chunk_comment
(
self
,
chunk_id
,
comments
):
"""
为chunk添加批注(保证每条评论只批注一次)
为 chunk 添加批注(保证每条评论只批注一次)。
执行顺序:
1) 过滤非“不合格”项;
2) 先按作者标识查重,命中则更新内容;
3) 未命中时先精确匹配,再模糊匹配;
4) 仍失败则记录日志。
"""
for
comment
in
comments
:
if
comment
.
get
(
"result"
)
!=
"不合格"
:
...
...
@@ -625,7 +773,7 @@ class SpireWordDoc(DocBase):
suggest
=
comment
.
get
(
"suggest"
,
""
)
find_key
=
comment
[
"original_text"
]
.
strip
()
or
comment
[
"key_points"
]
# 先检查是否已
经有批注
# 先检查是否已
有同一“规则ID|要点”的批注,避免重复插入。
existing_comment_idx
=
self
.
find_comment
(
author
)
if
existing_comment_idx
is
not
None
:
# 已存在批注,则更新内容
...
...
@@ -636,12 +784,14 @@ class SpireWordDoc(DocBase):
exact_author
=
self
.
_decorate_author_with_match_type
(
author
,
"exact"
)
fuzzy_author
=
self
.
_decorate_author_with_match_type
(
author
,
"fuzzy"
)
# 优先精确匹配,成功则不再进入模糊匹配。
matched
=
self
.
_try_add_comment_by_exact
(
sub_chunks
,
find_key
,
exact_author
,
suggest
)
if
not
matched
:
try
:
# 精确失败后走模糊匹配(段落/表格统一候选池评分)。
matched
=
self
.
_try_add_comment_by_fuzzy
(
sub_chunks
,
comment
,
fuzzy_author
,
suggest
)
...
...
@@ -654,6 +804,7 @@ class SpireWordDoc(DocBase):
# 根据作者名称查找批注
def
find_comment
(
self
,
author
):
# 比较前去掉“(精确)/(模糊)”前缀,只按真实作者键(id|key_points)识别唯一批注。
normalized_author
=
self
.
_normalize_author_prefix
(
author
)
for
i
in
range
(
self
.
_doc
.
Comments
.
Count
):
current_comment
=
self
.
_doc
.
Comments
.
get_Item
(
i
)
...
...
@@ -729,18 +880,19 @@ if __name__ == "__main__":
r"/home/ccran/lufa-contract/demo/今麦郎合同审核.docx"
)
print
(
doc
.
_doc_name
)
doc
.
add_chunk_comment
(
0
,
[
{
"id"
:
"1"
,
"key_points"
:
"日期审查"
,
"original_text"
:
"承诺"
,
"details"
:
"1111"
,
"chunk_id"
:
0
,
"result"
:
"不合格"
,
"suggest"
:
"这是测试建议"
,
}
],
)
doc
.
to_file
(
"/home/ccran/lufa-contract/demo/今麦郎合同审核_test.docx"
,
True
)
\ No newline at end of file
print
(
"附件2《技术协议》"
in
doc
.
get_all_text
())
# doc.add_chunk_comment(
# 0,
# [
# {
# "id": "1",
# "key_points": "日期审查",
# "original_text": "承诺",
# "details": "1111",
# "chunk_id": 0,
# "result": "不合格",
# "suggest": "这是测试建议",
# }
# ],
# )
# doc.to_file("/home/ccran/lufa-contract/demo/今麦郎合同审核_test.docx", True)
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment