超硬核!11個非常實用的 Python 和 Shell 拿來就用指令碼例項!

語言: CN / TW / HK

Python 指令碼部分例項:企業微信告警、FTP 客戶端、SSH 客戶端、Saltstack 客戶端、vCenter 客戶端、獲取域名 ssl 證書過期時間、傳送今天的天氣預報以及未來的天氣趨勢圖;

Shell 指令碼部分例項:SVN 完整備份、Zabbix 監控使用者密碼過期、構建本地 YUM 以及上篇文章中有讀者的需求(負載高時,查出佔用比較高的程序指令碼並存儲或推送通知);

篇幅有些長,還請大家耐心翻到文末,畢竟有彩蛋。

Python 指令碼部分

企業微信告警

此指令碼通過企業微信應用,進行微信告警,可用於 Zabbix 監控。

# -*- coding: utf-8 -*-
import requests
import json
class DLF:
   def __init__(self, corpid, corpsecret):
       self.url = "http://qyapi.weixin.qq.com/cgi-bin"
       self.corpid = corpid
       self.corpsecret = corpsecret
       self._token = self._get_token()
   def _get_token(self):
       '''
       獲取企業微信API介面的access_token
       :return:
       '''
       token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
       try:
           res = requests.get(token_url).json()
           token = res['access_token']
           return token
       except Exception as e:
           return str(e)
   def _get_media_id(self, file_obj):
       get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
       data = {"media": file_obj}
       try:
           res = requests.post(url=get_media_url, files=data)
           media_id = res.json()['media_id']
           return media_id
       except Exception as e:
           return str(e)
   def send_text(self, agentid, content, touser=None, toparty=None):
       send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
       send_data = {
           "touser": touser,
           "toparty": toparty,
           "msgtype": "text",
           "agentid": agentid,
           "text": {
               "content": content
           }
       }
       try:
           res = requests.post(send_msg_url, data=json.dumps(send_data))
       except Exception as e:
           return str(e)
   def send_image(self, agentid, file_obj, touser=None, toparty=None):
       media_id = self._get_media_id(file_obj)
       send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
       send_data = {
           "touser": touser,
           "toparty": toparty,
           "msgtype": "image",
           "agentid": agentid,
           "image": {
               "media_id": media_id
          }
       }
       try:
           res = requests.post(send_msg_url, data=json.dumps(send_data))
       except Exception as e:
           return str(e)

FTP 客戶端

通過 ftplib 模組操作 ftp 伺服器,進行上傳下載等操作。

