业务&数据分析

根据业务,分析根据爬取的数据源。
数据源返回的是完整的html页面,并且部分数据是需要页面上执行js之后才能更新。

技术框架

请求-htmlunit

一个可以模拟浏览器请求的java工具包,官网链接,自动处理js业务。

解析-jsoup

一款Java 的HTML解析器,可直接解析某个URL地址、HTML文本内容。它提供了一套非常省力的API,可通过DOM,CSS以及类似于jQuery的操作方法来取出和操作数据。

封禁&代理

封禁方式,短时封禁、填写验证码之后解封、直接永封
封禁规律,频率、人为运维监控
使用代理ip处理封禁,ip代理提供方式:ip池,ip存活时长不一,需要维护本地代理池,失效、超时、封禁后更新。
渠道代理-每次请求自动更换ip为最优解,无需维护代理池。

多线程

使用多线程提高爬虫效率。
需要注意线程池大小的维护,一般电脑默认线程池为处理器核心数*2,初始化过多可能存在项目无法启动的情况。

ThreadPoolExecutor

使用 ThreadPoolExecutor维护线程池时,在启动类中直接注入。

  /**
     * 业务线程池
     *
     * @return 线程池
     */
    @Bean
    ExecutorService businessExecutorPool() {
        //初始化线程池数量
        int nThreads = 5;
        log.info("当前线程池数量:【{}】", nThreads);
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("xxx-web-pool-%d").build();
        return new ThreadPoolExecutor(
                nThreads,
                20,
                100L,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024),
                namedThreadFactory,
                new ThreadPoolExecutor.AbortPolicy());
    }

在需要使用的地方直接注入即可,以下为serviceImpl中的注入使用

 @Resource
    @Qualifier(value = "businessExecutorPool")
    private ExecutorService pool;
        
    /**
     * 多线程demo
     */
    public void poolDemo() {
        pool.execute(() -> {
              //子线程执行业务...
        });
    }

注意初始化线程池大小的设置。

CyclicBarrier

根据具体业务,设计逻辑,多线程处理有不同的方式。
使用 CyclicBarrier解决主线程、子线程之间业务同步处理问题。
大体逻辑为,在明确需要开启多少子线程的前提下,在主线程创建 CyclicBarrier,之后业务调用子线程进行多线程处理,主线程暂停,等待子线程全部完成。
在子线程业务处理完成之后,调用cb.await()方法通知主线程。

public void poolJob2FixPage(List<NationalPage> pages) {
        try {
            //创建cb
            CyclicBarrier cb = new CyclicBarrier(pages.size() + 1);
            for (NationalPage page : pages) {
                pool.execute(() -> {
                    try {
                     //业务代码...
                      
                    } catch (Exception e) {
                        log.error(e.getMessage());
                    } finally {
                        try {
                            //子线程执行完成通知
                            cb.await();
                        } catch (InterruptedException | BrokenBarrierException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
          	//主线程暂停,所有子线程完成后主线程重新激活
            cb.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

在所有子线程业务完成之后,主线程会自动继续执行。
业务中一定要注意处理子线程的异常情况,保证cb.await()的执行完成,不然会导致主线程卡死的情况。
使用注意
ThreadPoolExecutorCyclicBarrier结合使用时,注意CyclicBarrier的初始化数量与线程池之间的冲突。
如果初始化线程池不够用,则业务会进入线程池任务列队中,等待有空出来的子线程再排队执行业务。
但是由于使用了CyclicBarrier,子线程的cb.await()重复触发并不会使得主线程继续执行,这样就会导致主线程直接卡死。
此处我提前将需要执行的业务分段,使得每次循环需要调用的子线程数量,与ThreadPoolExecutor初始化的线程池数量保持相同,以此解决这个问题。

    private void completeNormPage(NationalNorm nationalNorm) {
        List<NationalPage> nationalPages = nationalPageMapper.selectList(new QueryWrapper<NationalPage>()
                .eq("norms_id", nationalNorm.getId())
                .isNull("content"));
        //因为线程池有5个线程,因此将数据按5个一组进行投放执行
        int threadNum = 5;
        int i = nationalPages.size() / threadNum;
        if (nationalPages.size() % threadNum > 0) {
            i++;
        }
        for (int j = 0; j < i; j++) {
            int end = (j + 1) * threadNum;
            if (j == i - 1) {
                end = nationalPages.size();
            }
            //5个一组进行处理
            poolJob2FixPage(nationalPages.subList(j * threadNum, end));
        }
       	//其他处理...

    }