全國空氣質量爬取實戰

語言: CN / TW / HK

theme: channing-cyan highlight: agate


前言

Hi 許久不見,由於最近一直在搞課題所以成功從一個日更作者變成了一個周更作者,慚愧慚愧,不過值得慶幸的是目前已經快取得階段性進展了,至少畢業論文是由著落了,而且還是一個正個八經的科研專案,而不是某某系統開發設計文件。

ok,今天的話其實也沒啥,冒個泡,然後帶來一個爬蟲的實戰專案,也是在掘金看到了自己高中時代的一個啟蒙博主,崔慶才老師,所以乾脆來寫一個爬蟲相關的內容吧,不過這個專案的話其實也是在原來的一個專案基礎上進行了修改的,(而且也是好早以前我就做出來了,然後沒寫相關博文記錄...)

ok,我們先來看看我們實戰爬取的效果。

爬取之後我們會將資料儲存到excel裡面,至於那個date資料夾是你自己設定的。

image.png

然後裡面的資料是這樣的

image.png

我們的整個的專案結構是這樣的:

image.png

Pojo 實體類

這個東西咋說咧,是這個Java留下了的毛病,不過的因為涉及到儲存操作,我覺得定義這樣的一個類的話方面後面的資料儲存。

```python

import datetime

class AIRPojo(object): __AQI = None __PM25 = None __CO = None __SO2=None __PM10 = None __O3 = None __NO2 = None __City = None __Province=None __COUNT = 0 createTime = str(datetime.datetime.now().strftime("%Y-%m-%d-%X"))

def get_AQI(self):

    if (self.__AQI):
        return self.__AQI

def get_PM25(self):
    if (self.__PM25):
        return self.__PM25

def get_CO(self):
    if (self.__CO):
        return self.__CO

def get_SO2(self):
    if(self.__SO2):
        return self.__SO2

def get_PM10(self):
    if (self.__PM10):
        return self.__PM10

def get_O3(self):
    if (self.__O3):
        return self.__O3

def get_NO2(self):
    if (self.__NO2):
        return self.__NO2

def get_City(self):
    if(self.__City):
        return self.__City

def get_Province(self):
    if(self.__Province):
        return self.__Province

def set_AQI(self, AQI):
    if(self.__COUNT==0):
        self.__AQI = AQI
        self.__COUNT+=1
def set_PM25(self, PM25):
    if(self.__COUNT==1):
        self.__PM25 = PM25
        self.__COUNT+=1
def set_CO(self, CO):
    if(self.__COUNT==2):
        self.__CO = CO
        self.__COUNT+=1
def set_SO2(self,SO2):
    if(self.__COUNT==3):
        self.__SO2=SO2
        self.__COUNT+=1
def set_PM10(self, PM10):
    if(self.__COUNT==4):
        self.__PM10 = PM10
        self.__COUNT+=1
def set_O3(self, O3):
    if(self.__COUNT==5):
        self.__O3 = O3
        self.__COUNT+=1
def set_NO2(self, NO2):
    if(self.__COUNT==6):
        self.__NO2 = NO2
        self.__COUNT+=1
def set_City(self,City):
    if(self.__COUNT==7):
        self.__City = City
        self.__COUNT+=1

def set_Province(self,Province):
    if(self.__COUNT==8):
        self.__Province = Province
        self.__COUNT+=1

def __str__(self):
    if(self.__COUNT>=8):
        return "AQI:"+self.__AQI+"-PM2.5:"+self.__PM25+"-CO:"+self.__CO+"-SO2:"+self.__SO2+"-PM10:"+self.__PM10+"-O3:"+self.__O3+"-NO2:"+self.__NO2+"-city:"+self.__City+"-Province"+self.__Province
    else:
        return "資料未儲存完畢,無法輸出全部結果"

if name == 'main': air = AIRPojo() print(air) ```

這個類是沒有啥特殊的,他就是來封裝資料的。

設定

