Linux内核数据结构:基数树Radix Tree

当我们深入探索计算机内核的奥秘时,会发现其中隐藏着无数令人惊叹的数据结构。而今天,我们将聚焦于一个在内核世界中起着关键作用的神秘存在 —— 基数树(Radix Tree)。

想象一下,在内核这个庞大而复杂的舞台上,数据如同汹涌的河流奔腾不息。在这样的环境下,如何高效地组织、存储和检索数据成为了至关重要的挑战。而基数树,就像是一位默默耕耘的工匠,以其独特的结构和强大的功能,为内核的稳定运行立下汗马功劳。

一、什么是基数树

基数树是一种将指针与长整数键值相关联的机制,存储有效率且可快速查询,常用于指针与整数值的映射、内存管理等。它是一种多叉搜索树,叶子结点是实际的数据条目,每个结点有固定数量的指针指向子结点,并有一个指针指向父结点。

基数树,也被称为 PAT 位树(Patricia Trie or crit bit tree),是通用的字典类型数据结构。Linux 内核使用了数据类型 unsigned long 的固定长度输入的版本,每级代表了输入空间固定位数。

在 Linux 内核中,基数树有广泛的用途,比如用于内存管理。结构 address_space 通过 radix 树跟踪绑定到地址映射上的核心页,该 radix 树允许内存管理代码快速查找标识为 dirty 或 writeback 的页。

基数树为稀疏树提供了有效的存储,代替固定尺寸数组提供了键值到指针的快速查找。例如,Linux radix 树每个结点有 64 个 slot(当配置为每个结点有 2^6 = 64 个 slot 时),与数据类型 long 的位数相同。以一个有 3 级结点的 radix 树为例,每个数据条目可用 3 个 6 位的键值进行索引,键值从左到右分别代表第 1~3 层结点位置。没有孩子的结点在图中不出现。

对于数据量大的场景,比如存储和维护 100 亿个 url 及其属性,可考虑使用 radix 树。例如将 url 分配到多台机器中,每台机器可以将 url 直接放在内存,接将 url 组织成树状结构,对于字符串来说,最长使用的是 Trie tree,由于所占空间由最长 url 决定,在这里绝对不适用,再加上很多 url 拥有相同的属性(如路径等),这样,使用 trie tree 的一个变种 radix tree,相比会非常节省空间,并且不会影响效率。比如对 http://www.baidu.com 以及 http://www.12345.com 这种都可以按 3 - 5 - 3 作为基数树各层比较的前缀位数。

在 Redis 中,基数树被用来存储 stream 消息队列,消息队列中的每一个消息 ID 都是时间戳加序号,有了基数树就能根据 ID 快速定位到具体的消息。它还用来在 cluster 中定位槽和 key 的关系,此时 node 名是由槽位编号和 key 组合而成的,所以能快速找到对应槽位并遍历所有 key。

基数树与 Trie 树(字典树)的思想有点类似,甚至可以把 Trie 树看为一个基为 26 的 Radix 树。(也可以把 Radix 树看做是 Trie 树的变异)Trie 树一般用于字符串到对象的映射,Radix 树一般用于长整数到对象的映射。trie 树主要问题是树的层高,如果要索引的字的拼音很长很变态,我们也要建一个很高很变态的树么?radix 树能固定层高(对于较长的字符串,可以用数学公式计算出其特征值,再用 radix 树存储这些特征值)。Radix 树的原理实际上有点像多层 hash,每一层的 key 只不过是 key 的一段位范围的值,有点像页表映射的过程。

1.1前缀树(Trie)

Trie,又名前缀树(prefix tree)或者字典树,它属于一种 k 路搜索树,主要用于在一组集合当中搜索特定的键。这里的键通常为字符串,不过也可以是其他的数据类型。在 Trie 中,可以为键存储对应的数值,并且这些数值一般保存在叶子节点处。

与二叉搜索树有所不同,Trie 的节点并非保存完整的键,而仅仅是保存单个字符。若要获取节点对应的键,需遵循深度优先原则,从根节点开始一直遍历到相应节点,然后将遍历路径上每个节点的字符连接起来。

从实现角度来看,每个节点都配备一个数组。在节点中,字符通常是隐式存储的。比如以下图为例,每个节点都有一个数组,该数组的索引对应着从 a 到 z 这 26 个不同的字符,数组的成员则指向(如果存在的话)下一层节点。在叶子节点上,存储着键所对应的数值,例如键 “she” 对应数值 0,键 “sells” 对应数值 1,键 “sea” 对应数值 2。

C++代码实现示例:

#include <cstring>
#include <iostream>
#include <conio.h>
using namespace std;
namespace trie
{
    template <class T, size_t CHILD_MAX>
    /*
    Parameter T: Type of reserved data.
    Parameter CHILD_MAX: Size of the array of pointers to childnode.
    */
    struct nod
    {
        T reserved;
        nod<T, CHILD_MAX> *child[CHILD_MAX];
        nod()
        {
            memset(this, 0, sizeof *this);
        }
        ~nod()
        {
            for (unsigned i = 0; i < CHILD_MAX; i++)
                delete child[i];
        }
        void Traversal(char *str, unsigned index)
        {
            unsigned i;
            for (i = 0; i < index; i++)
                cout << str[i];
            cout << '\t' << reserved << endl;
            for (i = 0; i < CHILD_MAX; i++)
            {
                if (child[i])
                {
                    str[index] = i;
                    child[i]->Traversal(str, index + 1);
                }
            }
            return;
        }
    };

    template <class T, size_t CHILD_MAX = 127>
    /*
    Parameter T: Type of reserved data.
    Parameter CHILD_MAX: Size of the array of pointers to childnode.
    */
    class trie
    {
    private:
        nod<T, CHILD_MAX> root;

    public:
        nod<T, CHILD_MAX> *AddStr(char *str);
        nod<T, CHILD_MAX> *FindStr(char *str);
        bool DeleteStr(char *str);
        void Traversal()
        {
            char str[100];
            root.Traversal(str, 0);
        }
    };

    template <class T, size_t CHILD_MAX>
    nod<T, CHILD_MAX> * trie<T, CHILD_MAX>::AddStr(char *str)
    {
        nod<T, CHILD_MAX> *now = &root;
        do
        {
            if (now->child[*str] == NULL)
                now->child[*str] = new nod<T, CHILD_MAX>;
            now = now->child[*str];
        }
        while (*(++str) != '\0');
        return now;
    }

    template <class T, size_t CHILD_MAX>
    nod<T, CHILD_MAX> *trie<T, CHILD_MAX>::FindStr(char *str)
    {
        nod<T, CHILD_MAX> *now = &root;
        do
        {
            if (now->child[*str] == NULL)
                return NULL;
            now = now->child[*str];
        }
        while (*(++str) != '\0');
        return now;
    }

    template <class T, size_t CHILD_MAX>
    bool trie<T, CHILD_MAX>::DeleteStr(char *str)
    {
        nod<T, CHILD_MAX> **nods = new nod<T, CHILD_MAX> *[strlen(str)];
        int snods = 1;
        nod<T, CHILD_MAX> *now = &root;
        nods[0] = &root;
        do
        {
            if (now->child[*str] == NULL)
                return false;
            nods[snods++] = now = now->child[*str];
        }
        while (*(++str) != '\0');
        snods--;
        while (snods > 0)
        {
            for (unsigned i = 0; i < CHILD_MAX; i++)
                if (nods[snods]->child[i] != NULL)
                    return true;
            delete nods[snods];
            nods[--snods]->child[*(--str)] = NULL;
        }
        return true;
    }
} // namespace trie
int main()
{
    //TestProgram
    trie::trie<int> tree;
    while (1)
    {
        cout << "1 for add a string" << endl;
        cout << "2 for find a string" << endl;
        cout << "3 for delete a string" << endl;
        cout << "4 for traversal" << endl;
        cout << "5 for exit" << endl;
        char str[100];
        switch (getch())
        {
        case '1':
            cout << "Input a string: ";
            cin.getline(str, 100);
            cout << "This string has existed for " << tree.AddStr(str)->reserved++ << " times." << endl;
            break;
        case '2':
            cout << "Input a string: ";
            cin.getline(str, 100);
            trie::nod<int, 127> *find;
            find = tree.FindStr(str);
            if (!find)
                cout << "No found." << endl;
            else
                cout << "This string has existed for " << find->reserved << " times." << endl;
            break;
        case '3':
            cout << "Input a string: ";
            cin.getline(str, 100);
            cout << "The action is " << (tree.DeleteStr(str) ? "Successful." : "Unsuccessful.") << endl;
            break;
        case '4':
            cout << "Traversal the tree: " << endl;
            tree.Traversal();
            break;
        case '5':
            cout << "Exit." << endl;
            return 0;
        }
    }
    return 0;
}

