玖叶教程网

前端编程开发入门

BloomFilter(大数据去重)+Redis(持久化)策略

背景

之前在重构一套文章爬虫系统时,其中有块逻辑是根据文章标题去重,原先去重的方式是,插入文章之前检查待插入文章的标题是否在ElasticSearch中存在,这无疑加重了ElasticSearch的负担也势必会影响程序的性能!

BloomFilter算法

简介:布隆过滤器实际上是一个很长的二进制向量一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

原理:当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在

优点

相比于其它的数据结构,布隆过滤器在空间和时间方面都有巨大的优势。布隆过滤器存储空间和插入/查询时间都是常数(O(k))。而且它不存储元素本身,在某些对保密要求非常严格的场合有优势。

缺点:

一定的误识别率和删除困难。

结合以上几点及去重需求(容忍误判,会误判在,在则丢,无妨),决定使用BlomFilter。

思想

位数组k个散列函数

位数组

初始状态时,BloomFilter是一个长度为m的位数组,每一位都置为0。

添加元素(k个独立的hash函数)

添加元素时,对x使用k个哈希函数得到k个哈希值,对m取余,对应的bit位设置为1。

判断元素是否存在

判断y是否属于这个集合,对y使用k个哈希函数得到k个哈希值,对m取余,所有对应的位置都是1,则认为y属于该集合(哈希冲突,可能存在误判),否则就认为y不属于该集合。

图中y1不是集合中的元素,y2属于这个集合或者是一个false positive。

实现

可以使用JDK自带的BitSet来实现,但存在两个问题:OOM和持久化问题

结合Redis的BitMap能够解决,唯一需要注意的是Redis的BitMap只支持2^32大小,对应到内存也就是512MB,数组的下标最大只能是2^32-1。不过这个限制可以通过构建多个Redis的Bitmap通过hash取模的方式分散一下即可。万分之一的误判率,512MB可以放下2亿条数据。

好了,扯了这么多,贴代码!(注:在MagnusS/Java-BloomFilter的基础上加上了Redis持久化的实现)

@Component
publicclassBloomFilter<E>{
 
@Autowired
privateRedisTemplate<String, E> redisTemplate;
 
@Value("${bloomfilter.expireDays}")
privatelong expireDays;
 
// total length of theBloom filter
privateint sizeOfBloomFilter;
// expected (maximum)number of elements to be added
privateint expectedNumberOfFilterElements;
// number of hash functions
privateint numberOfHashFunctions;
// encoding used forstoring hash values as strings
privatefinalCharset charset = Charset.forName("UTF-8");
// MD5 gives good enoughaccuracy in most circumstances. Change to SHA1 if it's needed
privatestaticfinalString hashName ="MD5";
privatestaticfinalMessageDigest digestFunction;
 
// The digest method isreused between instances
static {
MessageDigest tmp;
try{
tmp = java.security.MessageDigest.getInstance(hashName);
}catch(NoSuchAlgorithmException e) {
tmp =null;
}
digestFunction = tmp;
}
 
public BloomFilter() {
this(0.0001,600000);
}
 
/**
* Constructs an empty Bloomfilter.
*
*@paramm is the total length ofthe Bloom filter.
*@paramn is the expected number ofelements the filter will contain.
*@paramk is the number of hashfunctions used.
*/
public BloomFilter(int m, int n, int k) {
this.sizeOfBloomFilter = m;
this.expectedNumberOfFilterElements = n;
this.numberOfHashFunctions = k;
}
 
/**
* Constructs an empty Bloomfilter with a given false positive probability.
* The size of bloom filterand the number of hash functions is estimated
* to match the falsepositive probability.
*
*@paramfalsePositiveProbability isthe desired false positive probability.
*@paramexpectedNumberOfElements isthe expected number of elements in the Bloom filter.
*/
public BloomFilter(double falsePositiveProbability, intexpectedNumberOfElements) {
this((int) Math.ceil((int)Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2))) * expectedNumberOfElements / Math.log(2)),// m = ceil(kn/ln2)
expectedNumberOfElements,
(int) Math.ceil(-(Math.log(falsePositiveProbability) /Math.log(2))));// k = ceil(-ln(f)/ln2)
}
 
/**
* Adds an object to theBloom filter. The output from the object's
* toString() method is usedas input to the hash functions.
*
*@paramelement is an element toregister in the Bloom filter.
*/
public void add(E element) {
add(element.toString().getBytes(charset));
}
 