# -*- coding: utf-8 -*-
from ftplib import FTP
from os import path
import copy
class FTPClient:
   def __init__(self, host, user, passwd, port=21):
       self.host = host
       self.user = user
       self.passwd = passwd
       self.port = port
       self.res = {'status': True, 'msg': None}
       self._ftp = None
       self._login()
   def _login(self):
       '''
       登入FTP伺服器
       :return: 連線或登入出現異常時返回錯誤資訊
       '''
       try:
           self._ftp = FTP()
           self._ftp.connect(self.host, self.port, timeout=30)
           self._ftp.login(self.user, self.passwd)
       except Exception as e:
           return e
   def upload(self, localpath, remotepath=None):
       '''
       上傳ftp檔案
       :param localpath: local file path
       :param remotepath: remote file path
       :return:
       '''
       if not localpath: return 'Please select a local file. '
       # 讀取本地檔案
       # fp = open(localpath, 'rb')
       # 如果未傳遞遠端檔案路徑,則上傳到當前目錄,檔名稱同本地檔案
       if not remotepath:
           remotepath = path.basename(localpath)
       # 上傳檔案
       self._ftp.storbinary('STOR ' + remotepath, localpath)
       # fp.close()
   def download(self, remotepath, localpath=None):
       '''
       localpath
       :param localpath: local file path
       :param remotepath: remote file path
       :return:
       '''
       if not remotepath: return 'Please select a remote file. '
       # 如果未傳遞本地檔案路徑,則下載到當前目錄,檔名稱同遠端檔案
       if not localpath:
           localpath = path.basename(remotepath)
       # 如果localpath是目錄的話就和remotepath的basename拼接
       if path.isdir(localpath):
           localpath = path.join(localpath, path.basename(remotepath))
       # 寫入本地檔案
       fp = open(localpath, 'wb')
       # 下載檔案
       self._ftp.retrbinary('RETR ' + remotepath, fp.write)
       fp.close()
   def nlst(self, dir='/'):
       '''
       檢視目錄下的內容
       :return: 以列表形式返回目錄下的所有內容
       '''
       files_list = self._ftp.nlst(dir)
       return files_list
   def rmd(self, dir=None):
       '''
       刪除目錄
       :param dir: 目錄名稱
       :return: 執行結果
       '''
       if not dir: return 'Please input dirname'
       res = copy.deepcopy(self.res)
       try:
           del_d = self._ftp.rmd(dir)
           res['msg'] = del_d
       except Exception as e:
           res['status'] = False
           res['msg'] = str(e)
       return res
   def mkd(self, dir=None):
       '''
       建立目錄
       :param dir: 目錄名稱
       :return: 執行結果
       '''
       if not dir: return 'Please input dirname'
       res = copy.deepcopy(self.res)
       try:
           mkd_d = self._ftp.mkd(dir)
           res['msg'] = mkd_d
       except Exception as e:
           res['status'] = False
           res['msg'] = str(e)
       return res
   def del_file(self, filename=None):
       '''
       刪除檔案
       :param filename: 檔名稱
       :return: 執行結果
       '''
       if not filename: return 'Please input filename'
       res = copy.deepcopy(self.res)
       try:
           del_f = self._ftp.delete(filename)
           res['msg'] = del_f
       except Exception as e:
           res['status'] = False
           res['msg'] = str(e)
       return res
   def get_file_size(self, filenames=[]):
       '''
       獲取檔案大小,單位是位元組
       判斷檔案型別
       :param filename: 檔名稱
       :return: 執行結果
       '''
       if not filenames: return {'msg': 'This is an empty directory'}
       res_l = []
       for file in filenames:
           res_d = {}
           # 如果是目錄或者檔案不存在就會報錯
           try:
               size = self._ftp.size(file)
               type = 'f'
           except:
               # 如果是路徑的話size顯示 - , file末尾加/ (/dir/)
               size = '-'
               type = 'd'
               file = file + '/'
           res_d['filename'] = file
           res_d['size'] = size
           res_d['type'] = type
           res_l.append(res_d)
       return res_l
   def rename(self, old_name=None, new_name=None):
       '''
       重新命名
       :param old_name: 舊的檔案或者目錄名稱
       :param new_name: 新的檔案或者目錄名稱
       :return: 執行結果
       '''
       if not old_name or not new_name: return 'Please input old_name and new_name'
       res = copy.deepcopy(self.res)
       try:
           rename_f = self._ftp.rename(old_name, new_name)
           res['msg'] = rename_f
       except Exception as e:
           res['status'] = False
           res['msg'] = str(e)
       return res
   def close(self):
       '''
       退出ftp連線
       :return:
       '''
       try:
           # 向伺服器傳送quit命令
           self._ftp.quit()
       except Exception:
           return 'No response from server'
       finally:
           # 客戶端單方面關閉連線
           self._ftp.close()

SSH 客戶端

此指令碼僅用於通過 key 連線,如需要密碼連線,簡單修改下即可。

# -*- coding: utf-8 -*-
import paramiko
class SSHClient:
   def __init__(self, host, port, user, pkey):
       self.ssh_host = host
       self.ssh_port = port
       self.ssh_user = user
       self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
       self.ssh = None
       self._connect()
   def _connect(self):
       self.ssh = paramiko.SSHClient()
       self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
       try:
           self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)
       except:
           return 'ssh connect fail'
   def execute_command(self, command):
       stdin, stdout, stderr = self.ssh.exec_command(command)
       out = stdout.read()
       err = stderr.read()
       return out, err
   def close(self):
       self.ssh.close()

Saltstack 客戶端

