赞
踩
DEF_SYSLOG_SWITCH_HUAWEI.py
华为交换机日志解析示例
- # -*- coding: utf8 -*-
- import time
- from DEF_COLOR import * ## 终端显示颜色
-
-
- def 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):
- #时间文本格式 = '%Y-%m-%d %H:%M:%S'
- 时间类 = time.localtime(时间戳)
- 时间文本 = time.strftime(时间文本格式, 时间类)
- return(时间文本)
-
- ## 适用于需要在SYSLOG的时间上加上时区才能和本地时间相等的情况
- def 日志时间转时间戳(时间文本, 加时区):
- 时间戳 = time.mktime(time.strptime(时间文本, '%Y-%m-%dT%H:%M:%S'))+(3600*加时区)
- return(时间戳)
-
- ## 2022-11-28T19:00:51+08:00 主机名 %%01SHELL/5/LOGOUT(s)[3918]: The user succeeded in logging out of VTY0. (UserType=SSH, UserName=xxx, Ip=xxx.xxx.xxx.xxx, VpnName=)
- def LOG_TYPE(LINE_TEXT):
- A = LINE_TEXT.find('%%')
- if A != -1:
- #打印_黄(LINE_TEXT[A:])
- B = LINE_TEXT.find('(', A)
- if B != -1:
- #打印_绿(LINE_TEXT[B:])
- C = LINE_TEXT.find(' ', B) ## 找日志正文开始位置标志
- if C != -1:
- #打印_蓝(LINE_TEXT[C:])
- #打印_青(LINE_TEXT[A+4:B])
- SP = LINE_TEXT[A+4:B].split('/')
- return((SP[0], SP[2], C+1))
- else:
- 打印_红(f"找位置标志' '失败(日志正文开始位) {LINE_TEXT}")
- return((' -1', LINE_TEXT[A:B], -1))
- else:
- 打印_红(f"找位置标志'('失败 {LINE_TEXT}")
- return((LINE_TEXT[:A], '(-1', -1))
- else:
- A2 = LINE_TEXT.find(': OID')
- if A2 != -1:
- SP = LINE_TEXT[:A2].split()[-1].split('/')
- return((SP[0], SP[2], A2+1))
- else:
- 打印_红(f"找位置标志'%%'失败且查找': OID'也失败 {LINE_TEXT}")
- return(('%-1', LINE_TEXT, -1))
-
- # %%01ACLE/4/ACLLOG(l)[xxx]: Acl 3997 deny GigabitEthernet0/0/10 xxxx-xxxx-xxxx -> xxxx-xxxx-xxxx udp x.x.x.x(63877) -> 239.255.255.250(1900) (1 packet).
- # %%01ACLE/4/ACLLOG(l)[xxx]: Acl 3997 deny GigabitEthernet0/0/10 xxxx-xxxx-xxxx -> xxxx-xxxx-xxxx igmp x.x.x.x -> 224.0.0.22 (4 packets).
- # %%01ACLE/4/ACLLOG(l)[xxx]: Acl deny GigabitEthernet0/0/14 xxxx-xxxx-xxxx -> xxxx-xxxx-xxxx tcp x.x.x.x(60559) -> x.x.x.x(80) (1 packet).
- def ACLE_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LOG_MSG):
- SP = LOG_MSG.split()
- if len(SP) == 11:
- SP = ['X'] + SP # 偶尔会丢个 acl id 补齐列表长度 'GigabitEthernet0/0/14 Acl'
- ACL_ID = SP[1]
- ACL_ST = SP[2]
- ACL_IF = SP[3]
- SMAC = SP[4]
- DMAC = SP[6]
- PT = SP[7]
- SIP = SP[8].split('(')[0] # 不记录源端口号
- DIP = SP[10].split('(')[0] # 不记录目的端口号
- COUNT = int(SP[11][1:])
- K = (ACL_IF, ACL_ST, PT, f"{SIP}({SMAC})", f"{DIP}({DMAC})", ACL_ID)
- if K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE']:
- D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K] += COUNT
- else:
- D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K] = COUNT
-
- # SACL/4/ACLLOG(l)[9487]: Acl 3996 applied Interface GigabitEthernet0/0/6 permit (15591 packets).
- # SACL/4/ACLLOG(l)[6146]: Acl 3992 applied Interface permit (2 packets).
- # SACL/4/ACLLOG(l)[6112]: Acl 3992 applied Interface GigabitEthernet0/0/14 permit (174757578 packets).
- def SACL_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LOG_MSG):
- try:
- SP = LOG_MSG.split()
- if len(SP) == 7:
- ACL_ID = SP[1]
- ACL_IF = SP[4]
- ACL_ST = SP[5]
- COUNT = int(SP[6][1:])
- elif len(SP) == 6:
- ACL_ID = SP[1]
- ACL_IF = 'un'
- ACL_ST = SP[4]
- COUNT = int(SP[5][1:])
- else:
- 打印_红(f"ERROR LOG_MSG={LOG_MSG} SP={SP} len(SP)={len(SP)}")
-
- K = (ACL_IF, ACL_ST, ACL_ID)
- if K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL']:
- D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K] += COUNT
- else:
- D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K] = COUNT
- except Exception as e:
- 打印_红(f"ERROR {e} LOG_MSG={LOG_MSG} SP={SP} len(SP)={len(SP)}")
- else:
- pass
-
- ## 解析SYSLOG日志一行内容
- def LINE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT, TIME_LOCAL):
- TP1, TP2, X = LOG_TYPE(LINE_TEXT)
- if TP1 == 'ACLE':
- if TP2 == 'ACLLOG':
- #打印_黄(LINE_TEXT[X:-1])
- ACLE_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT[X:-10])
- else:
- return((1, TP1, TP2)) ## 终止,不做解析,返回标识代码及日志类型信息
- elif TP1 == 'SACL':
- if TP2 == 'ACLLOG':
- SACL_ACLLOG(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT[X:-10])
- else:
- return((1, TP1, TP2))
- elif TP1 == 'SHELL':
- if TP2 == 'LOGIN': # The user succeeded in logging in to VTY0. (UserType=SSH, UserName=xxx, AuthenticationMethod="Local-user", Ip=xxx.xxx.xxx.xxx, VpnName=)
- SP = LINE_TEXT[X:-1].split(',')
- UserType = SP[0].split('=')[-1]
- UserName = SP[1].split('=')[-1]
- IP = SP[3].split('=')[-1]
- D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'].append((TIME_LOCAL, 'LOGIN', UserType, UserName, IP))
- elif TP2 == 'LOGOUT': # The user succeeded in logging out of VTY0. (UserType=SSH, UserName=xxx, Ip=xxx.xxx.xxx.xxx, VpnName=)
- #print(TP1,TP2,LINE_TEXT[X:-1])
- SP = LINE_TEXT[X:-1].split(',')
- UserType = SP[0].split('=')[-1]
- UserName = SP[1].split('=')[-1]
- IP = SP[2].split('=')[-1]
- D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'].append((TIME_LOCAL, 'LOGOUT', UserType, UserName, IP))
- elif TP2 in ('DISPLAY_CMDRECORD', 'CMDRECORD'): # %%01SHELL/6/DISPLAY_CMDRECORD(s)[1869]: Recorded display command information. (Task=VT0, Ip=x.x.x.x, VpnName=, User=xx, AuthenticationMethod="Local-user", Command="display stp brief")
- SP = LINE_TEXT[X:].split(',')
- IP = SP[1].split('=')[-1]
- USER = SP[3].split('=')[-1]
- CMD = SP[5].split('=')[-1][:-2]
- #打印_青(f"(SHELL, {TP2}) {TIME_LOCAL} {IP} {USER} {CMD}")
- D_SYSLOG_SWITCH_HUAWEI['L_CMD'].append((TIME_LOCAL, IP, USER, CMD))
- elif TP2 == 'CMDCONFIRM_UNIFORMRECORD': # %%01SHELL/6/CMDCONFIRM_UNIFORMRECORD(s)[1867]: Record command information. (Task=VT0, IP=x.x.x.x, VpnName=, User=xx, Command="", PromptInfo="The password needs to be changed. Change now? [Y/N]:", UserInput=N)
- SP = LINE_TEXT[X:].split(',')
- IP = SP[1].split('=')[-1]
- USER = SP[3].split('=')[-1]
- PromptInfo = SP[5].split('=')[-1]
- UserInput = SP[6].split('=')[-1][:-2]
- CMD = f"{PromptInfo} {UserInput}"
- #打印_青(f"(SHELL, {TP2}) {TIME_LOCAL} {IP} {USER} {CMD}")
- D_SYSLOG_SWITCH_HUAWEI['L_CMD'].append((TIME_LOCAL, IP, USER, CMD))
- else:
- return((1, TP1, TP2))
- elif TP1 == 'IFPDT':
- if TP2 == 'PKT_OUTDISCARD_ABNL': ## 端口【出】方向丢包达到报警阈值
- SP = LINE_TEXT[X:].split(',')
- #for i in SP:
- # print(i)
- SW_IF = SP[0].split('=')[-1]
- SW_DROP = SP[1].split('=')[-1]
- #print(SW_IF, SW_DROP, 'OUT')
- D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'OUT', '丢包超过阈值'))
- elif TP2 == 'PKT_OUTDISCARD_NL': ## 端口【出】方向恢复正常
- SP = LINE_TEXT[X:].split(',')
- SW_IF = SP[0].split('=')[-1]
- SW_DROP = SP[1].split('=')[-1]
- #print(SW_IF, SW_DROP, 'OUT')
- D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'OUT', '恢复'))
- elif TP2 == 'PKT_INDISCARD_ABNL': ## 端口【入】方向丢包达到报警阈值
- SP = LINE_TEXT[X:].split(',')
- #for i in SP:
- # print(i)
- SW_IF = SP[0].split('=')[-1]
- SW_DROP = SP[1].split('=')[-1]
- #print(SW_IF, SW_DROP, 'IN')
- D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'IN', '丢包超过阈值'))
- elif TP2 == 'PKT_INDISCARD_NL': ## 端口【入】方向恢复正常
- SP = LINE_TEXT[X:].split(',')
- SW_IF = SP[0].split('=')[-1]
- SW_DROP = SP[1].split('=')[-1]
- #print(SW_IF, SW_DROP, 'IN')
- D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'].append((TIME_LOCAL, SW_IF, SW_DROP, 'IN', '恢复'))
- elif TP2 == 'IF_STATE':
- SP = LINE_TEXT[X:].split()
- IF_ID = SP[1]
- IF_ST = SP[5]
- if IF_ID not in D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY']:
- D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][IF_ID] = []
- D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][IF_ID].append((TIME_LOCAL, IF_ST))
- else:
- return((1, TP1, TP2))
- elif TP1 == 'SECE':
- D_SYSLOG_SWITCH_HUAWEI['L_SECE'].append(LINE_TEXT)
- elif TP1 == 'MSTP':
- D_SYSLOG_SWITCH_HUAWEI['L_STP'].append((TIME_LOCAL, LINE_TEXT[X:-1]))
- elif TP1 == 'IFNET':
- print('IFNET', TP2, TIME_LOCAL, LINE_TEXT[X:-1])
- elif TP1 == 'IFADP':
- print('IFADP', TP2, TIME_LOCAL, LINE_TEXT[X:-1])
- elif TP1 == 'SSH':
- if TP2 == 'SSH_TRANS_FILE_FINISH':
- D_SYSLOG_SWITCH_HUAWEI['L_FTP'].append((TIME_LOCAL, LINE_TEXT[X:-1]))
- else:
- return((1, TP1, TP2))
- elif TP1 == 'VTY':
- print('VTY', LINE_TEXT)
- else:
- return((1, TP1, TP2))
- return((0, TP1, TP2))
-
- ## 解析SYSLOG日志文件
- def FILE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX):
- #print("RUN", FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)
- TIME_S = time.time()
- TOT_N = 0
- SELECT_N = 0
- 解析数量 = 0
- for LINE_BYTES in open(FILE_PATH, mode='br'):
- TOT_N += 1
- try:
- LINE_TEXT = LINE_BYTES.decode('UTF-8')
- except Exception as e:
- 打印_红(f"ERROR {TOT_N} LINE_BYTES={LINE_BYTES} {e}")
- else:
- TIME_UTC = LINE_TEXT[:19]
- #TIME_STAMP = time.mktime(time.strptime(TIME_UTC, '%Y-%m-%dT%H:%M:%S'))+28800
- TIME_STAMP = 日志时间转时间戳(TIME_UTC, 8)
- TIME_LOCAL = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(TIME_STAMP))
- if TIME_STAMP_MIN < TIME_STAMP < TIME_STAMP_MAX:
- SELECT_N += 1
- #print(TIME_UTC, TIME_LOCAL, 'RUN')
- ST,TP1,TP2 = LINE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, LINE_TEXT, TIME_LOCAL)
- if ST != 0:
- #打印_红(f"{TOT_N} 未知LOG")
- #break
- if (TP1,TP2) in D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER']:
- D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'][(TP1,TP2)] += 1
- else:
- D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'][(TP1,TP2)] = 1
- else:
- 解析数量 += 1
- else:
- #print(TIME_UTC, TIME_LOCAL, 'PASS')
- pass
- TIME_RUN = time.time() - TIME_S
- #打印_绿(f"{FILE_PATH} 完成 处理日志数量 {SELECT_N}/{TOT_N} 筛选数/总日志数 用时={TIME_RUN:.2f}秒")
- return((FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN))
-
-
-
-
- def SYSLOG_SWITCH_HUAWEI(FILE_PATH, SHOW=0):
- D_SYSLOG_SWITCH_HUAWEI = {}
- D_SYSLOG_SWITCH_HUAWEI['D_ACL'] = {'ACLE':{}, 'SACL':{}} # ACL规则匹配记录
- D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'] = {} # 接口物理断开记录
- D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK'] = {} # 接口链路断开记录
- D_SYSLOG_SWITCH_HUAWEI['L_CMD'] = [] # 用户执行命令记录
- D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'] = [] # 登录信息
- D_SYSLOG_SWITCH_HUAWEI['L_STP'] = [] # 生成树信息
- D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'] = [] # 接口丢包
- D_SYSLOG_SWITCH_HUAWEI['L_SECE'] = [] # 安全事件
- D_SYSLOG_SWITCH_HUAWEI['L_FTP'] = []
- D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'] = {} # 未解析日志记录
-
- TIME_STAMP_MIN = 0
- TIME_STAMP_MAX = time.time() # 默认为当前时间戳
- R = FILE_HUAWEI(D_SYSLOG_SWITCH_HUAWEI, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)
- if SHOW == 1:
- 打印_黄("端口丢包 (D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP'])")
- for TIME, SW_IF, SW_DROP, IN_OUT, PS in D_SYSLOG_SWITCH_HUAWEI['L_IF_DROP']:
- if PS == '恢复':
- 打印_绿(f" {TIME} {SW_IF} {SW_DROP} {IN_OUT:3s} {PS}")
- else:
- 打印_红(f" {TIME} {SW_IF} {SW_DROP} {IN_OUT:3s} {PS}")
-
- 打印_黄("设备安全 (D_SYSLOG_SWITCH_HUAWEI['L_SECE'])")
- for i in D_SYSLOG_SWITCH_HUAWEI['L_SECE']:
- 打印_红(f" {i}")
-
- 打印_黄("备份配置 (D_SYSLOG_SWITCH_HUAWEI['L_FTP'])")
- for i in D_SYSLOG_SWITCH_HUAWEI['L_FTP']:
- 打印_青(f" {i}")
-
- 打印_黄("生成树 (D_SYSLOG_SWITCH_HUAWEI['L_STP'])")
- for i in D_SYSLOG_SWITCH_HUAWEI['L_STP']:
- 打印_红(f" {i}")
-
- 打印_黄("访问规则 (D_SYSLOG_SWITCH_HUAWEI['D_ACL'])")
- for K1 in D_SYSLOG_SWITCH_HUAWEI['D_ACL']:
- if K1 == 'ACLE':
- L_K2 = [i for i in D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1]]
- L_K2.sort()
- for K2 in L_K2:
- ACL_IF, ACL_ST, PT, SIPSMAC, DIPDMAC, ACL_ID = K2
- if ACL_ST == 'deny':
- 打印_红(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ID:4s} {ACL_ST:5s} {PT:4s} {SIPSMAC:31s} -> {DIPDMAC}")
- else:
- 打印_绿(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ID:4s} {ACL_ST:5s} {PT:4s} {SIPSMAC:31s} -> {DIPDMAC}")
- elif K1 == 'SACL':
- L_K2 = [i for i in D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1]]
- L_K2.sort()
- for K2 in L_K2:
- ACL_IF, ACL_ST, ACL_ID = K2
- if ACL_ST == 'deny':
- 打印_红(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ST:5s}")
- else:
- 打印_绿(f"{D_SYSLOG_SWITCH_HUAWEI['D_ACL'][K1][K2]:8} {ACL_IF:21} {ACL_ST:5s}")
-
- 打印_黄("接口物理状态 (D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'])")
- for K in D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY']:
- print(f" {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][K])}")
- for TIME_LOCAL, IF_ST in D_SYSLOG_SWITCH_HUAWEI['D_IF_PHY'][K]:
- if IF_ST == 'DOWN':
- 打印_红(f" {TIME_LOCAL} DOWN")
- else:
- 打印_绿(f" {TIME_LOCAL} {IF_ST}")
-
- #打印_黄("接口逻辑状态 (D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK'])")
- #for K in D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK']:
- # print(f" {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_HUAWEI['D_IF_LINK'][K])}")
-
- 打印_黄("登录登出详细 (D_SYSLOG_SWITCH_HUAWEI['L_LOGIN'])")
- for TIME_LOCAL, ST, UserType, UserName, IP in D_SYSLOG_SWITCH_HUAWEI['L_LOGIN']:
- if ST == 'LOGIN':
- 打印_青(f" {TIME_LOCAL} {IP:15s} {ST:6s} {UserType} {UserName}")
- elif ST == 'LOGOUT':
- 打印_蓝(f" {TIME_LOCAL} {IP:15s} {ST:6s} {UserType} {UserName}")
- else:
- 打印_红(f" {TIME_LOCAL} {IP:15s} {ST:6s} {UserType} {UserName}")
-
- 打印_黄("操作命令 (D_SYSLOG_SWITCH_HUAWEI['L_CMD'])")
- for TIME_LOCAL, IP, USER, CMD in D_SYSLOG_SWITCH_HUAWEI['L_CMD']:
- 打印_青(f" {TIME_LOCAL} {IP:15s} {USER:8s} {CMD}")
-
- 打印_黄("忽略解析日志 (D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'])")
- L_K = [i for i in D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER']]
- L_K.sort()
- for K in L_K:
- print(f"{D_SYSLOG_SWITCH_HUAWEI['D_LOG_OTHER'][K]:5} {K}")
-
- FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN = R
- 时间文本格式 = '%Y-%m-%d %H:%M:%S'
- 打印_红(f"日志路径 : {FILE_PATH}")
- 打印_青(f"开始/结束: {时间戳_2_时间文本(TIME_STAMP_MIN, 时间文本格式)} / {时间戳_2_时间文本(TIME_STAMP_MAX, 时间文本格式)}")
- 打印_绿(f"解析/筛选/总数: {解析数量}/{SELECT_N}/{TOT_N} {(解析数量/TOT_N)*100:.2f}(%)/{(SELECT_N/TOT_N)*100:.2f}(%)/100(%) 用时: {TIME_RUN:.2f}秒")
-
- ## 返回需要的信息(ACL放行或阻止统计)
- D_RETURN = {'PERMIT':{}, 'DENY':{}, 'OTHER':{}}
-
- #print(D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'])
- for K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE']:
- ACL_IF = K[0]
- ACL_ST = K[1]
- ACL_ID = K[5]
- KEY_NEW = f"{ACL_IF} {ACL_ID}"
- if ACL_ST.upper() == 'PERMIT':
- if KEY_NEW not in D_RETURN['PERMIT']:
- D_RETURN['PERMIT'][KEY_NEW] = 0
- D_RETURN['PERMIT'][KEY_NEW] += D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K]
- elif ACL_ST.upper() == 'DENY':
- if KEY_NEW not in D_RETURN['DENY']:
- D_RETURN['DENY'][KEY_NEW] = 0
- D_RETURN['DENY'][KEY_NEW] += D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K]
- else:
- if KEY_NEW not in D_RETURN['OTHER']:
- D_RETURN['OTHER'][KEY_NEW] = 0
- D_RETURN['OTHER'][KEY_NEW] += D_SYSLOG_SWITCH_HUAWEI['D_ACL']['ACLE'][K]
-
- #print(D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'])
- for K in D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL']:
- ACL_IF, ACL_ST, ACL_ID = K
- KEY_NEW = f"{ACL_IF} {ACL_ID}"
- if ACL_ST.upper() == 'PERMIT':
- D_RETURN['PERMIT'][KEY_NEW] = D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K]
- elif ACL_ST.upper() == 'DENY':
- D_RETURN['DENY'][KEY_NEW] = D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K]
- else:
- D_RETURN['OTHER'][KEY_NEW] = D_SYSLOG_SWITCH_HUAWEI['D_ACL']['SACL'][K]
-
- return(D_RETURN)
-
- if __name__ == '__main__':
- FILE_PATH = '华为交换机日志文件路径'
- SHOW = 1 # 查看重要日志信息
- D_ACL_INFO = SYSLOG_SWITCH_HUAWEI(FILE_PATH, SHOW)
- print(D_ACL_INFO)

