MemoTrace 端到端样本追踪示例

从原始 LongMemEval Turns 到检索结果

目标: 追踪样本 71017276 的 Rank 1 命中候选 (msg:2023-03-04t22:00z)
配置: qwen_rerank_xenc (Ollama Qwen3 embeddings + Cross-encoder rerank)
问题: “How many weeks ago did I meet up with my aunt and receive the crystal chandelier?”


模块调用链

runner.py::run_retrieval_only()
  ├─> [STEP 1] get_dataset_handler()           # 获取 LongMemEval handler
  ├─> [STEP 2] handler.convert()               # 原始 JSON → processed JSONL
  │    └─> convert_longmemeval()
  │         ├─> LongMemEval.iter_samples()     # 解析原始 JSON
  │         ├─> _extract_haystack_dialogue()   # 提取 haystack sessions
  │         ├─> _normalize_timestamp()         # 时间戳规范化
  │         ├─> _normalize_role()              # 角色归一化
  │         └─> _write_dialogue_with_mapping() # 写入 dialogue.jsonl + session→msg 映射
  │
  ├─> [STEP 3] _ensure_snapshot()              # 创建 GraphStore snapshot
  │    └─> run_memowrite()
  │         ├─> extract.py::extract_graph()    # 从 dialogue 抽取 SPO 三元组
  │         ├─> assimilate.py::assimilate()    # 实体消歧、同化
  │         └─> snapshot.py::save()            # 保存 EdgeVersion + Events
  │
  └─> [STEP 4] memosearch_run()                # 执行检索
       └─> memosearch.py::run()
            ├─> load_snapshot()                # 加载 GraphStore
            ├─> current_fact_versions()        # 提取开放边 (1403 facts)
            ├─> slice_graph()                  # 根据视图 (AW) 过滤可见性
            └─> eagle_v1.search()              # 检索引擎
                 ├─> build_bm25_index()        # BM25 索引构建 (1401 docs)
                 ├─> build_embedding_index()   # Qwen3 向量索引 (1401 docs)
                 ├─> bm25_search()             # BM25 检索 (k=64)
                 ├─> vector_search()           # 向量检索 (k=64)
                 ├─> rrf_fusion()              # RRF 融合
                 ├─> graph_expansion()         # 图扩展 (depth=2, fanout=4)
                 ├─> cross_encoder_rerank()    # Cross-encoder 重排 (top=32)
                 └─> pack_facts()              # 打包 Top-10

数据变换流程(倒推)

最终输出 (Rank 1)

文件: reports/retrieval_benchmark/.../per_query.jsonl

{
  "candidates": [{
    "rank": 1,
    "raw_id": "msg#2023-03-04T22:00:00Z",
    "canonical_id": "msg:2023-03-04t22:00:00z",
    "doc_id": "msg:2023-03-04t22:00z",
    "score": 1.0000,
    "source": "USER",
    "span": "2023-03-04T22:00:00Z-",
    "kind": "fact"
  }]
}

关键字段解析:

  • raw_id: 原始边 ID (来自 EdgeVersion)
  • canonical_id: 规范化后的 ID (normalize_id)
  • doc_id: 文档级 ID (fold_chunk_to_doc)
  • score: 最终分数 (fusion + rerank)
  • kind: 类型 (fact 或 event)

⬆️ STEP 4: 检索输出 (memosearch.py)

输入:

  • Query: “How many weeks ago did I meet up with my aunt and receive the crystal chandelier?”
  • Snapshot: sample_dir/snapshot.jsonl
  • View: AW (as-world)
  • Topk: 10

处理:

  1. 加载 GraphStore:

    facts = load_snapshot(snapshot_path)
    # facts: Dict[str, EdgeVersion]
    # 包含 1403 个开放边
  2. 过滤可见性 (AW 视图):

    slc = slice_graph(facts, events, view="AW", moment=query_time)
    # slc.facts: 1403 个 facts (record.end is None)
    # slc.events: 180 个 events (visible at query_time)
  3. 检索 (eagle_v1.search):

    result = search(query, view="AW", facts=slc.facts, events=slc.events, topk=10)
    # result["facts"]: List[Dict] - Top-10 facts with scores
  4. 候选构建:

    for fact in result["facts"]:
        raw_id = fact["evidence_id"]          # "msg#2023-03-04T22:00:00Z"
        canonical = normalize_id(raw_id)      # "msg:2023-03-04t22:00:00z"
        doc_id = fold_chunk_to_doc(canonical) # "msg:2023-03-04t22:00z"
        score = fact["score"]                 # 1.0000 (after rerank)

输出: retrieval payload with facts, events, evidence_ids, diagnostics


⬆️ STEP 3: GraphStore Snapshot (memowrite.py)

输入: dialogue.jsonl (processed messages)

处理:

  1. 提取 SPO 三元组 (extract_graph):

    # 从消息: "I met my aunt and received a crystal chandelier"
    # 提取:
    edges = [
        EdgeVersion(
            id="edge:ent:user#met:aunt",
            src="ent:user",
            rel="met",
            dst="aunt",
            evidence_id="msg#2023-03-04T22:00:00Z",
            source="USER",
            record=Interval(start=parse("2023-03-04T22:00:00Z"), end=None),
            valid=Interval(start=parse("2023-03-04T22:00:00Z"), end=None),
            text="user met aunt"
        ),
        EdgeVersion(
            id="edge:ent:user#received:crystal_chandelier",
            src="ent:user",
            rel="received",
            dst="crystal_chandelier",
            evidence_id="msg#2023-03-04T22:00:00Z",
            source="USER",
            record=Interval(...),
            valid=Interval(...),
            text="user received crystal chandelier"
        ),
        # ... 更多边
    ]
  2. 实体消歧 (assimilate):

    # 合并同义实体
    "aunt""my aunt""the aunt"
    # 生成 alias_of 边
  3. 保存 snapshot:

    save_snapshot(edges, events, snapshot_path)
    # snapshot.jsonl: 每行一个 EdgeVersion (JSON)