通過 api 對 Saltstack 服務端進行操作,執行命令。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import requests
import json
import copy
class SaltApi:
   """
   定義salt api介面的類
   初始化獲得token
   """
   def __init__(self):
       self.url = "http://172.85.10.21:8000/"
       self.username = "saltapi"
       self.password = "saltapi"
       self.headers = {"Content-type": "application/json"}
       self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}
       self.login_url = self.url + "login"
       self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}
       self.token = self.get_data(self.login_url, self.login_params)['token']
       self.headers['X-Auth-Token'] = self.token
   def get_data(self, url, params):
       '''
       請求url獲取資料
       :param url: 請求的url地址
       :param params: 傳遞給url的引數
       :return: 請求的結果
       '''
       send_data = json.dumps(params)
       request = requests.post(url, data=send_data, headers=self.headers)
       response = request.json()
       result = dict(response)
       return result['return'][0]
   def get_auth_keys(self):
       '''
       獲取所有已經認證的key
       :return:
       '''
       data = copy.deepcopy(self.params)
       data['client'] = 'wheel'
       data['fun'] = 'key.list_all'
       result = self.get_data(self.url, data)
       try:
           return result['data']['return']['minions']
       except Exception as e:
           return str(e)
   def get_grains(self, tgt, arg='id'):
       """
       獲取系統基礎資訊
       :tgt: 目標主機
       :return:
       """
       data = copy.deepcopy(self.params)
       if tgt:
           data['tgt'] = tgt
       else:
           data['tgt'] = '*'
       data['fun'] = 'grains.item'
       data['arg'] = arg
       result = self.get_data(self.url, data)
       return result
   def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False):
       """
       執行saltstack 模組命令,類似於salt '*' cmd.run 'command'
       :param tgt: 目標主機
       :param fun: 模組方法 可為空
       :param arg: 傳遞引數 可為空
       :return: 執行結果
       """
       data = copy.deepcopy(self.params)
       if not tgt: return {'status': False, 'msg': 'target host not exist'}
       if not arg:
           data.pop('arg')
       else:
           data['arg'] = arg
       if tgt != '*':
           data['tgt_type'] = tgt_type
       if salt_async: data['client'] = 'local_async'
       data['fun'] = fun
       data['tgt'] = tgt
       result = self.get_data(self.url, data)
       return result
   def jobs(self, fun='detail', jid=None):
       """
       任務
       :param fun: active, detail
       :param jod: Job ID
       :return: 任務執行結果
       """
       data = {'client': 'runner'}
       data['fun'] = fun
       if fun == 'detail':
           if not jid: return {'success': False, 'msg': 'job id is none'}
           data['fun'] = 'jobs.lookup_jid'
           data['jid'] = jid
       else:
           return {'success': False, 'msg': 'fun is active or detail'}
       result = self.get_data(self.url, data)
       return result

vCenter 客戶端

通過官方 SDK 對 vCenter 進行日常操作,此指令碼是我用於 cmdb 平臺的,自動獲取主機資訊,存入資料庫。

