http://blog.51cto.com/xpleaf/2093952
1 概述
在不用爬虫框架的情况,经过多方学习,尝试实现了一个分布式爬虫系统,并且可以将数据保存到不同地方,类似MySQL、HBase等。
基于面向接口的编码思想来开发,因此这个系统具有一定的扩展性,有兴趣的朋友直接看一下代码,就能理解其设计思想,虽然代码目前来说很多地方还是比较紧耦合,但只要花些时间和精力,很多都是可抽取出来并且可配置化的。
因为时间的关系,我只写了京东和苏宁易购两个网站的爬虫,但是完全可以实现不同网站爬虫的随机调度,基于其代码结构,再写国美、天猫等的商品爬取,难度不大,但是估计需要花很多时间和精力。因为在解析网页的数据时,实际上需要花很多时间,比如我在爬取苏宁易购商品的价格时,价格是异步获取的,并且其api是一长串的数字组合,我花了几个小时的时间才发现其规律,当然也承认,我的经验不足。
这个系统的设计,除了基本的数据爬取以外,更关注以下几个方面的问题:
- 1.如何实现分布式,同一个程序打包后分发到不同的节点运行时,不影响整体的数据爬取
- 2.如何实现url随机循环调度,核心是针对不同的顶级域名做随机
- 3.如何定时向url仓库中添加种子url,达到不让爬虫系统停下来的目的
- 4.如何实现对爬虫节点程序的监控,并能够发邮件报警
- 5.如何实现一个随机IP代理库,目的跟第2点有点类似,都是为了反反爬虫
下面会针对这个系统来做一个整体的基本介绍,其实我在代码中都有非常详细的注释,有兴趣的朋友可以参考一下代码,最后我会给出一些我爬虫时的数据分析。
另外需要注意的是,这个爬虫系统是基于Java实现的,但是语言本身仍然不是最重要的,有兴趣的朋友可以尝试用Python实现。
2 分布式爬虫系统架构
整体系统架构如下:

所以从上面的架构可以看出,整个系统主要分为三个部分:
爬虫系统就是用来爬取数据的,因为系统设计为分布式,因此,爬虫程序本身可以运行在不同的服务器节点上。
url调度系统核心在于url仓库,所谓的url仓库其实就是用Redis保存了需要爬取的url列表,并且在我们的url调度器中根据一定的策略来消费其中的url,从这个角度考虑,url仓库其实也是一个url队列。
监控报警系统主要是对爬虫节点进行监控,虽然并行执行的爬虫节点中的某一个挂掉了对整体数据爬取本身没有影响(只是降低了爬虫的速度),但是我们还是希望知道能够主动接收到节点挂掉的通知,而不是被动地发现。
下面将会针对以上三个方面并结合部分代码片段来对整个系统的设计思路做一些基本的介绍,对系统完整实现有浓厚兴趣的朋友可以直接参考源代码。
3 爬虫系统

