全国空气质量爬取实战

语言: CN / TW / HK

theme: channing-cyan highlight: agate


前言

Hi 许久不见,由于最近一直在搞课题所以成功从一个日更作者变成了一个周更作者,惭愧惭愧,不过值得庆幸的是目前已经快取得阶段性进展了,至少毕业论文是由着落了,而且还是一个正个八经的科研项目,而不是某某系统开发设计文档。

ok,今天的话其实也没啥,冒个泡,然后带来一个爬虫的实战项目,也是在掘金看到了自己高中时代的一个启蒙博主,崔庆才老师,所以干脆来写一个爬虫相关的内容吧,不过这个项目的话其实也是在原来的一个项目基础上进行了修改的,(而且也是好早以前我就做出来了,然后没写相关博文记录...)

ok,我们先来看看我们实战爬取的效果。

爬取之后我们会将数据保存到excel里面,至于那个date文件夹是你自己设置的。

image.png

然后里面的数据是这样的

image.png

我们的整个的项目结构是这样的:

image.png

Pojo 实体类

这个东西咋说咧,是这个Java留下了的毛病,不过的因为涉及到保存操作,我觉得定义这样的一个类的话方面后面的数据存储。

```python

import datetime

class AIRPojo(object): __AQI = None __PM25 = None __CO = None __SO2=None __PM10 = None __O3 = None __NO2 = None __City = None __Province=None __COUNT = 0 createTime = str(datetime.datetime.now().strftime("%Y-%m-%d-%X"))

def get_AQI(self):

    if (self.__AQI):
        return self.__AQI

def get_PM25(self):
    if (self.__PM25):
        return self.__PM25

def get_CO(self):
    if (self.__CO):
        return self.__CO

def get_SO2(self):
    if(self.__SO2):
        return self.__SO2

def get_PM10(self):
    if (self.__PM10):
        return self.__PM10

def get_O3(self):
    if (self.__O3):
        return self.__O3

def get_NO2(self):
    if (self.__NO2):
        return self.__NO2

def get_City(self):
    if(self.__City):
        return self.__City

def get_Province(self):
    if(self.__Province):
        return self.__Province

def set_AQI(self, AQI):
    if(self.__COUNT==0):
        self.__AQI = AQI
        self.__COUNT+=1
def set_PM25(self, PM25):
    if(self.__COUNT==1):
        self.__PM25 = PM25
        self.__COUNT+=1
def set_CO(self, CO):
    if(self.__COUNT==2):
        self.__CO = CO
        self.__COUNT+=1
def set_SO2(self,SO2):
    if(self.__COUNT==3):
        self.__SO2=SO2
        self.__COUNT+=1
def set_PM10(self, PM10):
    if(self.__COUNT==4):
        self.__PM10 = PM10
        self.__COUNT+=1
def set_O3(self, O3):
    if(self.__COUNT==5):
        self.__O3 = O3
        self.__COUNT+=1
def set_NO2(self, NO2):
    if(self.__COUNT==6):
        self.__NO2 = NO2
        self.__COUNT+=1
def set_City(self,City):
    if(self.__COUNT==7):
        self.__City = City
        self.__COUNT+=1

def set_Province(self,Province):
    if(self.__COUNT==8):
        self.__Province = Province
        self.__COUNT+=1

def __str__(self):
    if(self.__COUNT>=8):
        return "AQI:"+self.__AQI+"-PM2.5:"+self.__PM25+"-CO:"+self.__CO+"-SO2:"+self.__SO2+"-PM10:"+self.__PM10+"-O3:"+self.__O3+"-NO2:"+self.__NO2+"-city:"+self.__City+"-Province"+self.__Province
    else:
        return "数据未保存完毕,无法输出全部结果"

if name == 'main': air = AIRPojo() print(air) ```

这个类是没有啥特殊的,他就是来封装数据的。

设置

