当千亿级数据遇上毫秒级读写,探探 LSM Tree 存储引擎深度剖析

背景

探探,作为挚文集团旗下一款月活跃用户超千万的社交软件,其部分核心业务数据依赖于LevelDB存储,尤其是用户滑动行为生成的关系链以及各类关系类型的计数系统。该平台具备强大的数据处理能力,支持高效的用户关系搜索与统计功能,单个节点便能承载千亿级别的海量信息。在如此大规模的数据处理下,数据写入操作在晚高峰的平均响应时间仅为0.7毫秒,查询操作的响应时间则控制在10毫秒以内。

作为关系储存系统,探探在多个推荐流程中发挥着至关重要的作用,典型应用场景如下:

  • 检索:借助该系统,可快速定位与特定用户存在某种关联的所有个体。例如,筛选出已建立连接但不包含“被喜欢”或“超级喜欢”状态的对象。
  • 写入:当用户对其他成员表达好感(如普通喜欢、超级喜欢)或相反态度时,这些行为会被实时记录,并同步更新相关的统计数据。
  • 特征分析:基于现有样本估算出相对准确的受欢迎程度指标——POP值。该数值反映在限定范围内(如最近发生的200次事件中),正面反馈(如收到他人好感)的比例。对于新加入的用户,即便其历史互动记录不足(如被动接收行为少于200次),通过对有限数据集的深度挖掘与智能推算,也能提供可靠的结果参考。

一、集群架构

LevelDB集群架构

在滑卡推荐系统中,集群架构采用5副本设计,每个副本又进一步细分为8个数据分片。每个实例根据其所在行列位置命名,比如位于第0行第1列的实例标识为r0c1。同一列内的所有实例互为冗余备份,共同确保数据的一致性和高可用性。

各实例可通过执行online_service命令向Zookeeper注册自身状态,通过offline_service命令注销。一旦某个实例发生故障或不可达,它将自动从Zookeeper维护的服务注册表中移除,以此避免单点故障对整体业务连续性造成潜在威胁。

客户端利用SDK监听并响应Zookeeper上发布的服务注册信息,动态更新本地缓存中的活跃服务节点列表。在发起数据查询请求时,SDK会智能地从当前在线且健康的列中选取最优实例进行交互,确保请求处理的高效与精准。

每个服务实例由以下三个关键功能模块组成:

  1. 服务进程:直接面向终端用户,提供数据存储及检索服务。
  2. 消费者进程:订阅Kafka消息队列中的数据更新事件,并调用底层API实现数据写入操作。
  3. 监控进程:持续收集系统运行时的各项性能指标,并上报至外部监控平台,以便进行分析和预警。

此外,在滑卡推荐系统中,数据更新流程设计为8个独立分区,每一列的服务只需关注与其关联的特定分区。消费者进程负责将各自的数据消费进度(即offset值)记录在本地存储中,worker服务承担汇聚、格式化以及验证原始数据的任务,并最终将符合预定义格式的消息体发布到Kafka主题中。

二、基本概念

1. 什么是LSM树

LSM树(Log-Structured Merge-Tree)是一种优化写入性能的存储结构,广泛应用于LevelDB、RocksDB、HBase、Cassandra和TiDB等数据存储系统。它先将数据写入内存中的数据结构,随后在后台批量刷写到磁盘上,以此实现高效的写入操作。LSM树利用了磁盘顺序写的优势,并通过多层内存和磁盘的合并结构进一步提升性能。这种结构以追加模式写入数据,避免了随机更新操作,显著提高了写入吞吐量。不过,这也致使读取性能有所下降,所以LSM树更适用于写多读少的场景。相较于传统的B+树或ISAM,LSM树在写操作方面表现更为出色。

什么是LSM树

2. 基本组件

WAL(Write-Ahead Log)