关键点:

  • 1条消息 → 多个 edges
  • 每个 edge 保留 evidence_id 指向原始消息
  • recordvalid 时间区间支持 AR/AW 视图

输出: snapshot.jsonl (1403 edges + 180 events)


⬆️ STEP 2: 数据转换 (convert_longmemeval)

输入: 原始 LongMemEval JSON

{
  "question_id": "71017276",
  "question": "How many weeks ago did I meet up with my aunt...",
  "question_date": "2023-04-01T08:09:00+00:00",
  "haystack_sessions": [
    {
      "session_id": "session_0",
      "session_date": "2023-03-04T22:00:00+00:00",
      "turns": [
        {
          "role": "user",
          "timestamp": 0,  // 相对时间(秒)
          "text": "I met my aunt and received a crystal chandelier"
        },
        {
          "role": "assistant",
          "timestamp": 1,
          "text": "That sounds lovely! Tell me more about it."
        },
        // ... 更多 turns
      ]
    },
    // ... 更多 sessions
  ],
  "answer_session_ids": ["session_0"]  // session 级别的答案标注
}

处理:

  1. 时间戳规范化:

    base_time = parse("2023-03-04T22:00:00+00:00")  # session_date
    for turn in session.turns:
        absolute_time = base_time + timedelta(seconds=turn.timestamp)
        # turn 0: 2023-03-04T22:00:00Z
        # turn 1: 2023-03-04T22:00:01Z
  2. 角色归一化:

    role = turn.role.lower()
    if role in {"human", "client"}:
        role = "user"
    elif role in {"bot", "agent"}:
        role = "assistant"
  3. 消息 ID 生成:

    msg_id = f"msg#{absolute_time.isoformat()}"
    # "msg#2023-03-04T22:00:00Z"
  4. 会话级 → 消息级映射:

    session_to_msg_ids = {"session_0": [
        "msg#2023-03-04T22:00:00Z",
        "msg#2023-03-04T22:00:01Z",
        # ... 12 条消息
    ]}
     
    # 展开 answer_session_ids
    gold_evidence = []
    for session_id in ["session_0"]:
        gold_evidence.extend(session_to_msg_ids[session_id])
    # gold_evidence: 12 条消息级 ID
  5. 写入 processed 文件:

    # dialogue.jsonl (每条消息一行)
    {
      "ts": "2023-03-04T22:00:00Z",
      "role": "user",
      "text": "I met my aunt and received a crystal chandelier",
      "session_id": "session_0"
    }
     
    # qas.jsonl (每个问题一行)
    {
      "id": "71017276",
      "query": "How many weeks ago...",
      "ts": "2023-04-01T08:09:00+00:00",
      "mode": "as-world",
      "gold_evidence": [
        "msg#2023-03-04T22:00:00Z",
        "msg#2023-03-04T22:00:01Z",
        // ... 12
      ]
    }

输出:

  • .cache/data/longmemeval/processed/test/dialogs/71017276.jsonl
  • .cache/data/longmemeval/processed/test/qas/test.jsonl

⬆️ STEP 1: 原始数据

文件: .cache/data/longmemeval/raw/LongMemEval_test.json

样本 71017276 的原始 turns:

#Time (relative)RoleUtterance
0+0suserI met my aunt and received a crystal chandelier
1+1sassistantThat sounds lovely! Tell me more about it.
2+2suserIt has intricate designs and hangs beautifully
3+3sassistantWhere did you hang it?
4+4suserIn the dining room, above the table
11+11sassistantI’m sure it looks stunning!

会话信息:

  • Session ID: session_0
  • Base time: 2023-03-04T22:00:00+00:00
  • Answer sessions: ["session_0"] (session 级别)

问题信息:

  • Question: “How many weeks ago did I meet up with my aunt and receive the crystal chandelier?”
  • Query time: 2023-04-01T08:09:00+00:00 (4周后)
  • Expected answer: “4 weeks ago”

SPO 三元组提取方法

整体流程

消息文本 "I met my aunt and received a crystal chandelier"
    ↓
[STEP 1] 预处理 (preprocess_turn)
    ├─> 分句: utils.parser.sentences
    ├─> 实体抽取: extract_entities
    └─> 共指消解: resolve_coref
    ↓
[STEP 2] 关系抽取 (extract_relations)
    ├─> 正则规则: REGEX_PATTERNS
    ├─> 依存句法: DependencyMatcher (spacy)
    └─> 语义规则: SVO 启发式
    ↓
[STEP 3] 三元组过滤 (triage_batch)
    ├─> 负向检测: detect_corrections
    ├─> 重要性评分: triage backend (LLM/tiny_clf)
    └─> Top-K 选择: max_triples_per_turn
    ↓
[STEP 4] 边版本构建 (_make_edge_version)
    ├─> 生成边 ID: hash(src+rel+dst)
    ├─> 时间区间: record/valid intervals
    └─> 保留证据: evidence_id + offsets

1. 实体抽取 (extract_entities)

方法: 混合确定性规则 + 可选的 Flair NER 模型

# src/modules/extract/entities.py
 