接下来是我们的设置,这个部分的话其实也没有啥,这边其实是对一些设置进行了提取。 方便统一调整。 ```python

日志等级,ALL,INFO,NONE

LOG_LEVEL="ALL"

空气质量保存位置

AIRQualitySavePath="./date"

记录当前空气质量保存进度

AIRQUALITYSAVECURRENT = 0 AIRQUALITYTOTAL = 0 ```

不过在这款集成的需要设置的配置不多。

爬虫实现

这个爬虫的实现的话,稍微复杂一点。首先是咱们的异步请求

爬取流程

在这里的话其实爬取的流程还是很简单的

image.png

爬取请求

搞清楚了这个就知道咱们的爬取请求怎么做了 ```python

def get_provinces(self): response = requests.get(url=self.rootUrl) response_data = response.content.decode(self.encoding) html = self.parse.HTML(response_data) Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

for Province in Provinces:
    temp = list()
    Province_citys_link = Province.xpath("""./dd/a/@href""")
    Province_citys_name = Province.xpath("""./dd/a/text()""")
    for city_link,city_name in zip(Province_citys_link,Province_citys_name):
        temp.append((self.baseUrl+city_link,city_name))

    province_name = Province.xpath("./dt/b/text()")[0]
    self.all_provinces[province_name] = temp
    save_model = QualitySaveModel(province_name,len(temp))

    self.save_all_models[province_name] = save_model
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
    print("初始化完成,已得到所有省份")

这里做一个内部切面来做数据保存工作

def parse_city_quality(self,task): if(task.result()): data,province,city,url= task.result() html = self.parse.HTML(data) airPojo= AIRPojo()

    AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

    airPojo.set_AQI(AQI)
    ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

    airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
    airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
    airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
    airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
    airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
    airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
    airPojo.set_City(city)
    airPojo.set_Province(province)
    self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
    if(LOG_LEVEL=="ALL"):
        print("当前完成任务",self.TIMES,airPojo,url)

    #保存文件
    QualitySave.SaveQuality(airPojo,self.save_all_models)
else:
    pass

async def fireCity(self,province,url,city,semaphore): #传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小

async with semaphore:
    timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url,timeout=timeout) as response:
                data = await response.text(encoding=self.encoding)

                return (data,province,city,url)
    except Exception as e:

        self.timeout_task.append((province,url,city,semaphore))

```

这部分就是前面两个步骤,请求主页面,然后解析对应的省份的城市,让创建异步请求,之后让异步请求进行处理。

超时处理

不过值得一提的是,我们的超时处理。

```python

def check(self):

while(self.timeout_task):
    if(LOG_LEVEL=="ALL"):
        print("正在处理超时连接",len(self.timeout_task))
    tasks = []
    while(self.timeout_task):
        province, url, city, semaphore = self.timeout_task.pop(0)
        c = self.fireCity(province, url, city, semaphore)
        task = asyncio.ensure_future(c)
        task.add_done_callback(self.parse_city_quality)
        tasks.append(task)
    self.loop.run_until_complete(asyncio.wait(tasks))

```

主要是这个玩意,当有些任务进入超时状态的时候,任务相关的消息就会被存储起来,然后重新创建异步请求。

线程处理

这里主要是这个原因,一开始我设计这个爬虫的目的其实是要配合一个GUI组件的,所以的话是有可能在一个UI线程里面执行的,所以要做一个线程安全处理。

这里面做一个判断 python def __now_thread(self)->bool: #判断当前的线程是否为主线程 current_name = threading.current_thread().getName() if(current_name=="MainThread"): return True return False 如果不是的话就像下面那样处理 python if (self.__now_thread()): self.loop = asyncio.get_event_loop() semaphore = asyncio.Semaphore(self.PoolSize) else: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) semaphore = asyncio.Semaphore(self.PoolSize)

完整代码