WAL是LSMT tree引擎实现数据持久化和恢复机制的关键技术。该机制确保在系统崩溃或出现其他异常情况时,未持久化到磁盘的数据不会丢失。当有写入操作时,LevelDB首先将这些操作顺序写入一个日志文件中,每个写入操作在日志中都有明确标识,包含键值对和时间戳等信息。

    const int leftover = kBlockSize - block_offset_;
    assert(leftover >= 0);
    if (leftover < kHeaderSize) {
      // Switch to a new block
      if (leftover > 0) {
        // Fill the trailer (literal below relies on kHeaderSize being 7)
        static_assert(kHeaderSize == 7, "");
        dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
      }
      block_offset_ = 0;

一旦写入操作成功Append到日志文件,LevelDB接下来会将这些数据更新到Memtable中,具体写入过程将在下文详细阐述。即便系统崩溃,由于更改已记录在日志磁盘中,系统重启后可通过重放日志文件恢复数据。当Memtable转换为Immutable Memtable并写入SSTable文件时,当前日志文件会关闭并开始新的日志文件。当日志文件关联的所有数据都成功写入SSTable并确认后,相关日志文件即可删除。

WAL恢复过程

LevelDB重新启动时,系统会检测是否存在未完成的日志文件。若有未完成日志,将执行以下步骤恢复数据:

  1. 日志扫描:系统首先扫描日志文件,读取所有记录的写入操作。
  2. 重建Memtable:根据日志文件中的数据,LevelDB重建Memtable。此过程包括重新执行日志中的所有写入操作,恢复到崩溃时的状态。Memtable重建完成后,LevelDB可继续接受新的写入请求,并按正常流程操作。
// Read all the records and add to a memtable
  std::string scratch;
  Slice record;
  WriteBatch batch;
  int compactions = 0;
  MemTable* mem = nullptr;
  while (reader.ReadRecord(&record, &scratch) && status.ok()) {
    if (record.size() < 12) {
      reporter.Corruption(record.size(),
                          Status::Corruption("log record too small"));
      continue;
    }
    WriteBatchInternal::SetContents(&batch, record);

    if (mem == nullptr) {
      mem = new MemTable(internal_comparator_);
      mem->Ref();
    }
    status = WriteBatchInternal::InsertInto(&batch, mem);
    MaybeIgnoreError(&status);
    if (!status.ok()) {
      break;
    }
    const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
                                    WriteBatchInternal::Count(&batch) - 1;
    if (last_seq > *max_sequence) {
      *max_sequence = last_seq;
    }

    if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
      compactions++;
      *save_manifest = true;
      status = WriteLevel0Table(mem, edit, nullptr);
      mem->Unref();
      mem = nullptr;
      if (!status.ok()) {
        // Reflect errors immediately so that conditions like full
        // file-systems cause the DB::Open() to fail.
        break;
      }
    }
  }

  delete file;

ReadRecord将记录读入record和scratch,并检查上一次操作的状态ok,循环将继续;在循环内部,首先检查记录大小是否小于12,若记录大小正常,就将record中的数据设置为batch的内容;创建新的MemTable对象,然后尝试将批处理插入到MemTable中。若插入失败,跳出循环;若插入成功,计算批处理中最后一个操作的序列号;若MemTable的大致内存使用量超过写缓冲区大小,增加压缩计数,将保存清单标志设置为true,并将MemTable写入Level-0表中;若写操作失败,跳出循环;循环结束后,删除文件对象。

WAL提供了数据安全性,但也引入了一些性能开销。此外,日志文件的管理(如何有效切换和清理旧日志文件)也是存储管理中的一个挑战。因此在生产环境中,需合理配置LevelDB的日志大小和切换频率。

MemTable

LSM tree引擎以高效写入性能著称。在LevelDB的写入流程中,数据最先写入Memtable内存表。借助内存的高速访问特性,这一环节的性能远高于磁盘I/O操作。随后,数据会周期性地写入硬盘上的SSTables(Sorted String Tables),这部分机制较为复杂,后续将详细阐述。

Memtable的核心数据结构是Skip List跳表。跳表具有多个层级,每一层级都是有序链表。最底层涵盖所有元素,上层则包含下层部分元素。这种结构能快速实现搜索、插入和删除操作,平均时间复杂度为O(log⁡n) 。其十分契合Memtable的实现需求,既支持快速插入,又能保持元素有序,为后续生成有序的SSTables奠定基础。

Immutable Memtable

当Memtable的大小达到预设阈值(默认4MB)时,它会转变为不可变的Memtable,并异步将其转储到磁盘,生成新的SSTable。Immutable Memtable是一种临时状态,作用在于将内存数据转储到磁盘的SSTables中。它的存在使得在后台将数据异步写入磁盘的同时,新的写入操作能够继续写入新的Memtable,避免阻塞数据库的写入性能。

 // Amount of data to build up in memory (backed by an unsorted log
  // on disk) before converting to a sorted on-disk file.
  //
  // Larger values increase performance, especially during bulk loads.
  // Up to two write buffers may be held in memory at the same time,
  // so you may wish to adjust this parameter to control memory usage.
  // Also, a larger write buffer will result in a longer recovery time
  // the next time the database is opened.
  size_t write_buffer_size = 4 * 1024 * 1024;

进入Immutable状态后,其内容便固定下来,不再接收新的写入操作。这一机制确保了数据的一致性视图。与此同时,系统无缝切换至新的空白MemTable,继续处理实时写请求,保障写操作的连续性。后台进程则负责将Immutable MemTable异步刷写至磁盘,形成持久化且预排序的Sorted String Table。SSTable的特点是内部数据按键值有序排列,优化了后续检索操作的效率。

SSTable

SSTable是一种有序且不可变的数据结构,用于存储键值对的有序序列,也可依据扩展函数自定义排序规则。它是LevelDB实现LSM tree存储引擎的基础结构。