def extract_entities(
    text: str,
    sentences: List[str],
    turn_id: int = 0,
    doc_id: str = "",
    config: Optional[ExtractionConfig] = None,
) -> List[Entity]:
    """
    提取实体的多阶段流程:
    1. 正则规则抽取 (EMAIL, URL, PHONE, MONEY, HANDLE, DATE)
    2. 大写序列抽取 (ORG/PERSON 候选)
    3. 可选: Flair NER 模型增强
    4. 启发式分类 (PERSON vs ORG vs GPE)
    5. 别名归一化和去重
    """
    
    entities = []
    
    # 正则规则 (高精度, 100% 确定性)
    for match in _EMAIL_RE.finditer(text):
        entities.append({
            "type": "EMAIL",
            "text": match.group(1),
            "start": match.start(1),
            "end": match.end(1),
            "confidence": 1.0
        })
    
    # 同理: URL, PHONE, MONEY, HANDLE, DATE_TEXT
    
    # 大写序列抽取 (启发式)
    for match in _CAP_SEQ_RE.finditer(text):
        span = match.group(0).strip()
        # 排除噪声: 单字母、停用词、etc
        if len(span) > 1 and not span.lower() in _STOPWORDS:
            # 分类逻辑
            if _looks_like_person(span, context):
                type = "PERSON"
            elif _has_legal_suffix(span):
                type = "ORG"
            elif _follows_location_prep(span, context):
                type = "GPE"
            else:
                type = "ORG"  # 默认
            
            entities.append({
                "type": type,
                "text": span,
                "start": match.start(),
                "end": match.end(),
                "confidence": 0.7
            })
    
    # 归一化: "my aunt" → "aunt", "Aunt Mary" → "Mary"
    normalized_entities = _normalize_and_deduplicate(entities)
    
    return normalized_entities

关键启发式:

规则示例类型置信度
跟在 “met”, “called”, “asked” 后”I met MaryPERSON0.85
包含 Inc/Ltd/Corp 后缀Apple Inc.ORG0.9
跟在 “in”, “at”, “from” 后”lives in BostonGPE0.8
邮箱/电话/URL[email protected]EMAIL1.0
@handle”@john_doe”HANDLE1.0

2. 关系抽取 (extract_relations)

方法: 正则模板 + 依存句法 + 角色词典

# src/modules/extract/relations.py
 
REGEX_PATTERNS = [
    # works_at: "work(s|ed|ing) at/for <ORG>"
    (r"\bwork(?:s|ed|ing)?\s+(?:at|for)\s+(?P<dst>[A-Z][\w&.,'\- ]+)", "works_at"),
    
    # lives_in: "live(s|d) in/at/near <GPE>"
    (r"\blive(?:s|d|ing)?\s+(?:in|at|near)\s+(?P<dst>[A-Z][\w.,'\- ]+)", "lives_in"),
    
    # owns: "own(s|ed) <PRODUCT>"
    (r"\bown(?:s|ed|ing)?\s+(?P<dst>[\w&.,'\- ]+)", "owns"),
    
    # uses: "use(s|d) <PRODUCT>"
    (r"\buse(?:s|d|ing)?\s+(?P<dst>[\w&.,'\- ]+)", "uses"),
    
    # likes/loves: "like(s) / love(s) <OBJECT>"
    (r"\b(?:like|likes|love|loves|enjoy|enjoys)\s+(?P<dst>[^.;,]+)", "likes"),
]
 
def extract_relations(turn: PreprocTurn) -> List[RelationCandidate]:
    """
    提取流程:
    1. 遍历每个句子
    2. 应用正则模板匹配
    3. 查找句子中的主语实体 (PERSON)
    4. 构建候选三元组
    """
    
    candidates = []
    entities = turn["entities"]
    
    for sentence, (sent_start, sent_end) in zip(sentences, offsets):
        # 正则匹配
        for pattern, rel_type in REGEX_PATTERNS:
            for match in pattern.finditer(sentence):
                dst = match.group("dst").strip()
                
                # 查找主语 (在 dst 之前的 PERSON 实体)
                src_entity = find_person_before(
                    position=sent_start + match.start(),
                    sentence_start=sent_start,
                    entities=entities
                )
                
                if src_entity:
                    candidates.append({
                        "src": src_entity["norm"],  # "person:user"
                        "rel": rel_type,            # "uses"
                        "dst": dst,                 # "crystal chandelier"
                        "evidence": sentence,
                        "offsets": (match.start(), match.end()),
                        "confidence": 0.65
                    })
        
        # 角色词典匹配
        if any(role in sentence.lower() for role in _ROLE_LEXICON):
            # _ROLE_LEXICON = {"engineer", "researcher", "manager", ...}
            for role in _ROLE_LEXICON:
                if role in sentence.lower():
                    src = find_person_before(...)
                    if src:
                        candidates.append({
                            "src": src["norm"],
                            "rel": "persona.role",
                            "dst": role,
                            ...
                        })
    
    return candidates

示例: “I met my aunt and received a crystal chandelier”

# 分句: ["I met my aunt and received a crystal chandelier"]
# 实体: [
#   {"type": "PERSON", "norm": "person:user", "text": "I"},
#   {"type": "PERSON", "norm": "person:aunt", "text": "aunt"}
# ]
 
# 关系匹配:
# 1. 正则未命中 (没有 "work at", "live in" 等模板)
# 2. 依存句法分析 (spacy DependencyMatcher):
#    - "I" (nsubj) → "met" (verb) → "aunt" (dobj)
#    - 生成: ("person:user", "met", "person:aunt")
#    
#    - "I" (nsubj) → "received" (verb) → "chandelier" (dobj)
#    - 生成: ("person:user", "received", "crystal_chandelier")
#
# 3. 语义推理:
#    - "crystal chandelier" 与 "aunt" 在同一句
#    - 推断: ("crystal_chandelier", "from", "person:aunt")
 
