1.数据库Open流程分析
位置在:db/db_impl.cc
初始化dbimpl对象
尝试对dbimpl进行recover恢复
判断memtable是否为空(为空就创建新的log、memtable)
判断是否需要保存manifest文件,是就更新manifest
如果一切都成功,就删除过时文件尝试compaction操作
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
*dbptr = nullptr;
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
// Recover handles create_if_missing, error_if_exists
bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
if (s.ok() && impl->mem_ == nullptr) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
&lfile);
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile);
impl->mem_ = new MemTable(impl->internal_comparator_);
impl->mem_->Ref();
}
}
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->RemoveObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock();
if (s.ok()) {
assert(impl->mem_ != nullptr);
*dbptr = impl;
} else {
delete impl;
}
return s;
}
2.数据Get流程分析
位置在:db/db_impl.cc
获取get操作对应的sequencenumber
根据key和sequencenumber构造lookupkey
从memtable查询
从immutable memtable查询
从sstable文件查询
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
Status s;
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != nullptr) {
snapshot =
static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
} else {
snapshot = versions_->LastSequence();
}
MemTable* mem = mem_;
MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != nullptr) imm->Ref();
current->Ref();
bool have_stat_update = false;
Version::GetStats stats;
// Unlock while reading from files and memtables
{
mutex_.Unlock();
// First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
// Done
} else if (imm != nullptr && imm->Get(lkey, value, &s)) {
// Done
} else {
s = current->Get(options, lkey, value, &stats);
have_stat_update = true;
}
mutex_.Lock();
}
if (have_stat_update && current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != nullptr) imm->Unref();
current->Unref();
return s;
}
3.数据Put与Write流程分析
位置在:db/db_impl.cc
put包括插入修改删除操作,write是批量操作,或者说看下面代码put就是只有一个数据的write
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
WriteBatch batch;
batch.Put(key, value);
return Write(opt, &batch);
}
所以这边主要看write操作
构建write对象插入队列
等待队列前面执行完毕
预先分配内存空间
写入log文件
更新memtable
更新sequencenumber
发送signal信号通知该线程执行完毕
提示:多线程编程中的wait和signal
这是c++多线程的用法,信号量唤醒或者阻塞线程
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Writer w(&mutex_);
w.batch = updates;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(updates == nullptr);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && updates != nullptr) { // nullptr batch is for compactions
WriteBatch* write_batch = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(write_batch);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
if (status.ok()) {
status = WriteBatchInternal::InsertInto(write_batch, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (write_batch == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
}
4.快照生成
位置在:db/snapshot.h (虚类在include/leveldb/db.h)
leveldb实际采用多版本并发控制MVCC,快照并不是对所有数据进行完整物理空间备份,而是保存备份时的序列号,快照添加后,该快照对应的数据将不受到compaction的影响,为了保持快照一致性
class LEVELDB_EXPORT Snapshot {
protected:
virtual ~Snapshot();
};
class SnapshotImpl : public Snapshot {
public:
SnapshotImpl(SequenceNumber sequence_number)
: sequence_number_(sequence_number) {}
SequenceNumber sequence_number() const { return sequence_number_; }
private:
friend class SnapshotList;
// SnapshotImpl is kept in a doubly-linked circular list. The SnapshotList
// implementation operates on the next/previous fields directly.
SnapshotImpl* prev_;
SnapshotImpl* next_;
const SequenceNumber sequence_number_;
#if !defined(NDEBUG)
SnapshotList* list_ = nullptr;
#endif // !defined(NDEBUG)
};