日本综合一区二区|亚洲中文天堂综合|日韩欧美自拍一区|男女精品天堂一区|欧美自拍第6页亚洲成人精品一区|亚洲黄色天堂一区二区成人|超碰91偷拍第一页|日韩av夜夜嗨中文字幕|久久蜜综合视频官网|精美人妻一区二区三区

RELATEED CONSULTING
相關咨詢
選擇下列產(chǎn)品馬上在線溝通
服務時間:8:30-17:00
你可能遇到了下面的問題
關閉右側工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
純干貨!Python在運維中的應用:批量ssh/sftp

環(huán)境:

  • 生產(chǎn):4000+物理服務器,近 3000 臺虛擬機。
  • 開發(fā)環(huán)境:python3.6、redhat7.9,除了paramiko為第三方模塊需要自己安裝,其他的直接import即可。

主要應用方向:

  1. 配置變更:例如服務器上線時需要批量校正系統(tǒng)分區(qū)容量、及掛載數(shù)據(jù)盤。
  2. 配置信息查詢過濾:例如過濾防火墻規(guī)則、過濾網(wǎng)卡配置。
  3. 存活檢測:設置定時任務,定時輪詢服務器的 ssh 狀態(tài)是否正常。
  4. 文件傳輸:多個 ip 同時傳輸目錄/文件。

基本原則:

批量執(zhí)行操作是一把雙刃劍。批量執(zhí)行操作可以提升工作效率,但是隨之而來的風險不可忽略。

站在用戶的角度思考問題,與客戶深入溝通,找到翠屏網(wǎng)站設計與翠屏網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設計與互聯(lián)網(wǎng)技術結合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:做網(wǎng)站、成都網(wǎng)站建設、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣、國際域名空間、網(wǎng)站空間、企業(yè)郵箱。業(yè)務覆蓋翠屏地區(qū)。

風險案例如下:

掛載很多數(shù)據(jù)盤,通常先格式化硬盤,再掛載數(shù)據(jù)盤,最后再寫入將開機掛載信息寫入/etc/fstab文件。在批量lsblk檢查硬盤信息的時候發(fā)現(xiàn)有的系統(tǒng)盤在/sda有的在/sdm,如果不事先檢查機器相關配置是否一致直接按照工作經(jīng)驗去執(zhí)行批量操作,會很容易造成個人難以承受的災難。

在執(zhí)行批量操作時按照慣例:格式化硬盤->掛載->開機掛載的順序去執(zhí)行,假設有的機器因為某些故障導致格式化硬盤沒法正確執(zhí)行。在處理這類問題的時候通常會先提取出失敗的ip,并再按照慣例執(zhí)行操作。運維人員會很容易忽略開機掛載的信息已經(jīng)寫過了,導致復寫(這都是血和淚的教訓)。

所以,為了避免故障,提升工作效率,我認為應當建立團隊在工作上的共識,應當遵守以下原則:

  1. 批量操作前應當準備回退方案。
  2. 批量操作前作前先確定檢查目標服務器相關的配置的一致性。
  3. 批量操作時應當把重復執(zhí)行會影響系統(tǒng)的操作和不影響的分開執(zhí)行。
  4. 批量操作后應當再次驗證操作結果是否符合預期。

當然,代碼的規(guī)范也應當重視起來,不僅是為了便于審計,同時也需要便于溯源。我認為應當注意以下幾點:

  1. 關鍵方法一定要記錄傳入的參數(shù)以及執(zhí)行后的結果。
  2. 為了避免方法返回值不符合預期,該拋異常的地方一定要拋。
  3. 優(yōu)化代碼,刪去不必要的邏輯分支和盡量不要寫重復的代碼,使代碼看起來整潔。
  4. 程序的執(zhí)行情況、結果一定要保留日志。

技術難點

1、ssh no existing session,sftp超時時間設置:

在代碼無錯的情況下大量ip出現(xiàn)No existing session,排查后定位在代碼的寫法上,下面是一個正確的示例。由于最開始沒考慮到ssh連接的幾種情況導致了重寫好幾遍。另外sftp的實例貌似不能直接設置連接超時時間,所以我采用了先建立ssh連接再打開sftp的方法。

import paramiko

username = 'root'
port = 22
pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa') # 導入公鑰
timeout=10