from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL
from pyVmomi import vim
from asset import models
import atexit
class Vmware:
   def __init__(self, ip, user, password, port, idc, vcenter_id):
       self.ip = ip
       self.user = user
       self.password = password
       self.port = port
       self.idc_id = idc
       self.vcenter_id = vcenter_id
   def get_obj(self, content, vimtype, name=None):
       '''
       列表返回,name 可以指定匹配的物件
       '''
       container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
       obj = [ view for view in container.view ]
       return obj
   def get_esxi_info(self):
       # 宿主機資訊
       esxi_host = {}
       res = {"connect_status": True, "msg": None}
       try:
           # connect this thing
           si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
       except Exception as e:
           res['connect_status'] = False
           try:
               res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)
           except Exception as e:
               res['msg'] = '%s: connection error' % (self.ip)
           return res
       # disconnect this thing
       atexit.register(Disconnect, si)
       content = si.RetrieveContent()
       esxi_obj = self.get_obj(content, [vim.HostSystem])
       for esxi in esxi_obj:
           esxi_host[esxi.name] = {}
           esxi_host[esxi.name]['idc_id'] = self.idc_id
           esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id
           esxi_host[esxi.name]['server_ip'] = esxi.name
           esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor
           esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model
           for i in esxi.summary.hardware.otherIdentifyingInfo:
               if isinstance(i, vim.host.SystemIdentificationInfo):
                   esxi_host[esxi.name]['server_sn'] = i.identifierValue
           # 系統名稱
           esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName
           # cpu總核數
           esxi_cpu_total = esxi.summary.hardware.numCpuThreads
           # 記憶體總量 GB
           esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024
           # 獲取硬碟總量 GB
           esxi_disk_total = 0
           for ds in esxi.datastore:
               esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024
           # 預設配置4核8G100G,根據這個配置計算剩餘可分配虛擬機器
           default_configure = {
               'cpu': 4,
               'memory': 8,
               'disk': 100
           }
           esxi_host[esxi.name]['vm_host'] = []
           vm_usage_total_cpu = 0
           vm_usage_total_memory = 0
           vm_usage_total_disk = 0
           # 虛擬機器資訊
           for vm in esxi.vm:
               host_info = {}
               host_info['vm_name'] = vm.name
               host_info['power_status'] = vm.runtime.powerState
               host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + '核'
               host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB'
               host_info['system_info'] = vm.config.guestFullName
               disk_info = ''
               disk_total = 0
               for d in vm.config.hardware.device:
                   if isinstance(d, vim.vm.device.VirtualDisk):
                       disk_total += d.capacityInKB / 1024 / 1024
                       disk_info += d.deviceInfo.label + ": " +  str((d.capacityInKB) / 1024 / 1024) + ' GB' + ','
               host_info['disk_info'] = disk_info
               esxi_host[esxi.name]['vm_host'].append(host_info)
               # 計算當前宿主機可用容量:總量 - 已分配的
               if host_info['power_status'] == 'poweredOn':
                   vm_usage_total_cpu += vm.config.hardware.numCPU
                   vm_usage_total_disk += disk_total
                   vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024)
           esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu
           esxi_memory_free = esxi_memory_total - vm_usage_total_memory
           esxi_disk_free = esxi_disk_total - vm_usage_total_disk
           esxi_host[esxi.name]['cpu_info'] = 'Total: %d核, Free: %d核' % (esxi_cpu_total, esxi_cpu_free)
           esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free)
           esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free)
           # 計算cpu 記憶體 磁碟按照預設資源分配的最小值,即為當前可分配資源
           if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:
               free_allocation_vm_host = 0
           else:
               free_allocation_vm_host = int(min(
                   [
                       esxi_cpu_free / default_configure['cpu'],
                       esxi_memory_free / default_configure['memory'],
                       esxi_disk_free / default_configure['disk']
                   ]
               ))
           esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host
       esxi_host['connect_status'] = True
       return esxi_host
   def write_to_db(self):
       esxi_host = self.get_esxi_info()
       # 連線失敗
       if not esxi_host['connect_status']:
           return esxi_host
       del esxi_host['connect_status']
       for machine_ip in esxi_host:
           # 物理機資訊
           esxi_host_dict = esxi_host[machine_ip]
           # 虛擬機器資訊
           virtual_host = esxi_host[machine_ip]['vm_host']
           del esxi_host[machine_ip]['vm_host']
           obj = models.EsxiHost.objects.create(**esxi_host_dict)
           obj.save()
           for host_info in virtual_host:
               host_info['management_host_id'] = obj.id
               obj2 = models.virtualHost.objects.create(**host_info)
               obj2.save()

獲取域名 ssl 證書過期時間

用於 zabbix 告警

import re
import sys
import time
import subprocess
from datetime import datetime
from io import StringIO
def main(domain):
   f = StringIO()
   comm = f"curl -Ivs http://{domain} --connect-timeout 10"
   result = subprocess.getstatusoutput(comm)
   f.write(result[1])
   try:
       m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n', f.getvalue(), re.S)
       start_date = m.group(1)
       expire_date = m.group(2)
       common_name = m.group(3)
       issuer = m.group(4)
   except Exception as e:
       return 999999999
   # time 字串轉時間陣列
   start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
   start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
   # datetime 字串轉時間陣列
   expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
   expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")
   # 剩餘天數
   remaining = (expire_date-datetime.now()).days
    return remaining  