Rep* r = rep_;
  Flush();
  assert(!r->closed);
  r->closed = true;

  BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;

  // Write filter block
  if (ok() && r->filter_block != nullptr) {
    WriteRawBlock(r->filter_block->Finish(), kNoCompression,
                  &filter_block_handle);
  }

  // Write metaindex block
  if (ok()) {
    BlockBuilder meta_index_block(&r->options);
    if (r->filter_block != nullptr) {
      // Add mapping from "filter.Name" to location of filter data
      std::string key = "filter.";
      key.append(r->options.filter_policy->Name());
      std::string handle_encoding;
      filter_block_handle.EncodeTo(&handle_encoding);
      meta_index_block.Add(key, handle_encoding);
    }

    // TODO(postrelease): Add stats and other meta blocks
    WriteBlock(&meta_index_block, &metaindex_block_handle);
  }

  // Write index block
  if (ok()) {
    if (r->pending_index_entry) {
      r->options.comparator->FindShortSuccessor(&r->last_key);
      std::string handle_encoding;
      r->pending_handle.EncodeTo(&handle_encoding);
      r->index_block.Add(r->last_key, Slice(handle_encoding));
      r->pending_index_entry = false;
    }
    WriteBlock(&r->index_block, &index_block_handle);
  }

  // Write footer
  if (ok()) {
    Footer footer;
    footer.set_metaindex_handle(metaindex_block_handle);
    footer.set_index_handle(index_block_handle);
    std::string footer_encoding;
    footer.EncodeTo(&footer_encoding);
    r->status = r->file->Append(footer_encoding);
    if (r->status.ok()) {
      r->offset += footer_encoding.size();
    }
  }

void TableBuilder::WriteRawBlock(const Slice& block_contents,
                                 CompressionType type, BlockHandle* handle) {
  Rep* r = rep_;
  handle->set_offset(r->offset);
  handle->set_size(block_contents.size());
  r->status = r->file->Append(block_contents);
}

一个典型的SSTable包含以下几个部分:

LevelDB-SSTable结构

  • Data Blocks:这是主要的数据部分,用于存储实际的键值对。数据块内的键通常按顺序压缩存储,既能减少磁盘占用空间,又能加快读取操作速度。每个块的默认大小为4KB,可在LevelDB的配置中按需调整。
Shardkey length unshard key length value length unshared key content value

Shardkey length: 与前一条记录key共享部分的长度

unshard key length:与前一条记录key不共享部分长度

value length:value 长度

unshared key content:与前一条记录key非共享的内容

value: value的内容

  • Index Block:该块存储每个数据块的最大键以及指向对应数据块的指针。借助索引块,LevelDB能够快速定位到包含特定键的数据块。

LevelDB-Index Block

  • Bloom Filter Block:用于快速判断某个键是否存在于特定SSTable中,无需实际查找数据块,能显著减少不必要的磁盘I/O操作。

  • Metaindex Block:存储关于其他块的元数据,比如索引块和布隆过滤器块的位置信息。

  • Footer:包含两个BlockHandler,分别指向元索引块和索引块。通过序列化和反序列化的Encode、Decode操作,处理SSTable文件末尾的魔数,以此验证SSTable文件的完整性。

三、数据写入流程

LevelDB的数据写入流程构建了多层次的保护机制,旨在确保数据的持久性与一致性。从写入日志开始,到更新Memtable,再到最终生成SSTable,每一个步骤都经过精心设计,以保障数据的安全存储与高效处理。借助日志和Memtable,LevelDB实现了快速响应的写入性能;而SSTable以及后续的压缩操作,则保证了数据长期存储的效率和管理的便捷性。这种设计让LevelDB成为一个高效且可靠的键值存储系统。

1. 写入 WAL和 MemTable

当执行写操作时,数据会率先写入MemTable,同时记录在WAL中。下面我们通过DBImpl::Write函数的源码,来深入了解LevelDB写入操作的完整过程。

Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
  Writer w(&mutex_);
  w.batch = my_batch;
  w.sync = options.sync;
  w.done = false;

  MutexLock l(&mutex_);
  writers_.push_back(&w);
  while (!w.done && &w != writers_.front()) {
    w.cv.Wait();
  }
  if (w.done) {
    return w.status;
  }

  // May temporarily unlock and wait.
  Status status = MakeRoomForWrite(my_batch == nullptr);
  uint64_t last_sequence = versions_->LastSequence();
  Writer* last_writer = &w;
  if (status.ok() && my_batch != nullptr) {
    WriteBatch* updates = BuildBatchGroup(&last_writer);
    WriteBatchInternal::SetSequence(updates, last_sequence + 1);
    last_sequence += WriteBatchInternal::Count(updates);

    // Add to log and apply to memtable.  We can release the lock
    // during this phase since `w` is currently responsible for logging
    // and memtable insertion.
    {
      mutex_.Unlock();
      status = log_->AddRecord(WriteBatchInternal::Contents(updates));
      bool sync_error = false;
      if (status.ok() && options.sync) {
        status = logfile_->Sync();
        if (!status.ok()) {
          sync_error = true;
        }
      }
      if (status.ok()) {
        status = WriteBatchInternal::InsertInto(updates, mem_);
      }
      mutex_.Lock();
      if (sync_error) {
        // The state of the log file is indeterminate: the log record we
        // just added may or may not show up when the DB is re-opened.
        // So we force the DB into a mode where all future writes fail.
        RecordBackgroundError(status);
      }
    }
    if (updates == tmp_batch_) tmp_batch_->Clear();

    versions_->SetLastSequence(last_sequence);
  }
  while (true) {
    Writer* ready = writers_.front();
    writers_.pop_front();
    if (ready != &w) {
      ready->done = true;
      ready->status = status;
      ready->cv.Signal();
    }
    if (ready == last_writer) break;
  }
  if (!writers_.empty()) {
    writers_.front()->cv.Signal();
  }

  return status;
}

