机器学习实验规模化:高吞吐量实战指南

本文深入探讨了如何通过重叠实验架构实现机器学习实验的高吞吐量规模化。详细介绍了命名空间隔离、分层分配、互斥组等五种冲突解决模式,并提供了完整的Python代码实现和系统架构设计,帮助团队在保证因果有效性的前提下提升实验并发能力。

机器学习实验规模化:高吞吐量实战指南

从猜测到增长:为什么A/B测试不可或缺

每个产品决策都是在不确定性下的赌注。A/B测试将这些赌注转化为可衡量的因果学习。

通过将用户随机分配到控制组和实验组,你创建了两个平均相同的组群。任何在转化率、留存率、收入或延迟方面的差异都可以归因于变更,而非季节性、营销活动或用户构成变化。随机化提供了可靠的反事实依据。

收益:团队实际获得什么

  • 信心而非共识:决策基于因果证据,而非最大声的意见
  • 更快迭代:发布小变更,衡量影响,保留有效部分,回退无效部分
  • 内置风险管理:护栏和分阶段发布在学习过程中限制下行风险
  • 清晰的ROI关注:指标量化业务影响以优先处理路线图
  • 科学节奏:假设、测试、学习、重复成为运营循环

速度问题:为什么需要重叠实验

一次运行一个实验会限制学习速度。像主页或搜索结果页面这样的高流量界面在等待单个测试达到显著性时会处于未充分利用状态。

解决方案是重叠实验,让多个团队在同一界面上并行学习。

并发实验的蓝图

实用蓝图(遵循谷歌重叠基础设施的精神):

  1. 命名空间隔离:按领域(如搜索、广告、结账)分组实验,防止跨域干扰
  2. 分层随机化:在排名、广告、UI等层中正交分配用户
  3. 请求时合并和优先处理:合并所有活跃实验的参数写入
  4. 记录结果:持久化最终有效参数映射和覆盖原因
  5. 谨慎使用互斥:为少数竞争激烈的入口点保留"排除组"

解开重叠:什么是"冲突实验"?

冲突是指并发处理相互干扰的任何交互,导致至少一个实验的估计产生偏差、稀释或操作风险:

  • 参数冲突:两个测试在同一请求上设置相同键
  • 共享界面约束:处理在高覆盖界面/入口点上竞争有限像素
  • 触发/资格交互:一个测试的定位/注册规则影响另一个测试的进入
  • 上游到下游依赖:上游排名变化在下游测量中引入干扰
  • 遗留效应:早期暴露持续改变后续行为

五种经过验证的冲突解决方式

1. 域和命名空间分区

将流量分割为完全独立的非重叠宇宙。

2. 互斥组

保证冲突的强制解决方案。

3. 带优先级规则的分层分配(主力方法)

在命名空间内将实验组织到有序层中。

4. 条件资格和触发

使用明确的资格规则进行精确控制。

5. (部分)因子设计

通过有意交叉不同实验的变体来测量交互作用。

架构视图:整合上述概念

1. 命名空间选择(广泛隔离)

将每个请求路由到产品命名空间。

2. 互斥检查(安全阀)

检查用户是否属于任何排除组。

3. 资格和触发(定位)

评估谁可以进入以及何时进入。

4. 独立每层分配(正交性)

每层独立随机化。

5. 每参数优先级合并(确定性解决)

收集所有候选参数写入并按优先级应用。

6. 服务和日志记录(可观察性)

服务合并配置并记录有效参数映射。

简单比较表

方法 最适合 如何解决冲突 优点 注意事项
命名空间 跨域隔离 按产品领域的硬边界 简单、组织对齐 不解决域内竞争
互斥 特定界面冲突 每个用户只参与一个冲突测试 统计干净 降低速度;需要手动管理
分层分配 多团队在同一界面 每参数优先级获胜 最大吞吐量;灵活 下层偏差风险;需要日志记录
资格/触发 精确控制 明确规则和基于事件的进入 精细化;可审计 抽样偏差;规则蔓延
(部分)因子设计 学习交互作用 交叉变体;建模交互作用 协同洞察 更多单元;更多流量;复杂性

运行冲突感知分配的代码块

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
# conflict-aware assignment for overlapping A/B tests.

from __future__ import annotations
from typing import Dict, List, Optional, Tuple, Any
import hashlib
from uuid import uuid4

# ---------- Core hashing ----------
def hash_to_bucket(user_id: str, salt: str, modulo: int = 10_000) -> int:
    return int(hashlib.sha256(f"{salt}:{user_id}".encode()).hexdigest()[:8], 16) % modulo

def in_allocation(user_id: str, salt: str, pct: float) -> bool:
    return hash_to_bucket(user_id, salt) < round(pct * 100)

# ---------- Mutual exclusion (pick one per group) ----------
def choose_exclusive(user_id: str, group_id: str, members: List[Dict[str, Any]]) -> Optional[str]:
    total = sum(m.get("weight", 0) for m in members)
    if total <= 0:
        return None
    cut = hash_to_bucket(user_id, f"excl:{group_id}", total)
    acc = 0
    for m in members:
        acc += m["weight"]
        if cut < acc:
            return m["experiment"]
    return None