```python import time import aiohttp import asyncio import requests from lxml import etree import threading from Spider.pojo.AIRPojo import AIRPojo from Spider.Setting.Settings import * from Spider.Utils.QualitySaveModel import QualitySaveModel from Spider.Utils.QualitySave import QualitySave class AIR_Quality(object): def init(self): self.all_provinces={} self.headers = {

    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36"
}
    self.parse = etree
    self.encoding = "gbk"
    self.baseUrl = "http://www.tianqihoubao.com"
    self.rootUrl = "http://www.tianqihoubao.com/aqi"
    self.TIMES = 0
    self.PoolSize = 500 #设置系统上限500
    self.tasks = []  # 任务队列
    self.timeout_task = [] #超时队列
    self.loop = None
    self.TasKLength = 0

    self.save_all_models={}

def __now_thread(self)->bool:
    #判断当前的线程是否为主线程
    current_name = threading.current_thread().getName()
    if(current_name=="MainThread"):
        return True
    return False


def get_provinces(self):
    response = requests.get(url=self.rootUrl)
    response_data = response.content.decode(self.encoding)
    html = self.parse.HTML(response_data)
    Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

    for Province in Provinces:
        temp = list()
        Province_citys_link = Province.xpath("""./dd/a/@href""")
        Province_citys_name = Province.xpath("""./dd/a/text()""")
        for city_link,city_name in zip(Province_citys_link,Province_citys_name):
            temp.append((self.baseUrl+city_link,city_name))

        province_name = Province.xpath("./dt/b/text()")[0]
        self.all_provinces[province_name] = temp
        save_model = QualitySaveModel(province_name,len(temp))

        self.save_all_models[province_name] = save_model
    if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
        print("初始化完成,已得到所有省份")

#这里做一个内部切面来做数据保存工作
def parse_city_quality(self,task):
    if(task.result()):
        data,province,city,url= task.result()
        html = self.parse.HTML(data)
        airPojo= AIRPojo()

        AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

        airPojo.set_AQI(AQI)
        ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

        airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
        airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
        airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
        airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
        airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
        airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
        airPojo.set_City(city)
        airPojo.set_Province(province)
        self.TIMES+=1 #这里完成一个记录,说明此时的这个城市的天气质量被我们获取到了
        if(LOG_LEVEL=="ALL"):
            print("当前完成任务",self.TIMES,airPojo,url)

        #保存文件
        QualitySave.SaveQuality(airPojo,self.save_all_models)
    else:
        pass
async def fireCity(self,province,url,city,semaphore):
    #传递四个参数,一个是当前爬取的省份,一个是需要爬取的url接口,city最后是那个池的限制大小

    async with semaphore:
        timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url,timeout=timeout) as response:
                    data = await response.text(encoding=self.encoding)

                    return (data,province,city,url)
        except Exception as e:

            self.timeout_task.append((province,url,city,semaphore))

def check(self):


    while(self.timeout_task):
        if(LOG_LEVEL=="ALL"):
            print("正在处理超时连接",len(self.timeout_task))
        tasks = []
        while(self.timeout_task):
            province, url, city, semaphore = self.timeout_task.pop(0)
            c = self.fireCity(province, url, city, semaphore)
            task = asyncio.ensure_future(c)
            task.add_done_callback(self.parse_city_quality)
            tasks.append(task)
        self.loop.run_until_complete(asyncio.wait(tasks))

def run(self):
    global AIRQUALITYTOTAL
    start = time.time()

    if(not self.all_provinces):
        self.get_provinces()

    semaphore = None

    if (self.__now_thread()):
        self.loop = asyncio.get_event_loop()
        semaphore = asyncio.Semaphore(self.PoolSize)
    else:
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        semaphore = asyncio.Semaphore(self.PoolSize)

    #创建异步队列
    for province in self.all_provinces.keys():

        citys_info= self.all_provinces.get(province)
        for city_info in citys_info:
            url_marks,city = city_info
            url = ""
            for url_mark in url_marks.split():
                url+=url_mark
            c = self.fireCity(province,url,city,semaphore)

            task = asyncio.ensure_future(c)
            task.add_done_callback(self.parse_city_quality)
            self.tasks.append(task)
    self.TasKLength = len(self.tasks)
    AIRQUALITYTOTAL = self.TasKLength

    self.loop.run_until_complete(asyncio.wait(self.tasks))

    self.check()
    if(LOG_LEVEL=="ALL" or LOG_LEVEL=="NONE"):
        print("耗时:", time.time() - start, "秒")
        print("任务总量:",self.TasKLength)
        print("执行完毕量",self.TIMES,"剩余超时任务:",len(self.timeout_task))

    self.loop.close()

if name == 'main': start = time.time() air_quality = AIR_Quality() air_quality.get_provinces() # print(air_quality.all_provinces) air_quality.run() ```