接下來是我們的設定,這個部分的話其實也沒有啥,這邊其實是對一些設定進行了提取。 方便統一調整。 ```python

日誌等級,ALL,INFO,NONE

LOG_LEVEL="ALL"

空氣質量儲存位置

AIRQualitySavePath="./date"

記錄當前空氣質量儲存進度

AIRQUALITYSAVECURRENT = 0 AIRQUALITYTOTAL = 0 ```

不過在這款整合的需要設定的配置不多。

爬蟲實現

這個爬蟲的實現的話,稍微複雜一點。首先是咱們的非同步請求

爬取流程

在這裡的話其實爬取的流程還是很簡單的

image.png

爬取請求

搞清楚了這個就知道咱們的爬取請求怎麼做了 ```python

def get_provinces(self): response = requests.get(url=self.rootUrl) response_data = response.content.decode(self.encoding) html = self.parse.HTML(response_data) Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

for Province in Provinces:
    temp = list()
    Province_citys_link = Province.xpath("""./dd/a/@href""")
    Province_citys_name = Province.xpath("""./dd/a/text()""")
    for city_link,city_name in zip(Province_citys_link,Province_citys_name):
        temp.append((self.baseUrl+city_link,city_name))

    province_name = Province.xpath("./dt/b/text()")[0]
    self.all_provinces[province_name] = temp
    save_model = QualitySaveModel(province_name,len(temp))

    self.save_all_models[province_name] = save_model
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
    print("初始化完成,已得到所有省份")

這裡做一個內部切面來做資料儲存工作

def parse_city_quality(self,task): if(task.result()): data,province,city,url= task.result() html = self.parse.HTML(data) airPojo= AIRPojo()

    AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

    airPojo.set_AQI(AQI)
    ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

    airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
    airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
    airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
    airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
    airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
    airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
    airPojo.set_City(city)
    airPojo.set_Province(province)
    self.TIMES+=1 #這裡完成一個記錄,說明此時的這個城市的天氣質量被我們獲取到了
    if(LOG_LEVEL=="ALL"):
        print("當前完成任務",self.TIMES,airPojo,url)

    #儲存檔案
    QualitySave.SaveQuality(airPojo,self.save_all_models)
else:
    pass

async def fireCity(self,province,url,city,semaphore): #傳遞四個引數,一個是當前爬取的省份,一個是需要爬取的url介面,city最後是那個池的限制大小

async with semaphore:
    timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(url,timeout=timeout) as response:
                data = await response.text(encoding=self.encoding)

                return (data,province,city,url)
    except Exception as e:

        self.timeout_task.append((province,url,city,semaphore))

```

這部分就是前面兩個步驟,請求主頁面,然後解析對應的省份的城市,讓建立非同步請求,之後讓非同步請求進行處理。

超時處理

不過值得一提的是,我們的超時處理。

```python

def check(self):

while(self.timeout_task):
    if(LOG_LEVEL=="ALL"):
        print("正在處理超時連線",len(self.timeout_task))
    tasks = []
    while(self.timeout_task):
        province, url, city, semaphore = self.timeout_task.pop(0)
        c = self.fireCity(province, url, city, semaphore)
        task = asyncio.ensure_future(c)
        task.add_done_callback(self.parse_city_quality)
        tasks.append(task)
    self.loop.run_until_complete(asyncio.wait(tasks))

```

主要是這個玩意,當有些任務進入超時狀態的時候,任務相關的訊息就會被儲存起來,然後重新建立非同步請求。

執行緒處理

這裡主要是這個原因,一開始我設計這個爬蟲的目的其實是要配合一個GUI元件的,所以的話是有可能在一個UI執行緒裡面執行的,所以要做一個執行緒安全處理。

這裡面做一個判斷 python def __now_thread(self)->bool: #判斷當前的執行緒是否為主執行緒 current_name = threading.current_thread().getName() if(current_name=="MainThread"): return True return False 如果不是的話就像下面那樣處理 python if (self.__now_thread()): self.loop = asyncio.get_event_loop() semaphore = asyncio.Semaphore(self.PoolSize) else: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) semaphore = asyncio.Semaphore(self.PoolSize)

完整程式碼