# 最终候选:
candidates = [
    {"src": "person:user", "rel": "met", "dst": "person:aunt", "confidence": 0.75},
    {"src": "person:user", "rel": "received", "dst": "crystal_chandelier", "confidence": 0.75},
    {"src": "crystal_chandelier", "rel": "from", "dst": "person:aunt", "confidence": 0.6}
]

3. 三元组过滤 (triage_batch)

目的: 从候选中选择最重要的 N 个三元组 (降低 token 成本)

# src/modules/extract/triage.py
 
def triage_batch(
    candidates: List[RelationCandidate],
    backend: Literal["llm", "tiny_clf", "none"],
    threshold: float = 0.5,
    budget: Optional[BudgetBook] = None,
    cfg: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
    """
    三阶段过滤:
    1. 强制保留: 负向语句 (negation/correction) 必须保留
    2. 评分: LLM 或 tiny classifier 给每个三元组打分
    3. Top-K: 选择得分 ≥ threshold 的前 max_triples_per_turn 个
    """
    
    # 阶段 1: 检测负向语句
    for candidate in candidates:
        corrections = detect_corrections(candidate["evidence"])
        candidate["negation"] = corrections["negation"]
        candidate["supersede"] = corrections["supersede"]
    
    # 强制保留负向
    force_keep = [c for c in candidates if c["negation"] or c["supersede"]]
    
    # 阶段 2: 评分
    if backend == "llm":
        scores = _llm_score(candidates, budget)
        # Prompt: "Rate importance (0-1): 'user met aunt'"
        # Response: {"score": 0.85, "reason": "personal relationship"}
    
    elif backend == "tiny_clf":
        scores = _tiny_clf_score(candidates)
        # 简单启发式:
        # - 包含 PERSON: +0.3
        # - 包含 works_at/lives_in: +0.2
        # - 句子长度 < 10 words: +0.1
    
    else:  # "none"
        scores = [1.0] * len(candidates)
    
    # 阶段 3: Top-K 选择
    scored = sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)
    
    # 强制保留 + Top-K
    selected = force_keep + [
        c for c, s in scored 
        if c not in force_keep and s >= threshold
    ][:max_triples_per_turn]
    
    return selected

qwen_rerank_xenc 配置:

extraction:
  triage:
    enabled: true
    backend: "tiny_clf"         # 使用轻量级分类器 (非 LLM)
    threshold: 0.5              # 最低得分阈值
    max_triples_per_turn: 20    # 每条消息最多 20 个三元组
    force_keep_negation: true   # 强制保留负向语句

4. 负向检测 (detect_corrections)

目的: 识别修正、撤回、否定语句 (对时间推理至关重要)

# src/modules/extract/relations.py
 
_NEGATION_CUES = (
    "no longer",      # "I no longer work at Google"
    "not anymore",    # "I'm not living there anymore"
    "stopped",        # "I stopped using iPhone"
    "quit",           # "I quit my job"
)
 
_SUPERSEDE_CUES = (
    "changed to",     # "I changed my email to ..."
    "switched to",    # "I switched to Android"
    "correction",     # "Correction: it was Tuesday"
)
 
_SOFT_SUPERSEDE_CUES = ("now", "actually")
 
def detect_corrections(evidence: str) -> CorrectionSignal:
    """
    检测流程:
    1. 查找否定线索 (negation)
    2. 查找覆盖线索 (supersede)
    3. 返回标记和置信度
    """
    
    lower_text = evidence.lower()
    
    # 否定检测
    negation = any(cue in lower_text for cue in _NEGATION_CUES)
    negation_cues = [cue for cue in _NEGATION_CUES if cue in lower_text]
    
    # 覆盖检测
    supersede = any(cue in lower_text for cue in _SUPERSEDE_CUES)
    supersede_cues = [cue for cue in _SUPERSEDE_CUES if cue in lower_text]
    soft_cues = [cue for cue in _SOFT_SUPERSEDE_CUES if cue in lower_text]
    
    return {
        "negation": negation,
        "supersede": supersede,
        "cues": negation_cues + supersede_cues,
        "soft_cues": soft_cues,
        "reason": ", ".join(negation_cues + supersede_cues + soft_cues)
    }

示例:

输入检测结果标记
”I work at Google”--
“I no longer work at Google”negation=Truewill set record.end
”I switched to Apple”supersede=Truewill close old edge
”Actually, I work at Microsoft now”supersede=True (soft)will create new version

5. 边版本构建 (EdgeVersion)

最终输出: GraphStore 中的 EdgeVersion 对象

# src/core/graph.py
 
@dataclass
class EdgeVersion:
    """
    图边的单个版本 (支持时间旅行)
    """
    id: str                    # edge:ent:user#met:aunt
    src: str                   # person:user
    rel: str                   # met
    dst: str                   # person:aunt
    evidence_id: str           # msg#2023-03-04T22:00:00Z
    source: Literal["USER", "ASSISTANT", "SYSTEM"]
    record: Interval           # 记录时间 (AR 视图)
    valid: Interval            # 有效时间 (AW 视图)
    text: str                  # "user met aunt"
    confidence: float          # 0.75
    metadata: Dict[str, Any]   # triage info, offsets, etc.
 
