RAMBO源码解读

一.从main函数的测试分支开始

没有下完整数据包,这里用test分支的toy test文件,用data/ArtfcKmersToy100.txt作为测试数据,主要部分在以下几个函数

RAMBO myRambo(n_perSet, R_all, B_all, K); 创建Rambo结构

myRambo.createMetaRambo (K, false); 初始化meta数据

myRambo.insertion2 (alllines); 插入数据

myRambo.query(testKeys[i], testKeys[i].size()); 查询数据

所以要先了解Rambo的数据结构以及相关方法

#include <iomanip>
#include <iostream>
#include <fstream>
#include <math.h>
#include <sstream>
#include <string>
#include <string.h>
#include <algorithm>
#include <chrono>
#include "MyBloom.h"
#include "MurmurHash3.h"
#include "Rambo_construction.h"
#include "utils.h"
#include "constants.h"
#include "bitArray.h"
#include <ctime>

using namespace std;

int main(){

//string job(argv[1]);

bool insert  =false;
bool ser =false;
bool test = true;
bool deser = false;

int n_perSet = 1000000000; //cardinality of each set 布隆过滤器的参数
int R_all = 2;	 //table数量	
int B_all = 15;  //bfu数组长度

int K = Ki; // total number of sets,这里ki测试集里为100

float fp_ops;
float ins_time;
float query_time;

// constructor 创建Rambo
RAMBO myRambo(n_perSet, R_all, B_all, K);

//  details of RAMBO set partitioning 
//myRambo.createMetaRambo (K, false);
cout<<"created meta"<<endl;

//......跳过了真实数据那块的测试    
//......
//......
    
if(test){
    // test RAMBO 获取数据
    std::vector<string> alllines = readlines("data/ArtfcKmersToy"+to_string(K)+".txt", 0);

    std::vector<string> testKeys;
    std::vector<int> gt_size;
    for(uint i=0; i<alllines.size(); i++){
          std::vector<string>KeySets =  line2array(alllines[i], ';');//sets for a key
          testKeys.push_back(KeySets[0]);
          gt_size.push_back( line2array(KeySets[1], ',').size() );
    }
    myRambo.createMetaRambo (K, false); //预处理0-99号文件的meta信息
    // cout<<"load: "<<myRambo.Rambo_array[0]->m_bits->getcount();
    cout<<"total number of queries : "<<testKeys.size()<<endl;
    myRambo.insertion2 (alllines);
    cout<<"loaded test keys"<<endl;

    float fp=0;
    std::ofstream FPtestFile; //结果输出到一个文件,不用管
    FPtestFile.open("FPtestFileToy.txt");
    std::clock_t t5_cpu = std::clock();
    chrono::time_point<chrono::high_resolution_clock> t5 = chrono::high_resolution_clock::now();

    for (std::size_t i=0; i<testKeys.size(); i++){
            bitArray MemVec = myRambo.query(testKeys[i], testKeys[i].size()); //对1000个testkey进行查询
            cout<<MemVec.getcount()<<endl;
            cout<<gt_size[i]<<endl;
            fp = fp + (MemVec.getcount())*0.1/((K - gt_size[i])*0.1);
    }

    cout<<"fp rate is: "<<fp/(testKeys.size()); // false positives/(all negatives) 计算假阳性率
    FPtestFile<<"fp rate is: "<<fp/(testKeys.size()); // false positives/(all negatives)

    fp_ops = fp/(testKeys.size());

    cout<<endl;
    
    //计算时间
    std::clock_t t6_cpu = std::clock();
    chrono::time_point<chrono::high_resolution_clock> t6 = chrono::high_resolution_clock::now();
    float QTpt_cpu = 1000.0 * (t6_cpu-t5_cpu)/(CLOCKS_PER_SEC*testKeys.size()); //in ms
    float QTpt = chrono::duration_cast<chrono::nanoseconds>(t6-t5).count()/(1000000.0*testKeys.size());
    cout <<"query time wall clock is :" <<QTpt <<", cpu is :" <<QTpt_cpu<< " milisec\n\n";
    FPtestFile<<"query time wall clock is :" <<QTpt <<", cpu is :" <<QTpt_cpu<< " milisec\n\n";
    query_time = QTpt_cpu;
  }
    
return 0;
}

二.主要代码:Rambo数据结构

2.1 Rambo_construction.h

先不看注释的部分,test中只用了初始化函数,以及query和insertion2两个函数,这两个最重要

类的参数作用见注释部分

class RAMBO{
    public:

        RAMBO(int n, int r1, int b1, int K);
        std::vector<uint> hashfunc( std::string key, int len);
        //void insertion (std::string setID, std::vector<std::string> keys);
        //std::set<int> takeunion(std::set<int> set1, std::set<int> set2);
        //std::set<int> takeIntrsec(std::set<int>* setArray);
        //std::vector <std::string> getdata(std::string filenameSet);
        bitArray query (std::string query_key, int len);//查询一个key对应的集合,集合用bitarray表示
        void createMetaRambo(int K, bool verbose);//预处理meta数据
        //void serializeRAMBO(std::string dir);
        //void deserializeRAMBO(std::vector<std::string> dir);
        void insertion2 (std::vector<std::string> alllines);//从alllines中提取keys并插入布隆过滤器
        //bitArray queryseq (std::string query_key, int len);
        //void insertionRare (std::string setID, std::vector<std::string> keys);

        int R; //table数量
        int B; //每个table的bfu数量
        //int n; 
        float p; //布隆过滤器的false positive rate参数
        int range; //布隆过滤器的range参数
        int k; //document总数量
        //float FPR;
        BloomFiler** Rambo_array; //布隆过滤器二维数组,就是论文中那个B*R的bfu数组,这里用指针形式表达
        std::vector<int>* metaRambo; //每个布隆过滤器的元信息,具体指代文件号的集合(这个布隆过滤器对应哪些文件,后面		  取交集什么要用)
};

参数名大多和论文图例相吻合:

2.2 Rambo_construction.c

2.2.1 初始化函数

初始化函数,初始化布隆过滤器二维数组和meta二维数组

RAMBO::RAMBO(int n, int r1, int b1, int K){
  R = r1;
  B = b1;
  K = K;

  //range = ceil(-(n*log(p))/(log(2)*log(2))); //range
  range = n;
  k = 3; //布隆过滤器的k参数
  std::cout << "range" <<range<< '\n';

  //k = ceil(-log(p)/log(2)); //number of hash, k is 7 for 0.01
  Rambo_array = new BloomFiler*[B*R]; //array of pointers
  metaRambo = new vector<int>[B*R]; //constains set info in it. 初始化meta二维数组
  for(int b=0; b<B; b++){
    for(int r=0; r<R; r++){
      Rambo_array[b + B*r] = new BloomFiler(range, p, k);//初始化布隆过滤器二维数组
    }
  }
}

2.2.2 createMetaRambo函数

提前计算每个布隆过滤器对应的meta数据,这里的meta数据指的是该布隆过滤器对应的文件号集合(比如第一行第一列的布隆过滤器对应集合{64,58},如果这个过滤器亮了说明key可能在集合{64,58}中,后面有些与运算操作),如它注释所说这是个预先运算操作,这里的K就是文件的总数量(这里测试时用的0-99文件所以是K=100)

// one time process- a preprocess step
void RAMBO::createMetaRambo(int K, bool verbose){
  for(int i=0;i<K;i++){
    vector<uint> hashvals = RAMBO::hashfunc(std::to_string(i), std::to_string(i).size()); // R hashvals, each with max value B
    for(int r=0; r<R; r++){
      metaRambo[hashvals[r] + B*r].push_back(i); //文件号加入对应哈希位置的布隆过滤器对应的vector
    }
  }

  //print RAMBO meta deta,这是输出终端debug用的
  if (verbose){
    for(int b=0; b<B; b++){
      for(int r=0; r<R; r++){
        for (auto it=metaRambo[b + B*r].begin(); it != metaRambo[b + B*r].end(); ++it)
          {std::cout << " " << *it;}
        {std::cout << "////";}
      }
      std::cout << '\n';
    }
  }
}

2.2.3 insertion2函数

插入一个key,具体见注释

// given inverted index type arrangement, kmer;files;files;..
void RAMBO::insertion2 (std::vector<string> alllines){
  //make this loop parallel
  //#pragma omp parallel for
  for(std::size_t i=0; i<alllines.size(); ++i){
    char d = ';';
    std::vector<string>KeySets =  line2array(alllines[i], d);//sets for a key

    std::vector<string>KeySet = line2array(KeySets[1], ','); //分词得到key的文件set,比如key1;64,58这一条到这	  里就变成{64,58}这个vector集合
    for (uint j = 0; j<KeySet.size(); j++){//对集合内每个数字号循环
      vector<uint> hashvals = RAMBO::hashfunc(KeySet[j], KeySet[j].size()); // R hashvals R个哈希函数对应R	     个table
      for(int r=0; r<R; r++){
        //这里给key造个哈希存入布隆过滤器,用于验证key是否存在
        vector<uint> temp = myhash(KeySets[0].c_str(), KeySets[0].size() , k, r, range);// i is the key
        Rambo_array[hashvals[r] + B*r]->insert(temp);
      }
    }
  }
}

2.2.4 query函数

查询key出现在哪些文件中,具体见注释