```python import time import aiohttp import asyncio import requests from lxml import etree import threading from Spider.pojo.AIRPojo import AIRPojo from Spider.Setting.Settings import * from Spider.Utils.QualitySaveModel import QualitySaveModel from Spider.Utils.QualitySave import QualitySave class AIR_Quality(object): def init(self): self.all_provinces={} self.headers = {

    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36"
}
    self.parse = etree
    self.encoding = "gbk"
    self.baseUrl = "http://www.tianqihoubao.com"
    self.rootUrl = "http://www.tianqihoubao.com/aqi"
    self.TIMES = 0
    self.PoolSize = 500 #設定系統上限500
    self.tasks = []  # 任務佇列
    self.timeout_task = [] #超時佇列
    self.loop = None
    self.TasKLength = 0

    self.save_all_models={}

def __now_thread(self)->bool:
    #判斷當前的執行緒是否為主執行緒
    current_name = threading.current_thread().getName()
    if(current_name=="MainThread"):
        return True
    return False


def get_provinces(self):
    response = requests.get(url=self.rootUrl)
    response_data = response.content.decode(self.encoding)
    html = self.parse.HTML(response_data)
    Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")

    for Province in Provinces:
        temp = list()
        Province_citys_link = Province.xpath("""./dd/a/@href""")
        Province_citys_name = Province.xpath("""./dd/a/text()""")
        for city_link,city_name in zip(Province_citys_link,Province_citys_name):
            temp.append((self.baseUrl+city_link,city_name))

        province_name = Province.xpath("./dt/b/text()")[0]
        self.all_provinces[province_name] = temp
        save_model = QualitySaveModel(province_name,len(temp))

        self.save_all_models[province_name] = save_model
    if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
        print("初始化完成,已得到所有省份")

#這裡做一個內部切面來做資料儲存工作
def parse_city_quality(self,task):
    if(task.result()):
        data,province,city,url= task.result()
        html = self.parse.HTML(data)
        airPojo= AIRPojo()

        AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()

        airPojo.set_AQI(AQI)
        ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")

        airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
        airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
        airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
        airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
        airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
        airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
        airPojo.set_City(city)
        airPojo.set_Province(province)
        self.TIMES+=1 #這裡完成一個記錄,說明此時的這個城市的天氣質量被我們獲取到了
        if(LOG_LEVEL=="ALL"):
            print("當前完成任務",self.TIMES,airPojo,url)

        #儲存檔案
        QualitySave.SaveQuality(airPojo,self.save_all_models)
    else:
        pass
async def fireCity(self,province,url,city,semaphore):
    #傳遞四個引數,一個是當前爬取的省份,一個是需要爬取的url介面,city最後是那個池的限制大小

    async with semaphore:
        timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url,timeout=timeout) as response:
                    data = await response.text(encoding=self.encoding)

                    return (data,province,city,url)
        except Exception as e:

            self.timeout_task.append((province,url,city,semaphore))

def check(self):


    while(self.timeout_task):
        if(LOG_LEVEL=="ALL"):
            print("正在處理超時連線",len(self.timeout_task))
        tasks = []
        while(self.timeout_task):
            province, url, city, semaphore = self.timeout_task.pop(0)
            c = self.fireCity(province, url, city, semaphore)
            task = asyncio.ensure_future(c)
            task.add_done_callback(self.parse_city_quality)
            tasks.append(task)
        self.loop.run_until_complete(asyncio.wait(tasks))

def run(self):
    global AIRQUALITYTOTAL
    start = time.time()

    if(not self.all_provinces):
        self.get_provinces()

    semaphore = None

    if (self.__now_thread()):
        self.loop = asyncio.get_event_loop()
        semaphore = asyncio.Semaphore(self.PoolSize)
    else:
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self.loop)
        semaphore = asyncio.Semaphore(self.PoolSize)

    #建立非同步佇列
    for province in self.all_provinces.keys():

        citys_info= self.all_provinces.get(province)
        for city_info in citys_info:
            url_marks,city = city_info
            url = ""
            for url_mark in url_marks.split():
                url+=url_mark
            c = self.fireCity(province,url,city,semaphore)

            task = asyncio.ensure_future(c)
            task.add_done_callback(self.parse_city_quality)
            self.tasks.append(task)
    self.TasKLength = len(self.tasks)
    AIRQUALITYTOTAL = self.TasKLength

    self.loop.run_until_complete(asyncio.wait(self.tasks))

    self.check()
    if(LOG_LEVEL=="ALL" or LOG_LEVEL=="NONE"):
        print("耗時:", time.time() - start, "秒")
        print("任務總量:",self.TasKLength)
        print("執行完畢量",self.TIMES,"剩餘超時任務:",len(self.timeout_task))

    self.loop.close()

if name == 'main': start = time.time() air_quality = AIR_Quality() air_quality.get_provinces() # print(air_quality.all_provinces) air_quality.run() ```

