Python如何異步發送日誌到遠程服務器?

語言: CN / TW / HK

背景

在Python中使用日誌最常用的方式就是在控制枱和文件中輸出日誌了,logging模塊也很好的提供的相應 的類,使用起來也非常方便,但是有時我們可能會有一些需求,如還需要將日誌發送到遠端,或者直接寫入數 據庫,這種需求該如何實現呢?

StreamHandler和FileHandler

首先我們先來寫一套簡單輸出到cmd和文件中的代碼

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 設置日誌格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 添加cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 添加文件的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 將cmd和file handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天氣不錯")

先初始化一個logger, 並且設置它的日誌級別是DEBUG,然後添初始化了 cmd_handler和 file_handler,最後將它們添加到logger中, 運行腳本,會在cmd中打印出

[2020-09-23 10:45:56] [DEBUG] 今天天氣不錯
且會寫入到當前目錄下的debug.log文件中

添加HTTPHandler

如果想要在記錄時將日誌發送到遠程服務器上,可以添加一個 HTTPHandler , 在python標準庫logging.handler中,已經為我們定義好了很多handler,有些我們可以直接用,本地使用tornado寫一個接收 日誌的接口,將接收到的參數全都打印出來

# 添加一個httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天氣不錯")
結果在服務端我們收到了很多信息

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
'funcName': [b '<module>'],
'created': [b '1600831054.8881223'],
'msecs': [b '888.1223201751709'],
'relativeCreated': [b '22.99976348876953'],
'thread': [b '14876'],
'threadName': [b 'MainThread'],
'processName': [b 'MainProcess'],
'process': [b '8648'],
'message': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'asctime': [b '2020-09-23 11:17:34']
}

可以説是信息非常之多,但是卻並不是我們想要的樣子,我們只是想要類似於

[2020-09-23 10:45:56][DEBUG] 今天天氣不錯
這樣的日誌
logging.handlers.HTTPHandler 只是簡單的將日誌所有信息發送給服務端,至於服務端要怎麼組織內 容是由服務端來完成. 所以我們可以有兩種方法,一種是改服務端代碼,根據傳過來的日誌信息重新組織一 下日誌內容, 第二種是我們重新寫一個類,讓它在發送的時候將重新格式化日誌內容發送到服務端。

我們採用第二種方法,因為這種方法比較靈活, 服務端只是用於記錄,發送什麼內容應該是由客户端來決定。

我們需要重新定義一個類,我們可以參考 logging.handlers.HTTPHandler 這個類,重新寫一個httpHandler類

每個日誌類都需要重寫emit方法,記錄日誌時真正要執行是也就是這個emit方法

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    '''
   重寫emit方法,這裏主要是為了把初始化時的baseParam添加進來
   :param record:
   :return:
   '''
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={'log': msg}, headers=headers,
timeout=1)

上面代碼中有一行定義發送的參數 msg = self.format(record)
這行代碼表示,將會根據日誌對象設置的格式返回對應的內容。

之後再將內容通過requests庫進行發送,無論使用get 還是post方式,服務端都可以正常的接收到日誌

{'log': [b'[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99']}

將bytes類型轉一下就得到了

[2020-09-23 11:43:50] [DEBUG] 今天天氣不錯

異步的發送遠程日誌

現在我們考慮一個問題,當日志發送到遠程服務器過程中,如果遠程服務器處理的很慢,會耗費一定的時間, 那麼這時記錄日誌就會都變慢修改服務器日誌處理類,讓其停頓5秒鐘,模擬長時間的處理流程

async def post(self):
  print(self.getParam('log'))
  await asyncio.sleep(5)
  self.write({"msg": 'ok'})

此時我們再打印上面的日誌

logger.debug("今天天氣不錯")
logger.debug("是風和日麗的")

得到的輸出為

[2020-09-23 11:47:33] [DEBUG] 今天天氣不錯
[2020-09-23 11:47:38] [DEBUG] 是風和日麗的

我們注意到,它們的時間間隔也是5秒。
那麼現在問題來了,原本只是一個記錄日誌,現在卻成了拖累整個腳本的累贅,所以我們需要異步的來 處理遠程寫日誌。

1使用多線程處理

首先想的是應該是用多線程來執行發送日誌方法

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{'log': msg},