1.2 基数树(Radix Tree)

基数树(Radix Tree)也称为压缩前缀树(compact prefix tree)或 compressed trie,是一种更节省空间的前缀树。在基数树中,当父节点只有一个子节点时,子节点会合并到父节点。

基数树可看做是以二进制位串为关键字的trie 树,是一种多叉树形结构,同时又类似多层索引表,每个中间节点包含指向多个子节点的指针数组,叶子节点包含指向实际的对象的指针(由于对象不具备树节点结构,因此将其父节点看做叶节点)[1]。基数树也被设计成多道树,以提高磁盘交互性能。同时,基数树也是按照字典序来组织叶节点的,这种特点使之适合持久化改造,加上它的多道特点,灵活性较强,适合作为区块链的基础数据结构,构建持久性区块时较好地映射各类数据集合上。基数树支持插入、删除、查找操作。查找包括完全匹配、前缀匹配、前驱查找、后继查找。所有这些操作都是O(k)复杂度,其中k是所有字符串中最大的长度。

基数树中的基 r,表示每个节点最多可以拥有 r 个子节点,其中 r 是 2 的 x 次幂, x≥1 。

基数树示例如下:

1.3 内核中的基数树

Linux 内核中,基数树的键(或者说索引)为整数类型,其相关的数据结构和接口主要在 2 个文件中:

include/linux/radix-tree.h
lib/radix-tree.c

radix tree,又称做基数树,是一种适合于构建key与value相关联的数据结构。在linux内核中,radix tree由 include/linux/radix-tree.h和lib/radix-tree.c两个文件实现。

在linux内核中pagecache就是用radix tree构建的page index与page ptr (指向struct page)的关联。详见struct address_space 中的page_tree。这里的(key,value)对, 就是(index, ptr)对。

struct address_space {
	struct inode		*host;		/* owner: inode, block_device */
	struct radix_tree_root	page_tree;
        ...
}

radix tree中的每一个内部节点最多拥有r个child,r=2^n(n>=1), 这里的n被称做基数。 n个bit称作一个bit簇。

在linux内核的radix tree实现代码中,基数最为最关键的参数被定义成6(RADIX_TREE_MAP_SHIFT),也就是说一个bit簇是6个bit。所以linux内核中radix tree的一个内部节点最多可以有64个child。

radix tree的所有叶子节点都在最下面一层(图中红色节点),其他节点都是为了构建radix tree树而创建出来的内部节点。叶子节点包含index-ptr对中的ptr item, 下图中将ptr用红色来标记为item。

假设key值等于0x840FF, 其二进制按照6bit一簇可以写成,000010-000100 -000011 -111111,从左到右的index值分别为2, 4,3, 63。那么根据key值0x840FF找到value的过程就只需要4步:

  • 第一步,在最上层的节点A中找到index为2的slot,其slot[2]指针指向第二层节点中的节点B。

  • 第二步,在节点B中找到index为4的slot,其slot[4]指针指向第三层节点中的节点C。

  • 第三步,在节点C中找到index为3的slot,其slot[3]指针指向第三层节点中的节点D。

  • 第四步,在节点D中找到index为63的slot,其slot[63]指针指向叶子节点item E。

二、基数树工作原理

‌内核基数树(Radix Tree)的工作原理‌主要体现在其数据结构和操作方法上。基数树是一种多叉搜索树,用于高效存储和检索键值对,特别适用于整数键值的映射。

基数树的核心数据结构包括节点和指针。每个节点最多有 2^n2 n 个子节点(n n 为正整数),这意味着每个节点最多有 64 个子节点(当 n=6 n =6 时)。这种结构使得基数树能够高效地管理稀疏的整数集合。每个节点包含一个指针数组,数组中的每个元素称为槽(slot),用于指向子节点。叶子节点包含实际的数据条目‌。

基数树的主要操作包括插入、查找和删除。插入操作时,根据键值的二进制表示逐位确定节点的位置,逐步向下遍历树直到达到叶子节点。查找操作类似,通过键值的二进制表示在树中遍历,直到找到匹配的叶子节点。删除操作则需要处理节点的引用计数和子节点的调整,确保树的完整性‌

2.1数据结构

⑴radix_tree_root

// file: include/linux/radix-tree.h
/* root tags are stored in gfp_mask, shifted by __GFP_BITS_SHIFT */
struct radix_tree_root {
    unsigned int        height;
    gfp_t           gfp_mask;
    struct radix_tree_node  __rcu *rnode;
};

基数树的树根由结构体 radix_tree_root 表示,它包含 3 个成员:

  • height – 树的高度

  • gfp_mask – 内存分配标识

  • rnode – 指向下一层的 radix_tree_node 节点(如果有的话)

⑵radix_tree_node

// file: lib/radix-tree.c
struct radix_tree_node {
    unsigned int    height;     /* Height from the bottom */
    unsigned int    count;
    union {
        struct radix_tree_node *parent; /* Used when ascending tree */
        struct rcu_head rcu_head;   /* Used when freeing node */
    };
    void __rcu  *slots[RADIX_TREE_MAP_SIZE];
    unsigned long   tags[RADIX_TREE_MAX_TAGS][RADIX_TREE_TAG_LONGS];
};

除根节点外,基数树的其它节点使用结构体 radix_tree_node 来表示。该结构体包含如下字段:

height -- 从最底层到当前层的高度
count -- 子节点数量
parent -- 指向父节点的指针
slots -- 指针数组

如果当前节点为位于树的最底层,则数组中的指针指向数据;否则,指向下一级节点。

⑶height_to_maxindex

height_to_maxindex是一个静态 unsigned long 数组,数组的下标对应着树的高度,值对应着该高度下的基数树所能表示的最大索引值。

// file: lib/radix-tree.c
/*
 * The height_to_maxindex array needs to be one deeper than the maximum
 * path as height 0 holds only 1 entry.
 */
static unsigned long height_to_maxindex[RADIX_TREE_MAX_PATH + 1] __read_mostly;

该数组容量为 RADIX_TREE_MAX_PATH + 1。

⑷radix_tree_preloads

radix_tree_preloads 是个 per-cpu 变量,也就是说每个 CPU 都有一份该变量的副本。其定义如下:

// file: lib/radix-tree.c
static DEFINE_PER_CPU(struct radix_tree_preload, radix_tree_preloads) = { 0, };

这是一个 radix_tree_preload 结构体类型的变量,是为节点预分配的缓存池:

// file: lib/radix-tree.c
/*
 * Per-cpu pool of preloaded nodes
 */
struct radix_tree_preload {
    int nr;
    struct radix_tree_node *nodes[RADIX_TREE_PRELOAD_SIZE];
};

radix_tree_preload 结构体包括 2 个成员:

  • nr – 剩余可分配的成员数量

  • nodes – 节点指针数组。

其中,宏 RADIX_TREE_PRELOAD_SIZE 表示数组的大小,该宏定义如下:

// file: lib/radix-tree.c
/*
 * The radix tree is variable-height, so an insert operation not only has
 * to build the branch to its corresponding item, it also has to build the
 * branch to existing items if the size has to be increased (by
 * radix_tree_extend).
 *
 * The worst case is a zero height tree with just a single item at index 0,
 * and then inserting an item at index ULONG_MAX. This requires 2 new branches
 * of RADIX_TREE_MAX_PATH size to be created, with only the root node shared.
 * Hence:
 */
#define RADIX_TREE_PRELOAD_SIZE (RADIX_TREE_MAX_PATH * 2 - 1)

在 x86-64 架构下,宏 RADIX_TREE_MAX_PATH 扩展为 11,所以 RADIX_TREE_PRELOAD_SIZE 扩展为 21。