操作步骤详细解析

  1. 初始化写操作状态并加入队列
    • 构建一个Writer对象,将其状态初始化为当前写操作的相关信息,涵盖写批次updates和同步选项options.sync。同时,创建一个互斥锁,以此保证操作过程中的线程安全。
    • 把当前写操作添加到写操作队列writers_中,之后等待,直至当前写操作成为队列的首个元素或者操作完成。
  2. 检查写操作状态并确保写入空间
    • 若当前写操作已经完成,函数会直接返回操作的状态w.status
    • 调用MakeRoomForWrite函数,确保有足够的空间来执行写操作。此过程可能会暂时解锁并等待。同时,获取当前版本的最后一个序列号,并将当前写操作标记为最后一个写操作。
  3. 日志记录与内存表更新
    • 将写批次记录到日志中,并依据同步选项决定是否对日志文件进行同步操作,然后把写批次插入到内存表中。
    • 若在同步日志文件时出现错误,函数会记录后台错误,并强制数据库进入一种状态,使得所有未来的写操作都失败。
  4. 处理写操作队列并返回状态
    • 对写操作队列中的其他写操作进行处理,设置它们的状态并标记为已完成。
    • 通知写操作队列的新头部,最后函数返回写操作的最终状态status

2. 内库表MemTable写入满了

当MemTable达到预设大小时,它会转变为Immutable MemTable,同时会新建一个MemTable来接收后续的写入操作。这个转变过程发生在MakeRoomForWrite函数中,下面我们来探究LevelDB是如何确保写操作时有足够内存空间的。