(说明:zookeeper监控属于监控报警系统,url调度器属于URL调度系统)
爬虫系统是一个独立运行的进程,我们把我们的爬虫系统打包成jar包,然后分发到不同的节点上执行,这样并行爬取数据可以提高爬虫的效率。
3.1 随机IP代理器
加入随机IP代理主要是为了反反爬虫,因此如果有一个IP代理库,并且可以在构建http客户端时可以随机地使用不同的代理,那么对我们进行反反爬虫则会有很大的帮助。
在系统中使用IP代理库,需要先在文本文件中添加可用的代理地址信息:
需要注意的是,上面的代理IP是我在西刺代理上拿到的一些代理IP,不一定可用,建议是自己花钱购买一批代理IP,这样可以节省很多时间和精力去寻找代理IP。
然后在构建http客户端的工具类中,当第一次使用工具类时,会把这些代理IP加载进内存中,加载到Java的一个HashMap:
IPProxyRepository = ();
<span class="hljs-comment">/**
- 初次使用时使用静态代码块将IP代理库加载进set中
*/
<span class="hljs-keyword">static {
InputStream in = HttpUtil.class.getClassLoader().getResourceAsStream(<span class="hljs-string">"IPProxyRepository.txt"); <span class="hljs-comment">// 加载包含代理IP的文本
<span class="hljs-comment">// 构建缓冲流对象
InputStreamReader isr = <span class="hljs-keyword">new InputStreamReader(in);
BufferedReader bfr = <span class="hljs-keyword">new BufferedReader(isr);
String line = <span class="hljs-keyword">null;
<span class="hljs-keyword">try {
<span class="hljs-comment">// 循环读每一行,添加进map中
<span class="hljs-keyword">while ((line = bfr.readLine()) != <span class="hljs-keyword">null) {
String[] split = line.split(<span class="hljs-string">":"); <span class="hljs-comment">// 以:作为分隔符,即文本中的数据格式应为192.168.1.1:4893
String host = split[<span class="hljs-number">0];
<span class="hljs-keyword">int port = Integer.valueOf(split[<span class="hljs-number">1]);
IPProxyRepository.put(host,port);
}
Set keys = IPProxyRepository.keySet();
keysArray = keys.toArray(<span class="hljs-keyword">new String[keys.size()]); <span class="hljs-comment">// keysArray是为了方便生成随机的代理对象
} <span class="hljs-keyword">catch (IOException e) {
e.printStackTrace();
}
}
之后,在每次构建http客户端时,都会先到map中看是否有代理IP,有则使用,没有则不使用代理:
随机代理对象则通过下面的方法生成:
这样,通过上面的设计,基本就实现了随机IP代理器的功能,当然,其中还有很多可以完善的地方,比如,当使用这个IP代理而请求失败时,是否可以把这一情况记录下来,当超过一定次数时,再将其从代理库中删除,同时生成日志供开发人员或运维人员参考,这是完全可以实现的,不过我就不做这一步功能了。
3.2 网页下载器
网页下载器就是用来下载网页中的数据,主要基于下面的接口开发:
基于此,在系统中只实现了一个http get的下载器,但是也可以完成我们所需要的功能了:
<span class="hljs-meta">@Override
<span class="hljs-function"><span class="hljs-keyword">public Page <span class="hljs-title">download<span class="hljs-params">(String url) {
Page page = <span class="hljs-keyword">new Page();
String content = HttpUtil.getHttpContent(url); <span class="hljs-comment">// 获取网页数据
page.setUrl(url);
page.setContent(content);
<span class="hljs-keyword">return page;
}
}
3.3 网页解析器
网页解析器就是把下载的网页中我们感兴趣的数据解析出来,并保存到某个对象中,供数据存储器进一步处理以保存到不同的持久化仓库中,其基于下面的接口进行开发:
网页解析器在整个系统的开发中也算是比较重头戏的一个组件,功能不复杂,主要是代码比较多,针对不同的商城不同的商品,对应的解析器可能就不一样了,因此需要针对特别的商城的商品进行开发,因为很显然,京东用的网页模板跟苏宁易购的肯定不一样,天猫用的跟京东用的也肯定不一样,所以这个完全是看自己的需要来进行开发了,只是说,在解析器开发的过程当中会发现有部分重复代码,这时就可以把这些代码抽象出来开发一个工具类了。
目前在系统中爬取的是京东和苏宁易购的手机商品数据,因此与就写了这两个实现类:
<span class="hljs-comment">/**
3.4 数据存储器

数据存储器主要是将网页解析器解析出来的数据对象保存到不同的,而对于本次爬取的手机商品,数据对象是下面一个Page对象:
网页对象,主要包含网页内容和商品数据
*/
<span class="hljs-keyword">public <span class="hljs-class"><span class="hljs-keyword">class <span class="hljs-title">Page {
<span class="hljs-keyword">private String content; <span class="hljs-comment">// 网页内容
<span class="hljs-keyword">private String id; <span class="hljs-comment">// 商品Id
<span class="hljs-keyword">private String source; <span class="hljs-comment">// 商品来源
<span class="hljs-keyword">private String brand; <span class="hljs-comment">// 商品品牌
<span class="hljs-keyword">private String title; <span class="hljs-comment">// 商品标题
<span class="hljs-keyword">private <span class="hljs-keyword">float price; <span class="hljs-comment">// 商品价格
<span class="hljs-keyword">private <span class="hljs-keyword">int commentCount; <span class="hljs-comment">// 商品评论数
<span class="hljs-keyword">private String url; <span class="hljs-comment">// 商品地址
<span class="hljs-keyword">private String imgUrl; <span class="hljs-comment">// 商品图片地址
<span class="hljs-keyword">private String params; <span class="hljs-comment">// 商品规格参数
<span class="hljs-keyword">private List urls = <span class="hljs-keyword">new ArrayList<>(); <span class="hljs-comment">// 解析列表页面时用来保存解析的商品url的容器
}
对应的,在MySQL中,表数据结构如下:
而在HBase中的表结构则为如下:
在HBase shell中查看创建的表
hbase(<span class="hljs-keyword">main):<span class="hljs-number">135:<span class="hljs-number">0> <span class="hljs-keyword">desc <span class="hljs-string">'phone'
<span class="hljs-keyword">Table phone <span class="hljs-keyword">is ENABLED
phone
<span class="hljs-keyword">COLUMN FAMILIES DESCRIPTION
{<span class="hljs-keyword">NAME => <span class="hljs-string">'cf1',BLOOMFILTER => <span class="hljs-string">'ROW',<span class="hljs-keyword">VERSIONS => <span class="hljs-string">'1',IN_MEMORY => <span class="hljs-string">'false',KEEP_DELETED_CELLS => <span class="hljs-string">'FALSE',DATA_BLOCK
_ENCODING => <span class="hljs-string">'NONE',TTL => <span class="hljs-string">'FOREVER',COMPRESSION => <span class="hljs-string">'NONE',MIN_VERSIONS => <span class="hljs-string">'0',BLOCKCACHE => <span class="hljs-string">'true',<span class="hljs-keyword">BLOCKSIZE =>
<span class="hljs-string">'65536',REPLICATION_SCOPE => <span class="hljs-string">'0'}
{<span class="hljs-keyword">NAME => <span class="hljs-string">'cf2',REPLICATION_SCOPE => <span class="hljs-string">'0'}
<span class="hljs-number">2 <span class="hljs-keyword">row(s) <span class="hljs-keyword">in <span class="hljs-number">0.0350 seconds
即在HBase中建立了两个列族,分别为cf1、cf2,其中cf1用来保存id source price comment brand url字段信息,cf2用来保存title params imgUrl字段信息。
不同的数据存储用的是不同的实现类,但是其都是基于下面同一个接口开发的:
然后基于此开发了MySQL的存储实现类、HBase的存储实现类还有控制台的输出实现类,如MySQL的存储实现类,其实就是简单的数据插入语句:
<span class="hljs-meta">@Override
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">void <span class="hljs-title">store<span class="hljs-params">(Page page) {
String sql = <span class="hljs-string">"insert into phone(id,source,brand,title,price,comment_count,url,img_url,params) values(?,?,?)";
<span class="hljs-keyword">try {
queryRunner.update(sql,page.getId(),page.getSource(),page.getBrand(),page.getTitle(),page.getPrice(),page.getCommentCount(),page.getUrl(),page.getImgUrl(),page.getParams());
} <span class="hljs-keyword">catch (SQLException e) {
e.printStackTrace();
}
}
}
而HBase的存储实现类,则是HBase Java API的常用插入语句代码:
当然,至于要将数据存储在哪个地方,在初始化爬虫程序时,是可以手动选择的:
目前还没有把代码写成可以同时存储在多个地方,按照目前代码的架构,要实现这一点也比较简单,修改一下相应代码就好了。实际上,是可以先把数据保存到MySQL中,然后通过Sqoop导入到HBase中,详细操作可以参考我写的Sqoop文章。
仍然需要注意的是,如果确定需要将数据保存到HBase中,请保证你有可用的集群环境,并且需要将如下配置文档添加到classpath下:
对大数据感兴趣的同学可以折腾一下这一点,如果之前没有接触过的,直接使用MySQL存储就好了,只需要在初始化爬虫程序时注入MySQL存储器即可:
4 URL调度系统