从注释中不难看出,缓存池中的节点数量 RADIX_TREE_PRELOAD_SIZE (扩展为 21)是按照最坏情况考虑的,即需要一次性分配 21 个节点的内存。 什么情况下要一次性分配这么多节点呢?就是在基数树高度为 0 且只有一个索引为 0 的数据的情况下,要插入一个最大索引 ULONG_MAX 的数据。此时,两条路径都要扩展到最大高度。

⑸基数树示意图

如果树中的索引不大于 63,那么树的深度就等于 1,因为可能存在的 64 个叶子可以都存放在第一层的节点中。

如果保存的数据索引大于 63,比如 131 ,那么树的高度就增加为 2,这样基数树可以查找的最大索引为 4095 。

2.2接口函数

⑴基数树的创建

创建基数树有 2 种方法:宏 RADIX_TREEINIT_RADIX_TREE

①宏 RADIX_TREE

// file: include/linux/radix-tree.h
#define RADIX_TREE(name, mask) \
    struct radix_tree_root name = RADIX_TREE_INIT(mask)

宏 RADIX_TREE 接收 2 个参数:

  • name – 基数树的名称

  • mask – 请求内存时所用的标志掩码

其内部又调用了 RADIX_TREE_INIT 宏,该宏定义如下:

// file: include/linux/radix-tree.h
#define RADIX_TREE_INIT(mask)   {                   \
    .height = 0,                            \
    .gfp_mask = (mask),                     \
    .rnode = NULL,                          \
}

②宏 INIT_RADIX_TREE

// file: include/linux/radix-tree.h
#define INIT_RADIX_TREE(root, mask)                 \
do {                                    \
    (root)->height = 0;                     \
    (root)->gfp_mask = (mask);                  \
    (root)->rnode = NULL;                       \
} while (0)

宏 INIT_RADIX_TREE 接收 2 个参数:

  • root – 基数树的根节点

  • mask – 请求内存时所用的标志掩码

⑵间接指针相关函数

当 root->rnode 指向 radix_tree_node结构而不是数据项时,被称为间接指针(indirect pointer)。间接指针通过将 root->rnode 的最低位设置为 1 来标识。内核文件对此说明如下:

// file: include/linux/radix-tree.h
/*
 * An indirect pointer (root->rnode pointing to a radix_tree_node, rather
 * than a data item) is signalled by the low bit set in the root->rnode
 * pointer.
 *
 * In this case root->height is > 0, but the indirect pointer tests are
 * needed for RCU lookups (because root->height is unreliable). The only
 * time callers need worry about this is when doing a lookup_slot under
 * RCU.
 *
 * Indirect pointer in fact is also used to tag the last pointer of a node
 * when it is shrunk, before we rcu free the node. See shrink code for
 * details.
 */

⑶标记相关函数

①root_gfp_mask()

root_gfp_mask() 函数用来获取根节点中的页分配标识。当基数树需要为新增节点分配内存时,会以此作为分配标识。 GFP 是 Get Free Page 的缩写。

// file: lib/radix-tree.c
static inline gfp_t root_gfp_mask(struct radix_tree_root *root)
{
    return root->gfp_mask & __GFP_BITS_MASK;
}

在基数树中,普通节点有一个专门的字段 tags 来存储状态标记,但根节点没有对应的字段,它的标记存储在 gfp_mask 字段中。在 gfp_mask 中,低 25 位(位 0 ~ 24)已经被占用,对应着页分配的不同标识。从位 25 开始的其余位,可用于其它用途。

// file: include/linux/gfp.h
/* Plain integer GFP bitmasks. Do not use this directly. */
#define ___GFP_DMA      0x01u
#define ___GFP_HIGHMEM      0x02u
#define ___GFP_DMA32        0x04u
#define ___GFP_MOVABLE      0x08u
#define ___GFP_WAIT     0x10u
#define ___GFP_HIGH     0x20u
#define ___GFP_IO       0x40u
#define ___GFP_FS       0x80u
#define ___GFP_COLD     0x100u
#define ___GFP_NOWARN       0x200u
#define ___GFP_REPEAT       0x400u
#define ___GFP_NOFAIL       0x800u
#define ___GFP_NORETRY      0x1000u
#define ___GFP_MEMALLOC     0x2000u
#define ___GFP_COMP     0x4000u
#define ___GFP_ZERO     0x8000u
#define ___GFP_NOMEMALLOC   0x10000u
#define ___GFP_HARDWALL     0x20000u
#define ___GFP_THISNODE     0x40000u
#define ___GFP_RECLAIMABLE  0x80000u
#define ___GFP_KMEMCG       0x100000u
#define ___GFP_NOTRACK      0x200000u
#define ___GFP_NO_KSWAPD    0x400000u
#define ___GFP_OTHER_NODE   0x800000u
#define ___GFP_WRITE        0x1000000u
/* If the above are modified, __GFP_BITS_SHIFT may need updating */

所以,要想获取 gpf 的标识,只需获取 root->gfp_mask 的低 25 位即可。

__GFP_BITS_MASK 用作获取 gpf 标识的掩码,该宏扩展为 (1<<25)−1 ,其定义如下:

// file: include/linux/gfp.h
#define __GFP_BITS_MASK ((__force gfp_t)((1 << __GFP_BITS_SHIFT) - 1))

#define __GFP_BITS_SHIFT 25 /* Room for N __GFP_FOO bits */

其中, gfp_t 类型是 unsigned 类型的别名。

// file: include/linux/gfp.h
typedef unsigned __bitwise__ gfp_t;

②root_tag_get()

基数树中,每项数据可以存储 3 种标记。root_tag_get() 函数用于获取根节点中指定标记的状态。

该函数接收 2 个参数:

  • root – 根节点

  • tag – 标记类型,一共有 0 ~ 2 共三种类型,对应着 radix_tree_node->tags 的索引。

// file: lib/radix-tree.c
static inline int root_tag_get(struct radix_tree_root *root, unsigned int tag)
{
    return (__force unsigned)root->gfp_mask & (1 << (tag + __GFP_BITS_SHIFT));
}

__GFP_BITS_SHIFT 扩展为 25:

// file: include/linux/gfp.h
#define __GFP_BITS_SHIFT 25 /* Room for N __GFP_FOO bits */

从上文已经知道,在 gfp_mask 中,低 25 位(位 0 ~ 24)对应着页分配的不同标识,从位 25 开始,可用于其它用途。在此处,第 25 ~ 27 位,用作基数树根节点的标记位。

③root_tag_set()

static inline void root_tag_set(struct radix_tree_root *root, unsigned int tag)
{
    root->gfp_mask |= (__force gfp_t)(1 << (tag + __GFP_BITS_SHIFT));
}

root_tag_set() 函数,通过按位或操作,将根节点中指定 tag 类型的标记置位。

④root_tag_clear()

static inline void root_tag_clear(struct radix_tree_root *root, unsigned int tag)
{
    root->gfp_mask &= (__force gfp_t)~(1 << (tag + __GFP_BITS_SHIFT));
}

root_tag_clear() 函数,将根节点中指定 tag 类型的标记清除。

⑤root_tag_clear_all()

static inline void root_tag_clear_all(struct radix_tree_root *root)
{
    root->gfp_mask &= __GFP_BITS_MASK;
}

root_tag_clear_all() 函数,将根节点中的 tag 标记全部清除,只保留 gpf 标识。

⑥tag_set()

// file: lib/radix-tree.c
static inline void tag_set(struct radix_tree_node *node, unsigned int tag,
        int offset)
{
    __set_bit(offset, node->tags[tag]);
}

tag_set() 函数调用位图接口 __set_bit(),将中间节点指定 tag 类型标记的位图中 offset 处的比特位设置为 1。

⑦tag_get()

static inline int tag_get(struct radix_tree_node *node, unsigned int tag,
        int offset)
{
    return test_bit(offset, node->tags[tag]);
}

tag_get() 函数调用位图接口 test_bit() ,获取中间节点指定 tag 类型标记的位图中 offset 处比特位的值。

⑧any_tag_set()

any_tag_set() 函数检查指定节点中是否有特定类型的标记被置位。如果任何一个比特位的标记被置位,该函数返回 1;否则,返回 0。

该函数接收 2 个参数:

  • node – 基数树节点

  • tag – 标记类型

/*
 * Returns 1 if any slot in the node has this tag set.
 * Otherwise returns 0.
 */