Status DBImpl::MakeRoomForWrite(bool force) {
  mutex_.AssertHeld();
  assert(!writers_.empty());
  bool allow_delay = !force;
  Status s;
  while (true) {
    if (!bg_error_.ok()) {
      s = bg_error_;
      break;
    } else if (allow_delay && versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
      // We are getting close to hitting a hard limit on the number of
      // L0 files.  Rather than delaying a single write by several
      // seconds when we hit the hard limit, we instead delay each
      // individual write by 1ms to reduce latency variance.  Also,
      // this delay hands over some CPU to the compaction thread in
      // case it is sharing the same core as the writer.
      mutex_.Unlock();
      env_->SleepForMicroseconds(1000);
      allow_delay = false;  // Do not delay a single write more than once
      mutex_.Lock();
    } else if (!force && (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
      // There is room in current memtable
      break;
    } else if (imm_ != nullptr) {
      // We have filled up the current memtable, but the previous
      // one is still being compacted, so we wait.
      Log(options_.info_log, "Current memtable full; waiting...\n");
      bg_cv_.Wait();
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
      // There are too many level-0 files.
      Log(options_.info_log, "Too many L0 files; waiting...\n");
      bg_cv_.Wait();
    } else {
      // Attempt to switch to a new memtable and trigger compaction of old
      assert(versions_->PrevLogNumber() == 0);
      uint64_t new_log_number = versions_->NewFileNumber();
      WritableFile* lfile = nullptr;
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
      if (!s.ok()) {
        versions_->ReuseFileNumber(new_log_number);
        break;
      }
      delete log_;
      delete logfile_;
      logfile_ = lfile;
      logfile_number_ = new_log_number;
      log_ = new log::Writer(lfile);
      imm_ = mem_;
      has_imm_.Release_Store(imm_);
      mem_ = new MemTable(internal_comparator_);
      mem_->Ref();
      force = false;   // Do not force another compaction if have room
      MaybeScheduleCompaction();
    }
  }
  return s;
}

操作步骤详细解析

  1. 初始化与前置检查
    • 确认互斥锁已被持有(mutex_.AssertHeld()),保证操作的线程安全性。
    • 确保写入队列不为空(assert(!writers_.empty()))。
    • 初始化状态变量Status s和延迟标志bool allow_delay = !force
  2. 循环检查内存表空间
    • 进入一个无限循环,若存在后台错误,或者当前内存表有足够空间且不是强制操作,就退出循环。
  3. 等待后台压缩操作
    • 若当前内存表已满,而前一个内存表仍在进行压缩操作,记录日志并等待后台任务完成。
    • 若L0层文件数量超过停止写入的限制,记录日志并等待后台任务完成。
  4. 切换内存表与日志文件
    • 尝试切换到新的内存表,并触发旧表的压缩操作。获取新的日志文件号,创建新的可写日志文件。
    • 若创建文件失败,重用文件号并退出循环。删除旧的日志写入器和日志文件,设置新的日志文件和日志写入器。
    • 将当前内存表标记为不可变(imm),创建新的内存表,并调度压缩操作。

3. 最后内存表刷写到磁盘

Immutable MemTable会在后台线程中被刷写到磁盘,形成SSTable文件。下面我们通过BackgroundCompaction函数来剖析整个过程。

void DBImpl::BackgroundCompaction() {
  mutex_.AssertHeld();

  if (imm_ != nullptr) {
    CompactMemTable();
    return;
  }

void DBImpl::CompactMemTable() {
  mutex_.AssertHeld();
  assert(imm_ != nullptr);

  // Save the contents of the memtable as a new Table
  VersionEdit edit;
  Version* base = versions_->current();
  base->Ref();
  Status s = WriteLevel0Table(imm_, &edit, base);
  base->Unref();

  if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
    s = Status::IOError("Deleting DB during memtable compaction");
  }

  // Replace immutable memtable with the generated Table
  if (s.ok()) {
    edit.SetPrevLogNumber(0);
    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
    s = versions_->LogAndApply(&edit, &mutex_);
  }

  if (s.ok()) {
    // Commit to the new state
    imm_->Unref();
    imm_ = nullptr;
    has_imm_.store(false, std::memory_order_release);
    RemoveObsoleteFiles();
  } else {
    RecordBackgroundError(s);
  }
}

操作步骤详细解析

  1. 确保线程安全与前置条件
    • mutex_.AssertHeld();:确保互斥锁已被持有,保障线程安全。
    • assert(imm_ != nullptr);:确保不可变内存表(imm_)不为空。
  2. 保存内存表内容为新表文件
    • 创建一个VersionEdit对象edit,用于记录变更信息。
    • 获取当前版本base,并增加其引用计数。
    • 调用WriteLevel0Table函数,将不可变内存表写入到一个新的表文件中,并返回操作状态s
    • 减少当前版本的引用计数。
  3. 检查数据库关闭状态
    • 若写入操作成功,但数据库正在关闭,则将状态设置为I/O错误,表示在内存表压缩过程中删除数据库。
  4. 用生成的表文件替换不可变内存表
    • 若状态仍然为成功,更新版本编辑对象edit,设置前一个日志编号和当前日志编号。
    • 调用versions_->LogAndApply函数,应用版本编辑,更新数据库版本。
  5. 提交新状态或记录错误
    • 若所有操作都成功,减少不可变内存表的引用计数,将其指针置为空,并更新标志has_imm_,表示没有不可变内存表。调用RemoveObsoleteFiles函数,移除不再需要的文件。
    • 若出现任何错误,调用RecordBackgroundError函数,记录后台错误。

4. 数据的更新和删除

前文已对数据写入过程进行了详尽描述,但鉴于LSMT引擎的特殊机制,将数据的更新和删除操作单独展开阐述,能让我们更全面地理解LevelDB的数据写入操作。

为了更直观地说明LevelDB处理各层级SSTable中键更新和删除的具体过程,我们不妨来看一个示例:

假设存在一个数据库,其中包含多个层级的SSTables。在Level N的一个SSTable里,有键KeyA,其对应的值为 valueA。现在我们要更新Level N中键KeyA的值,具体步骤如下:

  1. 写入新键值对:用户将新的键值对 keyA = valueB 写入MemTable。
  2. MemTable转换:当MemTable达到容量上限时,它会被压缩成一个SSTable,并写入Level 0。
  3. 触发合并操作:Level 0中包含更新后 keyA = valueB 的新SSTable,会触发与Level 1的SSTable进行合并。
  4. 键值覆盖:在合并Level 0和Level 1的过程中,新的 keyA = valueB 会覆盖旧的 keyA 值。
  5. 层级传播:随着合并操作的持续推进,更新会不断向更高层级传播,直至到达Level N。

在LevelDB中,删除存储在某个层级SSTable中的键值对,通常会使用“删除标记”(tombstone)。该过程与更新操作类似,主要区别在于,删除操作是标记键为已删除,而非提供一个新值。

四、数据读取流程

LevelDB的数据读取过程涉及多级缓存和多种优化策略,如布隆过滤器和 TableCache,旨在提高读取效率。其源代码在实现这些操作时,高度关注性能和资源利用,确保即便在数据量庞大、负载较高的情况下,仍能保持良好的性能表现。

Status DBImpl::Get(const ReadOptions& options, const Slice& key,
                   std::string* value) {
  Status s;
  MutexLock l(&mutex_);
  SequenceNumber snapshot;
  if (options.snapshot != nullptr) {
    snapshot =
        static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
  } else {
    snapshot = versions_->LastSequence();
  }

  MemTable* mem = mem_;
  MemTable* imm = imm_;
  Version* current = versions_->current();
  mem->Ref();
  if (imm != nullptr) imm->Ref();
  current->Ref();

  bool have_stat_update = false;
  Version::GetStats stats;

  // Unlock while reading from files and memtables
  {
    mutex_.Unlock();
    // First look in the memtable, then in the immutable memtable (if any).
    LookupKey lkey(key, snapshot);
    if (mem->Get(lkey, value, &s)) {
      // Done
    } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
      // Done
    } else {
      s = current->Get(options, lkey, value, &stats);
      have_stat_update = true;
    }
    mutex_.Lock();
  }

  if (have_stat_update && current->UpdateStats(stats)) {
    MaybeScheduleCompaction();
  }
  mem->Unref();
  if (imm != nullptr) imm->Unref();
  current->Unref();
  return s;
}