DEF_SYSLOG_SWITCH_H3C.py
华三交换机日志示例
- # -*- coding: utf8 -*-
- import time
- from DEF_COLOR import * ## 终端显示颜色
-
- def 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):
- #时间文本格式 = '%Y-%m-%d %H:%M:%S'
- 时间类 = time.localtime(时间戳)
- 时间文本 = time.strftime(时间文本格式, 时间类)
- return(时间文本)
-
- def 日志时间转时间戳(时间文本):
- 时间戳 = time.mktime(time.strptime(时间文本, '%Y-%m-%dT%H:%M:%S'))
- return(时间戳)
-
- def 参数时间转时间戳(TEXT):
- SP_DATE = TEXT.split('T')
- if len(SP_DATE) == 2:
- DATE = SP_DATE[0]
- SP_TIME = SP_DATE[1].split(':')
- if len(SP_TIME) == 1:
- M_TIME = '%H'
- elif len(SP_TIME) == 2:
- M_TIME = '%H:%M'
- elif len(SP_TIME) == 3:
- M_TIME = '%H:%M:%S'
- DATE_TIME = TEXT
- else:
- DATE = time.strftime('%Y-%m-%d')
- SP_TIME = TEXT.split(':')
- if len(SP_TIME) == 1:
- M_TIME = '%H'
- elif len(SP_TIME) == 2:
- M_TIME = '%H:%M'
- elif len(SP_TIME) == 3:
- M_TIME = '%H:%M:%S'
- DATE_TIME = f"{DATE}T{TEXT}"
- #print(f"TEXT={TEXT}")
- #print(f"DATE={DATE}")
- #print(f"M_TIME={M_TIME}")
- #print(f"DATE_TIME={DATE_TIME}")
- M_DATE_TIME = f"%Y-%m-%dT{M_TIME}"
- TIME_STAMP = time.mktime(time.strptime(DATE_TIME, M_DATE_TIME))
- #print(f"TIME_STAMP={TIME_STAMP}")
- return(TIME_STAMP)
-
- def 转时间戳(TEXT):
- SP_DATE = TEXT.split('T')
- if len(SP_DATE) == 2:
- DATE = SP_DATE[0]
- SP_TIME = SP_DATE[1].split(':')
- if len(SP_TIME) == 1:
- M_TIME = '%H'
- elif len(SP_TIME) == 2:
- M_TIME = '%H:%M'
- elif len(SP_TIME) == 3:
- M_TIME = '%H:%M:%S'
- DATE_TIME = TEXT
- else:
- DATE = time.strftime('%Y-%m-%d')
- SP_TIME = TEXT.split(':')
- if len(SP_TIME) == 1:
- M_TIME = '%H'
- elif len(SP_TIME) == 2:
- M_TIME = '%H:%M'
- elif len(SP_TIME) == 3:
- M_TIME = '%H:%M:%S'
- DATE_TIME = f"{DATE}T{TEXT}"
- #print(f"TEXT={TEXT}")
- #print(f"DATE={DATE}")
- #print(f"M_TIME={M_TIME}")
- print(f"DATE_TIME={DATE_TIME}")
- M_DATE_TIME = f"%Y-%m-%dT{M_TIME}"
- TIME_STAMP = time.mktime(time.strptime(DATE_TIME, M_DATE_TIME))
- #print(f"TIME_STAMP={TIME_STAMP}")
- return(TIME_STAMP)
-
- def LOG_TYPE(LINE_TEXT):
- A = LINE_TEXT.find('%%') # 主机名 %%10ACL/6/PFILTER_STATIS_INFO: GigabitEthernet1/0/25 (outbound): Packet-filter
- if A != -1:
- #打印_黄(LINE_TEXT[A:])
- B = LINE_TEXT.find(':', A) # 找日志类型结尾标识符号
- if B != -1:
- #打印_绿(LINE_TEXT[B:])
- SP = LINE_TEXT[A+4:B].split('/') # CFGMAN/4/TRAP(t)
- if SP[2][-1] == ')':
- IDX = SP[2].find('(')
- if IDX != -1:
- return((SP[0], SP[2][:IDX], B+1))
- else:
- return((SP[0], SP[2], B+1))
- else:
- return((SP[0], SP[2], B+1))
- else:
- 打印_红(f"找位置标志':'失败(日志类型结尾标识) {LINE_TEXT}")
- return((A, ':-1', -1))
- else:
- 打印_红(f"找位置标志'%%' {LINE_TEXT}")
- return(('%%-1', LINE_TEXT, -1))
-
- ##GigabitEthernet1/0/3 (outbound): Packet-filter 2205 rule 97 deny source 192.168.0.0 0.0.255.255 logging 14 packet(s).
- ##GigabitEthernet1/0/3 (inbound): Packet-filter name in_log rule 40 permit ip destination 192.168.0.0 0.0.255.255 logging 20 packet(s).
- def ACL_PFILTER_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LOG_MSG):
- try:
- IDX_1 = LOG_MSG.index(': Packet-filter ')
- IDX_2 = LOG_MSG.index('logging')
- IDX_3 = LOG_MSG.index(' packet')
- except:
- 打印_红(f"ERR {LOG_MSG}")
- else:
- SW_IF = LOG_MSG[:IDX_1]
- if LOG_MSG[IDX_1+16:IDX_1+20] == 'name':
- RULE = LOG_MSG[IDX_1+21:IDX_2-1]
- else:
- RULE = LOG_MSG[IDX_1+16:IDX_2-1]
- NCOUNT = int(LOG_MSG[IDX_2+8:IDX_3])
- #打印_绿(f"|{SW_IF}|{RULE}|{NCOUNT}|")
- if (SW_IF, RULE) in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4']:
- D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][(SW_IF, RULE)] += NCOUNT
- else:
- D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][(SW_IF, RULE)] = NCOUNT
-
- ##GigabitEthernet1/0/5 (inbound): Packet-filter IPv6 name ipv6_deny_in rule 0 deny logging 129 packet(s).
- ##GigabitEthernet1/0/5 (inbound): Packet-filter IPv6 name ipv6_deny_in rule 0 deny logging 752 packet(s).
- ##GigabitEthernet1/0/2 (inbound): Packet-filter IPv6 name ipv6_deny_in rule 0 deny logging 12 packet(s).
- def ACL_PFILTER_IPV6_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LOG_MSG):
- ##打印_黄(LOG_MSG)
- try:
- IDX_1 = LOG_MSG.index(': Packet-filter ')
- IDX_2 = LOG_MSG.index('logging')
- IDX_3 = LOG_MSG.index(' packet')
- except:
- 打印_红(f"ERR {LOG_MSG}")
- else:
- SW_IF = LOG_MSG[:IDX_1]
- #print(f"SW_IF={SW_IF}")
- if LOG_MSG[IDX_1+21:IDX_1+25] == 'name':
- #print(f"LOG_MSG[IDX_1+26:IDX_2-1]={LOG_MSG[IDX_1+26:IDX_2-1]}")
- RULE = LOG_MSG[IDX_1+26:IDX_2-1]
- else:
- RULE = LOG_MSG[IDX_1+21:IDX_2-1]
- NCOUNT = int(LOG_MSG[IDX_2+8:IDX_3])
- #打印_绿(f"|{SW_IF}|{RULE}|{NCOUNT}|")
- if (SW_IF, RULE) in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6']:
- D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][(SW_IF, RULE)] += NCOUNT
- else:
- D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][(SW_IF, RULE)] = NCOUNT
-
- ## ARP 超过阈值
- ##The ARP packet rate(6 pps) exceeded the rate limit(5 pps) on interface GigabitEthernet1/0/19 in the last 60 seconds.
- ##The ARP packet rate(118 pps) exceeded the rate limit(100 pps) on interface GigabitEthernet1/0/25 in the last 60 seconds.
- ##The ARP packet rate(100 pps) exceeded the rate limit(100 pps) on interface GigabitEthernet1/0/25 in the last 60 seconds.
- def ARP_RATE_EXCEEDED(D_SYSLOG_SWITCH_H3C, LOG_MSG):
- SP = LOG_MSG.split()
- 实际 = SP[3]
- 限制 = SP[8]
- 接口 = SP[12]
- 实际值 = int(实际[5:])
- 限制值 = int(限制[6:])
- #print(f"{实际}|{实际值}|{限制}|{限制值}|{接口}")
- if (接口,限制值) in D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED']:
- D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'][(接口,限制值)]+= 实际值
- else:
- D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'][(接口,限制值)] = 实际值
-
- ## 广播超阈值次数(广播风暴)
- ##GigabitEthernet1/0/21 is in normal status, BC flux exceeds its upper threshold 5 pps.
- def STORM_CONSTRAIN_EXCEED(D_SYSLOG_SWITCH_H3C, LOG_MSG):
- 接口 = LOG_MSG.split()[0]
- if 接口 in D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED']:
- D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'][接口]+= 1
- else:
- D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'][接口] = 1
-
- ## 解析SYSLOG日志一行内容
- def LINE_H3C(D_SYSLOG_SWITCH_H3C, LINE_TEXT, TIME_LOCAL):
- TP1, TP2, X = LOG_TYPE(LINE_TEXT)
- if TP1 == 'ACL':
- if TP2 == 'PFILTER_STATIS_INFO':
- #打印_黄(LINE_TEXT[X:-1])
- ACL_PFILTER_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])
- elif TP2 == 'PFILTER_IPV6_STATIS_INFO':
- ACL_PFILTER_IPV6_STATIS_INFO(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])
- else:
- return((1, TP1, TP2)) ## 终止,不做解析,返回标识代码及日志类型信息
- elif TP1 == 'IFNET':
- if TP2 == 'PHY_UPDOWN': ## Physical state on the interface GigabitEthernet1/0/8 changed to down.
- #打印_黄(LINE_TEXT[X+1:-1])
- SP = LINE_TEXT[X+1:-1].split()
- IF_ID = SP[5]
- IF_ST = SP[-1]
- #print(f'PHY {IF_ID:21s} {IF_ST:5s} {TIME_LOCAL}')
- if IF_ID not in D_SYSLOG_SWITCH_H3C['D_IF_PHY']:
- D_SYSLOG_SWITCH_H3C['D_IF_PHY'][IF_ID] = []
- D_SYSLOG_SWITCH_H3C['D_IF_PHY'][IF_ID].append((TIME_LOCAL, IF_ST))
- elif TP2 == 'LINK_UPDOWN': ## Line protocol state on the interface GigabitEthernet1/0/8 changed to down.
- #打印_黄(LINE_TEXT[X+1:-1])
- SP = LINE_TEXT[X+1:-1].split()
- IF_ID = SP[6]
- IF_ST = SP[-1]
- #print(f'LINK {IF_ID:21s} {IF_ST:5s} {TIME_LOCAL}')
- if IF_ID not in D_SYSLOG_SWITCH_H3C['D_IF_LINK']:
- D_SYSLOG_SWITCH_H3C['D_IF_LINK'][IF_ID] = []
- D_SYSLOG_SWITCH_H3C['D_IF_LINK'][IF_ID].append((TIME_LOCAL, IF_ST))
- elif TP2 == 'STORM_CONSTRAIN_EXCEED':
- STORM_CONSTRAIN_EXCEED(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])
- elif TP2 == 'STORM_CONSTRAIN_BELOW':
- #print(LINE_TEXT)
- pass
- else:
- return((1, TP1, TP2))
- elif TP1 == 'LLDP':
- if TP2 == 'LLDP_CREATE_NEIGHBOR': ## Nearest bridge agent neighbor created on port GigabitEthernet1/0/10 (IfIndex 10), neighbor's chassis ID is xxxx-xxxx-xxxx, port ID is xxxx-xxxx-xxxx.
- SP = LINE_TEXT[X+1:-1].split()
- IF_ID = SP[7]
- IF_MAC = SP[14]
- D_SYSLOG_SWITCH_H3C['L_LLDP'].append((TIME_LOCAL, 'CREATE', IF_ID, IF_MAC))
- elif TP2 == 'LLDP_DELETE_NEIGHBOR': ## Nearest bridge agent neighbor deleted on port GigabitEthernet1/0/6 (IfIndex 6), neighbor's chassis ID is xxxx-xxxx-xxxx, port ID is xxxx-xxxx-xxxx.
- SP = LINE_TEXT[X+1:-1].split()
- IF_ID = SP[7]
- IF_MAC = SP[14]
- D_SYSLOG_SWITCH_H3C['L_LLDP'].append((TIME_LOCAL, 'DELETE', IF_ID, IF_MAC))
- else:
- return((1, TP1, TP2))
- elif TP1 == 'SSHS':
- if TP2 == 'SSHS_LOG': ## Accepted password for xxx from xxx.xxx.xxx.xxx port 55598.
- pass
- #打印_红(LINE_TEXT.rstrip('\n'))
- elif TP2 == 'SSHS_SFTP_OPER':
- #打印_黄(LINE_TEXT[X+1:-1])
- D_SYSLOG_SWITCH_H3C['L_FTP'].append((TIME_LOCAL, LINE_TEXT[X+1:-1]))
- else:
- return((1, TP1, TP2))
- elif TP1 == 'SHELL':
- if TP2 == 'SHELL_CMD': ## -Line=vty0-IPAddr=xxx.xxx.xxx.xxx-User=xxx; Command is dis cu
- Index_IP = LINE_TEXT.index('-IPAddr=')
- Index_USER = LINE_TEXT.index('-User=')
- IP = LINE_TEXT[Index_IP+8:Index_USER]
- LOG_CMD = LINE_TEXT[Index_USER+6:].rstrip('\n')
- USER = LOG_CMD.split(';')[0]
- CMD = LOG_CMD[len(USER)+13:]
- ##打印_红(f"{USER} {CMD}")
- D_SYSLOG_SWITCH_H3C['L_CMD'].append((TIME_LOCAL, IP, USER, CMD))
- elif TP2 == 'SHELL_LOGIN': ## xxx logged in from xxx.xxx.xxx.xxx.
- SP = LINE_TEXT[X+1:-1].split()
- USER = SP[0]
- IP = SP[4]
- D_SYSLOG_SWITCH_H3C['L_LOGIN'].append(('LOGIN', TIME_LOCAL, USER, IP))
- elif TP2 == 'SHELL_LOGOUT': ## xxx logged out from xxx.xxx.xxx.xxx.
- SP = LINE_TEXT[X+1:-1].split()
- USER = SP[0]
- IP = SP[4]
- D_SYSLOG_SWITCH_H3C['L_LOGIN'].append(('LOGOUT', TIME_LOCAL, USER, IP))
- else:
- return((1, TP1, TP2))
- elif TP1 == 'CFGMAN':
- if TP2 == 'CFGMAN_CFGCHANGED': ## -EventIndex=74-CommandSource=snmp-ConfigSource=startup-ConfigDestination=running; Configuration changed.
- D_SYSLOG_SWITCH_H3C['L_CFGCHANGED'].append(TIME_LOCAL)
- else:
- return((1, TP1, TP2))
- elif TP1 == 'ARP':
- #print(f"LINE_TEXT={LINE_TEXT}")
- if TP2 == 'ARP_RATE_EXCEEDED': ## %%10ARP/4/ARP_RATE_EXCEEDED: The ARP packet rate(100 pps) exceeded the rate limit(100 pps) on interface GigabitEthernet1/0/25 in the last 60 seconds
- ARP_RATE_EXCEEDED(D_SYSLOG_SWITCH_H3C, LINE_TEXT[X+1:])
- else:
- return((1, TP1, TP2))
- else:
- return((1, TP1, TP2))
- return((0, TP1, TP2))
-
- ## 解析SYSLOG日志文件
- def FILE_H3C(D_SYSLOG_SWITCH_H3C, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX):
- #print("RUN", FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)
- TIME_S = time.time()
- TOT_N = 0
- SELECT_N = 0
- 解析数量 = 0
- for LINE_BYTES in open(FILE_PATH, mode='br'):
- TOT_N += 1
- try:
- LINE_TEXT = LINE_BYTES.decode('UTF-8')
- except Exception as e:
- 打印_红(f"ERROR {TOT_N} LINE_BYTES={LINE_BYTES} {e}")
- else:
- TIME_UTC = LINE_TEXT[:19]
- TIME_STAMP = time.mktime(time.strptime(TIME_UTC, '%Y-%m-%dT%H:%M:%S'))
- TIME_LOCAL = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(TIME_STAMP))
- if TIME_STAMP_MIN < TIME_STAMP < TIME_STAMP_MAX:
- SELECT_N += 1
- #print(TIME_UTC, TIME_LOCAL, 'RUN')
- ST,TP1,TP2 = LINE_H3C(D_SYSLOG_SWITCH_H3C, LINE_TEXT, TIME_LOCAL)
- if ST != 0:
- #打印_红(f"{TOT_N} 未知LOG")
- #break
- if (TP1,TP2) in D_SYSLOG_SWITCH_H3C['D_LOG_OTHER']:
- D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'][(TP1,TP2)] += 1
- else:
- D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'][(TP1,TP2)] = 1
- else:
- 解析数量 += 1
- else:
- #print(TIME_UTC, TIME_LOCAL, 'PASS')
- pass
- TIME_RUN = time.time() - TIME_S
- return((FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN))
-
-
-
- def SYSLOG_SWITCH_H3C(FILE_PATH, SHOW=0):
- D_SYSLOG_SWITCH_H3C = {}
- D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'] = {} # 重点ACL规则匹配记录
- D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'] = {} # 重点ACL规则匹配记录
- D_SYSLOG_SWITCH_H3C['D_IF_PHY'] = {} # 接口物理断开记录
- D_SYSLOG_SWITCH_H3C['D_IF_LINK'] = {} # 接口链路断开记录
- D_SYSLOG_SWITCH_H3C['L_LLDP'] = [] # 邻接设备记录
- D_SYSLOG_SWITCH_H3C['D_IF_MAC'] = {} # 接口电脑MAC记录
- D_SYSLOG_SWITCH_H3C['L_CMD'] = [] # 用户执行命令记录
- D_SYSLOG_SWITCH_H3C['L_LOGIN'] = [] # 登录信息
- D_SYSLOG_SWITCH_H3C['L_FTP'] = []
- D_SYSLOG_SWITCH_H3C['L_CFGCHANGED'] = [] # 配置被修改时间记录
- D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'] = {} # 未解析日志记录
- D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'] = {} # ARP限制
- D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'] = {} #广播风暴达到阈值次数
-
-
- TIME_STAMP_MIN = 0
- TIME_STAMP_MAX = time.time() # 默认为当前时间戳
- R = FILE_H3C(D_SYSLOG_SWITCH_H3C, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)
- if SHOW == 1:
- 打印_黄("D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'] 匹配ACL日志")
- L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4']]
- L_K.sort()
- for K in L_K:
- PORT, RULE = K
- SP_RULE = RULE.split()
- if len(SP_RULE) > 2:
- if SP_RULE[3] == 'permit':
- 打印_绿(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]:8} {PORT:33s} {RULE}")
- elif SP_RULE[3] == 'deny':
- 打印_红(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]:8} {PORT:33s} {RULE}")
- else:
- print(f"ERR1 SP_RULE[3]={SP_RULE[3]} {K} {FILE_PATH}")
- else:
- print(f"ERR2 {K} {FILE_PATH}")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'] 匹配ACL日志")
- L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6']]
- L_K.sort()
- for K in L_K:
- PORT, RULE = K
- SP_RULE = RULE.split()
- if len(SP_RULE) > 2:
- if SP_RULE[3] == 'permit':
- 打印_绿(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][K]:8} {PORT:33s} {RULE}")
- elif SP_RULE[3] == 'deny':
- 打印_红(f"{D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv6'][K]:8} {PORT:33s} {RULE}")
- else:
- print(f"ERR1 SP_RULE[3]={SP_RULE[3]} {K} {FILE_PATH}")
- else:
- print(f"ERR2 {K} {FILE_PATH}")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'] ARP 发包超过阈值")
- L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED']]
- L_K.sort()
- for K in L_K:
- 合计 = D_SYSLOG_SWITCH_H3C['D_ARP_EXCEEDED'][K]
- 接口,限制值 = K
- 打印_青(f"{合计:8} {接口:22s}({限制值})")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'] 广播风暴发包超过阈值")
- L = [(D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED'][i],i) for i in D_SYSLOG_SWITCH_H3C['D_STORM_CONSTRAIN_EXCEED']]
- L.sort()
- for 次数,接口 in L:
- 打印_红(f"{次数:8} {接口:22s}")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['L_LLDP'] 邻居设备变化")
- for TIME_LOCAL, ST, IF_ID, IF_MAC in D_SYSLOG_SWITCH_H3C['L_LLDP']:
- if ST == 'CREATE':
- 打印_绿(f" {TIME_LOCAL} {IF_ID} {IF_MAC} {ST}")
- else:
- 打印_红(f" {TIME_LOCAL} {IF_ID} {IF_MAC} {ST}")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['D_IF_PHY'] 端口物理线路变化")
- for K in D_SYSLOG_SWITCH_H3C['D_IF_PHY']:
- 打印_红(f" {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_H3C['D_IF_PHY'][K])}")
-
- #打印_黄('D_SYSLOG_SWITCH_H3C['D_IF_LINK']')
- #for K in D_SYSLOG_SWITCH_H3C['D_IF_LINK']:
- # print(f" {K:21s} DOWN/UP 次数 {len(D_SYSLOG_SWITCH_H3C['D_IF_LINK'][K])}")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['L_CMD']")
- for TIME_LOCAL, IP, USER, CMD in D_SYSLOG_SWITCH_H3C['L_CMD']:
- 打印_青(f" {TIME_LOCAL} {IP:15s} {USER:8s} '{CMD}'")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['L_LOGIN'] 登录登出详细")
- for ST, TIME_LOCAL, USER, IP in D_SYSLOG_SWITCH_H3C['L_LOGIN']:
- if ST == 'LOGIN':
- 打印_青(f" {TIME_LOCAL} {IP:15s} {ST:6s} {USER}")
- elif ST == 'LOGOUT':
- 打印_蓝(f" {TIME_LOCAL} {IP:15s} {ST:6s} {USER}")
- else:
- 打印_红(f" {TIME_LOCAL} {IP:15s} {ST:6s} {USER}")
-
- 打印_黄("D_LOGIN 登录汇总")
- D_LOGIN = {}
- for ST, TIME_LOCAL, USER, IP in D_SYSLOG_SWITCH_H3C['L_LOGIN']:
- if ST == 'LOGIN':
- if (IP, USER) not in D_LOGIN:
- D_LOGIN[(IP, USER)] = []
- D_LOGIN[(IP, USER)].append(TIME_LOCAL)
- for K in D_LOGIN:
- 打印_红(f" {K} {D_LOGIN[K]}")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['L_CFGCHANGED'] 配置被修改时间记录")
- for i in D_SYSLOG_SWITCH_H3C['L_CFGCHANGED']:
- 打印_紫(f" {i}")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['L_FTP']")
- for TIME_LOCAL, LOG_TEXT in D_SYSLOG_SWITCH_H3C['L_FTP']:
- 打印_青(f" {TIME_LOCAL} {LOG_TEXT}")
-
- 打印_黄("D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'] 未解析日志记录")
- L_K = [i for i in D_SYSLOG_SWITCH_H3C['D_LOG_OTHER']]
- L_K.sort()
- for K in L_K:
- print(f"{D_SYSLOG_SWITCH_H3C['D_LOG_OTHER'][K]:5} {K}")
-
- FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, 解析数量, TIME_RUN = R
- 时间文本格式 = '%Y-%m-%d %H:%M:%S'
- 打印_红(f"日志路径 : {FILE_PATH}")
- 打印_青(f"开始/结束: {时间戳_2_时间文本(TIME_STAMP_MIN, 时间文本格式)} / {时间戳_2_时间文本(TIME_STAMP_MAX, 时间文本格式)}")
- 打印_绿(f"解析/筛选/总数: {解析数量}/{SELECT_N}/{TOT_N} {(解析数量/TOT_N)*100:.2f}(%)/{(SELECT_N/TOT_N)*100:.2f}(%)/100(%) 用时: {TIME_RUN:.2f}秒")
-
- ## 返回需要的信息
- D_RETURN = {'PERMIT':{}, 'DENY':{}, 'OTHER':{}}
- #print(D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'])
- for K in D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4']:
- PORT, RULE = K
- SP_RULE = RULE.split()
- KEY_NEW = f"{PORT} {SP_RULE[0]}"
- if len(SP_RULE) > 2:
- if SP_RULE[3] == 'permit':
- if KEY_NEW not in D_RETURN['PERMIT']:
- D_RETURN['PERMIT'][KEY_NEW] = 0
- D_RETURN['PERMIT'][KEY_NEW] += D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]
- elif SP_RULE[3] == 'deny':
- if KEY_NEW not in D_RETURN['DENY']:
- D_RETURN['DENY'][KEY_NEW] = 0
- D_RETURN['DENY'][KEY_NEW] += D_SYSLOG_SWITCH_H3C['D_ACL_LOG_IPv4'][K]
- else:
- print(f"ERR1 SP_RULE[3]={SP_RULE[3]} {K} {FILE_PATH}")
- else:
- print(f"ERR2 {K} {FILE_PATH}")
- return(D_RETURN)
-
- if __name__ == '__main__':
- FILE_PATH = '华三交换机日志文件路径'
- SHOW = 1 # 查看重要日志信息
- D_ACL_INFO = SYSLOG_SWITCH_H3C(FILE_PATH, SHOW)
- print(D_ACL_INFO)

DEF_SYSLOG_USG.py
华为USG防火墙
- # -*- coding: utf8 -*-
- import time
- from DEF_COLOR import * ## 终端显示颜色
-
- def 时间戳_2_时间文本(时间戳, 时间文本格式='%Y-%m-%d %H:%M:%S'):
- #时间文本格式 = '%Y-%m-%d %H:%M:%S'
- 时间类 = time.localtime(时间戳)
- 时间文本 = time.strftime(时间文本格式, 时间类)
- return(时间文本)
-
- def USG_LOG_TYPE(LINE_TEXT):
- A = LINE_TEXT.find('%') ## 2022-12-11T16:03:31+08:00 IPS6515E %%01SECIF/6/STREAM(l)[136014]: In Last Five Minutes
- if A != -1:
- #打印_黄(LINE_TEXT[A:])
- B = LINE_TEXT.find('(', A)
- if B != -1:
- #打印_绿(LINE_TEXT[B:])
- C = LINE_TEXT.find(':', B) # 找日志正文开始位置标志
- if C != -1:
- #打印_蓝(LINE_TEXT[C:])
- #打印_青(LINE_TEXT[A+4:B])
- SP = LINE_TEXT[A+4:B].split('/')
- return((SP[0], SP[2], C+1))
- else:
- 打印_红(f"找位置标志':'失败(日志正文开始位) {LINE_TEXT}")
- return((':-1', LINE_TEXT[A:B], -1))
- else:
- 打印_红(f"找位置标志'('失败 {LINE_TEXT}")
- return((LINE_TEXT[:A], '(-1', -1))
- else:
- #打印_红(f"找位置标志'%'失败 {LINE_TEXT}")
- A2 = LINE_TEXT.find('DS/4/DATASYNC_CFGCHANGE')
- if A2 != -1:
- return(('DS', 'DATASYNC_CFGCHANGE', A2+1))
- else:
- 打印_红(f"找位置标志'%'失败 {LINE_TEXT}")
- return(('%-1', LINE_TEXT, -1))
-
- ## 解析SYSLOG日志一行内容
- # POLICY 规则匹配日志 规则内配置 policy logging 产生
- # SECLOG/4/PACKET_DENY 丢包信息 规则内配置 session logging 产生
- def USG_LOG_LINE(D_LOG_USG, LINE_TEXT, TIME_LOCAL):
- TP1, TP2, X = USG_LOG_TYPE(LINE_TEXT)
- if TP1 == 'POLICY':
- if TP2 in ('POLICYDENY', 'POLICYPERMIT'):
- #print(TP1, TP2, LINE_TEXT[X:-1])
- SP = LINE_TEXT[X:-1].split(',')
- PT = SP[1].split('=')[-1]
- SIP = SP[2].split('=')[-1]
- SPORT = SP[3].split('=')[-1]
- DIP = SP[4].split('=')[-1]
- DPORT = SP[5].split('=')[-1]
- TIME = SP[6].split('=')[-1]
- SZONE = SP[7].split('=')[-1]
- DZONE = SP[8].split('=')[-1]
- RULE_NAME = SP[9].split('=')[-1][:-1]
- if PT == '6':
- PT = 'TCP'
- elif PT == '17':
- PT = 'UDP'
- D_LOG_USG['L_SEC_RULE'].append((TIME, PT, SIP, SPORT, DIP, DPORT, SZONE, DZONE, RULE_NAME, TP2[6:]))
- else:
- return((TP1, TP2))
- elif TP1 == 'SECLOG':
- if TP2 == 'PACKET_DENY': # IPVer=4,Protocol=tcp,SourceIP=89.248.164.165,DestinationIP=183.129.153.43,SourcePort=48397,DestinationPort=8888,DestinationNatIP=192.168.200.112,DestinationNatPort=8888,BeginTime=1671059311,EndTime=1671059311,SourceVpnID=0,DestinationVpnID=0,SourceZone=untrust,DestinationZone=trust,PolicyName=HW,CloseReason=policy-deny.
- #print(TP1, TP2, LINE_TEXT[X:-1])
- SP = LINE_TEXT[X:-1].split(',')
- IPVer = SP[0].split('=')[-1]
- Protocol = SP[1].split('=')[-1]
- SIP = SP[2].split('=')[-1]
- DIP = SP[3].split('=')[-1]
- SPORT = SP[4].split('=')[-1]
- DPORT = SP[5].split('=')[-1]
- D_NAT_IP = SP[6].split('=')[-1]
- D_NAT_PORT = SP[7].split('=')[-1]
- BeginTime = int(SP[8].split('=')[-1])
- EndTime = int(SP[9].split('=')[-1])
- SZONE = SP[12].split('=')[-1]
- DZONE = SP[13].split('=')[-1]
- PolicyName = SP[14].split('=')[-1]
- CloseReason = SP[15].split('=')[-1][:-1]
- if CloseReason == 'policy-deny':
- 丢包类型 = '安全策略丢包'
- elif CloseReason == 'default-policy-deny':
- 丢包类型 = '缺省包过滤丢包'
- elif CloseReason == 'session miss':
- 丢包类型 = '未命中会话丢包'
- elif CloseReason == 'others':
- 丢包类型 = '其他类型丢包'
- else:
- 丢包类型 = f'未知类型丢包:{CloseReason}'
- D_LOG_USG['L_SEC_PACKET'].append((TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, 丢包类型))
- elif TP2 == 'SESSION_TEARDOWN':
- SP = LINE_TEXT[X:-1].split(',')
- IPVer = SP[0].split('=')[-1]
- Protocol = SP[1].split('=')[-1]
- SIP = SP[2].split('=')[-1]
- DIP = SP[3].split('=')[-1]
- SPORT = SP[4].split('=')[-1]
- DPORT = SP[5].split('=')[-1]
- D_NAT_IP = SP[6].split('=')[-1]
- D_NAT_PORT = SP[7].split('=')[-1]
- BeginTime = int(SP[8].split('=')[-1])
- EndTime = int(SP[9].split('=')[-1])
- SendPkts = int(SP[10].split('=')[-1])
- SendBytes = int(SP[11].split('=')[-1])
- RcvPkts = int(SP[12].split('=')[-1])
- RcvBytes = int(SP[13].split('=')[-1])
- SZONE = SP[16].split('=')[-1]
- DZONE = SP[17].split('=')[-1]
- PolicyName = SP[18].split('=')[-1]
- CloseReason = SP[19].split('=')[-1][:-1]
- D_LOG_USG['L_SEC_SESSION'].append((TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, CloseReason))
- else:
- return((TP1, TP2))
- elif TP1 == 'SHELL':
- if TP2 in ('LOGIN', 'LOGOUT'):
- #SP = LINE_TEXT[X:-1].split(',')
- #print(SP)
- #用户类型 = SP[1].split('=')[-1]
- #用户名 = SP[2].split('=')[-1]
- #用户地址 = SP[4].split('=')[-1]
- #D_LOG_USG['L_LOGIN'].append((TIME_LOCAL, TP2, 用户类型, 用户名, 用户地址))
- D_LOG_USG['L_LOGIN'].append((TIME_LOCAL, TP1, TP2, LINE_TEXT[X:-1]))
- else:
- return((TP1, TP2))
- elif TP1 == 'PHY':
- if TP2 in ('STATUSDOWN', 'STATUSUP'):
- 接口号 = LINE_TEXT[X:-1].split(':')[0]
- IF_ST = TP2[6:]
- if 接口号 in D_LOG_USG['D_IF_PHY']:
- D_LOG_USG['D_IF_PHY'][接口号].append((TIME_LOCAL, IF_ST))
- else:
- D_LOG_USG['D_IF_PHY'][接口号] = [(TIME_LOCAL, IF_ST)]
- else:
- return((TP1, TP2))
- else:
- if TP1 == '':
- 打印_紫(LINE_TEXT[X:-1])
- return((TP1, TP2))
- return(0)
-
- ## 解析SYSLOG日志文件
- def USG_LOG_FILE(D_LOG_USG, FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX):
- #print("RUN", FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX)
- TIME_S = time.time()
- TOT_N = 0
- SELECT_N = 0
- for LINE_BYTES in open(FILE_PATH, mode='br'):
- TOT_N += 1
- try:
- LINE_TEXT = LINE_BYTES.decode('GB2312')
- except Exception as e:
- 打印_红(f"ERROR {TOT_N} LINE_BYTES={LINE_BYTES} {e}")
- else:
- TIME_UTC = LINE_TEXT[:19]
- TIME_STAMP = time.mktime(time.strptime(TIME_UTC, '%Y-%m-%dT%H:%M:%S'))
- #TIME_STAMP = 日志时间转时间戳(TIME_UTC, 8)
- TIME_LOCAL = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(TIME_STAMP))
- if TIME_STAMP_MIN < TIME_STAMP < TIME_STAMP_MAX:
- SELECT_N += 1
- #print(TIME_UTC, TIME_LOCAL, 'RUN')
- R = USG_LOG_LINE(D_LOG_USG, LINE_TEXT, TIME_LOCAL)
- if R != 0:
- #打印_红(f"{TOT_N} 未知LOG")
- #break
- if R in D_LOG_USG['D_LOG_OTHER']:
- D_LOG_USG['D_LOG_OTHER'][R] += 1
- else:
- D_LOG_USG['D_LOG_OTHER'][R] = 1
- else:
- #print(TIME_UTC, TIME_LOCAL, 'PASS')
- pass
- TIME_RUN = time.time() - TIME_S
- #打印_绿(f"{FILE_PATH} 完成 处理日志数量 {SELECT_N}/{TOT_N} 筛选数/总日志数 用时={TIME_RUN:.2f}秒")
- return((FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, TIME_RUN))
-
-
- def 秒数转时长表示(总秒数):
- 天数 = 总秒数//86400
- 剩余秒数 = 总秒数%86400
-
- 时数 = 剩余秒数//3600
- 剩余秒数 = 剩余秒数%3600
-
- 分数 = 剩余秒数//60
- 剩余秒数 = 剩余秒数%60
-
- #print(f"{天数}:{时数}:{分数}:{剩余秒数}(天:时:分:秒)")
- return(f"{天数:02}:{时数:02}:{分数:02}:{剩余秒数:02}")
-
-
- def USG(PATH_SYSLOG_FILE, SHOW=0):
- D_LOG_USG = {}
- D_LOG_USG['L_SEC_RULE'] = []
- D_LOG_USG['L_SEC_PACKET'] = []
- D_LOG_USG['L_SEC_SESSION'] = []
- D_LOG_USG['D_IF_PHY'] = {} # 接口物理断开记录
- D_LOG_USG['D_IF_LINK'] = {} # 接口链路断开记录
- D_LOG_USG['L_CMD'] = [] # 用户执行命令记录
- D_LOG_USG['L_LOGIN'] = [] # 登录信息
- D_LOG_USG['D_LOG_OTHER'] = {} # 未解析日志记录
-
-
- TIME_STAMP_MIN = 0
- TIME_STAMP_MAX = time.time() # 默认为当前时间戳
- R = USG_LOG_FILE(D_LOG_USG, PATH_SYSLOG_FILE, TIME_STAMP_MIN, TIME_STAMP_MAX)
- if SHOW == 1:
- 打印_黄("L_SEC_RULE")
- D_SEC_RULE = {}
- for TIME, PT, SIP, SPORT, DIP, DPORT, SZONE, DZONE, RULE_NAME, DoP in D_LOG_USG['L_SEC_RULE']:
- if DoP == 'DENY':
- 打印_红(f" {TIME} {RULE_NAME:8s} {DoP:6s} {PT:4s} {SIP:15s} {SPORT:5s} -> {DIP:15s} {DPORT:5s} {SZONE} -> {DZONE}")
- #else:
- # 打印_绿(f" {TIME} {RULE_NAME:8s} {DoP:6s} {PT:4s} {SIP:15s} {SPORT:5s} -> {DIP:15s} {DPORT:5s} {SZONE} -> {DZONE}")
- KEY = (RULE_NAME, DoP)
- if KEY in D_SEC_RULE:
- D_SEC_RULE[KEY] += 1
- else:
- D_SEC_RULE[KEY] = 1
- for KEY in D_SEC_RULE:
- if KEY[1] == 'DENY':
- 打印_红(f" {KEY[0]:8s} {KEY[1]:6s} {D_SEC_RULE[KEY]}")
- else:
- 打印_绿(f" {KEY[0]:8s} {KEY[1]:6s} {D_SEC_RULE[KEY]}")
-
- 打印_黄("L_SEC_SESSION")
- for TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, CloseReason in D_LOG_USG['L_SEC_SESSION']:
- 会话秒数 = EndTime-BeginTime
- if 会话秒数 < 30:
- 打印_灰(f" {TIME} {PolicyName:8s} {Protocol:4s} {SIP:15s} {SPORT:5s} -> 用时:{秒数转时长表示(会话秒数)}(天:时:分:秒) {会话秒数}秒")
- else:
- 打印_紫(f" {TIME} {PolicyName:8s} {Protocol:4s} {SIP:15s} {SPORT:5s} -> 用时:{秒数转时长表示(会话秒数)}(天:时:分:秒) {会话秒数}秒")
-
- 打印_黄("L_SEC_PACKET")
- D_SEC_PACKET = {}
- for TIME_LOCAL, IPVer, Protocol, SIP, DIP, SPORT, DPORT, D_NAT_IP, D_NAT_PORT, BeginTime, EndTime, SZONE, DZONE, PolicyName, 丢包类型 in D_LOG_USG['L_SEC_PACKET']:
- #打印_红(f" {TIME_LOCAL} [{时间戳_2_时间文本(BeginTime)} {时间戳_2_时间文本(EndTime)}] {丢包类型} {PolicyName:8s} {Protocol}.{IPVer} {SIP:15s} {SPORT:5s} -> {DIP:15s} {DPORT:5s} -> {D_NAT_IP}:{D_NAT_PORT} {SZONE} -> {DZONE}")
- KEY = (PolicyName, 丢包类型)
- if KEY in D_SEC_PACKET:
- D_SEC_PACKET[KEY] += 1
- else:
- D_SEC_PACKET[KEY] = 1
- for KEY in D_SEC_PACKET:
- 打印_红(f" {KEY[0]:8s} {KEY[1]} {D_SEC_PACKET[KEY]}")
-
- 打印_黄("D_IF_PHY")
- for K in D_LOG_USG['D_IF_PHY']:
- print(f" {K:21s} DOWN/UP 次数 {len(D_LOG_USG['D_IF_PHY'][K])}")
- for TIME_LOCAL, IF_ST in D_LOG_USG['D_IF_PHY'][K]:
- if IF_ST == 'DOWN':
- 打印_红(f" {TIME_LOCAL} DOWN")
- else:
- 打印_绿(f" {TIME_LOCAL} {IF_ST}")
-
- #print("D_IF_LINK")
- #for K in D_LOG_USG['D_IF_LINK']:
- # print(f" {K:21s} DOWN/UP 次数 {len(D_LOG_USG['D_IF_LINK'][K])}")
-
- 打印_黄("L_LOGIN")
- for i in D_LOG_USG['L_LOGIN']:
- 打印_红(f" {' '.join(i)}")
-
- 打印_黄("D_LOG_OTHER")
- L_K = [i for i in D_LOG_USG['D_LOG_OTHER']]
- L_K.sort()
- for K in L_K:
- print(f"{D_LOG_USG['D_LOG_OTHER'][K]:5} {K}")
-
- FILE_PATH, TIME_STAMP_MIN, TIME_STAMP_MAX, TOT_N, SELECT_N, TIME_RUN = R
- 时间文本格式 = '%Y-%m-%d %H:%M:%S'
- 打印_紫(f"日志路径 : {FILE_PATH}")
- 打印_青(f"开始/结束: {时间戳_2_时间文本(TIME_STAMP_MIN, 时间文本格式)} / {时间戳_2_时间文本(TIME_STAMP_MAX, 时间文本格式)}")
- 打印_绿(f"解析/总数: {SELECT_N}/{TOT_N} {(SELECT_N/TOT_N)*100:.2f}(%) 用时: {TIME_RUN:.2f}秒")
-
- ## 返回需要的信息(防火墙规则放行和阻止的统计)
- D_RETURN = {'PERMIT':{}, 'DENY':{}, 'OTHER':{}}
- for TIME, PT, SIP, SPORT, DIP, DPORT, SZONE, DZONE, RULE_NAME, DoP in D_LOG_USG['L_SEC_RULE']:
- if DoP == 'PERMIT':
- if RULE_NAME not in D_RETURN['PERMIT']:
- D_RETURN['PERMIT'][RULE_NAME] = 0
- D_RETURN['PERMIT'][RULE_NAME] += 1
- elif DoP == 'DENY':
- if RULE_NAME not in D_RETURN['DENY']:
- D_RETURN['DENY'][RULE_NAME] = 0
- D_RETURN['DENY'][RULE_NAME] += 1
- else:
- if RULE_NAME not in D_RETURN['OTHER']:
- D_RETURN['OTHER'][RULE_NAME] = 0
- D_RETURN['OTHER'][RULE_NAME] += 1
- return(D_RETURN)
-
-
- if __name__ == '__main__':
- FILE_PATH = '华为防火墙日志文件路径'
- SHOW = 1 # 查看重要日志信息
- D_RULE_INFO = USG(PATH_SYSLOG_FILE, SHOW)
- print(D_RULE_INFO)