static inline int any_tag_set(struct radix_tree_node *node, unsigned int tag)
{
    int idx;
    for (idx = 0; idx < RADIX_TREE_TAG_LONGS; idx++) {
        if (node->tags[tag][idx])
            return 1;
    }
    return 0;
}

该函数的实现非常简单。遍历标记对应的数组,如果任何一个数组成员不为 0,则说明有标记被置位,返回 1;否则,返回 0。

宏 RADIX_TREE_TAG_LONGS 表示位图数组大小。

⑨radix_tree_tag_clear()

// file: lib/radix-tree.c
/**
 *  radix_tree_tag_clear - clear a tag on a radix tree node
 *  @root:      radix tree root
 *  @index:     index key
 *  @tag:       tag index
 *
 *  Clear the search tag (which must be < RADIX_TREE_MAX_TAGS)
 *  corresponding to @index in the radix tree.  If
 *  this causes the leaf node to have no tags set then clear the tag in the
 *  next-to-leaf node, etc.
 *
 *  Returns the address of the tagged item on success, else NULL.  ie:
 *  has the same return value and semantics as radix_tree_lookup().
 */
void *radix_tree_tag_clear(struct radix_tree_root *root,
            unsigned long index, unsigned int tag)
{
    struct radix_tree_node *node = NULL;
    struct radix_tree_node *slot = NULL;
    unsigned int height, shift;
    int uninitialized_var(offset);

    height = root->height;
    if (index > radix_tree_maxindex(height))
        goto out;

    shift = height * RADIX_TREE_MAP_SHIFT;
    slot = indirect_to_ptr(root->rnode);

    while (shift) {
        if (slot == NULL)
            goto out;

        shift -= RADIX_TREE_MAP_SHIFT;
        offset = (index >> shift) & RADIX_TREE_MAP_MASK;
        node = slot;
        slot = slot->slots[offset];
    }

    if (slot == NULL)
        goto out;

    while (node) {
        if (!tag_get(node, tag, offset))
            goto out;
        tag_clear(node, tag, offset);
        if (any_tag_set(node, tag))
            goto out;

        index >>= RADIX_TREE_MAP_SHIFT;
        offset = index & RADIX_TREE_MAP_MASK;
        node = node->parent;
    }

    /* clear the root's tag bit */
    if (root_tag_get(root, tag))
        root_tag_clear(root, tag);

out:
    return slot;
}

radix_tree_tag_clear() 函数将索引查找路径上相关节点的标记清除,该函数接收 3 个参数:

root – 基数树的根节点

  • index – 索引

  • tag – 标记类型

如果索引 index 对应的数据的标记被置位,该函数返回对应数据的指针;否则,返回 NULL

struct radix_tree_node *node = NULL;
    struct radix_tree_node *slot = NULL;
    unsigned int height, shift;
    int uninitialized_var(offset);

首先,声明了一批变量,其中 nodeslot 被初始化为 NULL。其中,node

height = root->height;
    if (index > radix_tree_maxindex(height))
        goto out;

如果指定的索引值超出当前高度所能表示的最大索引,说明该索引不存在,直接跳转到 out 标签处。

out:
    return slot;

out 标签处,会返回 slot 的值,此时 slotNULL

shift = height * RADIX_TREE_MAP_SHIFT;
    slot = indirect_to_ptr(root->rnode);

计算初始位偏移 shift 并将 slot 转换成普通指针。

while (shift) {
        if (slot == NULL)
            goto out;

        shift -= RADIX_TREE_MAP_SHIFT;
        offset = (index >> shift) & RADIX_TREE_MAP_MASK;
        node = slot;
        slot = slot->slots[offset];
    }

接下来是一个 while 循环,从 rnode 节点开始,从上向下逐层查找。在循环中,node 指向父节点,slot 指向子节点。每降低一层,shift 的值就会减少 RADIX_TREE_MAP_SHIFT。offset 计算出 index 对应的当前层的槽位。如果 shift 不为 0 时,slot 为 NULL ,说明要查找的索引不存在,不需要清除标记,直接跳转到 out 标签处执行。

if (slot == NULL)
        goto out;

如果查到到最底层,发现 slotNULL,说明该索引没有对应的值,不需要清除标记,直接跳转到 out 标签处执行。

while (node) {
        if (!tag_get(node, tag, offset))
            goto out;
        tag_clear(node, tag, offset);
        if (any_tag_set(node, tag))
            goto out;

        index >>= RADIX_TREE_MAP_SHIFT;
        offset = index & RADIX_TREE_MAP_MASK;
        node = node->parent;
    }

现在,我们已经来到了最底层节点。此时 node 指向最底层节点, slot 指向索引 index 对应的数据, offset 表示索引 index 对应数据在最底层节点的槽位。接下来要做的工作,就是从最底层节点开始,从底向上逐层清除标记,这是在 while 循环中完成的。

我们来看一下 while 循环所做的工作。

调用 tag_get() 函数获取 node 节点对应槽位的标记。如果该值为 0,说明不需要清除,直接跳转到 out 标签处执行。否则,继续执行下述工作。

调用 tag_clear() 函数清除对应槽位的标记。如果该节点中还有其它位置的标记被置位,则不需要处理上层节点了,清除工作结束,直接跳转到 out 标签处执行。

接下来,就是更新 offset 使其等于上一层的槽位,更新 node 使其指向父节点。

然后进入下一循环,开始处理上一层节点。

/* clear the root's tag bit */
    if (root_tag_get(root, tag))
        root_tag_clear(root, tag);

接下来处理 while 循环之后和 out 标签之间的代码。程序走到这里,说明到达了根节点,需要清除根节点的 tag 标记。

out:
    return slot;

最后,到达 out 标签,返回 slot 的值。如果 index 对应的数据的标记被清除,返回该数据的指针;否则,返回 NULL

⑩radix_tree_tag_set()

radix_tree_tag_set() 函数将索引搜索路径上相关节点的标记置位。索引对应的数据必须存在,否则就是 bug。该函数会返回被打上标记的数据的地址。

/**
 *  radix_tree_tag_set - set a tag on a radix tree node
 *  @root:      radix tree root
 *  @index:     index key
 *  @tag:       tag index
 *
 *  Set the search tag (which must be < RADIX_TREE_MAX_TAGS)
 *  corresponding to @index in the radix tree.  From
 *  the root all the way down to the leaf node.
 *
 *  Returns the address of the tagged item.   Setting a tag on a not-present
 *  item is a bug.
 */
void *radix_tree_tag_set(struct radix_tree_root *root,
            unsigned long index, unsigned int tag)
{
    unsigned int height, shift;
    struct radix_tree_node *slot;

    height = root->height;
    BUG_ON(index > radix_tree_maxindex(height));

    slot = indirect_to_ptr(root->rnode);
    shift = (height - 1) * RADIX_TREE_MAP_SHIFT;

    while (height > 0) {
        int offset;

        offset = (index >> shift) & RADIX_TREE_MAP_MASK;
        if (!tag_get(slot, tag, offset))
            tag_set(slot, tag, offset);
        slot = slot->slots[offset];
        BUG_ON(slot == NULL);
        shift -= RADIX_TREE_MAP_SHIFT;
        height--;
    }

    /* set the root's tag bit */
    if (slot && !root_tag_get(root, tag))
        root_tag_set(root, tag);

    return slot;
}

该函数接收 3 个参数:

  • root – 基数树的根

  • index – 索引

  • tag – 标记类型

unsigned int height, shift;
    struct radix_tree_node *slot;

    height = root->height;
    BUG_ON(index > radix_tree_maxindex(height));

如果索引 index 的值超出基数树的最大索引,这是内核 bug,调用 BUG_ON() 进行处理。

slot = indirect_to_ptr(root->rnode);
    shift = (height - 1) * RADIX_TREE_MAP_SHIFT;

slot 初始化为 rnode 节点, shift 初始化为 rnode 节点对应的位偏移。

while (height > 0) {
        int offset;

        offset = (index >> shift) & RADIX_TREE_MAP_MASK;
        if (!tag_get(slot, tag, offset))
            tag_set(slot, tag, offset);
        slot = slot->slots[offset];
        BUG_ON(slot == NULL);
        shift -= RADIX_TREE_MAP_SHIFT;
        height--;
    }