1. 读取内存表和表文件

当用户请求获取某个键的值时,LevelDB采用分层查找策略,按照性能从高到低的顺序依次进行查询。首先会检查最新的Memtable,接着是不可变的Memtable,最后是磁盘上的SSTables。

2. 查找适当的 SSTable

若在Memtables中未找到目标键,LevelDB会转而在SSTables中进行查找。由于SSTables存在多个层级,且每个层级可能包含多个SSTable文件,LevelDB会从最新的层级(L0)开始,逐层向下搜索,直至找到相应的键或遍历完所有层级。

LevelDB借助一个名为 Version 的数据结构,来维护不同层级SSTables的相关信息。每个SSTable文件通过 FileMetaData 结构进行描述,该结构的两个成员变量分别记录了文件中包含键的最小值和最大值。这些信息有助于快速判断查询的键是否可能存在于该文件中,从而跳过不包含该键的文件,提升查找效率。

3. SSTable 的布隆过滤器

布隆过滤器在LevelDB中发挥着提高查找效率的重要作用。它通过概率性的方式判断一个键是否存在于SSTable中,从而减少不必要的磁盘访问。在检查某个SSTable之前,LevelDB会先使用布隆过滤器(若存在)快速判断键是否可能存在于该SSTable中。

  • 布隆过滤器的检查实现:这一过程通常在 table.ccTable::Get() 方法中完成。若布隆过滤器显示键不在文件中,LevelDB会跳过该文件,继续检查下一个文件。
iiter->Seek(k);
  if (iiter->Valid()) {
    Slice handle_value = iiter->value();
    FilterBlockReader* filter = rep_->filter;
    BlockHandle handle;
    if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() &&
        !filter->KeyMayMatch(handle.offset(), k)) {
      // Not found
    }

4. 读取和解析 SSTable

若布隆过滤器表明键可能存在于SSTable中,或者该SSTable未配备布隆过滤器,LevelDB会继续在该文件中查找目标键。具体步骤如下:

  • 打开SSTable:利用 TableCache 类打开并缓存SSTable文件,这涉及 table_cache.cc 中的 TableCache::FindTable()TableCache::Get() 方法。
  • 读取索引块:LevelDB首先读取SSTable的索引块,以确定数据块的位置。索引块的位置存储在文件的 Footer 中,通过 Table::ReadMetaTable::ReadBlock 方法进行读取。
  • 定位数据块:依据索引块中的信息,定位包含目标键的数据块。索引块包含指向每个数据块的指针( BlockHandle)以及数据块中最大的键。
  • 读取数据块:确定数据块的位置后,LevelDB通过 Block::Iterate 方法读取该数据块,并在其中查找具体的键。

五、数据合并(Compaction)

在LevelDB体系中,Compaction操作扮演着至关重要的角色,其核心作用在于优化数据库性能、削减存储空间占用,并维持数据于存储层级间的有序状态。该合并操作主要聚焦于将多个SSTable文件整合为单一文件,与此同时,对过时的键值对进行删除或更新处理,这一过程确保了SSTable文件中的key值始终保持有序排列。

LevelDB的合并操作分为两种类型

  • Minor Compaction:此操作通常涉及将内存中的写缓冲(MemTable)转化为不可变的MemTable,随后将其写入磁盘,进而生成一个全新的SSTable。这一过程一般在MemTable达到容量上限,即变满时触发。
  • Major Compaction:该操作涵盖对一个或多个层级数据的重写,常见做法是将某一层级的SSTable合并至下一层级。在此过程中,LevelDB会对键对应的数据进行重写,去除重复记录或已删除的记录,并有可能将数据推送至更低层级。

合并操作触发的情况

VersionSet::PickCompaction方法负责依据不同层级的文件数量与大小,精准挑选合适的合并操作:

  • size_compaction:倘若当前层级的数据量超越阈值( current_->compaction_score_ >= 1),那么就需要开展基于数据量的合并操作。
  • seek_compaction:若存在需要合并的文件( current_->file_to_compact_ != nullptr),则需进行基于查找次数的合并,即查找次数过多的文件会被标记为需要合并。