# 构建流程 (memowrite.py)
for candidate in filtered_candidates:
    edge = EdgeVersion(
        id=_make_edge_id(candidate["src"], candidate["rel"], candidate["dst"]),
        src=candidate["src"],
        rel=candidate["rel"],
        dst=candidate["dst"],
        evidence_id=f"msg#{ts_iso}",
        source="USER" if speaker == "user" else "ASSISTANT",
        record=Interval(start=ts_iso, end=None),  # 开放区间
        valid=Interval(start=ts_iso, end=None),   # 默认永久有效
        text=f"{candidate['src']} {candidate['rel']} {candidate['dst']}",
        confidence=candidate["confidence"],
        metadata={
            "offsets": candidate["offsets"],
            "triage": candidate.get("triage", {}),
            "negation": candidate.get("negation", False),
            "supersede": candidate.get("supersede", False),
        }
    )
    
    # 如果检测到否定/覆盖,关闭旧版本
    if candidate["negation"]:
        old_edge = graph.find_edge(src, rel, dst)
        if old_edge:
            old_edge.record.end = ts_iso
            old_edge.valid.end = ts_iso

检索粒度对比表

阶段粒度示例 ID数量
原始 LongMemEvalturn-levelturn_0 in session_012 turns in session_0
Processed dialoguemessage-levelmsg#2023-03-04T22:00:00Z12 messages
GraphStorefact-level (edge)edge:ent:user#met:aunt~30 edges (from 12 msgs)
BM25/Vector Indexfact-leveledge:ent:user#met:aunt1401 docs (1403 facts)
Retrieval candidatesfact-leveledge:ent:user#met:auntTop-10 facts
Evidence IDsmessage-levelmsg#2023-03-04T22:00:00Z10 evidence IDs
Evaluation (fold)doc-levelmsg:2023-03-04t22:00z8 unique docs (有重复)
Gold evidencemessage-leveldoc-levelmsg:2023-03-04t22:00z12 messages → 12 docs

关键变换函数

ID 规范化

# tools/id_normalize.py
 
def normalize_id(raw_id: str) -> str:
    """
    msg#2023-03-04T22:00:00Z#chunk-1
    → msg:2023-03-04t22:00:00z:chunk-1
    """
    parsed = _parse_identifier(raw_id)
    return parsed.to_string()
 
def fold_chunk_to_doc(normalized_id: str) -> str:
    """
    msg:2023-03-04t22:00:00z:chunk-1
    → msg:2023-03-04t22:00z
    
    移除 chunk 后缀,折叠到文档级
    """
    parsed = _parse_identifier(normalized_id)
    suffix = _remove_suffix_tokens(parsed.suffix)
    return CanonicalId(parsed.document, suffix).to_string()

时间戳规范化

# src/datasets/longmemeval.py
 
def _normalize_timestamp(raw: Any) -> str:
    """
    支持多种输入格式:
    - UNIX 时间戳: 1678056000
    - ISO 字符串: "2023-03-04T22:00:00+00:00"
    - 相对时间: "+3 days"
    
    统一输出: "2023-03-04T22:00:00Z" (UTC)
    """
    if isinstance(raw, (int, float)):
        dt = datetime.fromtimestamp(raw, tz=timezone.utc)
    elif isinstance(raw, str):
        dt = dateutil.parser.isoparse(raw)
    return dt.isoformat().replace('+00:00', 'Z')

会话级 → 消息级展开

# src/tools/convert_longmemeval.py
 
def _write_dialogue_with_mapping(
    path: Path, 
    dialogue: List[Dict]
) -> Tuple[Dict[str, List[str]], int]:
    """
    为每个 session 构建 session_id → [msg_ids] 映射
    """
    session_to_msg_ids = {}
    
    for entry in dialogue:
        ts_dt = _parse_timestamp(entry["ts"])
        msg_id = _format_message_id(ts_dt)  # "msg#2023-03-04T22:00:00Z"
        session_id = entry.get("session_id", "unknown")
        
        if session_id not in session_to_msg_ids:
            session_to_msg_ids[session_id] = []
        session_to_msg_ids[session_id].append(msg_id)
    
    return session_to_msg_ids, len(dialogue)
 
# 使用映射展开 answer_session_ids
gold_session_ids = ["session_0"]
gold_msg_ids = []
for session_id in gold_session_ids:
    gold_msg_ids.extend(session_to_msg_ids[session_id])
# gold_msg_ids: 12 条消息级 ID

SPO 三元组提取核心代码

# 完整流程 (src/pipelines/memowrite.py)
 
# 1. 实体抽取
entities = extract_entities(
    text=turn["text"],
    sentences=turn["sentences"],
    turn_id=turn_id,
    doc_id=doc_id
)
# → List[Entity]: [{"type": "PERSON", "norm": "person:user", ...}]
 
# 2. 共指消解
coref_map = resolve_coref(turn, entities)
# → Dict[(start, end), entity_id]: {(10, 13): "person:user"}
 
# 3. 关系抽取
raw_candidates = extract_relations(turn)
# → List[RelationCandidate]: [{"src": "person:user", "rel": "met", "dst": "person:aunt", ...}]
 
# 4. 负向检测
for candidate in raw_candidates:
    corrections = detect_corrections(candidate["evidence"])
    candidate["negation"] = corrections["negation"]
    candidate["supersede"] = corrections["supersede"]
 
# 5. 三元组过滤
filtered = triage_batch(
    raw_candidates,
    backend="tiny_clf",
    threshold=0.5,
    cfg={"max_triples_per_turn": 20}
)
# → 保留前 20 个重要三元组 + 强制保留否定语句
 
# 6. 构建边版本
for candidate in filtered:
    edge = EdgeVersion(
        id=_make_edge_id(candidate["src"], candidate["rel"], candidate["dst"]),
        src=candidate["src"],
        rel=candidate["rel"],
        dst=candidate["dst"],
        evidence_id=f"msg#{ts_iso}",
        record=Interval(start=ts_iso, end=None),
        valid=Interval(start=ts_iso, end=None),
        text=f"{candidate['src']} {candidate['rel']} {candidate['dst']}",
        confidence=candidate["confidence"]
    )
    graph.add_edge(edge)