接下来是一个 while 循环。在循环内部,首先根据 shift 值计算出 index 对应的当前层槽位 offset;然后检查该槽位对应的标记,如果该槽位没有设置标记,则将对应的标记位置位。将 slot 指向下一层节点,如果该节点为 NULL,说明 index 路径上出现空缺,这是一个bug,调用 BUG_ON() 进行处理。再接着,更新 shift 的值为下一层位偏移,并将高度减一,进入下一层继续执行。

/* set the root's tag bit */
    if (slot && !root_tag_get(root, tag))
        root_tag_set(root, tag);

    return slot;

最后,给根节点打上标记,并返回 slot 的值。

⑷基数树相关函数

①radix_tree_maxindex()

radix_tree_maxindex() 函数根据指定的高度,返回该高度下基数树所能表示的最大索引。

// file: lib/radix-tree.c
/*
 *  Return the maximum key which can be store into a
 *  radix tree with height HEIGHT.
 */
static inline unsigned long radix_tree_maxindex(unsigned int height)
{
    return height_to_maxindex[height];
}

由于变量 height_to_maxindex 中存储了不同高度对应的最大索引值,这里直接获取即可。

②radix_tree_lookup_element()

radix_tree_lookup_element() 查找指定索引 index 对应的元素。

// file: lib/radix-tree.c
/*
 * is_slot == 1 : search for the slot.
 * is_slot == 0 : search for the node.
 */
static void *radix_tree_lookup_element(struct radix_tree_root *root,
                unsigned long index, int is_slot)
{
    unsigned int height, shift;
    struct radix_tree_node *node, **slot;

    node = rcu_dereference_raw(root->rnode);
    if (node == NULL)
        return NULL;

    if (!radix_tree_is_indirect_ptr(node)) {
        if (index > 0)
            return NULL;
        return is_slot ? (void *)&root->rnode : node;
    }
    node = indirect_to_ptr(node);

    height = node->height;
    if (index > radix_tree_maxindex(height))
        return NULL;

    shift = (height-1) * RADIX_TREE_MAP_SHIFT;

    do {
        slot = (struct radix_tree_node **)
            (node->slots + ((index>>shift) & RADIX_TREE_MAP_MASK));
        node = rcu_dereference_raw(*slot);
        if (node == NULL)
            return NULL;

        shift -= RADIX_TREE_MAP_SHIFT;
        height--;
    } while (height > 0);

    return is_slot ? (void *)slot : indirect_to_ptr(node);
}

radix_tree_lookup_element() 函数接收 3 个参数:

  • root – 基数树的根

  • index – 索引值

  • is_slot – 查找标识。0 - 返回数据指针;1 - 返回数据所在槽位的指针。

unsigned int height, shift;
    struct radix_tree_node *node, **slot;

    node = rcu_dereference_raw(root->rnode);
    if (node == NULL)
        return NULL;

在函数内部,首先通过 rcu_dereference_raw(root->rnode) 获取到指针 root->rnode。如果该值为 NULL,说明这是一颗空树,直接返回 NULL。

if (!radix_tree_is_indirect_ptr(node)) {
        if (index > 0)
            return NULL;
        return is_slot ? (void *)&root->rnode : node;
    }

接下来通过 radix_tree_is_indirect_ptr() 函数检测 root->rnode 是否间接指针。

如果不是间接指针,说明 root->rnode 直接指向数据项,此时只有一个数据,index 为 0。如果 index > 0,该索引对应的数据肯定是不存在的,所以直接返回 NULL;否则,根据 is_slot 的值返回不同的指针类型,当 is_slot 为真时,说明要返回是槽位指针,将 &root->rnode 转换成 void *类型返回;否则,说明要返回的是数据指针,直接返回 node,即 root->rnode。

接下来,处理 root->rnode 为间接指针的情况,即 root->rnode 指向中间节点。

node = indirect_to_ptr(node);

首先,将 node 转换成普通指针。此处, node 是根节点的子节点。

height = node->height;
    if (index > radix_tree_maxindex(height))
        return NULL;

通过 node->height 获取到树的高度,并赋值给变量 height

通过 radix_tree_maxindex(height) 获取到当前高度对应的最大索引值。如果给定的索引值 index 大于高度对应的最大索引值,说明要查找的索引超出当前高度能表示的范围,直接返回 NULL

shift = (height-1) * RADIX_TREE_MAP_SHIFT;

我们了解到在 4 级分页下,虚拟地址被分割成 5 个部分,每一部分表示不同层级的索引。为了方便计算每一层级的索引值,内核为他们定义了偏移量。

与线性地址转换类似,在基数树中,索引也会按照层数被分割成不同的部分,每一层也有着对应的位数。第 2 ~ 11 层,每层对应着 6 个比特位,第 1 层对应着 4 个(当树高度为 11 时)或 6 个(当树高度小于 11 时)比特位。

此处变量 shift 就用来表示偏移量。当前处于第 1 层,我们要获取到索引 index 中的最高的 6 位(或 4 位),所以位偏移为 (height-1) * RADIX_TREE_MAP_SHIFT

do {
        slot = (struct radix_tree_node **)
            (node->slots + ((index>>shift) & RADIX_TREE_MAP_MASK));
        node = rcu_dereference_raw(*slot);
        if (node == NULL)
            return NULL;

        shift -= RADIX_TREE_MAP_SHIFT;
        height--;
    } while (height > 0);

接下来,通过 while 循环,从第一层开始,从上向下逐层进行查找。

RADIX_TREE_MAP_MASK 是一个掩码,该掩码的低 6 位全为 1,其定义如下:

// file: lib/radix-tree.c
#define RADIX_TREE_MAP_MASK (RADIX_TREE_MAP_SIZE-1)

在查找的过程中,node 指向父节点,slot 指向子节点所在槽位。查找方式类似于虚拟地址到物理地址的转换。 先从第一层开始,此时 node 是第一层的节点,node->slots 得到当前节点 slots 成员的地址,然后通过 (index>>shift) & RADIX_TREE_MAP_MASK) 获取到 index 在 slots 数组中的对应槽位,node->slots + ((index>>shift) & RADIX_TREE_MAP_MASK)就得到了该索引对应的 slots 数组成员的地址,并赋值给 slot,这是一个二级指针。

然后调用 rcu_dereference_raw() 函数对指针 slot 解引用,获取到指向下一层节点的指针,并赋值给 node。如果 node == NULL,说明基数树中没有该索引对应的数据,直接返回 NULL。否则,执行 shift -= RADIX_TREE_MAP_SHIFT 及 height–,进入下一层继续搜索。

当 height 等于 0 时,搜索执行完毕,并找到了索引 index 对应的数据(未找到的话,会提前返回 NULL)。此时,slot 是数据所在槽位的地址,node 是指向数据的指针。

return is_slot ? (void *)slot : indirect_to_ptr(node);

最后,如果 is_slot为真,需要返回槽位 slot 的指针,则将 slot 转换成 (void *)类型的指针并返回;否则,表示要返回数据指针,则将 node 转换成普通指针并返回。

③ radix_tree_lookup()

// file: lib/radix-tree.c
/**
 *  radix_tree_lookup    -    perform lookup operation on a radix tree
 *  @root:      radix tree root
 *  @index:     index key
 *
 *  Lookup the item at the position @index in the radix tree @root.
 *
 *  This function can be called under rcu_read_lock, however the caller
 *  must manage lifetimes of leaf nodes (eg. RCU may also be used to free
 *  them safely). No RCU barriers are required to access or modify the
 *  returned item, however.
 */
void *radix_tree_lookup(struct radix_tree_root *root, unsigned long index)
{
    return radix_tree_lookup_element(root, index, 0);
}

radix_tree_lookup() 函数从基数树中查找 index 位置处的数据,并返回指向该数据的指针。

其内部调用 radix_tree_lookup_element() 函数来实现查找功能。

④radix_tree_lookup_slot()

// file: lib/radix-tree.c
/**
 *  radix_tree_lookup_slot    -    lookup a slot in a radix tree
 *  @root:      radix tree root
 *  @index:     index key
 *
 *  Returns:  the slot corresponding to the position @index in the
 *  radix tree @root. This is useful for update-if-exists operations.
 *
 *  This function can be called under rcu_read_lock iff the slot is not
 *  modified by radix_tree_replace_slot, otherwise it must be called
 *  exclusive from other writers. Any dereference of the slot must be done
 *  using radix_tree_deref_slot.
 */
