MemoTrace 端到端样本追踪示例
从原始 LongMemEval Turns 到检索结果
目标: 追踪样本 71017276 的 Rank 1 命中候选 (msg:2023-03-04t22:00z)
配置: qwen_rerank_xenc (Ollama Qwen3 embeddings + Cross-encoder rerank)
问题: “How many weeks ago did I meet up with my aunt and receive the crystal chandelier?”
模块调用链
runner.py::run_retrieval_only()
├─> [STEP 1] get_dataset_handler() # 获取 LongMemEval handler
├─> [STEP 2] handler.convert() # 原始 JSON → processed JSONL
│ └─> convert_longmemeval()
│ ├─> LongMemEval.iter_samples() # 解析原始 JSON
│ ├─> _extract_haystack_dialogue() # 提取 haystack sessions
│ ├─> _normalize_timestamp() # 时间戳规范化
│ ├─> _normalize_role() # 角色归一化
│ └─> _write_dialogue_with_mapping() # 写入 dialogue.jsonl + session→msg 映射
│
├─> [STEP 3] _ensure_snapshot() # 创建 GraphStore snapshot
│ └─> run_memowrite()
│ ├─> extract.py::extract_graph() # 从 dialogue 抽取 SPO 三元组
│ ├─> assimilate.py::assimilate() # 实体消歧、同化
│ └─> snapshot.py::save() # 保存 EdgeVersion + Events
│
└─> [STEP 4] memosearch_run() # 执行检索
└─> memosearch.py::run()
├─> load_snapshot() # 加载 GraphStore
├─> current_fact_versions() # 提取开放边 (1403 facts)
├─> slice_graph() # 根据视图 (AW) 过滤可见性
└─> eagle_v1.search() # 检索引擎
├─> build_bm25_index() # BM25 索引构建 (1401 docs)
├─> build_embedding_index() # Qwen3 向量索引 (1401 docs)
├─> bm25_search() # BM25 检索 (k=64)
├─> vector_search() # 向量检索 (k=64)
├─> rrf_fusion() # RRF 融合
├─> graph_expansion() # 图扩展 (depth=2, fanout=4)
├─> cross_encoder_rerank() # Cross-encoder 重排 (top=32)
└─> pack_facts() # 打包 Top-10
数据变换流程(倒推)
最终输出 (Rank 1)
文件: reports/retrieval_benchmark/.../per_query.jsonl
{
"candidates": [{
"rank": 1,
"raw_id": "msg#2023-03-04T22:00:00Z",
"canonical_id": "msg:2023-03-04t22:00:00z",
"doc_id": "msg:2023-03-04t22:00z",
"score": 1.0000,
"source": "USER",
"span": "2023-03-04T22:00:00Z-",
"kind": "fact"
}]
}关键字段解析:
raw_id: 原始边 ID (来自 EdgeVersion)canonical_id: 规范化后的 ID (normalize_id)doc_id: 文档级 ID (fold_chunk_to_doc)score: 最终分数 (fusion + rerank)kind: 类型 (fact 或 event)
⬆️ STEP 4: 检索输出 (memosearch.py)
输入:
- Query: “How many weeks ago did I meet up with my aunt and receive the crystal chandelier?”
- Snapshot:
sample_dir/snapshot.jsonl - View: AW (as-world)
- Topk: 10
处理:
-
加载 GraphStore:
facts = load_snapshot(snapshot_path) # facts: Dict[str, EdgeVersion] # 包含 1403 个开放边 -
过滤可见性 (AW 视图):
slc = slice_graph(facts, events, view="AW", moment=query_time) # slc.facts: 1403 个 facts (record.end is None) # slc.events: 180 个 events (visible at query_time) -
检索 (eagle_v1.search):
result = search(query, view="AW", facts=slc.facts, events=slc.events, topk=10) # result["facts"]: List[Dict] - Top-10 facts with scores -
候选构建:
for fact in result["facts"]: raw_id = fact["evidence_id"] # "msg#2023-03-04T22:00:00Z" canonical = normalize_id(raw_id) # "msg:2023-03-04t22:00:00z" doc_id = fold_chunk_to_doc(canonical) # "msg:2023-03-04t22:00z" score = fact["score"] # 1.0000 (after rerank)
输出: retrieval payload with facts, events, evidence_ids, diagnostics
⬆️ STEP 3: GraphStore Snapshot (memowrite.py)
输入: dialogue.jsonl (processed messages)
处理:
-
提取 SPO 三元组 (extract_graph):
# 从消息: "I met my aunt and received a crystal chandelier" # 提取: edges = [ EdgeVersion( id="edge:ent:user#met:aunt", src="ent:user", rel="met", dst="aunt", evidence_id="msg#2023-03-04T22:00:00Z", source="USER", record=Interval(start=parse("2023-03-04T22:00:00Z"), end=None), valid=Interval(start=parse("2023-03-04T22:00:00Z"), end=None), text="user met aunt" ), EdgeVersion( id="edge:ent:user#received:crystal_chandelier", src="ent:user", rel="received", dst="crystal_chandelier", evidence_id="msg#2023-03-04T22:00:00Z", source="USER", record=Interval(...), valid=Interval(...), text="user received crystal chandelier" ), # ... 更多边 ] -
实体消歧 (assimilate):
# 合并同义实体 "aunt" ↔ "my aunt" ↔ "the aunt" # 生成 alias_of 边 -
保存 snapshot:
save_snapshot(edges, events, snapshot_path) # snapshot.jsonl: 每行一个 EdgeVersion (JSON)
关键点:
- 1条消息 → 多个 edges
- 每个 edge 保留
evidence_id指向原始消息 record和valid时间区间支持 AR/AW 视图
输出: snapshot.jsonl (1403 edges + 180 events)
⬆️ STEP 2: 数据转换 (convert_longmemeval)
输入: 原始 LongMemEval JSON
{
"question_id": "71017276",
"question": "How many weeks ago did I meet up with my aunt...",
"question_date": "2023-04-01T08:09:00+00:00",
"haystack_sessions": [
{
"session_id": "session_0",
"session_date": "2023-03-04T22:00:00+00:00",
"turns": [
{
"role": "user",
"timestamp": 0, // 相对时间(秒)
"text": "I met my aunt and received a crystal chandelier"
},
{
"role": "assistant",
"timestamp": 1,
"text": "That sounds lovely! Tell me more about it."
},
// ... 更多 turns
]
},
// ... 更多 sessions
],
"answer_session_ids": ["session_0"] // session 级别的答案标注
}处理:
-
时间戳规范化:
base_time = parse("2023-03-04T22:00:00+00:00") # session_date for turn in session.turns: absolute_time = base_time + timedelta(seconds=turn.timestamp) # turn 0: 2023-03-04T22:00:00Z # turn 1: 2023-03-04T22:00:01Z -
角色归一化:
role = turn.role.lower() if role in {"human", "client"}: role = "user" elif role in {"bot", "agent"}: role = "assistant" -
消息 ID 生成:
msg_id = f"msg#{absolute_time.isoformat()}" # "msg#2023-03-04T22:00:00Z" -
会话级 → 消息级映射:
session_to_msg_ids = {"session_0": [ "msg#2023-03-04T22:00:00Z", "msg#2023-03-04T22:00:01Z", # ... 12 条消息 ]} # 展开 answer_session_ids gold_evidence = [] for session_id in ["session_0"]: gold_evidence.extend(session_to_msg_ids[session_id]) # gold_evidence: 12 条消息级 ID -
写入 processed 文件:
# dialogue.jsonl (每条消息一行) { "ts": "2023-03-04T22:00:00Z", "role": "user", "text": "I met my aunt and received a crystal chandelier", "session_id": "session_0" } # qas.jsonl (每个问题一行) { "id": "71017276", "query": "How many weeks ago...", "ts": "2023-04-01T08:09:00+00:00", "mode": "as-world", "gold_evidence": [ "msg#2023-03-04T22:00:00Z", "msg#2023-03-04T22:00:01Z", // ... 12 条 ] }
输出:
.cache/data/longmemeval/processed/test/dialogs/71017276.jsonl.cache/data/longmemeval/processed/test/qas/test.jsonl
⬆️ STEP 1: 原始数据
文件: .cache/data/longmemeval/raw/LongMemEval_test.json
样本 71017276 的原始 turns:
| # | Time (relative) | Role | Utterance |
|---|---|---|---|
| 0 | +0s | user | I met my aunt and received a crystal chandelier |
| 1 | +1s | assistant | That sounds lovely! Tell me more about it. |
| 2 | +2s | user | It has intricate designs and hangs beautifully |
| 3 | +3s | assistant | Where did you hang it? |
| 4 | +4s | user | In the dining room, above the table |
| … | … | … | … |
| 11 | +11s | assistant | I’m sure it looks stunning! |
会话信息:
- Session ID:
session_0 - Base time:
2023-03-04T22:00:00+00:00 - Answer sessions:
["session_0"](session 级别)
问题信息:
- Question: “How many weeks ago did I meet up with my aunt and receive the crystal chandelier?”
- Query time:
2023-04-01T08:09:00+00:00(4周后) - Expected answer: “4 weeks ago”
SPO 三元组提取方法
整体流程
消息文本 "I met my aunt and received a crystal chandelier"
↓
[STEP 1] 预处理 (preprocess_turn)
├─> 分句: utils.parser.sentences
├─> 实体抽取: extract_entities
└─> 共指消解: resolve_coref
↓
[STEP 2] 关系抽取 (extract_relations)
├─> 正则规则: REGEX_PATTERNS
├─> 依存句法: DependencyMatcher (spacy)
└─> 语义规则: SVO 启发式
↓
[STEP 3] 三元组过滤 (triage_batch)
├─> 负向检测: detect_corrections
├─> 重要性评分: triage backend (LLM/tiny_clf)
└─> Top-K 选择: max_triples_per_turn
↓
[STEP 4] 边版本构建 (_make_edge_version)
├─> 生成边 ID: hash(src+rel+dst)
├─> 时间区间: record/valid intervals
└─> 保留证据: evidence_id + offsets
1. 实体抽取 (extract_entities)
方法: 混合确定性规则 + 可选的 Flair NER 模型
# src/modules/extract/entities.py
def extract_entities(
text: str,
sentences: List[str],
turn_id: int = 0,
doc_id: str = "",
config: Optional[ExtractionConfig] = None,
) -> List[Entity]:
"""
提取实体的多阶段流程:
1. 正则规则抽取 (EMAIL, URL, PHONE, MONEY, HANDLE, DATE)
2. 大写序列抽取 (ORG/PERSON 候选)
3. 可选: Flair NER 模型增强
4. 启发式分类 (PERSON vs ORG vs GPE)
5. 别名归一化和去重
"""
entities = []
# 正则规则 (高精度, 100% 确定性)
for match in _EMAIL_RE.finditer(text):
entities.append({
"type": "EMAIL",
"text": match.group(1),
"start": match.start(1),
"end": match.end(1),
"confidence": 1.0
})
# 同理: URL, PHONE, MONEY, HANDLE, DATE_TEXT
# 大写序列抽取 (启发式)
for match in _CAP_SEQ_RE.finditer(text):
span = match.group(0).strip()
# 排除噪声: 单字母、停用词、etc
if len(span) > 1 and not span.lower() in _STOPWORDS:
# 分类逻辑
if _looks_like_person(span, context):
type = "PERSON"
elif _has_legal_suffix(span):
type = "ORG"
elif _follows_location_prep(span, context):
type = "GPE"
else:
type = "ORG" # 默认
entities.append({
"type": type,
"text": span,
"start": match.start(),
"end": match.end(),
"confidence": 0.7
})
# 归一化: "my aunt" → "aunt", "Aunt Mary" → "Mary"
normalized_entities = _normalize_and_deduplicate(entities)
return normalized_entities关键启发式:
| 规则 | 示例 | 类型 | 置信度 |
|---|---|---|---|
| 跟在 “met”, “called”, “asked” 后 | ”I met Mary” | PERSON | 0.85 |
| 包含 Inc/Ltd/Corp 后缀 | ”Apple Inc.” | ORG | 0.9 |
| 跟在 “in”, “at”, “from” 后 | ”lives in Boston” | GPE | 0.8 |
| 邮箱/电话/URL | ”[email protected]” | 1.0 | |
| @handle | ”@john_doe” | HANDLE | 1.0 |
2. 关系抽取 (extract_relations)
方法: 正则模板 + 依存句法 + 角色词典
# src/modules/extract/relations.py
REGEX_PATTERNS = [
# works_at: "work(s|ed|ing) at/for <ORG>"
(r"\bwork(?:s|ed|ing)?\s+(?:at|for)\s+(?P<dst>[A-Z][\w&.,'\- ]+)", "works_at"),
# lives_in: "live(s|d) in/at/near <GPE>"
(r"\blive(?:s|d|ing)?\s+(?:in|at|near)\s+(?P<dst>[A-Z][\w.,'\- ]+)", "lives_in"),
# owns: "own(s|ed) <PRODUCT>"
(r"\bown(?:s|ed|ing)?\s+(?P<dst>[\w&.,'\- ]+)", "owns"),
# uses: "use(s|d) <PRODUCT>"
(r"\buse(?:s|d|ing)?\s+(?P<dst>[\w&.,'\- ]+)", "uses"),
# likes/loves: "like(s) / love(s) <OBJECT>"
(r"\b(?:like|likes|love|loves|enjoy|enjoys)\s+(?P<dst>[^.;,]+)", "likes"),
]
def extract_relations(turn: PreprocTurn) -> List[RelationCandidate]:
"""
提取流程:
1. 遍历每个句子
2. 应用正则模板匹配
3. 查找句子中的主语实体 (PERSON)
4. 构建候选三元组
"""
candidates = []
entities = turn["entities"]
for sentence, (sent_start, sent_end) in zip(sentences, offsets):
# 正则匹配
for pattern, rel_type in REGEX_PATTERNS:
for match in pattern.finditer(sentence):
dst = match.group("dst").strip()
# 查找主语 (在 dst 之前的 PERSON 实体)
src_entity = find_person_before(
position=sent_start + match.start(),
sentence_start=sent_start,
entities=entities
)
if src_entity:
candidates.append({
"src": src_entity["norm"], # "person:user"
"rel": rel_type, # "uses"
"dst": dst, # "crystal chandelier"
"evidence": sentence,
"offsets": (match.start(), match.end()),
"confidence": 0.65
})
# 角色词典匹配
if any(role in sentence.lower() for role in _ROLE_LEXICON):
# _ROLE_LEXICON = {"engineer", "researcher", "manager", ...}
for role in _ROLE_LEXICON:
if role in sentence.lower():
src = find_person_before(...)
if src:
candidates.append({
"src": src["norm"],
"rel": "persona.role",
"dst": role,
...
})
return candidates示例: “I met my aunt and received a crystal chandelier”
# 分句: ["I met my aunt and received a crystal chandelier"]
# 实体: [
# {"type": "PERSON", "norm": "person:user", "text": "I"},
# {"type": "PERSON", "norm": "person:aunt", "text": "aunt"}
# ]
# 关系匹配:
# 1. 正则未命中 (没有 "work at", "live in" 等模板)
# 2. 依存句法分析 (spacy DependencyMatcher):
# - "I" (nsubj) → "met" (verb) → "aunt" (dobj)
# - 生成: ("person:user", "met", "person:aunt")
#
# - "I" (nsubj) → "received" (verb) → "chandelier" (dobj)
# - 生成: ("person:user", "received", "crystal_chandelier")
#
# 3. 语义推理:
# - "crystal chandelier" 与 "aunt" 在同一句
# - 推断: ("crystal_chandelier", "from", "person:aunt")
# 最终候选:
candidates = [
{"src": "person:user", "rel": "met", "dst": "person:aunt", "confidence": 0.75},
{"src": "person:user", "rel": "received", "dst": "crystal_chandelier", "confidence": 0.75},
{"src": "crystal_chandelier", "rel": "from", "dst": "person:aunt", "confidence": 0.6}
]3. 三元组过滤 (triage_batch)
目的: 从候选中选择最重要的 N 个三元组 (降低 token 成本)
# src/modules/extract/triage.py
def triage_batch(
candidates: List[RelationCandidate],
backend: Literal["llm", "tiny_clf", "none"],
threshold: float = 0.5,
budget: Optional[BudgetBook] = None,
cfg: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
"""
三阶段过滤:
1. 强制保留: 负向语句 (negation/correction) 必须保留
2. 评分: LLM 或 tiny classifier 给每个三元组打分
3. Top-K: 选择得分 ≥ threshold 的前 max_triples_per_turn 个
"""
# 阶段 1: 检测负向语句
for candidate in candidates:
corrections = detect_corrections(candidate["evidence"])
candidate["negation"] = corrections["negation"]
candidate["supersede"] = corrections["supersede"]
# 强制保留负向
force_keep = [c for c in candidates if c["negation"] or c["supersede"]]
# 阶段 2: 评分
if backend == "llm":
scores = _llm_score(candidates, budget)
# Prompt: "Rate importance (0-1): 'user met aunt'"
# Response: {"score": 0.85, "reason": "personal relationship"}
elif backend == "tiny_clf":
scores = _tiny_clf_score(candidates)
# 简单启发式:
# - 包含 PERSON: +0.3
# - 包含 works_at/lives_in: +0.2
# - 句子长度 < 10 words: +0.1
else: # "none"
scores = [1.0] * len(candidates)
# 阶段 3: Top-K 选择
scored = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)
# 强制保留 + Top-K
selected = force_keep + [
c for c, s in scored
if c not in force_keep and s >= threshold
][:max_triples_per_turn]
return selectedqwen_rerank_xenc 配置:
extraction:
triage:
enabled: true
backend: "tiny_clf" # 使用轻量级分类器 (非 LLM)
threshold: 0.5 # 最低得分阈值
max_triples_per_turn: 20 # 每条消息最多 20 个三元组
force_keep_negation: true # 强制保留负向语句4. 负向检测 (detect_corrections)
目的: 识别修正、撤回、否定语句 (对时间推理至关重要)
# src/modules/extract/relations.py
_NEGATION_CUES = (
"no longer", # "I no longer work at Google"
"not anymore", # "I'm not living there anymore"
"stopped", # "I stopped using iPhone"
"quit", # "I quit my job"
)
_SUPERSEDE_CUES = (
"changed to", # "I changed my email to ..."
"switched to", # "I switched to Android"
"correction", # "Correction: it was Tuesday"
)
_SOFT_SUPERSEDE_CUES = ("now", "actually")
def detect_corrections(evidence: str) -> CorrectionSignal:
"""
检测流程:
1. 查找否定线索 (negation)
2. 查找覆盖线索 (supersede)
3. 返回标记和置信度
"""
lower_text = evidence.lower()
# 否定检测
negation = any(cue in lower_text for cue in _NEGATION_CUES)
negation_cues = [cue for cue in _NEGATION_CUES if cue in lower_text]
# 覆盖检测
supersede = any(cue in lower_text for cue in _SUPERSEDE_CUES)
supersede_cues = [cue for cue in _SUPERSEDE_CUES if cue in lower_text]
soft_cues = [cue for cue in _SOFT_SUPERSEDE_CUES if cue in lower_text]
return {
"negation": negation,
"supersede": supersede,
"cues": negation_cues + supersede_cues,
"soft_cues": soft_cues,
"reason": ", ".join(negation_cues + supersede_cues + soft_cues)
}示例:
| 输入 | 检测结果 | 标记 |
|---|---|---|
| ”I work at Google” | - | - |
| “I no longer work at Google” | negation=True | will set record.end |
| ”I switched to Apple” | supersede=True | will close old edge |
| ”Actually, I work at Microsoft now” | supersede=True (soft) | will create new version |
5. 边版本构建 (EdgeVersion)
最终输出: GraphStore 中的 EdgeVersion 对象
# src/core/graph.py
@dataclass
class EdgeVersion:
"""
图边的单个版本 (支持时间旅行)
"""
id: str # edge:ent:user#met:aunt
src: str # person:user
rel: str # met
dst: str # person:aunt
evidence_id: str # msg#2023-03-04T22:00:00Z
source: Literal["USER", "ASSISTANT", "SYSTEM"]
record: Interval # 记录时间 (AR 视图)
valid: Interval # 有效时间 (AW 视图)
text: str # "user met aunt"
confidence: float # 0.75
metadata: Dict[str, Any] # triage info, offsets, etc.
# 构建流程 (memowrite.py)
for candidate in filtered_candidates:
edge = EdgeVersion(
id=_make_edge_id(candidate["src"], candidate["rel"], candidate["dst"]),
src=candidate["src"],
rel=candidate["rel"],
dst=candidate["dst"],
evidence_id=f"msg#{ts_iso}",
source="USER" if speaker == "user" else "ASSISTANT",
record=Interval(start=ts_iso, end=None), # 开放区间
valid=Interval(start=ts_iso, end=None), # 默认永久有效
text=f"{candidate['src']} {candidate['rel']} {candidate['dst']}",
confidence=candidate["confidence"],
metadata={
"offsets": candidate["offsets"],
"triage": candidate.get("triage", {}),
"negation": candidate.get("negation", False),
"supersede": candidate.get("supersede", False),
}
)
# 如果检测到否定/覆盖,关闭旧版本
if candidate["negation"]:
old_edge = graph.find_edge(src, rel, dst)
if old_edge:
old_edge.record.end = ts_iso
old_edge.valid.end = ts_iso检索粒度对比表
| 阶段 | 粒度 | 示例 ID | 数量 |
|---|---|---|---|
| 原始 LongMemEval | turn-level | turn_0 in session_0 | 12 turns in session_0 |
| Processed dialogue | message-level | msg#2023-03-04T22:00:00Z | 12 messages |
| GraphStore | fact-level (edge) | edge:ent:user#met:aunt | ~30 edges (from 12 msgs) |
| BM25/Vector Index | fact-level | edge:ent:user#met:aunt | 1401 docs (1403 facts) |
| Retrieval candidates | fact-level | edge:ent:user#met:aunt | Top-10 facts |
| Evidence IDs | message-level | msg#2023-03-04T22:00:00Z | 10 evidence IDs |
| Evaluation (fold) | doc-level | msg:2023-03-04t22:00z | 8 unique docs (有重复) |
| Gold evidence | message-level → doc-level | msg:2023-03-04t22:00z | 12 messages → 12 docs |
关键变换函数
ID 规范化
# tools/id_normalize.py
def normalize_id(raw_id: str) -> str:
"""
msg#2023-03-04T22:00:00Z#chunk-1
→ msg:2023-03-04t22:00:00z:chunk-1
"""
parsed = _parse_identifier(raw_id)
return parsed.to_string()
def fold_chunk_to_doc(normalized_id: str) -> str:
"""
msg:2023-03-04t22:00:00z:chunk-1
→ msg:2023-03-04t22:00z
移除 chunk 后缀,折叠到文档级
"""
parsed = _parse_identifier(normalized_id)
suffix = _remove_suffix_tokens(parsed.suffix)
return CanonicalId(parsed.document, suffix).to_string()时间戳规范化
# src/datasets/longmemeval.py
def _normalize_timestamp(raw: Any) -> str:
"""
支持多种输入格式:
- UNIX 时间戳: 1678056000
- ISO 字符串: "2023-03-04T22:00:00+00:00"
- 相对时间: "+3 days"
统一输出: "2023-03-04T22:00:00Z" (UTC)
"""
if isinstance(raw, (int, float)):
dt = datetime.fromtimestamp(raw, tz=timezone.utc)
elif isinstance(raw, str):
dt = dateutil.parser.isoparse(raw)
return dt.isoformat().replace('+00:00', 'Z')会话级 → 消息级展开
# src/tools/convert_longmemeval.py
def _write_dialogue_with_mapping(
path: Path,
dialogue: List[Dict]
) -> Tuple[Dict[str, List[str]], int]:
"""
为每个 session 构建 session_id → [msg_ids] 映射
"""
session_to_msg_ids = {}
for entry in dialogue:
ts_dt = _parse_timestamp(entry["ts"])
msg_id = _format_message_id(ts_dt) # "msg#2023-03-04T22:00:00Z"
session_id = entry.get("session_id", "unknown")
if session_id not in session_to_msg_ids:
session_to_msg_ids[session_id] = []
session_to_msg_ids[session_id].append(msg_id)
return session_to_msg_ids, len(dialogue)
# 使用映射展开 answer_session_ids
gold_session_ids = ["session_0"]
gold_msg_ids = []
for session_id in gold_session_ids:
gold_msg_ids.extend(session_to_msg_ids[session_id])
# gold_msg_ids: 12 条消息级 IDSPO 三元组提取核心代码
# 完整流程 (src/pipelines/memowrite.py)
# 1. 实体抽取
entities = extract_entities(
text=turn["text"],
sentences=turn["sentences"],
turn_id=turn_id,
doc_id=doc_id
)
# → List[Entity]: [{"type": "PERSON", "norm": "person:user", ...}]
# 2. 共指消解
coref_map = resolve_coref(turn, entities)
# → Dict[(start, end), entity_id]: {(10, 13): "person:user"}
# 3. 关系抽取
raw_candidates = extract_relations(turn)
# → List[RelationCandidate]: [{"src": "person:user", "rel": "met", "dst": "person:aunt", ...}]
# 4. 负向检测
for candidate in raw_candidates:
corrections = detect_corrections(candidate["evidence"])
candidate["negation"] = corrections["negation"]
candidate["supersede"] = corrections["supersede"]
# 5. 三元组过滤
filtered = triage_batch(
raw_candidates,
backend="tiny_clf",
threshold=0.5,
cfg={"max_triples_per_turn": 20}
)
# → 保留前 20 个重要三元组 + 强制保留否定语句
# 6. 构建边版本
for candidate in filtered:
edge = EdgeVersion(
id=_make_edge_id(candidate["src"], candidate["rel"], candidate["dst"]),
src=candidate["src"],
rel=candidate["rel"],
dst=candidate["dst"],
evidence_id=f"msg#{ts_iso}",
record=Interval(start=ts_iso, end=None),
valid=Interval(start=ts_iso, end=None),
text=f"{candidate['src']} {candidate['rel']} {candidate['dst']}",
confidence=candidate["confidence"]
)
graph.add_edge(edge)配置文件对应关系
qwen_rerank_xenc.yaml → 代码模块映射
| 配置项 | 配置值 | 对应模块/函数 | 说明 |
|---|---|---|---|
| 数据转换 | |||
dataset.name | ”longmemeval” | src/datasets/longmemeval.py | LongMemEval 数据集解析 |
dataset.split | ”test” | convert_longmemeval::iter_samples() | 迭代测试集样本 |
| 实体&关系抽取 | |||
extraction.triage.backend | ”tiny_clf” | src/modules/extract/triage.py::triage_batch() | 启发式三元组评分 |
extraction.triage.threshold | 0.5 | 过滤逻辑 | 只保留得分 ≥ 0.5 的三元组 |
extraction.triage.max_triples_per_turn | 20 | Top-K 选择 | 每条消息最多 20 个边 |
extraction.triage.force_keep_negation | true | detect_corrections() 检测结果 | 强制保留否定语句 |
| 检索 (BM25) | |||
retrieval.use_bm25 | true | src/modules/retrieval/eagle_v1.py::build_bm25_index() | BM25 索引构建 |
retrieval.bm25.k1 | 1.5 | BM25 超参数 | 词频饱和度 |
retrieval.bm25.b | 0.75 | BM25 超参数 | 文档长度惩罚 |
| 检索 (向量) | |||
retrieval.use_embeddings | true | eagle_v1.py::build_embedding_index() | 向量索引构建 |
retrieval.emb_backend | ”ollama” | src/utils/embeddings/ollama.py::OllamaEmbedder | Ollama HTTP API |
retrieval.emb_model | ”qwen3-embedding:0.6b” | Ollama 模型名 | Qwen3 嵌入模型 (768维) |
retrieval.emb_max_concurrency | 4 | asyncio.Semaphore(4) | 并发请求限制 |
| 检索 (RRF 融合) | |||
retrieval.fusion.k_bm25 | 64 | eagle_v1.py::rrf_fusion() | BM25 召回 Top-64 |
retrieval.fusion.k_e5 | 64 | 同上 | 向量召回 Top-64 |
retrieval.fusion.weight | 0.5 | RRF 权重 | BM25 和向量各占 50% |
| 重排 (Cross-encoder) | |||
retrieval.rerank.backend | ”cross_encoder” | src/modules/retrieval/rerankers/cross_encoder.py | MS MARCO Cross-encoder |
retrieval.rerank.top | 32 | reranker.rerank(candidates[:32]) | 只重排前 32 个候选 |
retrieval.rerank.weight | 0.5 | 最终分数 = fusion * 0.5 + rerank * 0.5 | 融合与重排各占 50% |
| 图扩展 | |||
retrieval.graph_expansion.enabled | true | eagle_v1.py::graph_expansion() | 基于图结构的候选扩展 |
retrieval.graph_expansion.depth | 2 | BFS 深度限制 | 最多扩展 2 跳邻居 |
retrieval.graph_expansion.fanout | 4 | 每个节点最多扩展 4 个邻居 | 控制扩展规模 |
数据流中的配置决策点
graph TD A[qwen_rerank_xenc.yaml] --> B{extraction.triage.backend} B -->|tiny_clf| C[启发式评分<br/>零成本] B -->|llm| D[LLM 评分<br/>高成本] B -->|none| E[保留所有<br/>无过滤] A --> F{retrieval.emb_backend} F -->|ollama| G[Ollama HTTP API<br/>CPU-friendly] F -->|e5| H[本地 sentence-transformers<br/>需要 GPU] A --> I{retrieval.rerank.backend} I -->|cross_encoder| J[MS MARCO model<br/>高精度] I -->|none| K[仅融合分数<br/>快速] C --> L[1403 facts] G --> L J --> M[Top-10 facts] M --> N[Fold to doc-level] N --> O[Hit@10 = 4/12]
检索诊断数据
样本 71017276 的检索统计:
{
"store_counters": {
"facts_open": 1403, // 开放边数量(检索候选池)
"events_visible": 180, // 可见事件数量
"events_total": 180
},
"index_stats": {
"bm25_docs": 1401, // BM25 索引文档数
"emb_docs": 1401, // 向量索引文档数
"vector_dim": 768 // Qwen3 向量维度
},
"retrieval_stats": {
"seeds": 32, // 初始种子数
"expanded": 53, // 扩展后候选数
"visited_nodes": 41, // 访问节点数
"visited_facts": 41 // 访问事实数
},
"rerank": {
"backend": "cross_encoder",
"pairs": 32, // 重排对数
"weight": 0.5, // 重排权重
"p50_latency_ms": 164.02 // P50 延迟
},
"fusion_top1": {
"id": "edge:ent:user#uses:9d05eb95",
"bm25_score": 16.712,
"vector_score": 0.458,
"fusion_score": 0.211,
"rerank_score": 1.000,
"final_score": 1.000
}
}Rank 1 候选的完整追踪
原始 Turn (LongMemEval)
{
"session_id": "session_0",
"turn_index": 0,
"role": "user",
"timestamp": 0, // 相对秒数
"text": "I met my aunt and received a crystal chandelier"
}Processed Message
{
"ts": "2023-03-04T22:00:00Z",
"role": "user",
"text": "I met my aunt and received a crystal chandelier",
"session_id": "session_0"
}Extracted Edges (部分)
提取方法: 混合确定性规则 + 依存句法分析
[
EdgeVersion(
id="edge:ent:user#met:aunt",
evidence_id="msg#2023-03-04T22:00:00Z",
text="user met aunt",
record=Interval(start="2023-03-04T22:00:00Z", end=None)
),
EdgeVersion(
id="edge:ent:user#received:crystal_chandelier",
evidence_id="msg#2023-03-04T22:00:00Z",
text="user received crystal chandelier",
record=Interval(start="2023-03-04T22:00:00Z", end=None)
),
EdgeVersion(
id="edge:crystal_chandelier#from:aunt",
evidence_id="msg#2023-03-04T22:00:00Z",
text="crystal chandelier from aunt",
record=Interval(start="2023-03-04T22:00:00Z", end=None)
),
// ... 更多边
]具体实现细节 (见下方”SPO 三元组提取方法”章节)
Retrieval Result
{
"rank": 1,
"edge_id": "edge:ent:user#uses:9d05eb95", // 实际的边ID(哈希)
"evidence_id": "msg#2023-03-04T22:00:00Z",
"source": "USER",
"text": "user met aunt", // SPO 文本
"score": 1.0000,
"lexical": 16.712, // BM25 分数
"vector": 0.458, // Qwen3 向量分数
"freshness": 0.0, // 新鲜度分数
"salience": 0.0, // 显著性分数
"doc_id": "msg:2023-03-04t22:00z", // 折叠后的文档ID
"intervals": {
"record": {
"start": "2023-03-04T22:00:00Z",
"end": null
},
"valid": {
"start": "2023-03-04T22:00:00Z",
"end": null
}
}
}Evaluation (Hit@10)
# Gold evidence (12 docs)
gold_docs = {
"msg:2023-03-04t22:00z",
"msg:2023-03-04t22:01z",
"msg:2023-03-04t22:02z",
# ... 共 12 个
}
# Top-10 candidates (8 unique docs after folding)
pred_docs = {
"msg:2023-03-04t22:00z", # ✓ HIT
"msg:2023-03-04t22:05z", # ✓ HIT
"msg:2023-03-04t22:09z", # ✓ HIT
"msg:2023-03-04t22:03z", # ✓ HIT
"msg:2023-03-04t12:03z", # ✗ MISS
# ... 共 8 个唯一 doc
}
# Hit@10 计算
hit_count = len(gold_docs & pred_docs[:10]) # 4
hit_rate = hit_count / len(gold_docs) # 4/12 = 33.3%关键发现
1. 粒度不一致是设计选择
- 检索: fact-level (细粒度) → 更好的语义匹配
- Gold: message-level (中粒度) → 人工标注成本
- 评测: doc-level (粗粒度) → 公平比较
2. Evidence ID 是追踪关键
每个 edge 的 evidence_id 字段保留了指向原始消息的引用,使得:
- 可以从 fact 追溯回原始 turn
- 可以将 fact-level 检索结果折叠到 doc-level 评测
- 支持 Reader 阶段加载原始文本
3. 一对多关系
1 turn → 1 message → N edges
示例:
- 1 个 turn: “I met my aunt and received a crystal chandelier”
- 1 个 message:
msg#2023-03-04T22:00:00Z - ~3 个 edges:
user#met:aunt,user#received:chandelier,chandelier#from:aunt
4. 时间区间支持视图切换
每个 edge 带有 record 和 valid 时间区间:
- AR (as-recorded): 仅使用
record.start ≤ query_time - AW (as-world): 使用
valid.start ≤ query_time < valid.end - 支持时间旅行和反事实推理
5. 混合确定性方法 (无需大模型)
实体抽取:
- 正则规则 (EMAIL, URL, PHONE) → 100% 精度
- 大写序列 + 启发式分类 → ~70-85% 精度
- 可选 Flair NER 增强 → ~90% 精度
关系抽取:
- 正则模板 (~15 个高频模式) → 覆盖 works_at, lives_in, owns, uses, likes
- 依存句法 (spacy DependencyMatcher) → 捕获 SVO 结构
- 角色词典 (~20 个职业名词) → persona.role 关系
三元组过滤:
- tiny_clf (轻量级启发式) → 无 API 调用成本
- 强制保留负向语句 → 保证时间推理正确性
- Top-K 选择 → 控制 GraphStore 规模
优势:
- 零 LLM 成本 (triage backend=“tiny_clf”)
- 完全可复现 (确定性规则)
- 支持 CPU-only 环境 (无需 GPU)
配置文件对应:
# qwen_rerank_xenc.yaml
extraction:
triage:
enabled: true
backend: "tiny_clf" # 轻量级分类器 (非 LLM)
threshold: 0.5
max_triples_per_turn: 20
force_keep_negation: true # 保留否定语句
force_keep_min_score: 0.3配置文件对应关系
qwen_rerank_xenc.yaml
retrieval:
use_bm25: true # → BM25 索引 (1401 docs)
use_embeddings: true # → Qwen3 向量索引 (1401 docs)
emb_backend: "ollama" # → Ollama HTTP API
rerank:
backend: "cross_encoder" # → Cross-encoder 模型
top: 32 # → 重排前 32 个候选
weight: 0.5 # → 重排分数权重 50%对应模块
# use_bm25 → src/modules/retrieval/eagle_v1.py::build_bm25_index()
if use_bm25:
bm25_index = build_bm25_index(fact_docs, cache_dir)
bm25_results = bm25_index.search(query, k=k_bm25)
# use_embeddings + emb_backend="ollama"
# → src/modules/retrieval/eagle_v1.py::build_embedding_index()
if use_embeddings:
emb_index = build_embedding_index(
fact_docs,
cache_dir,
backend="ollama",
model="qwen3-embedding:0.6b"
)
vector_results = emb_index.search(query, k=k_e5)
# rerank.backend="cross_encoder"
# → src/modules/retrieval/rerankers/cross_encoder.py
if rerank_backend == "cross_encoder":
reranker = CrossEncoderReranker()
reranked = reranker.rerank(query, candidates, top=rerank_top)
# rerank.weight=0.5
# → 最终分数 = fusion_score * (1 - weight) + rerank_score * weight
final_score = fusion_score * 0.5 + rerank_score * 0.5用于 Slides 的关键图表
图1: 数据流总览
LongMemEval JSON (turn-level)
↓ convert_longmemeval
Processed JSONL (message-level)
↓ run_memowrite / extract_graph
GraphStore Snapshot (fact-level: 1403 edges)
↓ eagle_v1.search
Top-10 Facts (with evidence_id)
↓ fold_chunk_to_doc
Top-8 Docs (after deduplication)
↓ compare with Gold (12 docs)
Hit@10 = 4/12 (33.3%)
图2: 粒度对比
┌─────────────────┬────────────┬──────────────────┬────────┐
│ 阶段 │ 粒度 │ 示例 ID │ 数量 │
├─────────────────┼────────────┼──────────────────┼────────┤
│ 原始 LongMemEval│ turn │ turn_0 │ 12 │
│ Processed │ message │ msg#...T22:00Z │ 12 │
│ GraphStore │ fact(edge) │ edge:user#met: │ ~30 │
│ 检索候选池 │ fact │ - │ 1403 │
│ Top-10 │ fact │ - │ 10 │
│ 评测 (fold) │ doc │ msg:...t22:00z │ 8 │
│ Gold │ doc │ msg:...t22:00z │ 12 │
└─────────────────┴────────────┴──────────────────┴────────┘
图3: Rank 1 候选追踪
原始 Turn:
┌──────────────────────────────────────────────────┐
│ role: user │
│ time: session_0 + 0s │
│ text: "I met my aunt and received a crystal │
│ chandelier" │
└──────────────────────────────────────────────────┘
↓
Processed Message:
┌──────────────────────────────────────────────────┐
│ ts: "2023-03-04T22:00:00Z" │
│ role: "user" │
│ text: "I met my aunt..." │
│ session_id: "session_0" │
└──────────────────────────────────────────────────┘
↓
Extracted Edges:
┌──────────────────────────────────────────────────┐
│ edge:user#met:aunt │
│ evidence_id: "msg#2023-03-04T22:00:00Z" │
│ text: "user met aunt" │
│ │
│ edge:user#received:chandelier │
│ evidence_id: "msg#2023-03-04T22:00:00Z" │
│ text: "user received crystal chandelier" │
│ │
│ edge:chandelier#from:aunt │
│ evidence_id: "msg#2023-03-04T22:00:00Z" │
│ text: "crystal chandelier from aunt" │
└──────────────────────────────────────────────────┘
↓
Retrieval Result (Rank 1):
┌──────────────────────────────────────────────────┐
│ edge_id: "edge:user#uses:9d05eb95" │
│ evidence_id: "msg#2023-03-04T22:00:00Z" │
│ doc_id: "msg:2023-03-04t22:00z" │
│ score: 1.0000 (BM25=16.7 + Vec=0.46 + RRnk=1.0)│
│ ✓ HIT │
└──────────────────────────────────────────────────┘
图4: 模块交互
runner.py
│
├─→ get_dataset_handler() ────→ LongMemEval parser
│ ↓
├─→ handler.convert() ────────→ convert_longmemeval
│ │ ↓
│ └──→ writes dialogue.jsonl + qas.jsonl
│
├─→ _ensure_snapshot() ────────→ run_memowrite
│ │ ↓
│ └──→ extract_graph() ────→ GraphStore
│ ↓
└─→ memosearch_run() ──────────→ memosearch.py
│ ↓
└──→ eagle_v1.search() ──→ BM25 + Qwen3 + Rerank
│ ↓
└──────────────→ Top-10 facts
总结
这个端到端追踪展示了 MemoTrace 如何通过多层粒度变换,从原始的对话 turns 一步步加工为可检索的知识图谱,并最终通过混合检索和重排找到相关证据。
关键设计原则:
- 粒度分离: 检索 (fact) ≠ Gold (message) ≠ 评测 (doc)
- 可追溯性: 每个 edge 保留
evidence_id指向原始消息 - 时间语义:
record和valid区间支持多视图 - 混合检索: BM25 (关键词) + Vector (语义) + Rerank (精排)
- 确定性提取: 正则 + 依存句法 + 启发式 (无需 LLM)
方法创新:
| 模块 | 传统方法 | MemoTrace 方法 | 优势 |
|---|---|---|---|
| 实体抽取 | NER 模型 (GPU) | 正则 + 启发式 + 可选 Flair | CPU-friendly, 零成本 |
| 关系抽取 | LLM prompt (昂贵) | 正则模板 + 依存句法 | 完全确定性, 可复现 |
| 三元组过滤 | 人工标注 | tiny_clf 启发式 | 自动化, 无标注成本 |
| 负向检测 | 忽略或需要 LLM | 线索词典匹配 | 100% 召回关键修正 |
| 检索粒度 | 文档级/段落级 | 事实级 (SPO 三元组) | 更精准的语义匹配 |
| 时间建模 | 单时间戳 | 双区间 (record + valid) | 支持 AR/AW 视图 |
适用场景:
- 长期对话理解
- 时间敏感的 QA
- 知识密集型检索
- 多轮对话推理
- 低资源环境部署 (CPU-only, 无 LLM API)