if __name__ == "__main__":
    domain = sys.argv[1]  
   remaining_days = main(domain)
   print(remaining_days)

傳送今天的天氣預報以及未來的天氣趨勢圖

此指令碼用於給老婆大人傳送今天的天氣預報以及未來的天氣趨勢圖,現在微信把網頁端禁止了,沒法傳送到微信了,我是通過企業微信進行通知的,需要把你老婆大人拉到企業微信,無興趣的小夥伴跳過即可。

# -*- coding: utf-8 -*-
   import requests
   import json
   import datetime
   def weather(city):
       url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city
       try:
           data = requests.get(url).json()['data']
           city = data['city']
           ganmao = data['ganmao']
           today_weather = data['forecast'][0]
           res = "老婆今天是{}\n今天天氣概況\n城市: {:<10}\n時間: {:<10}\n高溫: {:<10}\n低溫: {:<10}\n風力: {:<10}\n風向: {:<10}\n天氣: {:<10}\n\n稍後會發送近期溫度趨勢圖,請注意檢視。\
           ".format(
               ganmao,
               city,
               datetime.datetime.now().strftime('%Y-%m-%d'),
               today_weather['high'].split()[1],
               today_weather['low'].split()[1],
               today_weather['fengli'].split('[')[2].split(']')[0],
               today_weather['fengxiang'],today_weather['type'],
           )
           return {"source_data": data, "res": res}
       except Exception as e:
           return str(e)
   ```
   + 獲取天氣預報趨勢圖
   ```python
   # -*- coding: utf-8 -*-
   import matplotlib.pyplot as plt
   import re
   import datetime
   def Future_weather_states(forecast, save_path, day_num=5):
       '''
       展示未來的天氣預報趨勢圖
       :param forecast: 天氣預報預測的資料
       :param day_num: 未來幾天
       :return: 趨勢圖
       '''
       future_forecast = forecast
       dict={}
       for i in range(day_num):
           data = []
           date = future_forecast[i]["date"]
           date = int(re.findall("\d+",date)[0])
           data.append(int(re.findall("\d+", future_forecast[i]["high"])[0]))
           data.append(int(re.findall("\d+", future_forecast[i]["low"])[0]))
           data.append(future_forecast[i]["type"])
           dict[date] = data
       data_list = sorted(dict.items())
       date=[]
       high_temperature = []
       low_temperature = []
       for each in data_list:
           date.append(each[0])
           high_temperature.append(each[1][0])
           low_temperature.append(each[1][1])
           fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b")
       current_date = datetime.datetime.now().strftime('%Y-%m')
       plt.rcParams['font.sans-serif'] = ['SimHei']
       plt.rcParams['axes.unicode_minus'] = False
       plt.xlabel(current_date)
       plt.ylabel("℃")
       plt.legend(["高溫", "低溫"])
       plt.xticks(date)
       plt.title("最近幾天溫度變化趨勢")
       plt.savefig(save_path)
   ```
   + 傳送到企業微信
   ```python
   # -*- coding: utf-8 -*-
   import requests
   import json
   class DLF:
       def __init__(self, corpid, corpsecret):
           self.url = "http://qyapi.weixin.qq.com/cgi-bin"
           self.corpid = corpid
           self.corpsecret = corpsecret
           self._token = self._get_token()
       def _get_token(self):
           '''
           獲取企業微信API介面的access_token
           :return:
           '''
           token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
           try:
               res = requests.get(token_url).json()
               token = res['access_token']
               return token
           except Exception as e:
               return str(e)
       def _get_media_id(self, file_obj):
           get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
           data = {"media": file_obj}
           try:
               res = requests.post(url=get_media_url, files=data)
               media_id = res.json()['media_id']
               return media_id
           except Exception as e:
               return str(e)
       def send_text(self, agentid, content, touser=None, toparty=None):
           send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
           send_data = {
               "touser": touser,
               "toparty": toparty,
               "msgtype": "text",
               "agentid": agentid,
               "text": {
                   "content": content
               }
           }
           try:
               res = requests.post(send_msg_url, data=json.dumps(send_data))
           except Exception as e:
               return str(e)
       def send_image(self, agentid, file_obj, touser=None, toparty=None):
           media_id = self._get_media_id(file_obj)
           send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
           send_data = {
               "touser": touser,
               "toparty": toparty,
               "msgtype": "image",
               "agentid": agentid,
               "image": {
                   "media_id": media_id
              }
           }
           try:
               res = requests.post(send_msg_url, data=json.dumps(send_data))
           except Exception as e:
               return str(e)
