一次简单手写爬虫遇到的问题
业务&数据分析
根据业务,分析根据爬取的数据源。
数据源返回的是完整的html页面,并且部分数据是需要页面上执行js之后才能更新。
技术框架
请求-htmlunit
一个可以模拟浏览器请求的java工具包,官网链接,自动处理js业务。
解析-jsoup
一款Java 的HTML解析器,可直接解析某个URL地址、HTML文本内容。它提供了一套非常省力的API,可通过DOM,CSS以及类似于jQuery的操作方法来取出和操作数据。
封禁&代理
封禁方式,短时封禁、填写验证码之后解封、直接永封
封禁规律,频率、人为运维监控
使用代理ip处理封禁,ip代理提供方式:ip池,ip存活时长不一,需要维护本地代理池,失效、超时、封禁后更新。
渠道代理-每次请求自动更换ip为最优解,无需维护代理池。
多线程
使用多线程提高爬虫效率。
需要注意线程池大小的维护,一般电脑默认线程池为处理器核心数*2,初始化过多可能存在项目无法启动的情况。
ThreadPoolExecutor
使用 ThreadPoolExecutor
维护线程池时,在启动类中直接注入。
/**
* 业务线程池
*
* @return 线程池
*/
@Bean
ExecutorService businessExecutorPool() {
//初始化线程池数量
int nThreads = 5;
log.info("当前线程池数量:【{}】", nThreads);
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("xxx-web-pool-%d").build();
return new ThreadPoolExecutor(
nThreads,
20,
100L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
namedThreadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
在需要使用的地方直接注入即可,以下为serviceImpl
中的注入使用
@Resource
@Qualifier(value = "businessExecutorPool")
private ExecutorService pool;
/**
* 多线程demo
*/
public void poolDemo() {
pool.execute(() -> {
//子线程执行业务...
});
}
注意初始化线程池大小的设置。
CyclicBarrier
根据具体业务,设计逻辑,多线程处理有不同的方式。
使用 CyclicBarrier
解决主线程、子线程之间业务同步处理问题。
大体逻辑为,在明确需要开启多少子线程的前提下,在主线程创建 CyclicBarrier
,之后业务调用子线程进行多线程处理,主线程暂停,等待子线程全部完成。
在子线程业务处理完成之后,调用cb.await()
方法通知主线程。
public void poolJob2FixPage(List<NationalPage> pages) {
try {
//创建cb
CyclicBarrier cb = new CyclicBarrier(pages.size() + 1);
for (NationalPage page : pages) {
pool.execute(() -> {
try {
//业务代码...
} catch (Exception e) {
log.error(e.getMessage());
} finally {
try {
//子线程执行完成通知
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
});
}
//主线程暂停,所有子线程完成后主线程重新激活
cb.await();
} catch (Exception e) {
e.printStackTrace();
}
}
在所有子线程业务完成之后,主线程会自动继续执行。
业务中一定要注意处理子线程的异常情况,保证cb.await()的执行完成,不然会导致主线程卡死的情况。
使用注意
ThreadPoolExecutor
与CyclicBarrier
结合使用时,注意CyclicBarrier
的初始化数量与线程池之间的冲突。
如果初始化线程池不够用,则业务会进入线程池任务列队中,等待有空出来的子线程再排队执行业务。
但是由于使用了CyclicBarrier
,子线程的cb.await()
重复触发并不会使得主线程继续执行,这样就会导致主线程直接卡死。
此处我提前将需要执行的业务分段,使得每次循环需要调用的子线程数量,与ThreadPoolExecutor
初始化的线程池数量保持相同,以此解决这个问题。
private void completeNormPage(NationalNorm nationalNorm) {
List<NationalPage> nationalPages = nationalPageMapper.selectList(new QueryWrapper<NationalPage>()
.eq("norms_id", nationalNorm.getId())
.isNull("content"));
//因为线程池有5个线程,因此将数据按5个一组进行投放执行
int threadNum = 5;
int i = nationalPages.size() / threadNum;
if (nationalPages.size() % threadNum > 0) {
i++;
}
for (int j = 0; j < i; j++) {
int end = (j + 1) * threadNum;
if (j == i - 1) {
end = nationalPages.size();
}
//5个一组进行处理
poolJob2FixPage(nationalPages.subList(j * threadNum, end));
}
//其他处理...
}