DEF_COLOR.py
给点颜色看看
- # -*- coding: utf8 -*-
- import os
-
- ## 终端显示颜色
- if os.name == 'nt': # Windows
- import ctypes,sys
- STD_OUTPUT_HANDLE = -11
-
- # Windows CMD命令行 字体颜色定义 text colors
- 黑字 = 0x00 # black.
- 暗蓝字 = 0x01 # dark blue.
- 暗绿字 = 0x02 # dark green.
- 暗青字 = 0x03 # dark skyblue.
- 暗红字 = 0x04 # dark red.
- 暗紫字 = 0x05 # dark pink.
- 暗黄字 = 0x06 # dark yellow.
- 暗白字 = 0x07 # dark white.
- 灰字 = 0x08 # dark gray.
- 蓝字 = 0x09 # blue.
- 绿字 = 0x0a # green.
- 青字 = 0x0b # skyblue.
- 红字 = 0x0c # red.
- 紫字 = 0x0d # pink.
- 黄字 = 0x0e # yellow.
- 白字 = 0x0f # white.
-
- # Windows CMD命令行 背景颜色定义 background colors
- 暗蓝底 = 0x10 # dark blue.
- 暗绿底 = 0x20 # dark green.
- 暗青底 = 0x30 # dark skyblue.
- 暗红底 = 0x40 # dark red.
- 暗紫底 = 0x50 # dark pink.
- 暗黄底 = 0x60 # dark yellow.
- 暗白底 = 0x70 # dark white.
- 灰底 = 0x80 # dark gray.
- 蓝底 = 0x90 # blue.
- 绿底 = 0xa0 # green.
- 青底 = 0xb0 # skyblue.
- 红底 = 0xc0 # red.
- 紫底 = 0xd0 # pink.
- 黄底 = 0xe0 # yellow.
- 白底 = 0xf0 # white.
-
- std_out_handle = ctypes.windll.kernel32.GetStdHandle(STD_OUTPUT_HANDLE)
-
- def set_cmd_text_color(color, handle=std_out_handle):
- Bool = ctypes.windll.kernel32.SetConsoleTextAttribute(handle, color)
- return Bool
-
- def resetColor():
- set_cmd_text_color(红字 | 绿字 | 蓝字)
-
- def 打印_黑(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(黑字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_灰(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(灰字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_蓝(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(蓝字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_绿(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(绿字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_青(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(青字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_红(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(红字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_紫(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(紫字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_黄(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(黄字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_白(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(白字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
-
- def 打印_暗蓝(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(暗蓝字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_暗绿(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(暗绿字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_暗青(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(暗青字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_暗红(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(暗红字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_暗紫(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(暗紫字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_暗黄(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(暗黄字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_暗白(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(暗白字)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
-
- def 打印_白底黑字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(黑字 | 白底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_白底灰字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(灰字 | 白底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_白底红字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(红字 | 白底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_白底绿字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(绿字 | 白底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_白底黄字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(黄字 | 白底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_白底蓝字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(蓝字 | 白底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_白底紫字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(紫字 | 白底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_白底青字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(青字 | 白底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
-
- def 打印_灰底红字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(红字 | 灰底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_灰底蓝字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(蓝字 | 灰底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_灰底绿字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(绿字 | 灰底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
-
- def 打印_蓝底黄字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(黄字 | 蓝底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_蓝底白字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(白字 | 蓝底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- def 打印_灰底青字(TEXT, SHOW=1):
- if SHOW == 1:
- set_cmd_text_color(青字 | 灰底)
- sys.stdout.write(f"{TEXT}\n")
- resetColor()
- elif os.name == 'posix': # Linux
- '''
- 格式: print('\033[显示方式;前景颜色;背景颜色m ..........\033[0m')
- print('\033[31;42m 123\033[0m')
-
- 显示方式
- 0 默认
- 1 高亮显示
- 4 下划线
- 5 闪烁
- 7 反白显示
- 8 不可见
-
- 颜色 前景色 背景色
- 黑色 30 40
- 红色 31 41
- 绿色 32 42
- 黄色 33 43
- 蓝色 34 44
- 紫色 35 45
- 青色 36 46
- 白色 37 47
- '''
-
- def 打印_灰(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;30;1m{TEXT}\033[0m")
- def 打印_红(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;31;1m{TEXT}\033[0m")
- def 打印_绿(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;32;1m{TEXT}\033[0m")
- def 打印_黄(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;33;1m{TEXT}\033[0m")
- def 打印_蓝(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;34;1m{TEXT}\033[0m")
- def 打印_紫(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;35;1m{TEXT}\033[0m")
- def 打印_青(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;36;1m{TEXT}\033[0m")
- def 打印_白(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;37;1m{TEXT}\033[0m")
-
- def 打印_白底黑字(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;30;47m{TEXT}\033[0m")
- def 打印_白底红字(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;31;47m{TEXT}\033[0m")
- def 打印_白底绿字(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;32;47m{TEXT}\033[0m")
- def 打印_白底黄字(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;33;47m{TEXT}\033[0m")
- def 打印_白底蓝字(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;34;47m{TEXT}\033[0m")
- def 打印_白底紫字(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;35;47m{TEXT}\033[0m")
- def 打印_白底青字(TEXT, SHOW=1):
- if SHOW == 1:
- print(f"\033[0;36;47m{TEXT}\033[0m")

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。