合并的实现

合并操作的具体实现细节主要集中在db/version_set.cc和db/db_impl.cc文件中。以下是结合源码对LevelDB合并过程的详细步骤阐述:

void DBImpl::MaybeScheduleCompaction() {
      env_->Schedule(&DBImpl::BGWork, this);
}

void DBImpl::BGWork(void* db) {
  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
}

void DBImpl::BackgroundCall() {
    if (shutting_down_.load(std::memory_order_acquire)) {
    // No more background work when shutting down.
  } else if (!bg_error_.ok()) {
    // No more background work after a background error.
  } else {
    BackgroundCompaction();
  }
}
void DBImpl::BackgroundCompaction() {
  Compaction* c;
  bool is_manual = (manual_compaction_ != nullptr);
  InternalKey manual_end;
  if (is_manual) {
    ManualCompaction* m = manual_compaction_;
    c = versions_->CompactRange(m->level, m->begin, m->end);
  } else {
    c = versions_->PickCompaction();
  }

  Status status = DoCompactionWork(c);
  if (!status.ok()) {
    // 处理错误
  }
  CleanupCompaction(c);
}

Status DBImpl::DoCompactionWork(Compaction* compaction) {
  Iterator* input = versions_->MakeInputIterator(compaction);
  input->SeekToFirst();

  while (input->Valid()) {
    // 处理每个键值对,可能会写入到新的 SSTable
  }

  // 完成合并,更新元数据,删除旧的文件等
  versions_->InstallCompactionResults(compaction);
  return Status::OK();
}
  1. 数据读取与合并:在明确了需要合并的文件后,LevelDB会创建专门的Compaction对象,用以执行实际的合并操作。这一过程的核心逻辑封装于DBImpl::DoCompactionWork方法之中。该方法通过遍历所有选定的SSTable文件,逐一读取并合并其中所包含的键值对。
  2. 写入新的SSTable:在合并进程中,读取到的数据会经过精心排序与整合,在此期间,过期或重复的键值对将被剔除。经过这番细致处理后,合并后的数据将被写入一个或多个全新的SSTable文件。此步骤同样在DBImpl::DoCompactionWork方法内部完成,具体借助TableBuilder类来构建并写入这些新生成的SSTable文件。
  3. 更新元数据并清理旧文件:随着新SSTable文件的成功写入,LevelDB需要对其内部的元数据结构进行更新,以此确保系统能够精确反映当前的文件布局状况。与此同时,那些已经完成合并且不再需要的旧SSTable文件将被及时删除。这些关键操作均在DBImpl::DoCompactionWork方法的尾声部分开展,涉及对VersionSet的更新以及对冗余文件的清理工作。

LevelDB凭借上述精心设计,得以高效地管理和维护海量数据。在保障数据一致性的前提下,持续对存储空间的使用效率进行优化。

六、数据备份

方案概述

为切实保障数据的安全性与完整性,数据备份流程会指定一台负载余量最为充裕的机器作为数据源,在另一台具备充足存储空间的目标机器上留存备份数据。借助rsync命令,实现从源机器向目标机器的数据同步,最终将备份数据上传至对象存储服务。

逻辑流程

  1. 服务暂停:将源机器的服务下线,并停止filter进程,同时明确指定目标机器、源机器目录以及目标机器目录。具体操作如下:
    • 全面停止源机器上的所有相关服务,确保数据处于一致性状态。
    • 详尽记录当前系统状态,以便在恢复服务时能够精准还原。
    • 及时通知相关人员,使其充分了解当前的操作状态。
  2. 数据同步:登录目标机器,触发rsync命令执行数据同步操作,该过程支持增量更新。
    • 待rsync命令执行完毕后,对目标机器上的数据完整性与准确性进行验证。
    • 仔细检查日志文件,确认无任何错误或警告信息。
  3. 恢复服务:启动源机器上的filter进程。若源机器服务原本处于在线状态,则在数据更新完成后重新上线服务。
  4. 生成元数据:生成数据同步的元数据文件,该文件涵盖以下信息:
    • 文件列表及其MD5值
    • 若为全量备份:所有需要同步的文件及其MD5值
    • 若为增量备份:新增或删除的文件列表及其MD5值
    • 无论是全量还是增量备份,kafka topic的offset文件均在备份范围内
  5. 上传至OSS:将备份数据及相应的元数据文件上传至云对象存储,所有数据均采用标准存储模式,以确保数据的可靠性和访问性能。

运维要求

  • 故障报警:一旦数据备份过程中出现任何异常状况,系统应即刻触发报警机制,以便能够及时采取应对措施。具体的报警机制如下:
    • 通过邮件、短信或即时通讯工具发送报警通知。
    • 详细记录报警日志,便于后续深入分析和处理。
    • 设置多级报警,依据问题的严重程度做出不同级别的响应。
  • 自动化执行:整个数据备份流程需实现完全自动化,最大程度减少人工干预,确保备份任务能够高效、可靠地完成。具体实现方式如下:
    • 编写脚本自动化执行上述步骤。
    • 使用定时任务调度工具(如Cron)定期执行备份任务。
    • 集成监控系统,实时监控备份任务的状态和性能。