URL调度系统是实现整个爬虫系统分布式的桥梁与关键,正是通过URL调度系统的使用,才使得整个爬虫系统可以较为高效(Redis作为存储)随机地获取url,并实现整个系统的分布式。
4.1 URL仓库
通过架构图可以看出,所谓的URL仓库不过是Redis仓库,即在我们的系统中使用Redis来保存url地址列表,正是这样,才能保证我们的程序实现分布式,只要保存了url是唯一的,这样不管我们的爬虫程序有多少个,最终保存下来的数据都是只有唯一一份的,而不会重复,是通过这样来实现分布式的。
同时url仓库中的url地址在获取时的策略是通过队列的方式来实现的,待会通过URL调度器的实现即可知道。
另外,在我们的url仓库中,主要保存了下面的数据:
Redis的数据类型为list。
种子URL是持久化存储的,一定时间后,由URL定时器通过种子URL获取URL,并将其注入到我们的爬虫程序需要使用的高优先级URL队列中,这样就可以保存我们的爬虫程序可以源源不断地爬取数据而不需要中止程序的执行。
Redis的数据类型为set。
什么是高优先级URL队列?其实它就是用来保存列表url的。
那么什么是列表url呢?
说白了就是一个列表中含有多个商品,以京东为列,我们打开一个手机列表为例:

该地址中包含的不是一个具体商品的url,而是包含了多个我们需要爬取的数据(手机商品)的列表,通过对每个高级url的解析,我们可以获取到非常多的具体商品url,而具体的商品url,就是低优先url,其会保存到低优先级URL队列中。
那么以这个系统为例,保存的数据类似如下:
Redis的数据类型为set。
低优先级URL其实就是具体某个商品的URL,如下面一个手机商品:

通过下载该url的数据,并对其进行解析,就能够获取到我们想要的数据。
那么以这个系统为例,保存的数据类似如下:
4.2 URL调度器
所谓url调度器,其实说白了就是url仓库java代码的调度策略,不过因为其核心在于调度,所以将其放到URL调度器中来进行说明,目前其调度基于以下接口开发:
<span class="hljs-comment">/**
* 获取url的方法
* 从仓库中获取url(优先获取高优先级的url,如果没有,再获取低优先级的url)
* <span class="hljs-doctag">@return
*/
<span class="hljs-function"><span class="hljs-keyword">public String <span class="hljs-title">poll<span class="hljs-params">();
<span class="hljs-comment">/**
* 向高优先级列表中添加商品列表url
* <span class="hljs-doctag">@param highUrl
*/
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">void <span class="hljs-title">offerHigher<span class="hljs-params">(String highUrl);
<span class="hljs-comment">/**
* 向低优先级列表中添加商品url
* <span class="hljs-doctag">@param lowUrl
*/
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">void <span class="hljs-title">offerLower<span class="hljs-params">(String lowUrl);
}
其基于Redis作为URL仓库的实现如下:
<span class="hljs-comment">/**
* 构造方法
*/
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-title">RandomRedisRepositoryImpl<span class="hljs-params">() {
init();
}
<span class="hljs-comment">/**
* 初始化方法,初始化时,先将redis中存在的高低优先级url队列全部删除
* 否则上一次url队列中的url没有消耗完时,再停止启动跑下一次,就会导致url仓库中有重复的url
*/
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">void <span class="hljs-title">init<span class="hljs-params">() {
Jedis jedis = JedisUtil.getJedis();
Set<String> domains = jedis.smembers(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY);
String higherUrlKey;
String lowerUrlKey;
<span class="hljs-keyword">for(String domain : domains) {
higherUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX;
lowerUrlKey = domain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX;
jedis.del(higherUrlKey,lowerUrlKey);
}
JedisUtil.returnJedis(jedis);
}
<span class="hljs-comment">/**
* 从队列中获取url,目前的策略是:
* 1.先从高优先级url队列中获取
* 2.再从低优先级url队列中获取
* 对应我们的实际场景,应该是先解析完列表url再解析商品url
* 但是需要注意的是,在分布式多线程的环境下,肯定是不能完全保证的,因为在某个时刻高优先级url队列中
* 的url消耗完了,但实际上程序还在解析下一个高优先级url,此时,其它线程去获取高优先级队列url肯定获取不到
* 这时就会去获取低优先级队列中的url,在实际考虑分析时,这点尤其需要注意
* <span class="hljs-doctag">@return
*/
<span class="hljs-meta">@Override
<span class="hljs-function"><span class="hljs-keyword">public String <span class="hljs-title">poll<span class="hljs-params">() {
<span class="hljs-comment">// 从set中随机获取一个顶级域名
Jedis jedis = JedisUtil.getJedis();
String randomDomain = jedis.srandmember(SpiderConstants.SPIDER_WEBSITE_DOMAINS_KEY); <span class="hljs-comment">// jd.com
String key = randomDomain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX; <span class="hljs-comment">// jd.com.higher
String url = jedis.lpop(key);
<span class="hljs-keyword">if(url == <span class="hljs-keyword">null) { <span class="hljs-comment">// 如果为null,则从低优先级中获取
key = randomDomain + SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX; <span class="hljs-comment">// jd.com.lower
url = jedis.lpop(key);
}
JedisUtil.returnJedis(jedis);
<span class="hljs-keyword">return url;
}
<span class="hljs-comment">/**
* 向高优先级url队列中添加url
* <span class="hljs-doctag">@param highUrl
*/
<span class="hljs-meta">@Override
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">void <span class="hljs-title">offerHigher<span class="hljs-params">(String highUrl) {
offerUrl(highUrl,SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX);
}
<span class="hljs-comment">/**
* 向低优先url队列中添加url
* <span class="hljs-doctag">@param lowUrl
*/
<span class="hljs-meta">@Override
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">void <span class="hljs-title">offerLower<span class="hljs-params">(String lowUrl) {
offerUrl(lowUrl,SpiderConstants.SPIDER_DOMAIN_LOWER_SUFFIX);
}
<span class="hljs-comment">/**
* 添加url的通用方法,通过offerHigher和offerLower抽象而来
* <span class="hljs-doctag">@param url 需要添加的url
* <span class="hljs-doctag">@param urlTypeSuffix url类型后缀.higher或.lower
*/
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">void <span class="hljs-title">offerUrl<span class="hljs-params">(String url,String urlTypeSuffix) {
Jedis jedis = JedisUtil.getJedis();
String domain = SpiderUtil.getTopDomain(url); <span class="hljs-comment">// 获取url对应的顶级域名,如jd.com
String key = domain + urlTypeSuffix; <span class="hljs-comment">// 拼接url队列的key,如jd.com.higher
jedis.lpush(key,url); <span class="hljs-comment">// 向url队列中添加url
JedisUtil.returnJedis(jedis);
}
}
通过代码分析也是可以知道,其核心就在如何调度url仓库(Redis)中的url。
4.3 URL定时器
一段时间后,高优先级URL队列和低优先URL队列中的url都会被消费完,为了让程序可以继续爬取数据,同时减少人为的干预,可以预先在Redis中插入种子url,之后定时让URL定时器从种子url中取出url定存放到高优先级URL队列中,以此达到程序定时不间断爬取数据的目的。
url消费完毕后,是否需要循环不断爬取数据根据个人业务需求而不同,因此这一步不是必需的,只是也提供了这样的操作。因为事实上,我们需要爬取的数据也是每隔一段时间就会更新的,如果希望我们爬取的数据也跟着定时更新,那么这时定时器就有非常重要的作用了。不过需要注意的是,一旦决定需要循环重复爬取数据,则在设计存储器实现时需要考虑重复数据的问题,即重复数据应该是更新操作,目前在我设计的存储器不包括这个功能,有兴趣的朋友可以自己实现,只需要在插入数据前判断数据库中是否存在该数据即可。
另外需要注意的一点是,URL定时器是一个独立的进程,需要单独启动。
定时器基于Quartz实现,下面是其job的代码:
<span class="hljs-comment">// log4j日志记录
<span class="hljs-keyword">private Logger logger = LoggerFactory.getLogger(UrlJob.class);
<span class="hljs-meta">@Override
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">void <span class="hljs-title">execute<span class="hljs-params">(JobExecutionContext context) <span class="hljs-keyword">throws JobExecutionException {
<span class="hljs-comment">/**
* 1.从指定url种子仓库获取种子url
* 2.将种子url添加进高优先级列表
*/
Jedis jedis = JedisUtil.getJedis();
Set<String> seedUrls = jedis.smembers(SpiderConstants.SPIDER_SEED_URLS_KEY); <span class="hljs-comment">// spider.seed.urls Redis数据类型为set,防止重复添加种子url
<span class="hljs-keyword">for(String seedUrl : seedUrls) {
String domain = SpiderUtil.getTopDomain(seedUrl); <span class="hljs-comment">// 种子url的顶级域名
jedis.sadd(domain + SpiderConstants.SPIDER_DOMAIN_HIGHER_SUFFIX,seedUrl);
logger.info(<span class="hljs-string">"获取种子:{}",seedUrl);
}
JedisUtil.returnJedis(jedis);
<span class="hljs-comment">// System.out.println("Scheduler Job Test...");
}
}
调度器的实现如下:
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-title">UrlJobScheduler<span class="hljs-params">() {
init();
}
<span class="hljs-comment">/**
* 初始化调度器
*/
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">void <span class="hljs-title">init<span class="hljs-params">() {
<span class="hljs-keyword">try {
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
<span class="hljs-comment">// 如果没有以下start方法的执行,则是不会开启任务的调度
scheduler.start();
String name = <span class="hljs-string">"URL_SCHEDULER_JOB";
String group = <span class="hljs-string">"URL_SCHEDULER_JOB_GROUP";
JobDetail jobDetail = <span class="hljs-keyword">new JobDetail(name,group,UrlJob.class);
String cronExpression = <span class="hljs-string">"0 10 1 * * ?";
Trigger trigger = <span class="hljs-keyword">new CronTrigger(name,cronExpression);
<span class="hljs-comment">// 调度任务
scheduler.scheduleJob(jobDetail,trigger);
} <span class="hljs-keyword">catch (SchedulerException e) {
e.printStackTrace();
} <span class="hljs-keyword">catch (ParseException e) {
e.printStackTrace();
}
}
<span class="hljs-function"><span class="hljs-keyword">public <span class="hljs-keyword">static <span class="hljs-keyword">void <span class="hljs-title">main<span class="hljs-params">(String[] args) {
UrlJobScheduler urlJobScheduler = <span class="hljs-keyword">new UrlJobScheduler();
urlJobScheduler.start();
}
<span class="hljs-comment">/**
* 定时调度任务
* 因为我们每天要定时从指定的仓库中获取种子url,并存放到高优先级的url列表中
* 所以是一个不间断的程序,所以不能停止
*/
<span class="hljs-function"><span class="hljs-keyword">private <span class="hljs-keyword">void <span class="hljs-title">start<span class="hljs-params">() {
<span class="hljs-keyword">while (<span class="hljs-keyword">true) {
}
}
}
5 监控报警系统