+ main指令碼
# -*- coding: utf-8 -*-
from plugins.weather_forecast import weather
from plugins.trend_chart import Future_weather_states
from plugins.send_wechat import DLF
import os
# 企業微信相關資訊
corpid = "xxx"
corpsecret = "xxx"
agentid = "xxx"
# 天氣預報趨勢圖儲存路徑
_path = os.path.dirname(os.path.abspath(__file__))
save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg')
# 獲取天氣預報資訊
content = weather("大興")
# 傳送文字訊息
dlf = DLF(corpid, corpsecret)
dlf.send_text(agentid=agentid, content=content['res'], toparty='1')
# 生成天氣預報趨勢圖
Future_weather_states(content['source_data']['forecast'], save_path)
# 傳送圖片訊息
file_obj = open(save_path, 'rb')
dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)

Shell 指令碼部分

SVN 完整備份

通過 hotcopy 進行 SVN 完整備份,備份保留 7 天。

#!/bin/bash
# Filename   :  svn_backup_repos.sh
# Date       :  2020/12/14
# Author     :  JakeTian      
# Email      :  [email protected]***.com
# Crontab    :  59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/null 2>&1
# Notes      :  將指令碼加入crontab中,每天定時執行
# Description:  SVN完全備份
set -e
SRC_PATH="/opt/svndata"
DST_PATH="/data/svnbackup"
LOG_FILE="$DST_PATH/logs/svn_backup.log"
SVN_BACKUP_C="/bin/svnadmin hotcopy"
SVN_LOOK_C="/bin/svnlook youngest"
TODAY=$(date +'%F')
cd $SRC_PATH
ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d './')
# 建立備份目錄,備份指令碼日誌目錄
test -d $DST_PATH || mkdir -p $DST_PATH
test -d $DST_PATH/logs || mkdir $DST_PATH/logs
test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY
# 備份repos檔案
for repo in $ALL_REPOS
do
   $SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo
   # 判斷備份是否完成
   if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then
        echo "$TODAY: $repo Backup Success" >> $LOG_FILE  
   else
       echo "$TODAY: $repo Backup Fail" >> $LOG_FILE
   fi
done
# # 備份使用者密碼檔案和許可權檔案
cp -p authz access.conf $DST_PATH/$TODAY
# 日誌檔案轉儲
mv $LOG_FILE $LOG_FILE-$TODAY
# 刪除七天前的備份
seven_days_ago=$(date -d "7 days ago" +'%F')
rm -rf $DST_PATH/$seven_days_ago

zabbix 監控使用者密碼過期

用於 Zabbix 監控 Linux 系統使用者(shell 為 /bin/bash 和 /bin/sh)密碼過期,密碼有效期剩餘 7 天觸發加自動發現使用者。

