利用字符串驻留实现分层字段排序的高效C语言方案

本文详细介绍了一种在处理异构记录序列时,通过字符串驻留和命名空间映射技术实现分层字段智能排序的C语言解决方案。该方法在保持原始顺序的前提下,将分散的子结构字段重新组织为连续块,特别适用于JSON等结构化输出。

利用字符串驻留实现分层字段排序

在最近遇到的一个实际场景中,我需要从缓冲区加载异构记录序列。记录布局在序列之前的头部中定义。每个字段都是数值类型,并拥有由非空字母数字和点分隔符组成的唯一名称,其中分段表示嵌套结构。字段名称是以逗号分隔的列表,按记录布局顺序排列。本文要解决的关键难点是:嵌套结构在原始布局中不一定是连续的。而在转换后的表示中,我需要嵌套结构是连续的。为了说明,这里假设目标输出是JSON格式。我想到并实现了一个有趣的解决方案,使用了之前讨论过的C语言技术。

上面的描述可能单独看有些令人困惑,示例胜千言万语,下面是一个包含7个字段的列表:

timestamp,point.x,point.y,foo.bar.z,point.z,foo.bar.y,foo.bar.x

其中 point 是一个子结构,foobar 也是,但请注意它们在记录中是交错排列的。因此,如果一个记录包含以下值: {1758158348, 1.23, 4.56, -100, 7.89, -200, -300}

其JSON表示将如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
  "timestamp": 1758158348,
  "point": {
    "x": 1.23,
    "y": 4.56,
    "z": 7.89
  },
  "foo": {
    "bar": {
      "z": -100,
      "y": -200,
      "x": -300
    }
  }
}

注意 point.z 向上移动,foo.bar.z 向下移动,以便子结构在此表示中是连续的,这是JSON的要求。一个简单的解决方案是按字典顺序对字段名排序,这会将它们分组在一起。然而,作为一个额外约束,我希望尽可能保留原始字段顺序。例如,timestamp 在原始和JSON表示中都是第一个,但排序会将其放在最后。如果所有子结构原本就是连续的,则不应进行任何更改。

基于字符串驻留的解决方案

我的解决方案是驻留段字符串,为每个字符串分配一个唯一的、单调递增的整数令牌,按照观察到的顺序。在我的程序中,零被保留为特殊的“根”令牌,因此第一个字符串的值为1。具体数值并不重要,重要的是它们是单调分配的。

技巧在于,字符串总是在先前令牌的“命名空间”中被驻留。也就是说,我们正在构建一个(令牌,字符串)-> 令牌的映射。对于我们的段来说,该命名空间是父结构的令牌,而顶级字段在保留的“根”命名空间中被驻留。应用于示例时,我们得到以下令牌序列:

1
2
3
4
5
6
7
timestamp  -> 1
point.x    -> 2 3
point.y    -> 2 4
foo.bar.z  -> 5 6 7
point.z    -> 2 8
foo.bar.y  -> 5 6 9
foo.bar.x  -> 5 6 10

我们的映射如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{0, "timestamp"} -> 1
{0, "point"}     -> 2
{2, "x"}         -> 3
{2, "y"}         -> 4
{0, "foo"}       -> 5
{5, "bar"}       -> 6
{6, "z"}         -> 7
{2, "z"}         -> 8
{6, "y"}         -> 9
{6, "x"}         -> 10

注意由于不同的命名空间,"x" 被分配为 3 和 10。这很重要,否则 foo.bar 的字段将按照与 point 相同的顺序排序。命名空间为这些字段赋予了独特的身份。

一旦我们有了令牌表示,就按令牌的字典顺序排序。这将 point.z 拉到其兄弟节点旁边。

1
2
3
4
5
6
7
timestamp  -> 1
point.x    -> 2 3
point.y    -> 2 4
point.z    -> 2 8
foo.bar.z  -> 5 6 7
foo.bar.y  -> 5 6 9
foo.bar.x  -> 5 6 10

现在我们就得到了具有最小重排序的“输出”顺序。如果子结构原本就是连续的,则不会发生任何更改。假设映射合理,时间复杂度为 O(n log n),主要由排序决定。

替代方案

在想到命名空间之前,我最初的想法是驻留整个段前缀。查找序列将是:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
"timestamp"    -> 1  -> {1}
"point"        -> 2
"point.x"      -> 3  -> {2, 3}
"point"        -> 2
"point.y"      -> 4  -> {2, 4}
"foo"          -> 5
"foo.bar"      -> 6
"foo.bar.z"    -> 7  -> {5, 6, 7}
"point"        -> 2
"point.z"      -> 8  -> {2, 8}
"foo"          -> 5
"foo.bar"      -> 6
"foo.bar.y"    -> 9  -> {5, 6, 9}
"foo"          -> 5
"foo.bar"      -> 6
"foo.bar.x"    -> 10 -> {5, 6, 10}

最终它会产生相同的令牌,并且这是一个更直接的字符串 -> 字符串映射。前缀充当了命名空间。然而,我这样写它作为一种直观的证明:注意每个字段字符串形成的直角三角形形状。从面积我们可以看出,将前缀作为字符串处理的时间复杂度是 O(n²)(关于段数量的二次时间)!在我的实际问题中,输入从未大到足以使这成为问题,但我讨厌留下可避免的二次算法。

使用令牌作为命名空间可以将前缀扁平化为常量大小。

另一个选项是为每个命名空间使用不同的映射。因此对于 foo.bar.z,在根(字符串 -> 映射)中查找 "foo" 映射(字符串 -> 映射),然后在其中查找 "bar" 表(字符串 -> 令牌)(因为这是倒数第二个段),然后在该表中驻留 "z" 以获取其令牌。这不会有二次时间复杂度,但似乎比单个扁平的(令牌,字符串)-> 令牌映射复杂得多。