存储与解析

之后是咱们这个部分了。

这个部分的话其实也没啥。首先解析部分在爬虫里面做了,然后封装到了一个pojo对象里面。

image.png

我们这边要做的其实就是把这个对象保存起来。 ```python import datetime import os

from Spider.Setting.Settings import * import xlwt

class QualitySaveModel(object):

def __init__(self,province:str,total:int):
    #用于空气质量的保存
    #保存的表名
    self.name = str(datetime.datetime.now().strftime("%Y-%m-%d-%X")).replace(":",".")+province
    #保存的表
    self.save_boke = xlwt.Workbook(encoding='utf-8', style_compression=0)
    #保存的sheet
    self.sheet_boke = self.save_boke.add_sheet(self.name, cell_overwrite_ok=True)
    #总数
    self.total = total
    #当前已经保存的数量
    self.current_row = 0

    self.cols = ["Province","City","时间","AQI","PM10","PM2.5","CO","NO2","SO2","O3"]
    for i in range(len(self.cols)):
        #添加字段
        self.sheet_boke.write(0, i, self.cols[i])

def save(self):
    if(self.current_row>=self.total):
        path_root = AIRQualitySavePath
        if(not os.path.exists(path_root)):
            os.makedirs(path_root)
        path = path_root+"/"+self.name+".xls"
        self.save_boke.save(path)
        if (LOG_LEVEL == "ALL" or LOG_LEVEL == "INFO"):
            print(path)
def __str__(self):
    return "这是一个excel存储对象"

```

然后去调用就完了 ```python from Spider.pojo.AIRPojo import AIRPojo from Spider.Utils.QualitySaveModel import QualitySaveModel from Spider.Setting.Settings import * class QualitySave(object): @staticmethod

def SaveQuality(data,savemodels):
    global AIRQUALITYSAVECURRENT

    savemodel = savemodels.get(data.get_Province())

    savemodel.current_row+=1
    savemodel.sheet_boke.write(savemodel.current_row,0,data.get_Province())
    savemodel.sheet_boke.write(savemodel.current_row,1,data.get_City())
    savemodel.sheet_boke.write(savemodel.current_row, 2, data.createTime)
    savemodel.sheet_boke.write(savemodel.current_row, 3, data.get_AQI())
    savemodel.sheet_boke.write(savemodel.current_row, 4, data.get_PM10())
    savemodel.sheet_boke.write(savemodel.current_row, 5, data.get_PM25())
    savemodel.sheet_boke.write(savemodel.current_row, 6, data.get_CO())
    savemodel.sheet_boke.write(savemodel.current_row, 7, data.get_NO2())
    savemodel.sheet_boke.write(savemodel.current_row, 8, data.get_SO2())
    savemodel.sheet_boke.write(savemodel.current_row, 9, data.get_O3())

    if(LOG_LEVEL=="ALL"):
        print(data.get_City(),"已写入至表中")

    savemodel.save()#保存


    AIRQUALITYSAVECURRENT +=1

```

之后调用的话其实在爬虫部分你应该也见到了。

最后 让爬虫运行就完了。

其实一开始我是有想像spider一样的,做个提取,封装,架构,但是最后嫌麻烦就这样了,写个python搞得和Java一样太累人了。