# ---------- Targeting rules (simple) ----------
def eval_rule(rule: Dict[str, Any], ctx: Dict[str, Any]) -> bool:
    # ctx: {"assignments": set[str], "attrs": dict, "triggers": set[str]}
    if not rule: return True
    if "all" in rule: return all(eval_rule(r, ctx) for r in rule["all"])
    if "any" in rule: return any(eval_rule(r, ctx) for r in rule["any"])
    if "not" in rule: return not eval_rule(rule["not"], ctx)
    if "attrEq" in rule:
        k, v = rule["attrEq"]["key"], rule["attrEq"]["val"]
        return ctx["attrs"].get(k) == v
    if "triggered" in rule: return rule["triggered"] in ctx["triggers"]
    if "hasExp" in rule:   return rule["hasExp"] in ctx["assignments"]
    if "notInExp" in rule: return rule["notInExp"] not in ctx["assignments"]
    return True

# ---------- Per-parameter priority merge (higher layers win) ----------
def merge_by_priority(layer_order: List[str], params_by_layer: Dict[str, Dict[str, Any]]
) -> Tuple[Dict[str, Any], List[Dict[str, str]]]:
    effective: Dict[str, Any] = {}
    overrides: List[Dict[str, str]] = []
    for i, layer in enumerate(layer_order):                 # high -> low
        for k, v in (params_by_layer.get(layer) or {}).items():
            if k in effective:
                winner = next(L for L in layer_order[:i] if k in (params_by_layer.get(L) or {}))
                overrides.append({"param": k, "winner_layer": winner, "loser_layer": layer})
                continue
            effective[k] = v
    return effective, overrides

# ---------- Serve path (one request) ----------
def serve_request(user_id: str, namespace_id: str, ctx: Dict[str, Any], cfg: Dict[str, Any]) -> Dict[str, Any]:
    ns = next(ns for ns in cfg["product"]["namespaces"] if ns["id"] == namespace_id)
    layers: List[str] = ns["layers"]  # e.g., ["ranking","ads","ui"]

    exps   = [e for e in cfg["product"]["experiments"]      if e["namespace"] == namespace_id]
    groups = [g for g in cfg["product"]["exclusion_groups"] if g["namespace"] == namespace_id]

    # 1) Assignment per experiment (after eligibility + trigger)
    assignments: Dict[str, str] = {}
    for e in exps:
        c = {"assignments": set(assignments.values()),
             "attrs": ctx.get("attrs", {}), "triggers": set(ctx.get("triggers", []))}
        eligible  = eval_rule(e.get("eligibility", {}), c)
        triggered = (e.get("trigger") in c["triggers"]) if e.get("trigger") else True
        if not eligible:
            assignments[e["id"]] = "ineligible"
        elif not triggered:
            assignments[e["id"]] = "none"
        else:
            salt = f"exp:{e['id']}"
            variant = "treatment" if in_allocation(user_id, salt, e["allocation_pct"]) else "control"
            assignments[e["id"]] = f"{e['id']}:{variant}"

    # 2) Mutual exclusion (keep chosen; mark others excluded)
    for g in groups:
        chosen = choose_exclusive(user_id, g["id"], g["members"])
        if chosen:
            for m in g["members"]:
                eid = m["experiment"]
                if assignments.get(eid, "").startswith(eid) and eid != chosen:
                    assignments[eid] = "excluded"

    # 3) Build params per layer from final assignments
    params_by_layer: Dict[str, Dict[str, Any]] = {L: {} for L in layers}
    for e in exps:
        a = assignments[e["id"]]
        if not a.startswith(e["id"]):  # ineligible/none/excluded
            continue
        vname = "treatment" if a.endswith(":treatment") else "control"
        for pkey, opts in e.get("parameters", {}).items():
            params_by_layer[e["layer"]][pkey] = opts[vname]

    # 4) Priority merge + compact exposure log
    effective, overrides = merge_by_priority(layers, params_by_layer)
    return {
        "request_id": str(uuid4()),
        "namespace": namespace_id,
        "assignments": assignments,
        "effective_params": effective,
        "override_details": overrides
    }

关键要点

  • 明确所有权:每个参数一个所有者层
  • 正交分配:每层/实验独立盐值避免意外相关
  • 记录实际发布内容:存储每个请求的有效参数映射
  • 谨慎使用互斥:为少数竞争激烈的高覆盖界面保留
  • 注意选择效应:资格/触发规则强大但可能产生偏差
  • 拥有单一指标来源:一致定义防止混淆

参考文献和进一步阅读

  • Google – Tang, D., Agarwal, A., O’Brien, D., Meyer, M. Overlapping Experiment Infrastructure: More, Better, Faster Experimentation. KDD 2010.
  • Meta/Facebook – Bakshy, E., et al. Designing and Deploying Online Field Experiments (PlanOut).
  • LinkedIn – XLNT Platform engineering posts (targeting, segments, assignment at scale).
  • DoorDash – Improving Online Experiment Capacity by 4× with Parallelization and Increased Sensitivity.
  • Booking.com – Democratizing Online Controlled Experiments (culture, scale, meta-experiments).
  • Airbnb – Minerva: Metric Consistency (single source of truth across experiments).
  • Microsoft – Deng, A., et al. CUPED: A Practical Method for Reducing Variance in A/B Testing. WSDM 2013.
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计