七、数据恢复

数据恢复流程

  1. 备份数据的选择:明确将要使用的备份数据集是整个数据恢复过程中的关键环节。备份数据集可分为全量备份和增量备份两种类型。全量备份囊括了系统在某一时间点的所有数据,而增量备份仅记录自上次备份以来的数据变更情况。
  2. 确定服务恢复所需的机器资源,并初始化环境、清理无用数据。
  3. 服务初始化:完成上述准备工作后,随即启动相关服务的集群架构。此阶段的目标是在目标环境中搭建起能够承载即将导入数据的服务框架。
  4. 从OSS下载数据
    • 若采用全量备份方式进行恢复,直接从对象存储服务(如阿里云OSS)中下载完整的备份文件即可。
    • 若选择增量备份方案,则除获取基础的全量备份包外,还需额外下载最新的增量更新内容。随后,通过同步元数据并回放这些增量信息,最终生成包含所有最新变更的完整数据集。
  5. 数据完整性验证:为确保恢复过程中的数据准确性与完整性,在正式导入数据之前,必须执行严格的校验程序。常用的验证方法包括哈希值比对、文件大小检查以及业务读写逻辑一致性测试。只有当所有检验项目均通过后,方可继续下一步操作。此外,此环节需生成详细的检验报告并作为文档记录留存。
  6. 恢复updater服务 offset 文件:updater服务负责跟踪处理队列中的消息状态变化情况,因此其对应的offset文件对于维持整个系统的连续性和一致性极为关键。在确认所有核心组件均已恢复正常运作后,需特别留意正确恢复该文件内容,确保不会丢失任何已处理过的消息记录。
  7. 启动filter服务:紧接着,开启filter服务,以开始处理即将到来的数据流。这标志着系统已开始逐步恢复正常运作模式。
  8. 启动关联的updater服务:最后一步是激活所有相关的updater服务,使整个应用程序能够无缝地继续其预定的功能和服务提供。在此过程中,需密切监控各个服务之间的通信状况,及时发现并解决可能出现的问题。一旦所有服务都成功启动且相互之间能够正常协作,即认为本次数据恢复工作已顺利完成。

运维要求

  • 标准SOP制定:整个恢复过程应遵循一套详尽且规范的标准操作程序(SOP),以便每位参与者都能清晰明确各自的角色与责任。SOP应详细描述每一步骤的具体操作方法、所需工具材料清单以及预期达到的效果等内容,同时还应涵盖异常情况下的应对措施指南。这不仅有助于提升工作效率,还能有效降低因操作失误导致的风险。
  • 定期演练安排:为提升团队应对突发状况的能力与效率,需定期组织模拟恢复演练活动。参与人员涵盖直接负责该服务运维的技术支持人员、系统可靠性工程师(SRE)等关键角色;同时,中台部门相关人员视情况加入其中,共同提升整体协作水平。通过反复练习,能够在实践中发现问题,并不断优化改进现有的应急预案,使其更贴合真实场景下的需求。
  • 数据校验机制:在整个恢复流程中,应高度重视对数据完整性的检查工作。从业务服务的用户滑动行为事件日志中,抽取10000条用户(用户id满足待验证shard分片要求)滑动行为,组合出其滑动、被滑动的全部其他用户的列表,验证其是否为从线上数据库通过search接口查询结果的子集;与大数据Hive集群中的数据进行抽样对比,统计Diff数据信息。

八、总结

本文聚焦于 LevelDB 这一存储系统,深入探讨了其核心数据操作流程。在数据合并方面,介绍了 Minor Compaction 和 Major Compaction 两种类型,阐述了合并操作的触发情况以及详细的实现过程,包括数据读取与合并、写入新 SSTable、更新元数据并清理旧文件等步骤,通过这些操作优化数据库性能、减少存储空间。数据备份部分,给出了通过选定负载余量小的机器为源、充足存储机器为目标,利用 rsync 同步并上传至对象存储的方案,说明了服务暂停、数据同步、恢复服务、生成元数据和上传至 OSS 的逻辑流程以及故障报警和自动化执行的运维要求。数据恢复流程涵盖备份数据选择、环境准备、服务初始化、从 OSS 下载数据、数据完整性验证、恢复 updater 服务 offset 文件、启动 filter 服务及关联 updater 服务等环节,并强调了制定标准 SOP、定期演练和建立数据校验机制的运维要求,全面展现了 LevelDB 在数据管理各阶段的运行机制和保障措施 。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件举报,一经查实,本站将立刻删除。

文章由技术书栈整理,本文链接:https://study.disign.me/article/202510/20.leveldb-lsmtree-engine.md

发布时间: 2025-03-07