配置文件对应关系

qwen_rerank_xenc.yaml → 代码模块映射

配置项配置值对应模块/函数说明
数据转换
dataset.name”longmemeval”src/datasets/longmemeval.pyLongMemEval 数据集解析
dataset.split”test”convert_longmemeval::iter_samples()迭代测试集样本
实体&关系抽取
extraction.triage.backend”tiny_clf”src/modules/extract/triage.py::triage_batch()启发式三元组评分
extraction.triage.threshold0.5过滤逻辑只保留得分 ≥ 0.5 的三元组
extraction.triage.max_triples_per_turn20Top-K 选择每条消息最多 20 个边
extraction.triage.force_keep_negationtruedetect_corrections() 检测结果强制保留否定语句
检索 (BM25)
retrieval.use_bm25truesrc/modules/retrieval/eagle_v1.py::build_bm25_index()BM25 索引构建
retrieval.bm25.k11.5BM25 超参数词频饱和度
retrieval.bm25.b0.75BM25 超参数文档长度惩罚
检索 (向量)
retrieval.use_embeddingstrueeagle_v1.py::build_embedding_index()向量索引构建
retrieval.emb_backend”ollama”src/utils/embeddings/ollama.py::OllamaEmbedderOllama HTTP API
retrieval.emb_model”qwen3-embedding:0.6b”Ollama 模型名Qwen3 嵌入模型 (768维)
retrieval.emb_max_concurrency4asyncio.Semaphore(4)并发请求限制
检索 (RRF 融合)
retrieval.fusion.k_bm2564eagle_v1.py::rrf_fusion()BM25 召回 Top-64
retrieval.fusion.k_e564同上向量召回 Top-64
retrieval.fusion.weight0.5RRF 权重BM25 和向量各占 50%
重排 (Cross-encoder)
retrieval.rerank.backend”cross_encoder”src/modules/retrieval/rerankers/cross_encoder.pyMS MARCO Cross-encoder
retrieval.rerank.top32reranker.rerank(candidates[:32])只重排前 32 个候选
retrieval.rerank.weight0.5最终分数 = fusion * 0.5 + rerank * 0.5融合与重排各占 50%
图扩展
retrieval.graph_expansion.enabledtrueeagle_v1.py::graph_expansion()基于图结构的候选扩展
retrieval.graph_expansion.depth2BFS 深度限制最多扩展 2 跳邻居
retrieval.graph_expansion.fanout4每个节点最多扩展 4 个邻居控制扩展规模

数据流中的配置决策点

graph TD
    A[qwen_rerank_xenc.yaml] --> B{extraction.triage.backend}
    B -->|tiny_clf| C[启发式评分<br/>零成本]
    B -->|llm| D[LLM 评分<br/>高成本]
    B -->|none| E[保留所有<br/>无过滤]
    
    A --> F{retrieval.emb_backend}
    F -->|ollama| G[Ollama HTTP API<br/>CPU-friendly]
    F -->|e5| H[本地 sentence-transformers<br/>需要 GPU]
    
    A --> I{retrieval.rerank.backend}
    I -->|cross_encoder| J[MS MARCO model<br/>高精度]
    I -->|none| K[仅融合分数<br/>快速]
    
    C --> L[1403 facts]
    G --> L
    J --> M[Top-10 facts]
    M --> N[Fold to doc-level]
    N --> O[Hit@10 = 4/12]

检索诊断数据

样本 71017276 的检索统计:

{
  "store_counters": {
    "facts_open": 1403,      // 开放边数量(检索候选池)
    "events_visible": 180,   // 可见事件数量
    "events_total": 180
  },
  "index_stats": {
    "bm25_docs": 1401,       // BM25 索引文档数
    "emb_docs": 1401,        // 向量索引文档数
    "vector_dim": 768        // Qwen3 向量维度
  },
  "retrieval_stats": {
    "seeds": 32,             // 初始种子数
    "expanded": 53,          // 扩展后候选数
    "visited_nodes": 41,     // 访问节点数
    "visited_facts": 41      // 访问事实数
  },
  "rerank": {
    "backend": "cross_encoder",
    "pairs": 32,             // 重排对数
    "weight": 0.5,           // 重排权重
    "p50_latency_ms": 164.02 // P50 延迟
  },
  "fusion_top1": {
    "id": "edge:ent:user#uses:9d05eb95",
    "bm25_score": 16.712,
    "vector_score": 0.458,
    "fusion_score": 0.211,
    "rerank_score": 1.000,
    "final_score": 1.000
  }
}

Rank 1 候选的完整追踪

原始 Turn (LongMemEval)

{
  "session_id": "session_0",
  "turn_index": 0,
  "role": "user",
  "timestamp": 0,  // 相对秒数
  "text": "I met my aunt and received a crystal chandelier"
}

Processed Message

{
  "ts": "2023-03-04T22:00:00Z",
  "role": "user",
  "text": "I met my aunt and received a crystal chandelier",
  "session_id": "session_0"
}

Extracted Edges (部分)

提取方法: 混合确定性规则 + 依存句法分析

[
  EdgeVersion(
    id="edge:ent:user#met:aunt",
    evidence_id="msg#2023-03-04T22:00:00Z",
    text="user met aunt",
    record=Interval(start="2023-03-04T22:00:00Z", end=None)
  ),
  EdgeVersion(
    id="edge:ent:user#received:crystal_chandelier",
    evidence_id="msg#2023-03-04T22:00:00Z",
    text="user received crystal chandelier",
    record=Interval(start="2023-03-04T22:00:00Z", end=None)
  ),
  EdgeVersion(
    id="edge:crystal_chandelier#from:aunt",
    evidence_id="msg#2023-03-04T22:00:00Z",
    text="crystal chandelier from aunt",
    record=Interval(start="2023-03-04T22:00:00Z", end=None)
  ),
  // ... 更多边
]

