把布隆过滤器用起来

把布隆过滤器用起来

本文偏应用和代码实践,理论请参考本文末尾参考文章

简介

一句话简介:
过滤器,判断这个元素在与不在,不在则100%不在;在则去查询,确认在不在。

详细简介:
BloomFilter,中文名称叫做布隆过滤器,是1970年由 Bloom 提出的,它可以被用来检测一个元素是否在一个集合中,它的空间利用效率很高,使用它可以大大节省存储空间。BloomFilter 使用位数组表示一个待检测集合,并可以快速地通过概率算法判断一个元素是否存在于这个集合中,所以利用这个算法我们可以实现去重效果。

它的优点是空间效率和查询时间都远远超过一般算法,缺点是有一定的误识别率和删除困难。

场景

1、大量爬虫数据去重

2、保护数据安全:
广告精确投放 :广告主通过设备id,计算hash算法,在数据包(数据提供方)中去查找,如果在存在,则证明该设备id属于目标人群,进行投放广告,同时保证设备id不泄露。数据提供方和广告主都没有暴露自己拥有的设备id。间接用户画像且不违数据安全法。详见:https://zhuanlan.zhihu.com/p/37847480

3、比特币网络转账确认
-w798

SPV节点:SPV是“Simplified Payment Verification”(简单支付验证)的缩写。中本聪论文简要地提及了这一概念,指出:不运行完全节点也可验证支付,用户只需要保存所有的block header就可以了。用户虽然不能自己验证交易,但如果能够从区块链的某处找到相符的交易,他就可以知道网络已经认可了这笔交易,而且得到了网络的多少个确认。

-w507
先去访问布隆过滤器,去判断交易记录是否在某个block(区块)里存在。从海量数据(十亿个区块,每个区块1-2M的交易记录,),快速得到结果。
详见:https://www.youtube.com/watch?v=uC6Q5m0SSQ0

4、分布式系统(Map-Reduce)
把大任务切分成块,分配和验证一个子任务是否在一个子系统上。

必要性

省空间,提升效率

我们首先来回顾一下 ScrapyRedis 的去重机制,它将 Request 的指纹存储到了 Redis 集合中,每个指纹的长度为 40,例如 27adcc2e8979cdee0c9cecbbe8bf8ff51edefb61 就是一个指纹,它的每一位都是 16 进制数。

让我们来计算一下用这种方式耗费的存储空间,每个 16 进制数占用 4b,1 个指纹用 40 个 16 进制数表示,占用空间为 20B,所以 1 万个指纹即占用空间 200KB,1 亿个指纹即占用 2G,所以当我们的爬取数量达到上亿级别时,Redis 的占用的内存就会变得很高,而且这仅仅是指纹的存储,另外 Redis 还存储了爬取队列,内存占用会进一步提高,更别说有多个 Scrapy 项目同时爬取的情况了。所以当爬取达到亿级别规模时 ScrapyRedis 提供的集合去重已经不能满足我们的要求,所以在这里我们需要使用一个更加节省内存的去重算法,它叫做 BloomFilter。

(内存版)Python实现的内存版布隆过滤器pybloom

https://github.com/jaybaird/python-bloomfilter
安装:

1
pip install pybloom

该模块包含两个类实现布隆过滤器功能。
BloomFilter 是定容。
ScalableBloomFilter 可以自动扩容

使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
>>> from pybloom import BloomFilter
>>> f = BloomFilter(capacity=1000, error_rate=0.001) # capacity是容量, error_rate 是能容忍的误报率
>>> f.add('Traim304') # 当不存在该元素,返回False
False
>>> f.add('Traim304') # 若存在,返回 True
True
>>> 'Traim304' in f # 值得注意的是若返回 True。该元素可能存在, 也可能不存在。过滤器能容许存在一定的错误
True
>>> 'Jacob' in f # 但是 False。则必定不存在
False
>>> len(f) # 当前存在的元素
1