void **radix_tree_lookup_slot(struct radix_tree_root *root, unsigned long index)
{
    return (void **)radix_tree_lookup_element(root, index, 1);
}

radix_tree_lookup_slot() 函数从基数树中查找 index 位置处的数据,返回对应的槽位 slot 的指针。其内部调用 radix_tree_lookup_element() 函数来实现查找功能。

⑤radix_tree_node_alloc()

radix_tree_node_alloc() 函数用来为节点申请内存。

/*
 * This assumes that the caller has performed appropriate preallocation, and
 * that the caller has pinned this thread of control to the current CPU.
 */
static struct radix_tree_node *
radix_tree_node_alloc(struct radix_tree_root *root)
{
    struct radix_tree_node *ret = NULL;
    gfp_t gfp_mask = root_gfp_mask(root);

    if (!(gfp_mask & __GFP_WAIT)) {
        struct radix_tree_preload *rtp;

        /*
         * Provided the caller has preloaded here, we will always
         * succeed in getting a node here (and never reach
         * kmem_cache_alloc)
         */
        rtp = &__get_cpu_var(radix_tree_preloads);
        if (rtp->nr) {
            ret = rtp->nodes[rtp->nr - 1];
            rtp->nodes[rtp->nr - 1] = NULL;
            rtp->nr--;
        }
    }
    if (ret == NULL)
        ret = kmem_cache_alloc(radix_tree_node_cachep, gfp_mask);

    BUG_ON(radix_tree_is_indirect_ptr(ret));
    return ret;
}

首先,调用 root_gfp_mask() 函数获取到根节点中存储的页分配标识,并赋值给 gfp_mask。

如果页标识里没有 ___GFP_WAIT 标记,说明在分配内存时不能被打断,那么直接从预分配的缓存池中为节点分配空间。

// file: include/linux/gfp.h
#define __GFP_WAIT  ((__force gfp_t)___GFP_WAIT)    /* Can wait and reschedule? */

__get_cpu_var(radix_tree_preloads) 函数用来获取 per-cpu 变量 radix_tree_preloads,然后将变量地址赋值给 rtp。

获取到 radix_tree_preloads 的地址后,判断内存池中是否还有可用空间。如果 rtp->nr 为真,说明还有可用的空间,则将 nodes 数组中第 rtp->nr - 1 个成员(节点地址)赋值给 ret ,然后将该成员的值设置为 NULL ,表示不可用;最后, rtp->nr 自减一,表示可用的空间又减少一个。

如果 ret == NULL,说明从内存池分配失败,则调用 kmem_cache_alloc() 函数,从 slab 分配器中分配。

最后,将分配的内存地址 ret 返回。

⑹radix_tree_extend()

当树的高度不足以保存指定 index 处的数据时,需要对树进行扩展,也就是要增加高度。radix_tree_extend() 函数用来实现基数树高度的扩展。

// file: lib/radix-tree.c
/*
 *  Extend a radix tree so it can store key @index.
 */
static int radix_tree_extend(struct radix_tree_root *root, unsigned long index)
{
    struct radix_tree_node *node;
    struct radix_tree_node *slot;
    unsigned int height;
    int tag;

    /* Figure out what the height should be.  */
    height = root->height + 1;
    while (index > radix_tree_maxindex(height))
        height++;

    if (root->rnode == NULL) {
        root->height = height;
        goto out;
    }

    do {
        unsigned int newheight;
        if (!(node = radix_tree_node_alloc(root)))
            return -ENOMEM;

        /* Propagate the aggregated tag info into the new root */
        for (tag = 0; tag < RADIX_TREE_MAX_TAGS; tag++) {
            if (root_tag_get(root, tag))
                tag_set(node, tag, 0);
        }

        /* Increase the height.  */
        newheight = root->height+1;
        node->height = newheight;
        node->count = 1;
        node->parent = NULL;
        slot = root->rnode;
        if (newheight > 1) {
            slot = indirect_to_ptr(slot);
            slot->parent = node;
        }
        node->slots[0] = slot;
        node = ptr_to_indirect(node);
        rcu_assign_pointer(root->rnode, node);
        root->height = newheight;
    } while (height > root->height);
out:
    return 0;
}

该函数接收 2 个参数:

  • root – 基数树的根节点

  • index – 要存储的索引

/* Figure out what the height should be.  */
    height = root->height + 1;
    while (index > radix_tree_maxindex(height))
        height++;

首先计算出,为了能够存储索引 index 所需要的树的高度 height

if (root->rnode == NULL) {
        root->height = height;
        goto out;
    }

如果 root->rnode == NULL ,说明这是一棵空树,那么直接将树根的高度设置为 height,跳转到 out 标签处执行。

out:
    return 0;

out 标签处,直接返回 0。

2.3基数树特点

⑴为稀疏树提供有效存储,代替固定尺寸数组提供键值到指针的快速查找。

基数树是一种多叉搜索树,其叶子结点是实际的数据条目,每个结点有固定数量的指针指向子结点。在面对稀疏树的存储需求时,基数树能够发挥出极大的优势。例如,Linux radix 树每个结点有特定数量的 slot(当配置为每个结点有 2^6 = 64 个 slot 时),与数据类型 long 的位数相同。通过这种方式,基数树可以为稀疏树提供有效的存储,避免了使用固定尺寸数组时可能出现的大量空间浪费,同时又能快速地进行键值到指针的查找。

⑵支持插入、删除和查找等操作,时间复杂度均为 O (k),其中 k 是集合中全部元素的最大长度。

无论是插入、删除还是查找操作,基数树都能在相对较短的时间内完成。以对 romane、romanus、romulus、rubens、ruber、rubicon、rubicundus 七个字符串进行插入操作为例,开始插入 romane 时,直接创建一个 “romane” 节点,并将该节点结束标记设为 true,随即完成插入。

接着插入 romanus,从根节点开始逐个字符进行比较,发现两者前缀 “roman” 相同,需要分割原来的 “romane” 节点,先创建一个新的公共前缀 “roman” 节点,然后将原来的 “romane” 节点设为 “e”,并将新的公共前缀节点指向 “e” 子节点,接着继续创建一个新的 “us” 节点,最后将公共前缀 “roman” 节点指向 “us” 子节点,并将 “us” 节点结束标记设为 true。后续的插入操作类似,通过这种方式,基数树能够高效地进行插入操作,且时间复杂度为 O (k)。对于查找操作,基数树从根节点开始,沿着字符路径进行查找,能够快速确定目标字符串是否存在于树中。删除操作也类似,先找到目标节点,然后根据树的结构进行调整,时间复杂度同样为 O (k)。

⑶采用空间换时间的策略,通过标签扩展提高查找效率。

Linux radix 树的独特特征是标签扩展,每个结点的 tags 域是一个二维数组,每个 slot 用 2 位标识,这是典型的用空间换时间的应用。例如在查找 PG_dirty 的页面时,就不需要遍历整个树,而可以跳过那些 tags [0] 为 0 值的子树,这样就提高了查找效率。

基数树在页缓存管理中也有广泛应用,结构 address_space 通过 radix 树跟踪绑定到地址映射上的核心页,该 radix 树允许内存管理代码快速查找标识为 dirty 或 writeback 的页。在 IP 路由检索、文本文档的倒排索引等场景中,基数树也能通过标签扩展等策略,提高查找效率。

2.4实现方式

上面介绍了工作原理及特点,下面用 C++ 来简单实现一个 Radix Tree 的 demo。

▼数据结构

首先定义 RadixTreeNode,我们需要用一个 string 来存储字符串前缀,用一个 bool 变量来标识当前路径是否构成一个完整的字符串,再用一个哈希表来存储所有的子节点(这里不用数组的原因是删除一个节点时,需要偏移多个节点,且查找时需要遍历数组)。

class RadixTreeNode {
public:
  explicit RadixTreeNode(const string &word = "", bool is_end = false)
      : word(word), is_end(is_end) {}

  unordered_set<shared_ptr<RadixTreeNode>> children;
  string word;
  bool is_end;
};

接着定义 RadixTree,首先我们需要存储一个 root 节点的指针,由于 C++ 中没有 GC 机制,为了避免内存泄漏,这里统一用智能指针来进行管理。这里实现了基本的插入、删除、查找函数,以及递归调用的辅助函数。