def ssh_client( ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient() # 實例化ssh
client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 配置ssh互信
if auth == 'id_rsa': # 公鑰認證
client.connect(ip, port, username, pkey=pkey, banner_timeout=60, timeout=timeout)
elif auth == 'noAuth': # 用戶名密碼認證
if user is not None and passwd is not None:
client.connect(ip, port, user, passwd, banner_timeout=60, timeout=timeout)
else:
raise ValueError('傳入的用戶名密碼不能為空')
else:
raise NameError('不存在此%s認證方式' % auth)
return client


def sftp_client(ip, user=None, passwd=None, auth='id_rsa'):
ssh = ssh_client(ip, user, passwd, auth)
sftp = ssh.open_sftp()
return sftp

2、sftp中的get()和put()方法僅能傳文件,不支持直接傳目錄:

不能直接傳目錄,那換個思路,遍歷路徑中的目錄和文件,先創(chuàng)建目錄再傳文件就能達到一樣的效果了。在paramiko的sftp中sftp.listdir_attr()方法可以獲取遠程路徑中的文件、目錄信息。那么我們可以寫一個遞歸來遍歷遠程路徑中的所有文件和目錄(傳入一個列表是為了接收遞歸返回的值)。

def check_remote_folders(sftp, remote_path, remote_folders: list):
# 遞歸遍歷遠程路徑中的目錄,并添加到remote_folders中
for f in sftp.listdir_attr(remote_path):
# 檢查路徑狀態(tài)是否為目錄
if stat.S_ISDIR(f.st_mode):
# 遞歸調用自己
check_remote_folders(sftp, remote_path + '/' + f.filename, remote_folders)
# 添加遍歷到的目錄信息到列表中
remote_folders.append(remote_path + '/' + f.filename)

python自帶的os模塊中的os.walk()方法可以遍歷到本地路徑中的目錄和文件。

  local_files_path = []
local_directory = []
for root, dirs, files in os.walk(local_path):
local_directory.append(root)
for file in files:if root[-1] != '/':
local_files_path.append(root + '/' + file)
elif root[-1] == '/':
local_files_path.append(root + file)

3、多線程多個ip使用sftp.get()方法時無法并發(fā)。

改成多進程即可。

 def batch_sftp_get(ip, remote_path, local_path, user=None, passwd=None, auth='id_rsa'):
pool = multiprocessing.Pool(5)for i in ip:
pool.apply_async(sftp_get, (i, remote_path, local_path, user, passwd, auth,))
pool.close()
pool.join()

4、多個ip需要執(zhí)行相同命令或不同的命令。

由于是日常使用的場景不會很復雜,所以借鑒了ansible的playbook,讀取提前準備好的配置文件即可,然后再整合到之前定義的ssh函數(shù)中。

# 配置文件大概是這樣
192.168.0.100:df -Th | grep xfs; lsblk | grep disk | wc -l
192.168.0.101:ip a | grep team | wc -l
192.168.0.102:route -n
...
from concurrent.futures import ThreadPoolExecutor, as_completed
import time, json


def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
pool = ThreadPoolExecutor(self.workers)
task = []
if cmd_type == 'one':
task = [pool.submit(self.ssh_exec, i, cmd, user, passwd, auth) for i in ip]
elif cmd_type == 'many':
if isinstance(ip, list):
for i in ip:
separator = ''if ':' in i:
separator = ':'elif ',' in i:
separator = ','if separator != '':
data = i.split(separator)
task.append(pool.submit(self.ssh_client, data[0], data[1], user, passwd))
else:
return '請檢查ip和命令間的分隔符'else:
return 'ip的類型為%s, 請傳入一個正確的類型' % type(ip)
else:
return 'cmd_type不存在%s值, 請傳入一個正確的參數(shù)' % cmd_type
self.logger.debug('檢查變量task:%s' % task)
results = {}
for future in as_completed(task):
res = future.result().split(':')
results[res[1]] = {res[0]: res[2]}
if 'success' in future.result():
print('\033[32;1m%s\033[0m' % future.result().replace('success:', ''))
elif 'failed' in future.result():
print('\033[31;1m%s\033[0m' % future.result().replace('failed:', ''))
pool.shutdown()
json_results = {
'task_name': task_name,
'task_sn': self.task_sn,
'start_time': self.now_time,
'cost_time': '%.2fs' % (time.perf_counter() - self.s),
'results': results
}
self.logger.info('json_results:%s' % json_results)
with open(self.log_path + 'task_%s_%s.log' % (task_name, self.task_sn), 'a') as f:
f.write(json.dumps(json_results))
return json_results

同時,我們還衍生出一個需求,既然都要讀取配置,那同樣也可以提前把ip地址準備在文件里。正好也能讀取我們返回的執(zhí)行程序的結果。

import os
import json


def get_info(self, path):
self.logger.debug('接收參數(shù)path:%s'.encode('utf8') % path.encode('utf8'))
if os.path.exists(path):
info_list = [i.replace('\n', '') for i in open(path, 'r', encoding='utf8').readlines()]
return info_list
else:
self.logger.warning('%s不存在,請傳入一個正確的目錄' % path)
raise ValueError('%s不存在,請傳入一個正確的目錄' % path)


def log_analysis(filename):if os.path.exists(filename):
try:
data = json.load(open(filename, 'r', encoding='utf8'))
return data
except Exception as e:
print('%s無法解析該類型文件' % filename + ' ' + str(e))
raise TypeError('%s無法解析該類型文件' % filename + ' ' + str(e))
else:
raise ValueError('該%s文件路徑不存在,請傳入一個正確的文件路徑' % filename)


def show_log(self, filename, mode=None):
data: dict = self.log_analysis(filename)
if isinstance(data, dict):
for key, value in data["results"].items():
if 'success' in value.keys():
if mode == 'success':
print(key)
elif mode is None:
print('%s:%s' % (key, value['success'].replace('\r\n', '')))
elif 'failed' in value.keys():
if mode == 'failed':
print(key)
elif mode is None:
print('%s:%s' % (key, value['failed'].replace('\r\n', '')))

完整代碼展示:

from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
import os
import re
import time
import stat
import json
import random
import logging
import asyncio
import argparse
import paramiko


class TaskManager:def __init__(self, timeout=10, workers=15, system='linux'):
self.username = 'root'
self.port = 22
self.datetime = time.strftime("%Y-%m-%d", time.localtime())
self.timeout = timeout
self.workers = workers
self.now_time = time.strftime("%Y/%m/%d %H:%M:%S", time.localtime())
self.s = time.perf_counter()
self.task_sn = self.sn_random_generator()

if system == 'linux':
self.pkey = paramiko.RSAKey.from_private_key_file('/root/.ssh/id_rsa')
self.log_path = '/tmp/TaskManager/log/'
self.log_debug_path = '/tmp/TaskManager/debug/'elif system == 'windows':
self.pkey = paramiko.RSAKey.from_private_key_file(r'C:\Users\001\.ssh\id_rsa')
self.log_path = r'D:\tmp\TaskManager\log\\'
self.log_debug_path = r'D:\tmp\TaskManager\debug\\'

if os.path.exists(self.log_path) is False:
os.makedirs(self.log_path, exist_ok=True)
if os.path.exists(self.log_debug_path) is False:
os.makedirs(self.log_debug_path, exist_ok=True)

self.logger = logging.getLogger(__name__)
self.logger.setLevel(level=logging.DEBUG)
self.handler = logging.FileHandler(self.log_debug_path + '%s_%s.log' % (self.datetime, self.task_sn))
self.formatter = logging.Formatter("%(asctime)s[%(levelname)s][%(funcName)s]%(message)s ")
self.handler.setFormatter(self.formatter)
self.logger.addHandler(self.handler)
self.logger.info('初始化完成'.encode(encoding='utf8'))

def ssh_client(self, ip, user=None, passwd=None, auth='id_rsa'):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if auth == 'id_rsa':
self.logger.info('正在 SSH 連接%s'.encode('utf8') % str(ip).encode('utf8'))
client.connect(ip, self.port, self.username, pkey=self.pkey, banner_timeout=60, timeout=self.timeout)
elif auth == 'noAuth':
if user is not None and passwd is not None:
client.connect(ip, self.port, user, passwd, banner_timeout=60, timeout=self.timeout)
# allow_agent=False, look_for_keys=False# No existing session 解決辦法 else:raise ValueError('傳入的用戶名密碼不能為空')else:raise NameError('不存在此%s 認證方式' % auth)return client

def ssh_exec(self, ip, cmd, user=None, passwd=None, auth='id_rsa'):try:
ssh = self.ssh_client(ip, user, passwd, auth)
stdin, stdout, stderr = ssh.exec_command(command=cmd, get_pty=True)
self.logger.debug('%s:stdin 輸入:%s' % (ip, stdin))
self.logger.debug('%s:stderr 錯誤:%s' % (ip, stderr))
self.logger.debug('%s:stdout 輸出:%s' % (ip, stdout))
result = stdout.read().decode('utf-8')
ssh.close()
return 'success:' + ip + ':' + str(result)
except Exception as e:
return 'failed:' + ip + ':' + str(e)

def batch_ssh(self, ip, cmd=None, user=None, passwd=None, cmd_type='one', task_name='default', auth='id_rsa'):
self.logger.debug('接收參數(shù) ip:%s, cmd:%s, user:%s passwd:%s cmd_type:%s, task_name:%s' %
(ip, cmd, user, passwd, cmd_type, task_name))
print('\033[35;1m-------------------Task is set up! Time:%s ----------------------\033[0m' % self.now_time)
pool = http://www.dlmjj.cn/article/dpogish.html