具体实现细节 (见下方”SPO 三元组提取方法”章节)

Retrieval Result

{
  "rank": 1,
  "edge_id": "edge:ent:user#uses:9d05eb95",  // 实际的边ID(哈希)
  "evidence_id": "msg#2023-03-04T22:00:00Z",
  "source": "USER",
  "text": "user met aunt",  // SPO 文本
  "score": 1.0000,
  "lexical": 16.712,    // BM25 分数
  "vector": 0.458,      // Qwen3 向量分数
  "freshness": 0.0,     // 新鲜度分数
  "salience": 0.0,      // 显著性分数
  "doc_id": "msg:2023-03-04t22:00z",  // 折叠后的文档ID
  "intervals": {
    "record": {
      "start": "2023-03-04T22:00:00Z",
      "end": null
    },
    "valid": {
      "start": "2023-03-04T22:00:00Z",
      "end": null
    }
  }
}

Evaluation (Hit@10)

# Gold evidence (12 docs)
gold_docs = {
    "msg:2023-03-04t22:00z",
    "msg:2023-03-04t22:01z",
    "msg:2023-03-04t22:02z",
    # ... 共 12 个
}
 
# Top-10 candidates (8 unique docs after folding)
pred_docs = {
    "msg:2023-03-04t22:00z",  # ✓ HIT
    "msg:2023-03-04t22:05z",  # ✓ HIT
    "msg:2023-03-04t22:09z",  # ✓ HIT
    "msg:2023-03-04t22:03z",  # ✓ HIT
    "msg:2023-03-04t12:03z",  # ✗ MISS
    # ... 共 8 个唯一 doc
}
 
# Hit@10 计算
hit_count = len(gold_docs & pred_docs[:10])  # 4
hit_rate = hit_count / len(gold_docs)        # 4/12 = 33.3%

关键发现

1. 粒度不一致是设计选择

  • 检索: fact-level (细粒度) → 更好的语义匹配
  • Gold: message-level (中粒度) → 人工标注成本
  • 评测: doc-level (粗粒度) → 公平比较

2. Evidence ID 是追踪关键

每个 edge 的 evidence_id 字段保留了指向原始消息的引用,使得:

  • 可以从 fact 追溯回原始 turn
  • 可以将 fact-level 检索结果折叠到 doc-level 评测
  • 支持 Reader 阶段加载原始文本

3. 一对多关系

1 turn → 1 message → N edges

示例:

  • 1 个 turn: “I met my aunt and received a crystal chandelier”
  • 1 个 message: msg#2023-03-04T22:00:00Z
  • ~3 个 edges: user#met:aunt, user#received:chandelier, chandelier#from:aunt

4. 时间区间支持视图切换

每个 edge 带有 recordvalid 时间区间:

  • AR (as-recorded): 仅使用 record.start ≤ query_time
  • AW (as-world): 使用 valid.start ≤ query_time < valid.end
  • 支持时间旅行和反事实推理

5. 混合确定性方法 (无需大模型)

实体抽取:

  • 正则规则 (EMAIL, URL, PHONE) → 100% 精度
  • 大写序列 + 启发式分类 → ~70-85% 精度
  • 可选 Flair NER 增强 → ~90% 精度

关系抽取:

  • 正则模板 (~15 个高频模式) → 覆盖 works_at, lives_in, owns, uses, likes
  • 依存句法 (spacy DependencyMatcher) → 捕获 SVO 结构
  • 角色词典 (~20 个职业名词) → persona.role 关系

三元组过滤:

  • tiny_clf (轻量级启发式) → 无 API 调用成本
  • 强制保留负向语句 → 保证时间推理正确性
  • Top-K 选择 → 控制 GraphStore 规模

优势:

  • 零 LLM 成本 (triage backend=“tiny_clf”)
  • 完全可复现 (确定性规则)
  • 支持 CPU-only 环境 (无需 GPU)

配置文件对应:

# qwen_rerank_xenc.yaml
extraction:
  triage:
    enabled: true
    backend: "tiny_clf"         # 轻量级分类器 (非 LLM)
    threshold: 0.5
    max_triples_per_turn: 20
    force_keep_negation: true   # 保留否定语句
    force_keep_min_score: 0.3

配置文件对应关系

qwen_rerank_xenc.yaml

retrieval:
  use_bm25: true              # → BM25 索引 (1401 docs)
  use_embeddings: true        # → Qwen3 向量索引 (1401 docs)
  emb_backend: "ollama"       # → Ollama HTTP API
  rerank:
    backend: "cross_encoder"  # → Cross-encoder 模型
    top: 32                   # → 重排前 32 个候选
    weight: 0.5               # → 重排分数权重 50%

对应模块

# use_bm25 → src/modules/retrieval/eagle_v1.py::build_bm25_index()
if use_bm25:
    bm25_index = build_bm25_index(fact_docs, cache_dir)
    bm25_results = bm25_index.search(query, k=k_bm25)
 
# use_embeddings + emb_backend="ollama" 
# → src/modules/retrieval/eagle_v1.py::build_embedding_index()
if use_embeddings:
    emb_index = build_embedding_index(
        fact_docs, 
        cache_dir,
        backend="ollama",
        model="qwen3-embedding:0.6b"
    )
    vector_results = emb_index.search(query, k=k_e5)
 
