WebMagic启动流程的分析

分析WebMagic爬虫框架的执行流程


写在前面

前几天写了一个WebMagic的入门程序,虽然能够跑起来,但是效果并不理想,一方面是数据并没有保存到数据库,而另一方面就是自己对WebMagic的执行过程还不太了解,以至于当我想自定义某些功能的时候,根本无从下手,所以今天决定扒一下源码,深入了解一下WebMagic的执行过程。使用的版本是0.7.3


执行过程

如果你看过WebMagic的官方文档的话,你肯定见过类似于下面这样的代码:

1
Spider.create(new GithubRepoPageProcessor()).addUrl("https://github.com/code4craft").thread(5).run();

这其实就是爬虫的入口或者说爬虫的启动代码,由Spider类调用其静态方法create()创建一个Spider实例,其中参数new GithubRepoPageProcessor()这一部分是我们自己定义的类,这个类需要实现PageProcessor接口并且实现这个接口中的process()方法以及getSite()方法,我们可以看一下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package us.codecraft.webmagic.processor;

import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Site;

/**
* Interface to be implemented to customize a crawler.<br>
* <br>
* In PageProcessor, you can customize:
* <br>
* start urls and other settings in {@link Site}<br>
* how the urls to fetch are detected <br>
* how the data are extracted and stored <br>
*
* @author code4crafter@gmail.com <br>
* @see Site
* @see Page
* @since 0.1.0
*/
public interface PageProcessor {

/**
* process the page, extract urls to fetch, extract the data and store
*
* @param page page
*/
public void process(Page page);

/**
* get the site settings
*
* @return site
* @see Site
*/
public Site getSite();
}

PageProcessor接口的作用就是方便用户自己定义爬虫的抽取逻辑以及设置站点信息,其实在run()方法中会通过间接调用(pageProcessor.process(page))来执行用户自定义的process()方法。这一点下面会讲到,这里我们先看另外两个方法addUrl()以及thread(),其中thread()方法比较简单,就是设置启动的线程数量,而addUrl()方法是用于添加要爬取的URL地址,此方法可以接收多个RUL地址。其源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Add urls to crawl. <br>
*
* @param urls urls
* @return this
*/
public Spider addUrl(String... urls) {
for (String url : urls) {
addRequest(new Request(url));
}
signalNewUrl();
return this;
}

接下来就是run()方法了,这个是爬虫的核心方法,这个方法会完成爬虫的整个执行的流程,包括检查运行的状态、初始化组件、打印日志信息、处理请求等等一系列的功能。其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public void run() {
// 检查运行状态
checkRunningStat();
// 初始化组件
initComponent();
logger.info("Spider {} started!",getUUID());
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
final Request request = scheduler.poll(this);
if (request == null) {
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
break;
}
// wait until new url added
waitNewUrl();
} else {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
// 处理请求
processRequest(request);
onSuccess(request);
} catch (Exception e) {
onError(request);
logger.error("process request " + request + " error", e);
} finally {
pageCount.incrementAndGet();
signalNewUrl();
}
}
});
}
}
stat.set(STAT_STOPPED);
// release some resources
if (destroyWhenExit) {
close();
}
logger.info("Spider {} closed! {} pages downloaded.", getUUID(), pageCount.get());
}

首先是checkRunningStat()方法,在调用run()后会先检查爬虫的运行状态,如果爬虫已经启动,再调用run()将会报错,否则会将爬虫的运行状态设置为1。

1
2
3
4
5
6
7
8
9
10
11
private void checkRunningStat() {
while (true) {
int statNow = stat.get();
if (statNow == STAT_RUNNING) {
throw new IllegalStateException("Spider is already running!");
}
if (stat.compareAndSet(statNow, STAT_RUNNING)) {
break;
}
}
}

检查完运行状态之后,会初始化组件,这一部分主要是设置默认的Downloader以及Pipeline,还有就是对线程池以及请求做一些处理,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
protected void initComponent() {
if (downloader == null) {
this.downloader = new HttpClientDownloader();
}
if (pipelines.isEmpty()) {
// 指定默认的Pipeline
pipelines.add(new ConsolePipeline());
}
downloader.setThread(threadNum);
if (threadPool == null || threadPool.isShutdown()) {
if (executorService != null && !executorService.isShutdown()) {
threadPool = new CountableThreadPool(threadNum, executorService);
} else {
threadPool = new CountableThreadPool(threadNum);
}
}
if (startRequests != null) {
for (Request request : startRequests) {
addRequest(request);
}
startRequests.clear();
}
startTime = new Date();
}

可以看到,默认情况下,如果你没有指定Pipeline,那么当你调用putField()时,会将结果输出到控制台。这是因为调用putField()方法时,会将抽取的结果集信息存放到ResultItems类中的fields中,fields是一个LinkedHashMap,而结果的输出实际上是在Spider类中的onDownloadSuccess()方法执行,而run()方法会间接调用这个方法,完成信息的输出。onDownloadSuccess()方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void onDownloadSuccess(Request request, Page page) {
if (site.getAcceptStatCode().contains(page.getStatusCode())){
pageProcessor.process(page);
extractAndAddRequests(page, spawnUrl);
if (!page.getResultItems().isSkip()) {
for (Pipeline pipeline : pipelines) {
// 如果没有指定Pipeline,那么这里调用的就是ConsolePipeline类中的process方法
pipeline.process(page.getResultItems(), this);
}
}
} else {
logger.info("page status code error, page {} , code: {}", request.getUrl(), page.getStatusCode());
}
sleep(site.getSleepTime());
return;
}

初始化完组件之后,就是处理请求了,这一部分主要看两个方法:

1
2
processRequest(request);
onSuccess(request);

首先是processRequest()方法,这个方法会去下载要爬取的页面,并且根据下载成功与否进行不同的处理,如果下载成功,就会去调用上面刚说过的onDownloadSuccess()方法,而在onDownloadSuccess()方法中,在输出结果集之前,会去执行pageProcessor.process(page);这一句代码,而这一句代码实际上调用的就是我们自己定义的抽取逻辑的方法process()。如果下载失败会进行一些重试处理。processRequest()方法源码如下:

1
2
3
4
5
6
7
8
private void processRequest(Request request) {
Page page = downloader.download(request, this);
if (page.isDownloadSuccess()){
onDownloadSuccess(request, page);
} else {
onDownloaderFail(request);
}
}

总结

通过分析源码可以看出,如果我们想要将数据保存到数据库,那么在抽取信息的时候不是调用putField()方法保存到Map集合中了,而是将信息封装到一个对象模型中,并且通过自定义Pipeline接口的实现类,将对象模型中的数据保存到数据库中。


如果您觉得我的文章对您有帮助,请随意赞赏,您的支持将鼓励我继续创作!
0%