class RadixTree {
public:
  RadixTree() : root(make_shared<RadixTreeNode>()){};

  virtual ~RadixTree() = default;

  RadixTree(const RadixTree &) = delete;
  RadixTree &operator=(const RadixTree &) = delete;

  void insert(const string &str);

  void erase(const string &str);

  bool search(const string &str);

private:
  shared_ptr<RadixTreeNode> root;

  void insert_helper(const string &str, shared_ptr<RadixTreeNode> node);

  shared_ptr<RadixTreeNode> erase_helper(const string &str,
                                         shared_ptr<RadixTreeNode> node);

  bool search_helper(const string &str, shared_ptr<RadixTreeNode> node);

▼插入

①如果插入的是空字符串,则直接将根节点标记为完整字符串,否则继续往下。

②遍历当前节点的子节点,共有以下三种情况:

  • 当前节点的内容与字符串完全匹配:将当前前缀标记为完整字符串。

  • 当前节点的内容是字符串的前缀:此时将字符串拆分为公共前缀和剩余字符,用剩余字符与该子节点继续递归进行查找,寻找合适的插入位置,继续回到流程 2。

  • 当前节点的内容和字符串具有公共前缀:此时将当前节点的内容拆分为公共前缀,剩余后缀两部分。当前节点保留前缀内容,将后缀作为子节点插入,此时如果字符串还有剩余字符,则将其也作为子节点一同插入。

③子节点中没有与之具有公共前缀的节点:将字符串的内容直接作为新的子节点插入。

void insert(const string &str) {
  if (str.empty()) {
    root->is_end = true;
  } else {
    insert_helper(str, root);
  }
}

void insert_helper(const string &str, shared_ptr<RadixTreeNode> node) {
  // 如果当前没有子节点,则直接作为新的子节点
  if (node->children.empty()) {
    auto new_node = make_shared<RadixTreeNode>(str, true);
    node->children.insert(new_node);
    return;
  }

  bool is_match = false;
  for (auto current : node->children) {
    int i = 0;
    for (; i < str.size() && i < current->word.size(); i++) {
      if (str[i] != current->word[i]) {
        break;
      }
    }
    if (i != 0) {
      is_match = true;
      // 情况一:当前节点的内容与字符串完全匹配,则直接将该前缀标记为完整
      if (i == str.size() && i == current->word.size()) {
        current->is_end = true;
      } else if (i != current->word.size()) {
        // 如果当前节点的内容是字符串的部分前缀,则进行分裂
        auto new_node = make_shared<RadixTreeNode>(current->word.substr(i),
                                                   current->is_end);

        current->word = current->word.substr(0, i);
        current->is_end = (i == str.size()) ? true : false;
        current->children.swap(new_node->children);
        current->children.insert(new_node);

        if (i != str.size()) {
          auto new_node2 = make_shared<RadixTreeNode>(str.substr(i), true);
          current->children.insert(new_node2);
        }
      } else {
        // 如果当前节点已匹配完,则继续往子节点匹配
        insert_helper(str.substr(i), current);
      }
      if (is_match) {
        return;
      }
    }
  }
  // 如果没有找到,则直接插入
  auto new_node = make_shared<RadixTreeNode>(str, true);
  node->children.insert(new_node);
}

▼删除

⑴如果删除的是空字符串,则直接将 root 标记为非完整字符串。

⑵遍历当前节点的子节点:

①当前节点的内容与字符串完全匹配:

  • 当前节点有子节点:将当前节点标记为非完整字符串。

  • 当前节点没子节点:直接删除该节点。此时如果在删除了该节点后,当前节点的父节点仅剩下一个子节点,并且父节点中存储一个不完整字符串,此时可以将当前节点和父节点合并,用于压缩路径。

②当前节点的内容是字符串的前缀:将字符串拆分为公共前缀和剩余后缀,用后缀继续向下递归查找到合适的位置进行删除。

③字符串是当前节点内容的前缀:该字符串一定不在树中,删除结束。

void erase(const string &str) {
  if (str.empty()) {
    root->is_end = false;
  } else {
    erase_helper(str, root);
  }
}

shared_ptr<RadixTreeNode> erase_helper(const string &str,
                                       shared_ptr<RadixTreeNode> node) {
  bool is_match = false;
  for (auto current : node->children) {
    int i = 0;
    for (; i < str.size() && i < current->word.size(); i++) {
      if (str[i] != current->word[i]) {
        break;
      }
    }
    if (i != 0) {
      is_match = true;

      // 情况一:当前节点的内容与字符串完全匹配
      if (i == str.size() && i == current->word.size()) {
        // 如果该节点没有子节点,则将该节点删除。否则将is_end标记为false
        if (current->children.empty()) {
          node->children.erase(current);
        } else {
          current->is_end = false;
        }

        // 如果删除了该节点后,父节点仅剩下一个子节点,且父节点不完整,则将两个节点合并
        if (node->children.size() == 1 && !node->is_end && node != root) {
          auto sub_node = *node->children.begin();
          node->children.erase(sub_node);
          node->is_end = sub_node->is_end;
          node->word.append(sub_node->word);
          node->children = sub_node->children;
          return node;
        }
      }
      // 情况二:当前节点是字符串的前缀
      else if (i == current->word.size()) {
        // 继续向下搜索,如果返回值不为空则说明需要合并节点
        auto sub_node = erase_helper(str.substr(i), current);
        if (sub_node && node->children.size() == 1 && !node->is_end &&
            node != root) {
          auto sub_node = *node->children.begin();
          node->children.erase(sub_node);
          node->is_end = sub_node->is_end;
          node->word.append(sub_node->word);
          node->children = sub_node->children;
        }
      }
      // 情况三:字符串是当前节点的前缀,此时字符串必定不存在,删除结束
      else {
        break;
      }
    }
    if (is_match) {
      return nullptr;
    }
  }
  return nullptr;
}

▼查找

查找实现的逻辑如下:

  1. 从根节点出发,如果字符串为空,判断空字符有没有存储到根节点,没有往下执行。

  2. 遍历当前节点的所有子节点,查找是否存在公共前缀,此时存在以下四种情况:

    • 当前节点的内容与字符串完全匹配:此时根据当前路径是否为完整单词,判断查找是否成功。

    • 当前节点的内容是字符串的前缀:将字符串拆分为前缀和剩余后缀,将后缀继续递归到该节点的子节点处继续查询,重复流程 2

    • 字符串是当前节点内容的前缀:查找失败,这一部分前缀必定没有插入到树中。

    • 无公共前缀:继续遍历下一个子节点,如果已经遍历完,则认为查找失败,该字符串不存在。

bool search(const string &str) {
  if (str.empty()) {
    return root->is_end;
  }
  return search_helper(str, root);
}

bool search_helper(const string &str, shared_ptr<RadixTreeNode> node) {
  for (auto current : node->children) {
    int i = 0;
    for (; i < str.size() && i < current->word.size(); i++) {
      if (str[i] != current->word[i]) {
        break;
      }
    }
    if (i != 0) {
      // 情况一:当前节点的内容与字符串完全匹配,根据是否为完整单词判断结果
      if (i == str.size() && i == current->word.size()) {
        return current->is_end;
      }
      // 情况二:当前节点的内容是字符串的前缀
      else if (i == current->word.size()) {
        return search_helper(str.substr(i), current);
      }
      // 情况三:字符串的内容是当前节点的前缀,直接返回错误
      else {
        return false;
      }
    }
  }
  // 没有找到
  return false;
}

完整代码

//
// Created by orekilee on 2023/3/31.
//

#ifndef RADIX_RADIXTREE_CPP
#define RADIX_RADIXTREE_CPP

#include <iostream>
#include <memory>
#include <string>
#include <unordered_set>

using namespace std;

class RadixTreeNode {
public:
  explicit RadixTreeNode(const string &word = "", bool is_end = false)
      : word(word), is_end(is_end) {}

  unordered_set<shared_ptr<RadixTreeNode>> children;
  string word;
  bool is_end;
};

class RadixTree {
public:
  RadixTree() : root(make_shared<RadixTreeNode>()){};

  virtual ~RadixTree() = default;

  RadixTree(const RadixTree &) = delete;
  RadixTree &operator=(const RadixTree &) = delete;

  void insert(const string &str) {
    if (str.empty()) {
      root->is_end = true;
    } else {
      insert_helper(str, root);
    }
  }