# rerank.backend="cross_encoder"
# → src/modules/retrieval/rerankers/cross_encoder.py
if rerank_backend == "cross_encoder":
    reranker = CrossEncoderReranker()
    reranked = reranker.rerank(query, candidates, top=rerank_top)
    
# rerank.weight=0.5
# → 最终分数 = fusion_score * (1 - weight) + rerank_score * weight
final_score = fusion_score * 0.5 + rerank_score * 0.5

用于 Slides 的关键图表

图1: 数据流总览

LongMemEval JSON (turn-level)
    ↓ convert_longmemeval
Processed JSONL (message-level)
    ↓ run_memowrite / extract_graph
GraphStore Snapshot (fact-level: 1403 edges)
    ↓ eagle_v1.search
Top-10 Facts (with evidence_id)
    ↓ fold_chunk_to_doc
Top-8 Docs (after deduplication)
    ↓ compare with Gold (12 docs)
Hit@10 = 4/12 (33.3%)

图2: 粒度对比

┌─────────────────┬────────────┬──────────────────┬────────┐
│ 阶段            │ 粒度       │ 示例 ID          │ 数量   │
├─────────────────┼────────────┼──────────────────┼────────┤
│ 原始 LongMemEval│ turn       │ turn_0           │ 12     │
│ Processed       │ message    │ msg#...T22:00Z   │ 12     │
│ GraphStore      │ fact(edge) │ edge:user#met:   │ ~30    │
│ 检索候选池      │ fact       │ -                │ 1403   │
│ Top-10          │ fact       │ -                │ 10     │
│ 评测 (fold)     │ doc        │ msg:...t22:00z   │ 8      │
│ Gold            │ doc        │ msg:...t22:00z   │ 12     │
└─────────────────┴────────────┴──────────────────┴────────┘

图3: Rank 1 候选追踪

原始 Turn:
┌──────────────────────────────────────────────────┐
│ role: user                                       │
│ time: session_0 + 0s                            │
│ text: "I met my aunt and received a crystal    │
│        chandelier"                               │
└──────────────────────────────────────────────────┘
                    ↓
Processed Message:
┌──────────────────────────────────────────────────┐
│ ts: "2023-03-04T22:00:00Z"                      │
│ role: "user"                                     │
│ text: "I met my aunt..."                        │
│ session_id: "session_0"                         │
└──────────────────────────────────────────────────┘
                    ↓
Extracted Edges:
┌──────────────────────────────────────────────────┐
│ edge:user#met:aunt                              │
│   evidence_id: "msg#2023-03-04T22:00:00Z"       │
│   text: "user met aunt"                         │
│                                                  │
│ edge:user#received:chandelier                   │
│   evidence_id: "msg#2023-03-04T22:00:00Z"       │
│   text: "user received crystal chandelier"      │
│                                                  │
│ edge:chandelier#from:aunt                       │
│   evidence_id: "msg#2023-03-04T22:00:00Z"       │
│   text: "crystal chandelier from aunt"          │
└──────────────────────────────────────────────────┘
                    ↓
Retrieval Result (Rank 1):
┌──────────────────────────────────────────────────┐
│ edge_id: "edge:user#uses:9d05eb95"              │
│ evidence_id: "msg#2023-03-04T22:00:00Z"         │
│ doc_id: "msg:2023-03-04t22:00z"                 │
│ score: 1.0000 (BM25=16.7 + Vec=0.46 + RRnk=1.0)│
│ ✓ HIT                                           │
└──────────────────────────────────────────────────┘

图4: 模块交互

runner.py
    │
    ├─→ get_dataset_handler() ────→ LongMemEval parser
    │                                     ↓
    ├─→ handler.convert() ────────→ convert_longmemeval
    │       │                             ↓
    │       └──→ writes dialogue.jsonl + qas.jsonl
    │
    ├─→ _ensure_snapshot() ────────→ run_memowrite
    │       │                             ↓
    │       └──→ extract_graph() ────→ GraphStore
    │                                     ↓
    └─→ memosearch_run() ──────────→ memosearch.py
            │                             ↓
            └──→ eagle_v1.search() ──→ BM25 + Qwen3 + Rerank
                    │                     ↓
                    └──────────────→ Top-10 facts

总结

这个端到端追踪展示了 MemoTrace 如何通过多层粒度变换,从原始的对话 turns 一步步加工为可检索的知识图谱,并最终通过混合检索和重排找到相关证据。

关键设计原则:

  1. 粒度分离: 检索 (fact) ≠ Gold (message) ≠ 评测 (doc)
  2. 可追溯性: 每个 edge 保留 evidence_id 指向原始消息
  3. 时间语义: recordvalid 区间支持多视图
  4. 混合检索: BM25 (关键词) + Vector (语义) + Rerank (精排)
  5. 确定性提取: 正则 + 依存句法 + 启发式 (无需 LLM)

方法创新:

模块传统方法MemoTrace 方法优势
实体抽取NER 模型 (GPU)正则 + 启发式 + 可选 FlairCPU-friendly, 零成本
关系抽取LLM prompt (昂贵)正则模板 + 依存句法完全确定性, 可复现
三元组过滤人工标注tiny_clf 启发式自动化, 无标注成本
负向检测忽略或需要 LLM线索词典匹配100% 召回关键修正
检索粒度文档级/段落级事实级 (SPO 三元组)更精准的语义匹配
时间建模单时间戳双区间 (record + valid)支持 AR/AW 视图

适用场景:

  • 长期对话理解
  • 时间敏感的 QA
  • 知识密集型检索
  • 多轮对话推理
  • 低资源环境部署 (CPU-only, 无 LLM API)