>>> f = BloomFilter(capacity=1000, error_rate=0.001)
>>> from pybloom import ScalableBloomFilter
>>> sbf = ScalableBloomFilter(mode=ScalableBloomFilter.SMALL_SET_GROWTH)
>>> # sbf.add() 与 BloomFilter 同

超过误报率时抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
>>> f = BloomFilter(capacity=1000, error_rate=0.0000001)
>>> for a in range(1000):
... _ = f.add(a)
...
>>> len(a)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: object of type 'int' has no len()
>>> len(f)
1000
>>> f.add(1000)
False
>>> f.add(1001) # 当误报率超过 error_rate 会报错
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/site-packages/pybloom/pybloom.py", line 182, in add
raise IndexError("BloomFilter is at capacity")
IndexError: BloomFilter is at capacity

(持久化)手动实现的redis版布隆过滤器

大数据量,多用Redis持久化版本的布隆过滤器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# 布隆过滤器 redis版本实现
import hashlib
import redis
import six

# 1. 多个hash函数的实现和求值
# 2. hash表实现和实现对应的映射和判断


class MultipleHash(object):
'''根据提供的原始数据,和预定义的多个salt,生成多个hash函数值'''

def __init__(self, salts, hash_func_name="md5"):
self.hash_func = getattr(hashlib, hash_func_name)
if len(salts) < 3:
raise Exception("请至少提供3个salt")
self.salts = salts

def get_hash_values(self, data):
'''根据提供的原始数据, 返回多个hash函数值'''
hash_values = []
for i in self.salts:
hash_obj = self.hash_func()
hash_obj.update(self._safe_data(data))
hash_obj.update(self._safe_data(i))
ret = hash_obj.hexdigest()
hash_values.append(int(ret, 16))
return hash_values

def _safe_data(self, data):
'''
python2 str === python3 bytes
python2 uniocde === python3 str
:param data: 给定的原始数据
:return: 二进制类型的字符串数据
'''
if six.PY3:
if isinstance(data, bytes):
return data
elif isinstance(data, str):
return data.encode()
else:
raise Exception("请提供一个字符串") # 建议使用英文来描述
else:
if isinstance(data, str):
return data
elif isinstance(data, unicode):
return data.encode()
else:
raise Exception("请提供一个字符串") # 建议使用英文来描述


class BloomFilter(object):
''''''
def __init__(self, salts, redis_host="localhost", redis_port=6379, redis_db=0, redis_key="bloomfilter"):
self.redis_host = redis_host
self.redis_port = redis_port
self.redis_db = redis_db
self.redis_key = redis_key
self.client = self._get_redis_client()
self.multiple_hash = MultipleHash(salts)

def _get_redis_client(self):
'''返回一个redis连接对象'''
pool = redis.ConnectionPool(host=self.redis_host, port=self.redis_port, db=self.redis_db)
client = redis.StrictRedis(connection_pool=pool)
return client

def save(self, data):
''''''
hash_values = self.multiple_hash.get_hash_values(data)
for hash_value in hash_values:
offset = self._get_offset(hash_value)
self.client.setbit(self.redis_key, offset, 1)
return True

def is_exists(self, data):
hash_values = self.multiple_hash.get_hash_values(data)
for hash_value in hash_values:
offset = self._get_offset(hash_value)
v = self.client.getbit(self.redis_key, offset)
if v == 0:
return False
return True

def _get_offset(self, hash_value):
# 512M长度哈希表
# 2**8 = 256
# 2**20 = 1024 * 1024
# (2**8 * 2**20 * 2*3) 代表hash表的长度 如果同一项目中不能更改
return hash_value % (2**8 * 2**20 * 2*3)
if __name__ == '__main__':

data = ["asdfasdf", "123", "123", "456","asf", "asf"]

bm = BloomFilter(salts=["1","2","3", "4"],redis_host="172.17.0.2")
for d in data:
if not bm.is_exists(d):
bm.save(d)
print("映射数据成功: ", d)
else:
print("发现重复数据:", d)