/**
* Adds an array of bytes tothe Bloom filter.
*
*@parambytes array of bytes to addto the Bloom filter.
*/
public void add(byte[] bytes) {
if(redisTemplate.opsForValue().get(RedisConsts.CRAWLER_BLOOMFILTER)==null) {
redisTemplate.opsForValue().setBit(RedisConsts.CRAWLER_BLOOMFILTER,0,false);
redisTemplate.expire(RedisConsts.CRAWLER_BLOOMFILTER,expireDays, TimeUnit.DAYS);
}
 
int[] hashes = createHashes(bytes, numberOfHashFunctions);
for(int hash : hashes) {
redisTemplate.opsForValue().setBit(RedisConsts.CRAWLER_BLOOMFILTER,Math.abs(hash % sizeOfBloomFilter),true);
}
}
 
/**
* Adds all elements from aCollection to the Bloom filter.
*
*@paramc Collection of elements.
*/
public void addAll(Collection<?extendsE> c) {
for(E element : c) {
add(element);
}
}
 
/**
* Returns true if theelement could have been inserted into the Bloom filter.
* UsegetFalsePositiveProbability() to calculate the probability of this
* being correct.
*
*@paramelement element to check.
*@returntrue if the element couldhave been inserted into the Bloom filter.
*/
public boolean contains(E element) {
returncontains(element.toString().getBytes(charset));
}
 
/**
* Returns true if the arrayof bytes could have been inserted into the Bloom filter.
* UsegetFalsePositiveProbability() to calculate the probability of this
* being correct.
*
*@parambytes array of bytes tocheck.
*@returntrue if the array couldhave been inserted into the Bloom filter.
*/
public boolean contains(byte[] bytes) {
int[] hashes = createHashes(bytes, numberOfHashFunctions);
for(int hash : hashes) {
if(!redisTemplate.opsForValue().getBit(RedisConsts.CRAWLER_BLOOMFILTER,Math.abs(hash % sizeOfBloomFilter))) {
returnfalse;
}
}
returntrue;
}
 
/**
* Returns true if all theelements of a Collection could have been inserted
* into the Bloom filter.Use getFalsePositiveProbability() to calculate the
* probability of this beingcorrect.
*
*@paramc elements to check.
*@returntrue if all the elements inc could have been inserted into the Bloom filter.
*/
public boolean containsAll(Collection<?extendsE> c) {
for(E element : c) {
if(!contains(element)) {
returnfalse;
}
}
returntrue;
}
 
/**
* Generates digests basedon the contents of an array of bytes and splits the result into 4-byte int'sand store them in an array. The
* digest function is calleduntil the required number of int's are produced. For each call to digest a salt
* is prepended to the data.The salt is increased by 1 for each call.
*
*@paramdata specifies input data.
*@paramhashes number ofhashes/int's to produce.
*@returnarray of int-sized hashes
*/
public static int[] createHashes(byte[] data, int hashes) {
int[] result =newint[hashes];
 
int k =0;
byte salt =0;
while(k < hashes) {
byte[] digest;
synchronized (digestFunction) {
digestFunction.update(salt);
salt++;
digest = digestFunction.digest(data);
}
 
for(int i =0; i < digest.length /4&& k < hashes; i++) {
int h =0;
for(int j = (i *4); j < (i *4) +4; j++) {
h <<=8;
h |= ((int) digest[j]) &0xFF;
}
result[k] = h;
k++;
}
}
returnresult;
}
 
public int getSizeOfBloomFilter() {
returnthis.sizeOfBloomFilter;
}
 
public int getExpectedNumberOfElements() {
returnthis.expectedNumberOfFilterElements;
}
 
public int getNumberOfHashFunctions() {
returnthis.numberOfHashFunctions;
}
 
/**
* Compares the contents oftwo instances to see if they are equal.
*
*@paramobj is the object tocompare to.
*@returnTrue if the contents of theobjects are equal.
*/
@Override
public boolean equals(Object obj) {
if(obj ==null) {
returnfalse;
}
if(getClass() != obj.getClass()) {
returnfalse;
}
finalBloomFilter<E> other = (BloomFilter<E>) obj;
if(this.sizeOfBloomFilter!= other.sizeOfBloomFilter) {
returnfalse;
}
if(this.expectedNumberOfFilterElements!= other.expectedNumberOfFilterElements) {
returnfalse;
}
if(this.numberOfHashFunctions!= other.numberOfHashFunctions) {
returnfalse;
}
returntrue;
}
 
/**
* Calculates a hash codefor this class.
*
*@returnhash code representing thecontents of an instance of this class.
*/
@Override
public int hashCode() {
int hash =7;
hash =61* hash +this.sizeOfBloomFilter;
hash =61* hash +this.expectedNumberOfFilterElements;
hash =61* hash +this.numberOfHashFunctions;
returnhash;
}
 
public static void main(String[] args) {
BloomFilter<String> bloomFilter =newBloomFilter<>(0.0001,600000);
System.out.println(bloomFilter.getSizeOfBloomFilter());
System.out.println(bloomFilter.getNumberOfHashFunctions());
}
}

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言