儲存與解析

之後是咱們這個部分了。

這個部分的話其實也沒啥。首先解析部分在爬蟲裡面做了,然後封裝到了一個pojo物件裡面。

image.png

我們這邊要做的其實就是把這個物件儲存起來。 ```python import datetime import os

from Spider.Setting.Settings import * import xlwt

class QualitySaveModel(object):

def __init__(self,province:str,total:int):
    #用於空氣質量的儲存
    #儲存的表名
    self.name = str(datetime.datetime.now().strftime("%Y-%m-%d-%X")).replace(":",".")+province
    #儲存的表
    self.save_boke = xlwt.Workbook(encoding='utf-8', style_compression=0)
    #儲存的sheet
    self.sheet_boke = self.save_boke.add_sheet(self.name, cell_overwrite_ok=True)
    #總數
    self.total = total
    #當前已經儲存的數量
    self.current_row = 0

    self.cols = ["Province","City","時間","AQI","PM10","PM2.5","CO","NO2","SO2","O3"]
    for i in range(len(self.cols)):
        #新增欄位
        self.sheet_boke.write(0, i, self.cols[i])

def save(self):
    if(self.current_row>=self.total):
        path_root = AIRQualitySavePath
        if(not os.path.exists(path_root)):
            os.makedirs(path_root)
        path = path_root+"/"+self.name+".xls"
        self.save_boke.save(path)
        if (LOG_LEVEL == "ALL" or LOG_LEVEL == "INFO"):
            print(path)
def __str__(self):
    return "這是一個excel儲存物件"

```

然後去呼叫就完了 ```python from Spider.pojo.AIRPojo import AIRPojo from Spider.Utils.QualitySaveModel import QualitySaveModel from Spider.Setting.Settings import * class QualitySave(object): @staticmethod

def SaveQuality(data,savemodels):
    global AIRQUALITYSAVECURRENT

    savemodel = savemodels.get(data.get_Province())

    savemodel.current_row+=1
    savemodel.sheet_boke.write(savemodel.current_row,0,data.get_Province())
    savemodel.sheet_boke.write(savemodel.current_row,1,data.get_City())
    savemodel.sheet_boke.write(savemodel.current_row, 2, data.createTime)
    savemodel.sheet_boke.write(savemodel.current_row, 3, data.get_AQI())
    savemodel.sheet_boke.write(savemodel.current_row, 4, data.get_PM10())
    savemodel.sheet_boke.write(savemodel.current_row, 5, data.get_PM25())
    savemodel.sheet_boke.write(savemodel.current_row, 6, data.get_CO())
    savemodel.sheet_boke.write(savemodel.current_row, 7, data.get_NO2())
    savemodel.sheet_boke.write(savemodel.current_row, 8, data.get_SO2())
    savemodel.sheet_boke.write(savemodel.current_row, 9, data.get_O3())

    if(LOG_LEVEL=="ALL"):
        print(data.get_City(),"已寫入至表中")

    savemodel.save()#儲存


    AIRQUALITYSAVECURRENT +=1

```

之後呼叫的話其實在爬蟲部分你應該也見到了。

最後 讓爬蟲執行就完了。

其實一開始我是有想像spider一樣的,做個提取,封裝,架構,但是最後嫌麻煩就這樣了,寫個python搞得和Java一樣太累人了。