应用在scrapy-redis中

代码已经打包成了一个 Python 包并发布到了 PyPi,链接为:https://pypi.python.org/pypi/scrapy-redis-bloomfilter,因此我们以后如果想使用 ScrapyRedisBloomFilter 直接使用就好了,不需要再自己实现一遍。

我们可以直接使用Pip来安装,命令如下:

1
pip3 install scrapy-redis-bloomfilter

使用的方法和 ScrapyRedis 基本相似,在这里说明几个关键配置:

1
2
3
4
5
6
# 去重类,要使用BloomFilter请替换DUPEFILTER_CLASS
DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter"
# 哈希函数的个数,默认为6,可以自行修改
BLOOMFILTER_HASH_NUMBER = 6
# BloomFilter的bit参数,默认30,占用128MB空间,去重量级1亿
BLOOMFILTER_BIT = 30

DUPEFILTER_CLASS 是去重类,如果要使用 BloomFilter需要将 DUPEFILTER_CLASS 修改为该包的去重类。

BLOOMFILTER_HASH_NUMBER 是 BloomFilter 使用的哈希函数的个数,默认为 6,可以根据去重量级自行修改。

BLOOMFILTER_BIT 即前文所介绍的 BloomFilter 类的 bit 参数,它决定了位数组的位数,如果 BLOOMFILTER_BIT 为 30,那么位数组位数为 2 的 30次方,将占用 Redis 128MB 的存储空间,去重量级在 1 亿左右,即对应爬取量级 1 亿左右。如果爬取量级在 10亿、20 亿甚至 100 亿,请务必将此参数对应调高。

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Spider 文件:
from scrapy import Request, Spider

class TestSpider(Spider):
name = 'test'
base_url = 'https://www.baidu.com/s?wd='

def start_requests(self):
for i in range(10):
url = self.base_url + str(i)
yield Request(url, callback=self.parse)

# Here contains 10 duplicated Requests
for i in range(100):
url = self.base_url + str(i)
yield Request(url, callback=self.parse)

def parse(self, response):
self.logger.debug('Response of ' + response.url)

在 start_requests() 方法中首先循环 10 次,构造参数为 0-9 的 URL,然后重新循环了 100 次,构造了参数为 0-99 的 URL,那么这里就会包含 10 个重复的 Request,我们运行项目测试一下:

1
scrapy crawl test

可以看到最后的输出结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{'bloomfilter/filtered': 10,
'downloader/request_bytes': 34021,
'downloader/request_count': 100,
'downloader/request_method_count/GET': 100,
'downloader/response_bytes': 72943,
'downloader/response_count': 100,
'downloader/response_status_count/200': 100,
'finish_reason': 'finished',
'finish_time': datetime.datetime(2017, 8, 11, 9, 34, 30, 419597),
'log_count/DEBUG': 202,
'log_count/INFO': 7,
'memusage/max': 54153216,
'memusage/startup': 54153216,
'response_received_count': 100,
'scheduler/dequeued/redis': 100,
'scheduler/enqueued/redis': 100,
'start_time': datetime.datetime(2017, 8, 11, 9, 34, 26, 495018)}

可以看到最后统计的第一行的结果:

1
'bloomfilter/filtered': 10,

这就是 BloomFilter 过滤后的统计结果,可以看到它的过滤个数为 10 个,也就是它成功将重复的 10 个 Reqeust 识别出来了,测试通过。

原理

本文偏应用,难以描述的原理,最后说。
一个很长的二进制向量和一个映射函数。

-w1209

-w1161

参考资料

1、https://zhuanlan.zhihu.com/p/37847480
2、https://www.youtube.com/watch?v=uC6Q5m0SSQ0
3、《python3网络爬虫开发实战》崔庆才
4、https://www.jianshu.com/p/f57187e2b5b9

子航 Clark wechat
微信公众号"优雅的python",欢迎订阅!
坚持分享,您的支持将鼓励我继续创作!