source : Jay's IT The journey
This time I'll share with you a few scripts I used in my work , It is mainly divided into :Python and Shell Two parts .
Python Script part instance : Enterprise wechat alert 、FTP client 、SSH client 、Saltstack client 、vCenter client 、 Access to domain name ssl Certificate expiration time 、 Send today's weather forecast and future weather trend chart ;
Shell Script part instance :SVN Full backup 、Zabbix Monitoring user password expiration 、 Build local YUM And the needs of readers in the last article ( When the load is high , Find out the process script with high occupancy and store or push the notification );
It's a little long , Please turn to the end of the article patiently .
This script is applied through enterprise wechat , Make wechat alarm , Can be used for Zabbix monitor .
# -*- coding: utf-8 -*-
import requests
import json
class DLF:
def __init__(self, corpid, corpsecret):
self.url = "https://qyapi.weixin.qq.com/cgi-bin"
self.corpid = corpid
self.corpsecret = corpsecret
self._token = self._get_token()
def _get_token(self):
'''
Get enterprise wechat API Interface access_token
:return:
'''
token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
try:
res = requests.get(token_url).json()
token = res['access_token']
return token
except Exception as e:
return str(e)
def _get_media_id(self, file_obj):
get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
data = {"media": file_obj}
try:
res = requests.post(url=get_media_url, files=data)
media_id = res.json()['media_id']
return media_id
except Exception as e:
return str(e)
def send_text(self, agentid, content, touser=None, toparty=None):
send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
send_data = {
"touser": touser,
"toparty": toparty,
"msgtype": "text",
"agentid": agentid,
"text": {
"content": content
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
def send_image(self, agentid, file_obj, touser=None, toparty=None):
media_id = self._get_media_id(file_obj)
send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
send_data = {
"touser": touser,
"toparty": toparty,
"msgtype": "image",
"agentid": agentid,
"image": {
"media_id": media_id
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
adopt ftplib Module operation ftp The server , Upload and download .
# -*- coding: utf-8 -*-
from ftplib import FTP
from os import path
import copy
class FTPClient:
def __init__(self, host, user, passwd, port=21):
self.host = host
self.user = user
self.passwd = passwd
self.port = port
self.res = {'status': True, 'msg': None}
self._ftp = None
self._login()
def _login(self):
'''
Sign in FTP The server
:return: Return error message when connection or login is abnormal
'''
try:
self._ftp = FTP()
self._ftp.connect(self.host, self.port, timeout=30)
self._ftp.login(self.user, self.passwd)
except Exception as e:
return e
def upload(self, localpath, remotepath=None):
'''
Upload ftp file
:param localpath: local file path
:param remotepath: remote file path
:return:
'''
if not localpath: return 'Please select a local file. '
# Read local file
# fp = open(localpath, 'rb')
# If the remote file path is not passed , Then upload to the current directory , The file name is the same as the local file
if not remotepath:
remotepath = path.basename(localpath)
# Upload files
self._ftp.storbinary('STOR ' + remotepath, localpath)
# fp.close()
def download(self, remotepath, localpath=None):
'''
localpath
:param localpath: local file path
:param remotepath: remote file path
:return:
'''
if not remotepath: return 'Please select a remote file. '
# If the local file path is not passed , Download to the current directory , The file name is the same as the remote file
if not localpath:
localpath = path.basename(remotepath)
# If localpath If it's a catalog, it's like remotepath Of basename Splicing
if path.isdir(localpath):
localpath = path.join(localpath, path.basename(remotepath))
# Write to local file
fp = open(localpath, 'wb')
# Download the file
self._ftp.retrbinary('RETR ' + remotepath, fp.write)
fp.close()
def nlst(self, dir='/'):
'''
Check the contents of the catalog
:return: Returns all the contents in the directory as a list
'''
files_list = self._ftp.nlst(dir)
return files_list
def rmd(self, dir=None):
'''
Delete directory
:param dir: Directory name
:return: Execution results
'''
if not dir: return 'Please input dirname'
res = copy.deepcopy(self.res)
try:
del_d = self._ftp.rmd(dir)
res['msg'] = del_d
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def mkd(self, dir=None):
'''
Create directory
:param dir: Directory name
:return: Execution results
'''
if not dir: return 'Please input dirname'
res = copy.deepcopy(self.res)
try:
mkd_d = self._ftp.mkd(dir)
res['msg'] = mkd_d
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def del_file(self, filename=None):
'''
Delete file
:param filename: File name
:return: Execution results
'''
if not filename: return 'Please input filename'
res = copy.deepcopy(self.res)
try:
del_f = self._ftp.delete(filename)
res['msg'] = del_f
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def get_file_size(self, filenames=[]):
'''
Get file size , Unit is byte
Determine file type
:param filename: File name
:return: Execution results
'''
if not filenames: return {'msg': 'This is an empty directory'}
res_l = []
for file in filenames:
res_d = {}
# If the directory or file does not exist, an error will be reported
try:
size = self._ftp.size(file)
type = 'f'
except:
# If it's a path size Show - , file Add at the end / (/dir/)
size = '-'
type = 'd'
file = file + '/'
res_d['filename'] = file
res_d['size'] = size
res_d['type'] = type
res_l.append(res_d)
return res_l
def rename(self, old_name=None, new_name=None):
'''
rename
:param old_name: Old file or directory name
:param new_name: New file or directory name
:return: Execution results
'''
if not old_name or not new_name: return 'Please input old_name and new_name'
res = copy.deepcopy(self.res)
try:
rename_f = self._ftp.rename(old_name, new_name)
res['msg'] = rename_f
except Exception as e:
res['status'] = False
res['msg'] = str(e)
return res
def close(self):
'''
sign out ftp Connect
:return:
'''
try:
# Send to server quit command
self._ftp.quit()
except Exception:
return 'No response from server'
finally:
# The client unilaterally closes the connection
self._ftp.close()
This script is only used to pass key Connect , If you need a password connection , Simply modify it .
# -*- coding: utf-8 -*-
import paramiko
class SSHClient:
def __init__(self, host, port, user, pkey):
self.ssh_host = host
self.ssh_port = port
self.ssh_user = user
self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
self.ssh = None
self._connect()
def _connect(self):
self.ssh = paramiko.SSHClient()
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)
except:
return 'ssh connect fail'
def execute_command(self, command):
stdin, stdout, stderr = self.ssh.exec_command(command)
out = stdout.read()
err = stderr.read()
return out, err
def close(self):
self.ssh.close()
adopt api Yes Saltstack The server operates , Carry out orders .
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import requests
import json
import copy
class SaltApi:
"""
Definition salt api The class of the interface
Initialize get token
"""
def __init__(self):
self.url = "http://172.85.10.21:8000/"
self.username = "saltapi"
self.password = "saltapi"
self.headers = {"Content-type": "application/json"}
self.params = {'client': 'local', 'fun': None, 'tgt': None, 'arg': None}
self.login_url = self.url + "login"
self.login_params = {'username': self.username, 'password': self.password, 'eauth': 'pam'}
self.token = self.get_data(self.login_url, self.login_params)['token']
self.headers['X-Auth-Token'] = self.token
def get_data(self, url, params):
'''
request url get data
:param url: Requested url Address
:param params: Pass to url Parameters of
:return: Requested results
'''
send_data = json.dumps(params)
request = requests.post(url, data=send_data, headers=self.headers)
response = request.json()
result = dict(response)
return result['return'][0]
def get_auth_keys(self):
'''
Get all the certified key
:return:
'''
data = copy.deepcopy(self.params)
data['client'] = 'wheel'
data['fun'] = 'key.list_all'
result = self.get_data(self.url, data)
try:
return result['data']['return']['minions']
except Exception as e:
return str(e)
def get_grains(self, tgt, arg='id'):
"""
Get the basic information of the system
:tgt: The target host
:return:
"""
data = copy.deepcopy(self.params)
if tgt:
data['tgt'] = tgt
else:
data['tgt'] = '*'
data['fun'] = 'grains.item'
data['arg'] = arg
result = self.get_data(self.url, data)
return result
def execute_command(self, tgt, fun='cmd.run', arg=None, tgt_type='list', salt_async=False):
"""
perform saltstack Module command , Be similar to salt '*' cmd.run 'command'
:param tgt: The target host
:param fun: Module method May be empty
:param arg: Pass parameters May be empty
:return: Execution results
"""
data = copy.deepcopy(self.params)
if not tgt: return {'status': False, 'msg': 'target host not exist'}
if not arg:
data.pop('arg')
else:
data['arg'] = arg
if tgt != '*':
data['tgt_type'] = tgt_type
if salt_async: data['client'] = 'local_async'
data['fun'] = fun
data['tgt'] = tgt
result = self.get_data(self.url, data)
return result
def jobs(self, fun='detail', jid=None):
"""
Mission
:param fun: active, detail
:param jod: Job ID
:return: Task execution results
"""
data = {'client': 'runner'}
data['fun'] = fun
if fun == 'detail':
if not jid: return {'success': False, 'msg': 'job id is none'}
data['fun'] = 'jobs.lookup_jid'
data['jid'] = jid
else:
return {'success': False, 'msg': 'fun is active or detail'}
result = self.get_data(self.url, data)
return result
Through the official SDK Yes vCenter Carry out daily operation , This script is for cmdb Platform , Get host information automatically , Store in database .
from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL
from pyVmomi import vim
from asset import models
import atexit
class Vmware:
def __init__(self, ip, user, password, port, idc, vcenter_id):
self.ip = ip
self.user = user
self.password = password
self.port = port
self.idc_id = idc
self.vcenter_id = vcenter_id
def get_obj(self, content, vimtype, name=None):
'''
The list returns ,name You can specify matching objects
'''
container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
obj = [ view for view in container.view ]
return obj
def get_esxi_info(self):
# Host information
esxi_host = {}
res = {"connect_status": True, "msg": None}
try:
# connect this thing
si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
except Exception as e:
res['connect_status'] = False
try:
res['msg'] = ("%s Caught vmodl fault : " + e.msg) % (self.ip)
except Exception as e:
res['msg'] = '%s: connection error' % (self.ip)
return res
# disconnect this thing
atexit.register(Disconnect, si)
content = si.RetrieveContent()
esxi_obj = self.get_obj(content, [vim.HostSystem])
for esxi in esxi_obj:
esxi_host[esxi.name] = {}
esxi_host[esxi.name]['idc_id'] = self.idc_id
esxi_host[esxi.name]['vcenter_id'] = self.vcenter_id
esxi_host[esxi.name]['server_ip'] = esxi.name
esxi_host[esxi.name]['manufacturer'] = esxi.summary.hardware.vendor
esxi_host[esxi.name]['server_model'] = esxi.summary.hardware.model
for i in esxi.summary.hardware.otherIdentifyingInfo:
if isinstance(i, vim.host.SystemIdentificationInfo):
esxi_host[esxi.name]['server_sn'] = i.identifierValue
# System name
esxi_host[esxi.name]['system_name'] = esxi.summary.config.product.fullName
# cpu Total number of cores
esxi_cpu_total = esxi.summary.hardware.numCpuThreads
# Total memory GB
esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024
# Get the total number of hard disks GB
esxi_disk_total = 0
for ds in esxi.datastore:
esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024
# The default configuration 4 nucleus 8G100G, Calculate the remaining allocable virtual machines based on this configuration
default_configure = {
'cpu': 4,
'memory': 8,
'disk': 100
}
esxi_host[esxi.name]['vm_host'] = []
vm_usage_total_cpu = 0
vm_usage_total_memory = 0
vm_usage_total_disk = 0
# Virtual machine information
for vm in esxi.vm:
host_info = {}
host_info['vm_name'] = vm.name
host_info['power_status'] = vm.runtime.powerState
host_info['cpu_total_kernel'] = str(vm.config.hardware.numCPU) + ' nucleus '
host_info['memory_total'] = str(vm.config.hardware.memoryMB) + 'MB'
host_info['system_info'] = vm.config.guestFullName
disk_info = ''
disk_total = 0
for d in vm.config.hardware.device:
if isinstance(d, vim.vm.device.VirtualDisk):
disk_total += d.capacityInKB / 1024 / 1024
disk_info += d.deviceInfo.label + ": " + str((d.capacityInKB) / 1024 / 1024) + ' GB' + ','
host_info['disk_info'] = disk_info
esxi_host[esxi.name]['vm_host'].append(host_info)
# Calculate the available capacity of the current host : Total amount - assigned
if host_info['power_status'] == 'poweredOn':
vm_usage_total_cpu += vm.config.hardware.numCPU
vm_usage_total_disk += disk_total
vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024)
esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu
esxi_memory_free = esxi_memory_total - vm_usage_total_memory
esxi_disk_free = esxi_disk_total - vm_usage_total_disk
esxi_host[esxi.name]['cpu_info'] = 'Total: %d nucleus , Free: %d nucleus ' % (esxi_cpu_total, esxi_cpu_free)
esxi_host[esxi.name]['memory_info'] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free)
esxi_host[esxi.name]['disk_info'] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free)
# Calculation cpu Memory The disk is allocated according to the minimum value of the default resource , That is, the current allocable resources
if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:
free_allocation_vm_host = 0
else:
free_allocation_vm_host = int(min(
[
esxi_cpu_free / default_configure['cpu'],
esxi_memory_free / default_configure['memory'],
esxi_disk_free / default_configure['disk']
]
))
esxi_host[esxi.name]['free_allocation_vm_host'] = free_allocation_vm_host
esxi_host['connect_status'] = True
return esxi_host
def write_to_db(self):
esxi_host = self.get_esxi_info()
# The connection fails
if not esxi_host['connect_status']:
return esxi_host
del esxi_host['connect_status']
for machine_ip in esxi_host:
# Physical machine information
esxi_host_dict = esxi_host[machine_ip]
# Virtual machine information
virtual_host = esxi_host[machine_ip]['vm_host']
del esxi_host[machine_ip]['vm_host']
obj = models.EsxiHost.objects.create(**esxi_host_dict)
obj.save()
for host_info in virtual_host:
host_info['management_host_id'] = obj.id
obj2 = models.virtualHost.objects.create(**host_info)
obj2.save()
be used for zabbix The alarm
import re
import sys
import time
import subprocess
from datetime import datetime
from io import StringIO
def main(domain):
f = StringIO()
comm = f"curl -Ivs https://{domain} --connect-timeout 10"
result = subprocess.getstatusoutput(comm)
f.write(result[1])
try:
m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n', f.getvalue(), re.S)
start_date = m.group(1)
expire_date = m.group(2)
common_name = m.group(3)
issuer = m.group(4)
except Exception as e:
return 999999999
# time String to time array
start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT")
start_date_st = time.strftime("%Y-%m-%d %H:%M:%S", start_date)
# datetime String to time array
expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT")
expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S")
# Days left
remaining = (expire_date-datetime.now()).days
return remaining
if __name__ == "__main__":
domain = sys.argv[1]
remaining_days = main(domain)
print(remaining_days)
This script is used to send today's weather forecast and future weather trend map to my wife , Now wechat has banned the website , I can't send it to wechat , I inform you through wechat , You need to bring your wife to wechat , If you are not interested, you can skip it .
# -*- coding: utf-8 -*-
import requests
import json
import datetime
def weather(city):
url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city
try:
data = requests.get(url).json()['data']
city = data['city']
ganmao = data['ganmao']
today_weather = data['forecast'][0]
res = " My wife is today {}\n Today's weather \n City : {:<10}\n Time : {:<10}\n The high temperature : {:<10}\n low temperature : {:<10}\n wind : {:<10}\n wind direction : {:<10}\n The weather : {:<10}\n\n The recent temperature trend chart will be sent later , Please check .\
".format(
ganmao,
city,
datetime.datetime.now().strftime('%Y-%m-%d'),
today_weather['high'].split()[1],
today_weather['low'].split()[1],
today_weather['fengli'].split('[')[2].split(']')[0],
today_weather['fengxiang'],today_weather['type'],
)
return {"source_data": data, "res": res}
except Exception as e:
return str(e)
```
+ Get weather forecast trends
```python
# -*- coding: utf-8 -*-
import matplotlib.pyplot as plt
import re
import datetime
def Future_weather_states(forecast, save_path, day_num=5):
'''
Show the trend chart of future weather forecast
:param forecast: Weather forecast data
:param day_num: In the next few days
:return: Trend chart
'''
future_forecast = forecast
dict={}
for i in range(day_num):
data = []
date = future_forecast[i]["date"]
date = int(re.findall("\d+",date)[0])
data.append(int(re.findall("\d+", future_forecast[i]["high"])[0]))
data.append(int(re.findall("\d+", future_forecast[i]["low"])[0]))
data.append(future_forecast[i]["type"])
dict[date] = data
data_list = sorted(dict.items())
date=[]
high_temperature = []
low_temperature = []
for each in data_list:
date.append(each[0])
high_temperature.append(each[1][0])
low_temperature.append(each[1][1])
fig = plt.plot(date,high_temperature,"r",date,low_temperature,"b")
current_date = datetime.datetime.now().strftime('%Y-%m')
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
plt.xlabel(current_date)
plt.ylabel("℃")
plt.legend([" The high temperature ", " low temperature "])
plt.xticks(date)
plt.title(" Temperature trends in recent days ")
plt.savefig(save_path)
```
+ Send to enterprise wechat
```python
# -*- coding: utf-8 -*-
import requests
import json
class DLF:
def __init__(self, corpid, corpsecret):
self.url = "https://qyapi.weixin.qq.com/cgi-bin"
self.corpid = corpid
self.corpsecret = corpsecret
self._token = self._get_token()
def _get_token(self):
'''
Get enterprise wechat API Interface access_token
:return:
'''
token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret)
try:
res = requests.get(token_url).json()
token = res['access_token']
return token
except Exception as e:
return str(e)
def _get_media_id(self, file_obj):
get_media_url = self.url + "/media/upload?access_token={}&type=file".format(self._token)
data = {"media": file_obj}
try:
res = requests.post(url=get_media_url, files=data)
media_id = res.json()['media_id']
return media_id
except Exception as e:
return str(e)
def send_text(self, agentid, content, touser=None, toparty=None):
send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
send_data = {
"touser": touser,
"toparty": toparty,
"msgtype": "text",
"agentid": agentid,
"text": {
"content": content
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
def send_image(self, agentid, file_obj, touser=None, toparty=None):
media_id = self._get_media_id(file_obj)
send_msg_url = self.url + "/message/send?access_token=%s" % (self._token)
send_data = {
"touser": touser,
"toparty": toparty,
"msgtype": "image",
"agentid": agentid,
"image": {
"media_id": media_id
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
+ main Script
# -*- coding: utf-8 -*-
from plugins.weather_forecast import weather
from plugins.trend_chart import Future_weather_states
from plugins.send_wechat import DLF
import os
# Enterprise wechat related information
corpid = "xxx"
corpsecret = "xxx"
agentid = "xxx"
# Weather forecast trend chart preservation path
_path = os.path.dirname(os.path.abspath(__file__))
save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg')
# Get weather forecast information
content = weather(" Daxing ")
# Send a text message
dlf = DLF(corpid, corpsecret)
dlf.send_text(agentid=agentid, content=content['res'], toparty='1')
# Generate weather forecast trend map
Future_weather_states(content['source_data']['forecast'], save_path)
# Send a picture message
file_obj = open(save_path, 'rb')
dlf.send_image(agentid=agentid, toparty='1', file_obj=file_obj)
adopt hotcopy Conduct SVN Full backup , Backup retention 7 God .
#!/bin/bash
# Filename : svn_backup_repos.sh
# Date : 2020/12/14
# Author : JakeTian
# Email : [email protected]***.com
# Crontab : 59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/null 2>&1
# Notes : Add script to crontab in , Regular execution every day
# Description: SVN Full backup
set -e
SRC_PATH="/opt/svndata"
DST_PATH="/data/svnbackup"
LOG_FILE="$DST_PATH/logs/svn_backup.log"
SVN_BACKUP_C="/bin/svnadmin hotcopy"
SVN_LOOK_C="/bin/svnlook youngest"
TODAY=$(date +'%F')
cd $SRC_PATH
ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name 'httpd' -a ! -name 'bak' | tr -d './')
# Create backup directory , Backup script log directory
test -d $DST_PATH || mkdir -p $DST_PATH
test -d $DST_PATH/logs || mkdir $DST_PATH/logs
test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY
# Backup repos file
for repo in $ALL_REPOS
do
$SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo
# Determine if the backup is complete
if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then
echo "$TODAY: $repo Backup Success" >> $LOG_FILE
else
echo "$TODAY: $repo Backup Fail" >> $LOG_FILE
fi
done
# # Backup user password file and permission file
cp -p authz access.conf $DST_PATH/$TODAY
# Log file dump
mv $LOG_FILE $LOG_FILE-$TODAY
# Delete the seven day old backup
seven_days_ago=$(date -d "7 days ago" +'%F')
rm -rf $DST_PATH/$seven_days_ago
be used for Zabbix monitor Linux System users (shell by /bin/bash and /bin/sh) Password expired , The password is still valid 7 Day trigger plus auto discovery users .
#!/bin/bash
diskarray=(`awk -F':' '$NF ~ /\/bin\/bash/||/\/bin\/sh/{print $1}' /etc/passwd`)
length=${#diskarray[@]}
printf "{\n"
printf '\t'"\"data\":["
for ((i=0;i<$length;i++))
do
printf '\n\t\t{'
printf "\"{#USER_NAME}\":\"${diskarray[$i]}\"}"
if [ $i -lt $[$length-1] ];then
printf ','
fi
done
printf "\n\t]\n"
printf "}\n"
Check for user password expiration
#!/bin/bash
export LANG=en_US.UTF-8
SEVEN_DAYS_AGO=$(date -d '-7 day' +'%s')
user="$1"
# take Sep 09, 2018 The time of the format is converted into unix Time
expires_date=$(sudo chage -l $user | awk -F':' '/Password expires/{print $NF}' | sed -n 's/^ //p')
if [[ "$expires_date" != "never" ]];then
expires_date=$(date -d "$expires_date" +'%s')
if [ "$expires_date" -le "$SEVEN_DAYS_AGO" ];then
echo "1"
else
echo "0"
fi
else
echo "0"
fi
adopt rsync How to synchronize yum, adopt nginx Only do http yum Site ;
however centos6 I've been unable to use my image recently , It seems to be banned in China , If you find the right address to change .
#!/bin/bash
# to update yum Mirror image
RsyncCommand="rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000"
DIR="/app/yumData"
LogDir="$DIR/logs"
Centos6Base="$DIR/Centos6/x86_64/Base"
Centos7Base="$DIR/Centos7/x86_64/Base"
Centos6Epel="$DIR/Centos6/x86_64/Epel"
Centos7Epel="$DIR/Centos7/x86_64/Epel"
Centos6Salt="$DIR/Centos6/x86_64/Salt"
Centos7Salt="$DIR/Centos7/x86_64/Salt"
Centos6Update="$DIR/Centos6/x86_64/Update"
Centos7Update="$DIR/Centos7/x86_64/Update"
Centos6Docker="$DIR/Centos6/x86_64/Docker"
Centos7Docker="$DIR/Centos7/x86_64/Docker"
Centos6Mysql5_7="$DIR/Centos6/x86_64/Mysql/Mysql5.7"
Centos7Mysql5_7="$DIR/Centos7/x86_64/Mysql/Mysql5.7"
Centos6Mysql8_0="$DIR/Centos6/x86_64/Mysql/Mysql8.0"
Centos7Mysql8_0="$DIR/Centos7/x86_64/Mysql/Mysql8.0"
MirrorDomain="rsync://rsync.mirrors.ustc.edu.cn"
# Create a directory if it doesn't exist
check_dir(){
for dir in $*
do
test -d $dir || mkdir -p $dir
done
}
# Check rsync Synchronization results
check_rsync_status(){
if [ $? -eq 0 ];then
echo "rsync success" >> $1
else
echo "rsync fail" >> $1
fi
}
check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0
# Base yumrepo
#$RsyncCommand "$MirrorDomain"/repo/centos/6/os/x86_64/ $Centos6Base >> "$LogDir/centos6Base.log" 2>&1
# check_rsync_status "$LogDir/centos6Base.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/os/x86_64/ $Centos7Base >> "$LogDir/centos7Base.log" 2>&1
check_rsync_status "$LogDir/centos7Base.log"
# Epel yumrepo
# $RsyncCommand "$MirrorDomain"/repo/epel/6/x86_64/ $Centos6Epel >> "$LogDir/centos6Epel.log" 2>&1
# check_rsync_status "$LogDir/centos6Epel.log"
$RsyncCommand "$MirrorDomain"/repo/epel/7/x86_64/ $Centos7Epel >> "$LogDir/centos7Epel.log" 2>&1
check_rsync_status "$LogDir/centos7Epel.log"
# SaltStack yumrepo
# $RsyncCommand "$MirrorDomain"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> "$LogDir/centos6Salt.log" 2>&1
# ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest
# check_rsync_status "$LogDir/centos6Salt.log"
$RsyncComman "$MirrorDomain"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> "$LogDir/centos7Salt.log" 2>&1
check_rsync_status "$LogDir/centos7Salt.log"
# ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest
# Docker yumrepo
$RsyncCommand "$MirrorDomain"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> "$LogDir/centos7Docker.log" 2>&1
check_rsync_status "$LogDir/centos7Docker.log"
# centos update yumrepo
# $RsyncCommand "$MirrorDomain"/repo/centos/6/updates/x86_64/ $Centos6Update >> "$LogDir/centos6Update.log" 2>&1
# check_rsync_status "$LogDir/centos6Update.log"
$RsyncCommand "$MirrorDomain"/repo/centos/7/updates/x86_64/ $Centos7Update >> "$LogDir/centos7Update.log" 2>&1
check_rsync_status "$LogDir/centos7Update.log"
# mysql 5.7 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ "$Centos6Mysql5_7" >> "$LogDir/centos6Mysql5.7.log" 2>&1
# check_rsync_status "$LogDir/centos6Mysql5.7.log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ "$Centos7Mysql5_7" >> "$LogDir/centos7Mysql5.7.log" 2>&1
check_rsync_status "$LogDir/centos7Mysql5.7.log"
# mysql 8.0 yumrepo
# $RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ "$Centos6Mysql8_0" >> "$LogDir/centos6Mysql8.0.log" 2>&1
# check_rsync_status "$LogDir/centos6Mysql8.0.log"
$RsyncCommand "$MirrorDomain"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ "$Centos7Mysql8_0" >> "$LogDir/centos7Mysql8.0.log" 2>&1
check_rsync_status "$LogDir/centos7Mysql8.0.log"
#!/bin/bash
# Physics cpu Number
physical_cpu_count=$(egrep 'physical id' /proc/cpuinfo | sort | uniq | wc -l)
# Single Physics cpu Check the number
physical_cpu_cores=$(egrep 'cpu cores' /proc/cpuinfo | uniq | awk '{print $NF}')
# Total number of cores
total_cpu_cores=$((physical_cpu_count*physical_cpu_cores))
# One minute, respectively 、 Five minutes 、 The 15 minute load threshold , If one of them exceeds the threshold, it will trigger
one_min_load_threshold="$total_cpu_cores"
five_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.8"}')
fifteen_min_load_threshold=$(awk 'BEGIN {print '"$total_cpu_cores"' * "0.7"}')
# They are minutes 、 Five minutes 、 15 minute load average
one_min_load=$(uptime | awk '{print $(NF-2)}' | tr -d ',')
five_min_load=$(uptime | awk '{print $(NF-1)}' | tr -d ',')
fifteen_min_load=$(uptime | awk '{print $NF}' | tr -d ',')
# Get current cpu Memory disk io Information , And write the log file
# If you need to send a message or call another , Please write your own function
get_info(){
log_dir="cpu_high_script_log"
test -d "$log_dir" || mkdir "$log_dir"
ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > "$log_dir"/cpu_top10.log
ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > "$log_dir"/mem_top10.log
iostat -dx 1 10 > "$log_dir"/disk_io_10.log
}
export -f get_info
echo "$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold" | \
awk '{ if ($1>=$2 || $3>=$4 || $5>=$6) system("get_info") }'
above , That's all I'm going to share today .
I hope you can apply what you have learned through these cases , Combined with their own actual scene to use , So as to improve their work efficiency .
Reference resources :IDEA Conn
When reviewing today, I saw a
報錯內容如下:TemplateSyntaxError at