這種方法是可以達到不阻塞主目的,但是每打印一條日誌就需要開啟一個線程,也是挺浪費資源的。我們也 可以使用線程池來處理

2使用線程池處理

python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor類,是線程池和進程池, 就是在初始化的時候先定義幾個線程,之後讓這些線程來處理相應的函數,這樣不用每次都需要新創建線程

線程池的基本使用

exector = ThreadPoolExecutor(max_workers=1) # 初始化一個線程池,只有一個線程
exector.submit(fn, args, kwargs) # 將函數submit到線程池中

如果線程池中有n個線程,當提交的task數量大於n時,則多餘的task將放到隊列中。
再次修改上面的emit函數

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={'log': msg},
headers=headers, timeout=6)

這裏為什麼要只初始化一個只有一個線程的線程池? 因為這樣的話可以保證先進隊列裏的日誌會先被髮 送,如果池子中有多個線程,則不一定保證順序了。

3使用異步aiohttp庫來發送請求

上面的CustomHandler類中的emit方法使用的是requests.post來發送日誌,這個requests本身是阻塞運 行的,也正上由於它的存在,才使得腳本卡了很長時間,所們我們可以將阻塞運行的requests庫替換為異步 的aiohttp來執行get和post方法, 重寫一個CustomHandler中的emit方法

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={'log': msg}) as resp:
          print(await resp.text())

這時代碼執行崩潰了

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
'CustomHandler.emit' was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

服務端也沒有收到發送日誌的請求。
究其原因是由於emit方法中使用 async with session.post 函數,它需要在一個使用async 修飾的函數 裏執行,所以修改emit函數,使用async來修飾,這裏emit函數變成了異步的函數, 返回的是一個 coroutine 對象,要想執行coroutine對象,需要使用await, 但是腳本里卻沒有在哪裏調用 await emit() ,所以崩潰信息 中顯示 coroutine 'CustomHandler.emit' was never awaited。

既然emit方法返回的是一個coroutine對象,那麼我們將它放一個loop中執行

async def main():
  await logger.debug("今天天氣不錯")
  await logger.debug("是風和日麗的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

執行依然報錯

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

意思是需要的是一個coroutine,但是傳進來的對象不是。
這似乎就沒有辦法了,想要使用異步庫來發送,但是卻沒有可以調用await的地方。

解決辦法是有的,我們使用 asyncio.get_event_loop() 獲取一個事件循環對象, 我們可以在這個對象上註冊很多協程對象,這樣當執行事件循環的時候,就是去執行註冊在該事件循環上的協程, 我們通過一個小例子來看一下

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("執行結束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

我們使用 loop = asyncio.get_event_loop() 創建了一個事件循環對象loop, 並且在loop上創建了兩個task, 並且給task1添加了一個回調函數,在task1它執行結束以後,將loop停掉。
注意看上面的代碼,我們並沒有在某處使用await來執行協程,而是通過將協程註冊到某個事件循環對象上, 然後調用該循環的 run_forever() 函數,從而使該循環上的協程對象得以正常的執行。

上面得到的輸出為

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
執行結束, task n is 0

可以看到,使用事件循環對象創建的task,在該循環執行run_forever() 以後就可以執行了如果不執行 loop.run_forever() 函數,則註冊在它上面的協程也不會執行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()

上面的代碼將loop.run_forever() 註釋掉,換成time.sleep(5) 停5秒, 這時腳本不會有任何輸出,在停了5秒 以後就中止了,
回到之前的日誌發送遠程服務器的代碼,我們可以使用aiohttp封裝一個發送數據的函數, 然後在emit中將 這個函數註冊到全局的事件循環對象loop中,最後再執行loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封裝發送數據函數
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 添加一個httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天氣不錯")
logger.debug("是風和日麗的")
loop.run_forever()

這時腳本就可以正常的異步執行了

loop.create_task(self.submit(msg)) 也可以使用
asyncio.ensure_future(self.submit(msg), loop=loop) 來代替,目的都是將協程對象註冊到事件循環中。

但這種方式有一點要注意,loop.run_forever() 將會一直阻塞,所以需要有個地方調用 loop.stop() 方法. 可以註冊到某個task的回調中。

以上就是本次分享的所有內容,想要了解更多 python 知識歡迎前往公眾號:Python 編程學習圈 ,發送 “J” 即可免費獲取,每日干貨分享