bitArray RAMBO::query (string query_key, int len){
  // set<int> resUnion[R]; //constains union results in it.
  bitArray bitarray_K(Ki);//ki默认100表示全集为文件0-99,测试用的数据在这个范围内
  // bitset<Ki> bitarray_K;
  // set<int> res;
  float count=0.0;//计算时间的,不用管
  vector<uint> check;
  for(int r=0; r<R; r++){
    check = myhash(query_key, len , k, r,range); //hash values correspondign to the keys
    bitArray bitarray_K1(Ki);
    // bitset<Ki> bitarray_K1;
    for(int b=0; b<B; b++){
        if (Rambo_array[b + B*r]->test(check)){//检查改布隆过滤器是否有这个key
          chrono::time_point<chrono::high_resolution_clock> t5 = chrono::high_resolution_clock::now();
          for (uint j=0; j<metaRambo[b + B*r].size(); j++){//从meta中取出文件号在bitarray对应位置取1
            bitarray_K1.SetBit(metaRambo[b + B*r][j]);
        }
        chrono::time_point<chrono::high_resolution_clock> t6 = chrono::high_resolution_clock::now();
        count+=((t6-t5).count()/1000000000.0);
      }
    }
    if (r ==0){
      bitarray_K = bitarray_K1;//第一个table不用求交集
    }
    else{
      bitarray_K.ANDop(bitarray_K1.A); //这一步是一个bitarray的合并,table之间得到的答案求交集
    }
  }
  vector<uint>().swap(check);
  return bitarray_K;
}

2.2.5 辅助函数

hashfunc函数返回r个(r个table)哈希函数的结果(每个结果在0-B之间,对应论文的construct图的箭头)

vector<uint> RAMBO:: hashfunc( std::string key, int len){
  // int hashvals[k];
  vector <uint> hashvals; //hashvals的长度等于R(就是重复的数量,论文图上table数)
  uint op;
  for (int i=0; i<R; i++){
    MurmurHash3_x86_32(key.c_str(), len, 10, &op); //seed i
    hashvals.push_back(op%B); //经典取余函数,范围为0-(B-1)
  }
  return hashvals;
}

三.其他代码

3.1 util.h/cpp

一些常用函数,包括读写的readline函数以及分词的line2array函数

3.2 bitArray.h/cpp

包装的bitarray代码,类似std的bitset,作用是生成0和1构成的set,用于记录最后得到的答案,比如最后答案是55和64,则[000000…..10000000…….100000]表示第55和64位为1,用bitset的好处是做与运算求交集时简单

3.3 MyBloom.h/cpp

包装的布隆过滤器代码,作用是实现布隆过滤器的创建插入等操作

3.4 MurmurhHash3.h/cpp

随机hash函数的生成代码,直接用的现成的包,作用是生成随机哈希

四.流程总结

用一个简单例子描述下流程:

假设key是account账户,num表示块号,格式如 key;num1,num2,num3

数据如下:

account1;11,22,44 (表示account1在11、22、44号块出现)

account2;11,33,55 (表示account2在11、33、55号块出现)

……

4.1 初始化Rambo

假设R = 2,B = 9(如果参数忘记是什么意思见论文图或者第二部分中的代码注释,这里就是布隆过滤器二维数组是2*9的规模)

4.2 预处理块号集合

假设K=5,就总共就只有11,22,33,44,55号块,用一个大小与布隆过滤器数组同样的二维vector数组记录每个布隆过滤器的元信息(一个列表)

预处理每个块在每个table上的哈希,假设22和33在table1哈希到同一个位置

预处理的哈希如图,每个方块代表对应布隆过滤器的一个vector列表

(这个哈希结果这里为了说明乱写的,实际由哈希函数生成)

4.3 插入key

以插入 account1;11,22,44 为例:

将account1后面对应的数字,以4.2中相同的哈希对应布隆过滤器,在这些布隆过滤器中加入account1这个key的哈希后结果

4.4 查找key

以查找 account1为例,对account1这个key做哈希,对每个table的布隆过滤器顺序查找验证,发现table1的0,2,5和table2的1,3,5号布隆过滤器命中。(就是4.3中插入的那些)

对每个table,将选中的布隆过滤器对应的meta数据得到并集,再求每个table这个集合的交集即为答案。

对于这个例子来说,table1得到[{11},{22,33},{44}]==>{11,22,33,44},table2得到[{11},{22},{44}]==>{11,22,44}。(这些数据对应4.2中的图)

这里只有两个table,将{11,22,33,44}与{11,22,44}取交集得到答案{11,22,44}。(多个table循环取交集即可,用的bitarray取交集很简单)

所以account1在11,22,44出现,本做法可能有假阳性但是也可以容忍只要去验证一下就行。