#!/bin/bash
diskarray=(`awk -F':' '$NF ~ /\/bin\/bash/||/\/bin\/sh/{print $1}' /etc/passwd`)
length=${#diskarray[@]}
printf "{\n"
printf  '\t'"\"data\":["
for ((i=0;i<$length;i++))
do
   printf '\n\t\t{'
   printf "\"{#USER_NAME}\":\"${diskarray[$i]}\"}"
   if [ $i -lt $[$length-1] ];then
           printf ','
   fi
done
printf  "\n\t]\n"
printf "}\n"
檢查使用者密碼過期
#!/bin/bash
export LANG=en_US.UTF-8
SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s')
user="$1"
# 將Sep 09, 2018格式的時間轉換成unix時間
expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p')
if [[ "$expires_date" != "never" ]];then
   expires_date=$(date -d "$expires_date" +'%s')
   if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then
       echo "1"
   else
       echo "0"
   fi
else
   echo "0"
fi

構建本地YUM

通過 rsync 的方式同步 yum,通過 nginx 只做 http yum 站點;

但是 centos6 的映象最近都不能用了,國內貌似都禁用了,如果找到合適的自行更換地址。

#!/bin/bash
# 更新yum映象
RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"
DIR="/app/yumData"
LogDir="$DIR/logs"
Centos6Base="$DIR/Centos6/x86_64/Base"
Centos7Base="$DIR/Centos7/x86_64/Base"
Centos6Epel="$DIR/Centos6/x86_64/Epel"
Centos7Epel="$DIR/Centos7/x86_64/Epel"
Centos6Salt="$DIR/Centos6/x86_64/Salt"
Centos7Salt="$DIR/Centos7/x86_64/Salt"
Centos6Update="$DIR/Centos6/x86_64/Update"
Centos7Update="$DIR/Centos7/x86_64/Update"
Centos6Docker="$DIR/Centos6/x86_64/Docker"
Centos7Docker="$DIR/Centos7/x86_64/Docker"
Centos6Mysql5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7"
Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7"
Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0"
Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0"
MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"
# 目錄不存在就建立
check_dir(){
   for dir in $*
   do
       test -d $dir || mkdir -p $dir
   done
}
# 檢查rsync同步結果
check_rsync_status(){
   if [ $? -eq 0 ];then
       echo "rsync success" >> $1
   else
       echo "rsync fail" >> $1
   fi
}
check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0
# Base yumrepo
#$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1
# check_rsync_status "$LogDir/centos6Base.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1
check_rsync_status "$LogDir/centos7Base.log"
# Epel yumrepo
# $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1
# check_rsync_status "$LogDir/centos6Epel.log"
$RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1
check_rsync_status "$LogDir/centos7Epel.log"
# SaltStack yumrepo
# $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1
# ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest
# check_rsync_status "$LogDir/centos6Salt.log"
$RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1
check_rsync_status "$LogDir/centos7Salt.log"
# ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest
# Docker yumrepo
$RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1
check_rsync_status "$LogDir/centos7Docker.log"
# centos update yumrepo
# $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1
# check_rsync_status "$LogDir/centos6Update.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1
check_rsync_status "$LogDir/centos7Update.log"
# mysql 5.7 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1
# check_rsync_status "$LogDir/centos6Mysql5.7.log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1
check_rsync_status "$LogDir/centos7Mysql5.7.log"
# mysql 8.0 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1
# check_rsync_status "$LogDir/centos6Mysql8.0.log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1
check_rsync_status "$LogDir/centos7Mysql8.0.log"

讀者需求解答

負載高時,查出佔用比較高的程序指令碼並存儲或推送通知

這部分內容是上篇 Shell 指令碼例項中底部讀者留言的需求,如下:

#!/bin/bash
# 物理cpu個數
physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l)
# 單個物理cpu核數
physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}')
# 總核數
total_cpu_cores=$((physical_cpu_count*physical_cpu_cores))
# 分別是一分鐘、五分鐘、十五分鐘負載的閾值,其中有一項超過閾值才會觸發
one_min_load_threshold="$total_cpu_cores"
five_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.8"}')
fifteen_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.7"}')
# 分別是分鐘、五分鐘、十五分鐘負載平均值
one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',')
five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',')
fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',')
# 獲取當前cpu 記憶體 磁碟io資訊,並寫入日誌檔案
# 如果需要傳送訊息或者呼叫其他,請自行編寫函式即可
get_info(){
   log_dir="cpu_high_script_log"
   test -d "$log_dir" || mkdir "$log_dir"
   ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log
   ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log
   iostat -dx 1 10 > "$log_dir"/disk_io_10.log
}
export -f get_info
echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | \
awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }'

以上,就是今天分享的全部內容了。

希望大家通過這些案例能夠學以致用,結合自身的實際場景進行運用,從而提高自己的工作效率。