全國空氣質量爬取實戰
theme: channing-cyan highlight: agate
前言
Hi 許久不見,由於最近一直在搞課題所以成功從一個日更作者變成了一個周更作者,慚愧慚愧,不過值得慶幸的是目前已經快取得階段性進展了,至少畢業論文是由著落了,而且還是一個正個八經的科研專案,而不是某某系統開發設計文件。
ok,今天的話其實也沒啥,冒個泡,然後帶來一個爬蟲的實戰專案,也是在掘金看到了自己高中時代的一個啟蒙博主,崔慶才老師,所以乾脆來寫一個爬蟲相關的內容吧,不過這個專案的話其實也是在原來的一個專案基礎上進行了修改的,(而且也是好早以前我就做出來了,然後沒寫相關博文記錄...)
ok,我們先來看看我們實戰爬取的效果。
爬取之後我們會將資料儲存到excel裡面,至於那個date資料夾是你自己設定的。
然後裡面的資料是這樣的
我們的整個的專案結構是這樣的:
Pojo 實體類
這個東西咋說咧,是這個Java留下了的毛病,不過的因為涉及到儲存操作,我覺得定義這樣的一個類的話方面後面的資料儲存。
```python
import datetime
class AIRPojo(object): __AQI = None __PM25 = None __CO = None __SO2=None __PM10 = None __O3 = None __NO2 = None __City = None __Province=None __COUNT = 0 createTime = str(datetime.datetime.now().strftime("%Y-%m-%d-%X"))
def get_AQI(self):
if (self.__AQI):
return self.__AQI
def get_PM25(self):
if (self.__PM25):
return self.__PM25
def get_CO(self):
if (self.__CO):
return self.__CO
def get_SO2(self):
if(self.__SO2):
return self.__SO2
def get_PM10(self):
if (self.__PM10):
return self.__PM10
def get_O3(self):
if (self.__O3):
return self.__O3
def get_NO2(self):
if (self.__NO2):
return self.__NO2
def get_City(self):
if(self.__City):
return self.__City
def get_Province(self):
if(self.__Province):
return self.__Province
def set_AQI(self, AQI):
if(self.__COUNT==0):
self.__AQI = AQI
self.__COUNT+=1
def set_PM25(self, PM25):
if(self.__COUNT==1):
self.__PM25 = PM25
self.__COUNT+=1
def set_CO(self, CO):
if(self.__COUNT==2):
self.__CO = CO
self.__COUNT+=1
def set_SO2(self,SO2):
if(self.__COUNT==3):
self.__SO2=SO2
self.__COUNT+=1
def set_PM10(self, PM10):
if(self.__COUNT==4):
self.__PM10 = PM10
self.__COUNT+=1
def set_O3(self, O3):
if(self.__COUNT==5):
self.__O3 = O3
self.__COUNT+=1
def set_NO2(self, NO2):
if(self.__COUNT==6):
self.__NO2 = NO2
self.__COUNT+=1
def set_City(self,City):
if(self.__COUNT==7):
self.__City = City
self.__COUNT+=1
def set_Province(self,Province):
if(self.__COUNT==8):
self.__Province = Province
self.__COUNT+=1
def __str__(self):
if(self.__COUNT>=8):
return "AQI:"+self.__AQI+"-PM2.5:"+self.__PM25+"-CO:"+self.__CO+"-SO2:"+self.__SO2+"-PM10:"+self.__PM10+"-O3:"+self.__O3+"-NO2:"+self.__NO2+"-city:"+self.__City+"-Province"+self.__Province
else:
return "資料未儲存完畢,無法輸出全部結果"
if name == 'main': air = AIRPojo() print(air) ```
這個類是沒有啥特殊的,他就是來封裝資料的。
設定
接下來是我們的設定,這個部分的話其實也沒有啥,這邊其實是對一些設定進行了提取。 方便統一調整。 ```python
日誌等級,ALL,INFO,NONE
LOG_LEVEL="ALL"
空氣質量儲存位置
AIRQualitySavePath="./date"
記錄當前空氣質量儲存進度
AIRQUALITYSAVECURRENT = 0 AIRQUALITYTOTAL = 0 ```
不過在這款整合的需要設定的配置不多。
爬蟲實現
這個爬蟲的實現的話,稍微複雜一點。首先是咱們的非同步請求
爬取流程
在這裡的話其實爬取的流程還是很簡單的
爬取請求
搞清楚了這個就知道咱們的爬取請求怎麼做了 ```python
def get_provinces(self): response = requests.get(url=self.rootUrl) response_data = response.content.decode(self.encoding) html = self.parse.HTML(response_data) Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
for Province in Provinces:
temp = list()
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
for city_link,city_name in zip(Province_citys_link,Province_citys_name):
temp.append((self.baseUrl+city_link,city_name))
province_name = Province.xpath("./dt/b/text()")[0]
self.all_provinces[province_name] = temp
save_model = QualitySaveModel(province_name,len(temp))
self.save_all_models[province_name] = save_model
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
print("初始化完成,已得到所有省份")
這裡做一個內部切面來做資料儲存工作
def parse_city_quality(self,task): if(task.result()): data,province,city,url= task.result() html = self.parse.HTML(data) airPojo= AIRPojo()
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()
airPojo.set_AQI(AQI)
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
airPojo.set_City(city)
airPojo.set_Province(province)
self.TIMES+=1 #這裡完成一個記錄,說明此時的這個城市的天氣質量被我們獲取到了
if(LOG_LEVEL=="ALL"):
print("當前完成任務",self.TIMES,airPojo,url)
#儲存檔案
QualitySave.SaveQuality(airPojo,self.save_all_models)
else:
pass
async def fireCity(self,province,url,city,semaphore): #傳遞四個引數,一個是當前爬取的省份,一個是需要爬取的url介面,city最後是那個池的限制大小
async with semaphore:
timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url,timeout=timeout) as response:
data = await response.text(encoding=self.encoding)
return (data,province,city,url)
except Exception as e:
self.timeout_task.append((province,url,city,semaphore))
```
這部分就是前面兩個步驟,請求主頁面,然後解析對應的省份的城市,讓建立非同步請求,之後讓非同步請求進行處理。
超時處理
不過值得一提的是,我們的超時處理。
```python
def check(self):
while(self.timeout_task):
if(LOG_LEVEL=="ALL"):
print("正在處理超時連線",len(self.timeout_task))
tasks = []
while(self.timeout_task):
province, url, city, semaphore = self.timeout_task.pop(0)
c = self.fireCity(province, url, city, semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
tasks.append(task)
self.loop.run_until_complete(asyncio.wait(tasks))
```
主要是這個玩意,當有些任務進入超時狀態的時候,任務相關的訊息就會被儲存起來,然後重新建立非同步請求。
執行緒處理
這裡主要是這個原因,一開始我設計這個爬蟲的目的其實是要配合一個GUI元件的,所以的話是有可能在一個UI執行緒裡面執行的,所以要做一個執行緒安全處理。
這裡面做一個判斷
python
def __now_thread(self)->bool:
#判斷當前的執行緒是否為主執行緒
current_name = threading.current_thread().getName()
if(current_name=="MainThread"):
return True
return False
如果不是的話就像下面那樣處理
python
if (self.__now_thread()):
self.loop = asyncio.get_event_loop()
semaphore = asyncio.Semaphore(self.PoolSize)
else:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
semaphore = asyncio.Semaphore(self.PoolSize)
完整程式碼
```python import time import aiohttp import asyncio import requests from lxml import etree import threading from Spider.pojo.AIRPojo import AIRPojo from Spider.Setting.Settings import * from Spider.Utils.QualitySaveModel import QualitySaveModel from Spider.Utils.QualitySave import QualitySave class AIR_Quality(object): def init(self): self.all_provinces={} self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36"
}
self.parse = etree
self.encoding = "gbk"
self.baseUrl = "http://www.tianqihoubao.com"
self.rootUrl = "http://www.tianqihoubao.com/aqi"
self.TIMES = 0
self.PoolSize = 500 #設定系統上限500
self.tasks = [] # 任務佇列
self.timeout_task = [] #超時佇列
self.loop = None
self.TasKLength = 0
self.save_all_models={}
def __now_thread(self)->bool:
#判斷當前的執行緒是否為主執行緒
current_name = threading.current_thread().getName()
if(current_name=="MainThread"):
return True
return False
def get_provinces(self):
response = requests.get(url=self.rootUrl)
response_data = response.content.decode(self.encoding)
html = self.parse.HTML(response_data)
Provinces = html.xpath("""//*[@id="content"]/div[2]/dl[position()>1]""")
for Province in Provinces:
temp = list()
Province_citys_link = Province.xpath("""./dd/a/@href""")
Province_citys_name = Province.xpath("""./dd/a/text()""")
for city_link,city_name in zip(Province_citys_link,Province_citys_name):
temp.append((self.baseUrl+city_link,city_name))
province_name = Province.xpath("./dt/b/text()")[0]
self.all_provinces[province_name] = temp
save_model = QualitySaveModel(province_name,len(temp))
self.save_all_models[province_name] = save_model
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="INFO"):
print("初始化完成,已得到所有省份")
#這裡做一個內部切面來做資料儲存工作
def parse_city_quality(self,task):
if(task.result()):
data,province,city,url= task.result()
html = self.parse.HTML(data)
airPojo= AIRPojo()
AQI = html.xpath("""//*[@id="today-quality"]/div[1]/div[1]/div[1]/text()""")[0].strip()
airPojo.set_AQI(AQI)
ALL_Info = html.xpath("""//*[@id="today-quality"]/div[2]/ul/li/text()""")
airPojo.set_PM25(ALL_Info[0].split(":")[1].strip())
airPojo.set_CO(ALL_Info[1].split(":")[1].strip())
airPojo.set_SO2(ALL_Info[2].split(":")[1].strip())
airPojo.set_PM10(ALL_Info[3].split(":")[1].strip())
airPojo.set_O3(ALL_Info[4].split(":")[1].strip())
airPojo.set_NO2(ALL_Info[5].split(":")[1].strip())
airPojo.set_City(city)
airPojo.set_Province(province)
self.TIMES+=1 #這裡完成一個記錄,說明此時的這個城市的天氣質量被我們獲取到了
if(LOG_LEVEL=="ALL"):
print("當前完成任務",self.TIMES,airPojo,url)
#儲存檔案
QualitySave.SaveQuality(airPojo,self.save_all_models)
else:
pass
async def fireCity(self,province,url,city,semaphore):
#傳遞四個引數,一個是當前爬取的省份,一個是需要爬取的url介面,city最後是那個池的限制大小
async with semaphore:
timeout = aiohttp.ClientTimeout(connect=2, sock_connect=1, sock_read=10)
try:
async with aiohttp.ClientSession() as session:
async with session.get(url,timeout=timeout) as response:
data = await response.text(encoding=self.encoding)
return (data,province,city,url)
except Exception as e:
self.timeout_task.append((province,url,city,semaphore))
def check(self):
while(self.timeout_task):
if(LOG_LEVEL=="ALL"):
print("正在處理超時連線",len(self.timeout_task))
tasks = []
while(self.timeout_task):
province, url, city, semaphore = self.timeout_task.pop(0)
c = self.fireCity(province, url, city, semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
tasks.append(task)
self.loop.run_until_complete(asyncio.wait(tasks))
def run(self):
global AIRQUALITYTOTAL
start = time.time()
if(not self.all_provinces):
self.get_provinces()
semaphore = None
if (self.__now_thread()):
self.loop = asyncio.get_event_loop()
semaphore = asyncio.Semaphore(self.PoolSize)
else:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
semaphore = asyncio.Semaphore(self.PoolSize)
#建立非同步佇列
for province in self.all_provinces.keys():
citys_info= self.all_provinces.get(province)
for city_info in citys_info:
url_marks,city = city_info
url = ""
for url_mark in url_marks.split():
url+=url_mark
c = self.fireCity(province,url,city,semaphore)
task = asyncio.ensure_future(c)
task.add_done_callback(self.parse_city_quality)
self.tasks.append(task)
self.TasKLength = len(self.tasks)
AIRQUALITYTOTAL = self.TasKLength
self.loop.run_until_complete(asyncio.wait(self.tasks))
self.check()
if(LOG_LEVEL=="ALL" or LOG_LEVEL=="NONE"):
print("耗時:", time.time() - start, "秒")
print("任務總量:",self.TasKLength)
print("執行完畢量",self.TIMES,"剩餘超時任務:",len(self.timeout_task))
self.loop.close()
if name == 'main': start = time.time() air_quality = AIR_Quality() air_quality.get_provinces() # print(air_quality.all_provinces) air_quality.run() ```
儲存與解析
之後是咱們這個部分了。
這個部分的話其實也沒啥。首先解析部分在爬蟲裡面做了,然後封裝到了一個pojo物件裡面。
我們這邊要做的其實就是把這個物件儲存起來。 ```python import datetime import os
from Spider.Setting.Settings import * import xlwt
class QualitySaveModel(object):
def __init__(self,province:str,total:int):
#用於空氣質量的儲存
#儲存的表名
self.name = str(datetime.datetime.now().strftime("%Y-%m-%d-%X")).replace(":",".")+province
#儲存的表
self.save_boke = xlwt.Workbook(encoding='utf-8', style_compression=0)
#儲存的sheet
self.sheet_boke = self.save_boke.add_sheet(self.name, cell_overwrite_ok=True)
#總數
self.total = total
#當前已經儲存的數量
self.current_row = 0
self.cols = ["Province","City","時間","AQI","PM10","PM2.5","CO","NO2","SO2","O3"]
for i in range(len(self.cols)):
#新增欄位
self.sheet_boke.write(0, i, self.cols[i])
def save(self):
if(self.current_row>=self.total):
path_root = AIRQualitySavePath
if(not os.path.exists(path_root)):
os.makedirs(path_root)
path = path_root+"/"+self.name+".xls"
self.save_boke.save(path)
if (LOG_LEVEL == "ALL" or LOG_LEVEL == "INFO"):
print(path)
def __str__(self):
return "這是一個excel儲存物件"
```
然後去呼叫就完了 ```python from Spider.pojo.AIRPojo import AIRPojo from Spider.Utils.QualitySaveModel import QualitySaveModel from Spider.Setting.Settings import * class QualitySave(object): @staticmethod
def SaveQuality(data,savemodels):
global AIRQUALITYSAVECURRENT
savemodel = savemodels.get(data.get_Province())
savemodel.current_row+=1
savemodel.sheet_boke.write(savemodel.current_row,0,data.get_Province())
savemodel.sheet_boke.write(savemodel.current_row,1,data.get_City())
savemodel.sheet_boke.write(savemodel.current_row, 2, data.createTime)
savemodel.sheet_boke.write(savemodel.current_row, 3, data.get_AQI())
savemodel.sheet_boke.write(savemodel.current_row, 4, data.get_PM10())
savemodel.sheet_boke.write(savemodel.current_row, 5, data.get_PM25())
savemodel.sheet_boke.write(savemodel.current_row, 6, data.get_CO())
savemodel.sheet_boke.write(savemodel.current_row, 7, data.get_NO2())
savemodel.sheet_boke.write(savemodel.current_row, 8, data.get_SO2())
savemodel.sheet_boke.write(savemodel.current_row, 9, data.get_O3())
if(LOG_LEVEL=="ALL"):
print(data.get_City(),"已寫入至表中")
savemodel.save()#儲存
AIRQUALITYSAVECURRENT +=1
```
之後呼叫的話其實在爬蟲部分你應該也見到了。
最後 讓爬蟲執行就完了。
其實一開始我是有想像spider一樣的,做個提取,封裝,架構,但是最後嫌麻煩就這樣了,寫個python搞得和Java一樣太累人了。
- 我正在參與掘金技術社群創作者簽約計劃招募活動,點選連結報名投稿。
- 還在調API寫所謂的AI“女友”,嘮了嘮了,教你基於python咱們“new”一個(深度學習)
- Java前後端分離實戰Auto2.0使用者登入註冊--基本的使用者登入 郵箱驗證
- 卡爾曼濾波器(目標跟蹤一)(上)
- 手把手教你如何自制目標檢測框架(從理論到實現)
- 基於Python深度圖生成3D點雲
- Pandas基礎使用(機器學習基礎)
- CEC2017基礎函式說明Python版本
- 全國空氣質量爬取實戰
- 智慧演算法整合測試平臺V0.1實戰開發
- DDPG神經網路實戰(基於強化學習優化粒子群演算法)
- 關於強化學習優化粒子群演算法的論文解讀(全)
- 關於強化學習優化粒子群演算法的論文解讀(上)
- 基於多種群機制的PSO演算法(優化與探索三 *混合種群思想優化多種群與廣義PSO求解JSP)
- 基於多種群機制的PSO演算法Python實現(優化與探索二)
- 基於多種群機制的PSO演算法Python實現
- 520桌面手勢告白
- 嘿~瞎話一下,為啥要用Sigmoid?!