监控报警系统的加入主要是为了让使用者可以主动发现节点宕机,而不是被动地发现,因为实际中爬虫程序可能是持续不断运行的,并且我们会在多个节点上部署我们的爬虫程序,因此很有必要对节点进行监控,并且在节点出现问题时可以及时发现并修正,需要注意的是,监控报警系统是一个独立的进程,需要单独启动。
5.1 基本原理
首先需要先在zookeeper中创建一个/ispider 节点:
监控报警系统的开发主要依赖于zookeeper实现,监控程序对zookeeper下面的这个节点目录进行监听:
爬虫程序启动时会在该节点目录下注册一个临时节点目录:
当节点出现宕机时,该临时节点目录就会被zookeeper删除
同时因为我们监听了节点目录/ispider ,所以当zookeeper删除其下的节点目录时(或增加一个节点目录),zookeeper会给我们的监控程序发送通知,即我们的监控程序会得到回调,这样便可以在回调程序中执行报警的系统动作,从而完成监控报警的功能。
5.2 zookeeper Java API使用说明
可以使用zookeeper原生的Java API,我在另外写的一个RPC框架(底层基于Netty实现远程通信)中就是使用原生的API,不过显然代码会复杂很多,并且本身需要对zookeeper有更多的学习和了解,这样用起来才会容易一些。
所以为了降低开发的难度,这里使用第三方封装的API,即curator,来进行zookeeper客户端程序的开发。
5.3 爬虫系统zookeeper注册
在启动爬虫系统时,我们的程序都会启动一个zookeeper客户端来向zookeeper来注册自身的节点信息,主要是ip地址,并在/ispider 节点目录以创建一个以该爬虫程序所在的节点IP地址命名的节点,如/ispider/192.168.43.116 ,实现的代码如下:
应该注意到的是,我们创建的节点为临时节点,要想实现监控报警功能,必须要为临时节点。
5.4 监控程序
首先需要先监听zookeeper中的一个节点目录,在我们的系统中,设计是监听/ispider 这个节点目录:
在上面注册了zookeeper中的watcher ,也就是接收通知的回调程序,在该程序中,执行我们报警的逻辑:
currentNodes = curator.getChildren().usingWatcher( previousNodesSet = new HashSet<>(previousNodes);
previousNodes.size()) {
当然,判断节点是否挂掉,上面的逻辑还是存在一定的问题的,按照上面的逻辑,假如某一时刻新增节点和删除节点事件同时发生,那么其就不能判断出来,所以如果需要更精准的话,可以将上面的程序代码修改一下。
5.5 邮件发送模块
使用模板代码就可以了,不过需要注意的是,在使用时,发件人的信息请使用自己的邮箱。
下面是爬虫节点挂掉时接收到的邮件:

实际上,如果购买了短信服务,那么通过短信API也可以向我们的手机发送短信。
6 实战:爬取京东、苏宁易购全网手机商品数据
因为前面在介绍这个系统的时候也提到了,我只写了京东和苏宁易购的网页解析器,所以接下来也就是爬取其全网的手机商品数据。
6.1 环境说明
需要确保Redis、Zookeeper服务可用,另外如果需要使用HBase来存储数据,需要确保Hadoop集群中的HBase可用,并且相关配置文件已经加入到爬虫程序的classpath中。
还有一点需要注意的是,URL定时器和监控报警系统是作为单独的进程来运行的,并且也是可选的。
6.2 爬虫结果
进行了两次爬取,分别尝试将数据保存到MySQL和HBase中,给出如下数据情况。
6.2.1 保存到MySQL
select count(*) from phone;
+----------+
| count(*) |
+----------+
| 12052 |
+----------+
1 row in set
mysql> select count() from phone where source='jd.com';
+----------+
| count() |
+----------+
| 9578 |
+----------+
1 row in set
mysql> select count() from phone where source='suning
.com';
+----------+
| count() |
+----------+
| 2474 |
+----------+
1 row in set
在可视化工具中查看数据情况: 
6.2.2 保存到HBase
=> <span class="hljs-number">12348
在HDFS中查看数据情况:

6.2.3 数据量与实际情况分析

京东手机的列表大概有160多页,每个列表有60个商品数据,所以总量在9600左右,我们的数据基本是符合的,后面通过日志分析其实可以知道,一般丢失的数据为连接超时导致的,所以在选取爬虫的环境时,更建议在网络环境好的主机上进行,同时如果可以有IP代理地址库就更好了,另外对于连接超时的情况,其实是可以进一步在我们的程序中加以控制,一旦出现爬取数据失败的url,可以将其加入到重试url队列中,目前这一点功能我是没有做,有兴趣的同学可以试一下。
再来看看苏宁的,其有100页左右的手机列表,每页也是60个商品数据,所以总量在6000左右。但可以看到,我们的数据却只有3000这样的数量级(缺少的依然是频繁爬取造成的连接失败问题),这是为什么呢?
这是因为,打开苏宁的某个列表页面后,其是先加载30个商品,当鼠标向下滑动时,才会通过另外的API去加载其它的30个商品数据,每一个列表页面都是如此,所以,实际上,我们是缺少了一半的商品数据没有爬取。知道这个原因之后,实现也不难,但是因为时间关系,我就没有做了,有兴趣的朋友折腾一下吧。
6.3 通过日志分析爬虫系统的性能
在我们的爬虫系统中,每个关键的地方,如网页下载、数据解析等都是有打logger的,所以通过日志,可以大概分析出相关的时间参数。
平均下来,下载一个商品网页数据的时间在200~500毫秒不等,当然这个还需要取决于当时的网络情况。
另外,如果想要真正计算爬取一个商品的数据,可以通过日志下面的数据来计算:
- 下载一个商品页面数据的时间
- 获取价格数据的时间
- 获取评论数据的时间
在我的主机上(CPU:E5 10核心,内存:32GB,分别开启1个虚拟机和3个虚拟机),情况如下:
可以看到,当使用3个节点时,时间并不会相应地缩小为原来的1/3,这是因为此时影响爬虫性能的问题主要是网络问题,节点数量多,线程数量大,网络请求也多,但是带宽一定,并且在没有使用代理的情况,请求频繁,连接失败的情况也会增多,对时间也有一定的影响,如果使用随机代理库,情况将会好很多。
但可以肯定的是,在横向扩展增加爬虫节点之后,确实可以大大缩小我们的爬虫时间,这也是分布式爬虫系统的好处。
7 爬虫系统中使用的反反爬虫策略
在整个爬虫系统的设计中,主要使用下面的策略来达到反反爬虫的目的:
- 使用代理来访问-->IP代理库,随机IP代理
- 随机顶级域名url访问-->url调度系统
- 每个线程每爬取完一条商品数据sleep一小段时间再进行爬取
8 总结
需要说明的是,本系统是基于Java实现的,但个人觉得,语言本身依然不是问题,核心在于对整个系统的设计上以及理解上,写此文章是希望分享这样一种分布式爬虫系统的架构给大家,如果对源代码感兴趣,可以到我的GitHub上查看。
GitHub: (编辑:李大同)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|