  void erase(const string &str) {
    if (str.empty()) {
      root->is_end = false;
    } else {
      erase_helper(str, root);
    }
  }

  bool search(const string &str) {
    if (str.empty()) {
      return root->is_end;
    }
    return search_helper(str, root);
  }

private:
  shared_ptr<RadixTreeNode> root;

  void insert_helper(const string &str, shared_ptr<RadixTreeNode> node) {
    // 如果当前没有子节点,则直接作为新的子节点
    if (node->children.empty()) {
      auto new_node = make_shared<RadixTreeNode>(str, true);
      node->children.insert(new_node);
      return;
    }

    bool is_match = false;
    for (auto current : node->children) {
      int i = 0;
      for (; i < str.size() && i < current->word.size(); i++) {
        if (str[i] != current->word[i]) {
          break;
        }
      }
      if (i != 0) {
        is_match = true;
        // 情况一:当前节点的内容与字符串完全匹配,则直接将该前缀标记为完整
        if (i == str.size() && i == current->word.size()) {
          current->is_end = true;
        } else if (i != current->word.size()) {
          // 如果当前节点的内容是字符串的部分前缀,则进行分裂
          auto new_node = make_shared<RadixTreeNode>(current->word.substr(i),
                                                     current->is_end);

          current->word = current->word.substr(0, i);
          current->is_end = (i == str.size()) ? true : false;
          current->children.swap(new_node->children);
          current->children.insert(new_node);

          if (i != str.size()) {
            auto new_node2 = make_shared<RadixTreeNode>(str.substr(i), true);
            current->children.insert(new_node2);
          }
        } else {
          // 如果当前节点已匹配完,则继续往子节点匹配
          insert_helper(str.substr(i), current);
        }
        if (is_match) {
          return;
        }
      }
    }
    // 如果没有找到,则直接插入
    auto new_node = make_shared<RadixTreeNode>(str, true);
    node->children.insert(new_node);
  }

  shared_ptr<RadixTreeNode> erase_helper(const string &str,
                                         shared_ptr<RadixTreeNode> node) {
    bool is_match = false;
    for (auto current : node->children) {
      int i = 0;
      for (; i < str.size() && i < current->word.size(); i++) {
        if (str[i] != current->word[i]) {
          break;
        }
      }
      if (i != 0) {
        is_match = true;

        // 情况一:当前节点的内容与字符串完全匹配
        if (i == str.size() && i == current->word.size()) {
          // 如果该节点没有子节点,则将该节点删除。否则将is_end标记为false
          if (current->children.empty()) {
            node->children.erase(current);
          } else {
            current->is_end = false;
          }

          // 如果删除了该节点后,父节点仅剩下一个子节点,且父节点不完整,则将两个节点合并
          if (node->children.size() == 1 && !node->is_end && node != root) {
            auto sub_node = *node->children.begin();
            node->children.erase(sub_node);
            node->is_end = sub_node->is_end;
            node->word.append(sub_node->word);
            node->children = sub_node->children;
            return node;
          }
        }
        // 情况二:当前节点是字符串的前缀
        else if (i == current->word.size()) {
          // 继续向下搜索,如果返回值不为空则说明需要合并节点
          auto sub_node = erase_helper(str.substr(i), current);
          if (sub_node && node->children.size() == 1 && !node->is_end &&
              node != root) {
            auto sub_node = *node->children.begin();
            node->children.erase(sub_node);
            node->is_end = sub_node->is_end;
            node->word.append(sub_node->word);
            node->children = sub_node->children;
          }
        }
        // 情况三:字符串是当前节点的前缀,此时必定查询失败
        else {
          break;
        }
      }
      if (is_match) {
        return nullptr;
      }
    }
    return nullptr;
  }

  bool search_helper(const string &str, shared_ptr<RadixTreeNode> node) {
    for (auto current : node->children) {
      int i = 0;
      for (; i < str.size() && i < current->word.size(); i++) {
        if (str[i] != current->word[i]) {
          break;
        }
      }
      if (i != 0) {
        // 情况一:当前节点的内容与字符串完全匹配,根据是否为完整单词判断结果
        if (i == str.size() && i == current->word.size()) {
          return current->is_end;
        }
        // 情况二:当前节点的内容是字符串的前缀
        else if (i == current->word.size()) {
          return search_helper(str.substr(i), current);
        }
        // 情况三:字符串的内容是当前节点的前缀,直接返回错误
        else {
          return false;
        }
      }
    }
    // 没有找到
    return false;
  }
};

#endif // RADIX_RADIXTREE_CPP

三、基数树在内核的应用

3.1文件缓存页管理

早期内核用共同散列表管理文件页缓存,存在并发访问性能问题。在较早版本的内核中(比如 2.4.0),文件页缓存是通过共同的散列表 page_hash_table 组织的,根据缓存页对应的 index 进行 hash,虽然能较快地搜索到指定文件的指定页,且没有太多额外内存消耗,但所有访问文件都通过同一个散列表缓存页,查询时都通过自旋锁 pagecache_lock,降低了多进程的并发访问性能。

2.6 内核中用各文件地址空间自行管理缓存页,采用基数树管理文件页,提高了并发性能。Linux 2.6 内核的文件页是通过基数树管理的,页索引决定了其在树中的位置。文件地址空间对象的数据结构中,page_tree 即指向基数树的根,该指针指向 radix_tree_root 结构。radix_tree_root 中的 rnode 指向根节点,根节点是一个 radix_tree_node 结构,其中 height 为节点的高度,count 指示孩子节点数,slots 为子节点指针数组,对于叶节点则指向对应的页结构,tags 数组使用位图分别指示各子树是否包含有对应标志的页,如脏和写回标志。

3.2进程间通信 ipc 对象管理

较早内核中 ipc 对象用固定数组管理,存在性能问题。较早内核(如 2.6.11)中的 ipc 对象(比如共享内存对象 shm)是用固定数组管理的,对象 id 为数组的下标,当对象数量剧增,原有数组对象数不够时就要通过 grow_ary () 重新分配新的数组,然后进行新旧数组间的内容拷贝,当对象数量变化较大时就要面临数组的频繁分配和释放,这对性能不利。

2.6.24 中 ipc 对象改用基数树管理,提高了动态性能。在 2.6.24 中 ipc 对象改用基数树 idr 管理,虽然通过 idr 定位对象不像数组那么直接(时间复杂度为树的高度),但是换取了很好的动态性能,增加对象时不会面临大规模的内存分配,只需要创建一个或几个(扩展树时)树节点,并且获取空闲 id 的性能比数组要好,这直接影响了插入新对象的速度。idr 结构用于管理包含 ipc 对象的基数树,以对象 id 为索引,其中 idr_layer 是树节点结构,top 指向根节点,layers 是树的高度,id_free 维护一个临时的空闲节点链表,id_free_cnt 指示空闲链表中的节点数。

四、如何理解基数树

作用解决长整型数据映射中 Hash 冲突和表大小设计问题。空间使用灵活,只有需要用到某节点时才创建。

对于长整型数据的映射,Hash 冲突和 Hash 表大小的设计常常让人头疼。基数树针对这种稀疏的长整型数据查找,能够快速且节省空间地完成映射。它可以根据一个长整型(比如一个长 ID)快速查找到其对应的对象指针,比用 hash 映射更简单、更节省空间。因为 hash 映射中 hash 函数难以设计,不恰当的 hash 函数可能增大冲突或浪费空间。

基数树的空间使用非常灵活,只有在需要用到某节点时才会去创建它。例如在 tcmalloc 的 pagemap(key 是释放内存的 pageid,value 是该 pageid 对应的 span)以及内核的页高速缓存中的基数树(key 是相对文件起始位置的第几页,value 是对应的页描述符)中都有所体现。

与 Trie 树的关系思想类似,Radix 树可看作 Trie 树的变种。Trie 树一般用于字符串到对象的映射,Radix 树一般用于长整数到对象的映射。

Radix 树与 Trie 树的思想有点类似,甚至可以把 Trie 树看为一个基为 26 的 Radix 树,也可以把 Radix 树看做是 Trie 树的变异。

Trie 树一般用于字符串到对象的映射,例如在字符串检索、文本预测、词频统计、排序以及字符串最长公共前缀

原文阅读