C语言实现

由于标准库对我们没什么帮助,我基于先前建立的定义进行构建,请参考那篇文章了解像 Str 这样的基本定义。首先,令牌将是一个 size 类型的整数,这样我们永远不需要担心令牌计数器溢出——我们会先耗尽内存:

1
typedef ptrdiff Token;

我们正在构建一个(令牌,字符串)-> 令牌的映射,因此我们需要一个针对此类键的哈希函数:

1
2
3
4
5
6
7
8
9
uint64_t hash(Token t, Str s)
{
    uint64_t r = (uint64_t)t << 8;
    for (ptrdiff i = 0; i < s.len; i++) {
        r ^= s.data[i];
        r *= 1111111111111111111u;
    }
    return r;
}

映射本身是一个永远有用的哈希字典树:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
typedef struct Map Map;
struct Map {
    Map  *child[4];
    Token namespace;
    Str   segment;
    Token token;
};

Token *upsert(Map **m, Token namespace, Str segment, Arena *a)
{
    for (uint64_t h = hash(ns, segment); *m; h <<= 2) {
        if (namespace==(*m)->namespace && equals(segment, (*m)->segment)) {
            return &(*m)->token;
        }
        m = &(*m)->child[h>>62];
    }
    *m = new(a, 1, Map);
    (*m)->namespace = namespace;
    (*m)->segment = segment;
    return &(*m)->token;  // caller will assign
}

我们将使用此映射将命名字段的字符串转换为令牌序列,因此需要一个切片。字段在记录内部还有一个偏移量和类型,我们将通过其原始排序来跟踪,我将使用一个索引字段(例如指向原始头部的索引)。同时跟踪原始名称。

1
2
3
4
5
typedef struct {
    Str          name;
    ptrdiff_t    index;
    Slice(Token) tokens;
} Field;

为了对字段排序,我们需要一个比较器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
ptrdiff_t field_compare(Field a, Field b)
{
    ptrdiff_t len = min(a.tokens.len, b.tokens.len);
    for (ptrdiff_t i = 0; i < len; i++) {
        Token d = a.tokens.data[i] - b.tokens.data[i];
        if (d) {
            return d;
        }
    }
    return a.tokens.len - b.tokens.len;
}

因为字段名是唯一的,每个令牌序列也是唯一的,所以我们不需要在比较器中使用索引。

最后进入正题:切分列表并使用已建立的 push 宏构建令牌序列。排序函数没什么特别的,可以简单使用带有上述比较器(和适配器)的 libc qsort,因此我只列出原型。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void field_sort(Slice(Field), Arena scratch);

Slice(Field) parse_fields(Str fieldlist, Arena *a)
{
    Slice(Field) fields  = {};
    Map         *strtab  = 0;
    ptrdiff_t    ntokens = 0;

    for (Cut c = {.tail=fieldlist, .ok=true}; c.ok;) {
        c = cut(c.tail, ',');
        Field field = {};
        field.name  = c.head;
        field.index = fields.len;

        Token prev = 0;
        for (Cut f = {.tail=field.name, .ok=true}; f.ok;) {
            f = cut(f.tail, '.');
            Token *token = upsert(&strtab, prev, f.head, a);
            if (!*token) {
                *token = ++ntokens;
            }
            *push(a, &field.tokens) = *token;
            prev = *token;
        }

        *push(a, &fields) = field;
    }

    field_sort(fields, *a);
    return fields;
}

这里的使用表明 Cut::ok 应该反转为 Cut::done,以便更好地进行零初始化。这是我需要考虑的事情。因为所有内容都是从 arena 分配的,所以不需要析构函数之类的,这就是完整的实现。回到示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
    Str fieldlist = S(
        "timestamp,"
        "point.x,"
        "point.y,"
        "foo.bar.z,"
        "point.z,"
        "foo.bar.y,"
        "foo.bar.x"
    );
    Slice(Field) fields = parse_fields(fieldlist, &scratch);
    for (ptrdiff_t i = 0; i < fields.len; i++) {
        Str name = fields.data[i].name;
        fwrite(name.data, 1, name.len, stdout);
        putchar('\n');
    }

这个程序将打印出正确的输出字段顺序。在实际程序中,我们会保留字符串表,定义一个反向查找以将令牌转换回字符串,并在生成输出时使用它。我在我的探索性程序 rec2json.c 中正是这样做的,写法与上面略有不同。它使用排序后的令牌来编译一个简单的字节码程序,当针对记录运行时,会生成其JSON表示。它将示例编译为:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
OPEN          # print '{'
KEY     1     # print token 1 as a key, i.e. "timestamp:"
READ    0     # print double at record offset 0
COMMA         # print ','
KEY     2     # print token 2 as a key, i.e. "point:"
OPEN
KEY     3
READ    8     # print double at record offset 8
COMMA
KEY     4
READ    16
COMMA
KEY     8
READ    32
CLOSE         # print '}'
COMMA
KEY     5
OPEN
KEY     6
OPEN
KEY     7
READ    24
COMMA
KEY     9
READ    40
COMMA
KEY     10
READ    48
CLOSE
CLOSE
CLOSE

写出来之后,我注意到了更多的改进空间。优化过程可以合并指令,例如,OPEN 然后 KEY 可以在编译时连接成单个字符串,这样只需要一条指令。这个程序可以是 15 条指令而不是 31 条。在我的实际案例中,我不需要这么复杂的东西,但探索起来很有趣。

comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计