Alert生成工具

Zss 发表于:

在测试公司的AltaiCare系统时,需要产生大于200条的Alert信息来测试系统每天的数据打包功能是否正常

但是这些Alert信息正常使用下是比较难生成的一次ap的重启可能才两条,而且重启也是需要时间的,所以想到让配置下发失败

这样就可以产生一条Alert信息,那么批量的发送请求就可以让他生成很多的Alert信息了,我就是这么懒,什么,叫我一个个点等着重启??

怎么可能?那么想想怎么让他怎么生成,再通过代码来循环实现就好了,有时间人懒点也可以,至少会去思考更加简便,更加快捷的方法

譬如:什么叫我一个个去重启服务器,重启设备???不行,写个脚本

什么叫我每次一遍遍的安装一个服务器???不行,怎么装怎么部署都可以倒着背了,写个shell自动装

什么叫我一次的上传固件???不行,那不是要点死我??写个批量上传的脚本一次性全给你传了

。。。。

于是发现Python还是真方便,至少在工作中很多地方帮得到忙太多了,这可能就是很多公司为什么需要这门语言吧

简单  粗暴  很快就可以想到解决的办法,使用各个三方库组合来实现你需要的功能

有两个地方需要生成,所以写了一个工具来专门生成这两个地方的Alert,代码写得有点乱,因为实现没有理清楚就开始写了,下次应该先把他理清楚

中间因为涉及到需要获取到多个数据的字段id来判断,所以有点乱了,不过最后还是实现了

希望下次把逻辑理清楚再动手

#coding:gbk
import requests
import json,time
from threading import Thread
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

class Get_care_info(object):

    def get_token(self,care_ip):
        url = 'http://{}/login'.format(str(care_ip))
        headers = {
            'Authorization':'Basic ....=',
            'Host':'{}'.format(str(care_ip)),
            'Referer':'http://{}/login'.format(str(care_ip)),
        }
        rsp = requests.get(url=url,headers=headers).text
        token = rsp[10:][:-13]
        return token

    def get_site_name(self,care_ip):
        site_list = []
        token = self.get_token(care_ip)
        headers = {
        'Host':'{}'.format(str(care_ip)),
        'Referer':'http://{}/sites'.format(str(care_ip)),
        'X-Auth-Token':token
        }
        url = 'http://{}/sites?pageIndex=0&pageSize=1000&sortOrder=asc'.format(str(care_ip))
        rsp = requests.get(url,headers=headers).text
        rsp = json.loads(rsp)
        index = 1
        for i in rsp['siteStatus']:
            site_list.append((index,i['name'],i['projectName'],i['id']))
            index += 1
        return site_list

    def set_domain_alert(self,care_ip):
        domain_list = []
        porject_dict = {}
        token = self.get_token(care_ip)
        url2 = 'http://{}/sites?pageIndex=0&pageSize=1000&sortOrder=asc'.format(str(care_ip))
        url1 = 'http://{}/serviceDomains?pageIndex=0&pageSize=1000&sortOrder=asc'.format(str(care_ip))
        headers = {
                    'Host':'{}'.format(str(care_ip)),
            'Referer':'http://{}/sites'.format(str(care_ip)),
            'X-Auth-Token':token
        }
        rsp2 = requests.get(url2,headers=headers).text
        rsp1 = requests.get(url1,headers=headers).text
        rsp2 = json.loads(rsp2)
        rsp1 = json.loads(rsp1)
        for i in rsp2['siteStatus']:
            porject_dict[i['projectId']] = i['projectName']
        index = 1
        for i in rsp1["serviceDomains"]:
            for w in porject_dict:
                if i['projectId'] == w:
                    domain_list.append((index,i['name'],porject_dict[w],i['id']))
                    index += 1
        return domain_list

def choice():
    ip_list = ['52.74.53.225','52.74.80.211','10.6.161.123','10.6.161.135','10.6.161.20']
    print('-'*15 + '>  Alert 生成工具  <'+'-'*15)
    print('说明:')
    print('Site通过aplog下载失败来产生大量Alert')
    print('Domain通过批量添加user失败来生成大量Alert')
    print('-'*62)
    print('服务器IP如下:')
    index = 1
    for ip in ip_list:
        if ip == '52.74.x.x':
            print('(%d).Alati Care 2.2    ---> %s'%(index,ip))
        elif ip == '52.74.x.x':
            print('(%d).Alati Care 2.1    ---> %s'%(index,ip))
        elif ip == '10.6.161.123':
            print('(%d).On Premise        ---> %s'%(index,ip))
        elif ip == '10.6.161.135':
            print('(%d).On Premise        ---> %s'%(index,ip))
        elif ip == '10.6.161.20':
            print('(%d).Altai Gate        ---> %s'%(index,ip))
        else:pass
        index += 1
    print('-'*62 + '\n')
    choice = raw_input('输入你的选择:')
    if choice == '1':choice = '52.74.x.x'
    elif choice == '2':choice = '52.74.x.x'
    elif choice == '3':choice = '10.6.161.123'
    elif choice == '4':choice = '10.6.161.135'
    elif choice == '5':choice = '10.6.161.20'
    else:pass
    print('\n')
    return choice

def ap_id(site_name_id,ip,token):
    print('-'*62 + '\n')
    print('获取的Site如下:')
    try:
        for i in site_name_id:
            print('(%d). Site:%s  Project:%s'%(i[0],i[1].encode('utf-8'),i[2].encode('utf-8')))
    except:pass
    print('-'*62 + '\n')
    site = raw_input('输入你的选择:')
    for a in site_name_id:
        if site == str(a[0]): site_id = a[3]
        else:pass
    url = 'http://%s/sites/%s/apStatus?orderBy=name&pageIndex=0&pageSize=100&sortOrder=asc'%(ip,site_id)
    headers = {'Host':ip,
        'X-Auth-Token': token,
        'Referer': 'http://%s/aps?active=activeAlert'%ip,
}
    rsp = requests.get(url,headers=headers).text
    rsp = json.loads(rsp)
    for t in rsp['apStatus']:
        if str(t['status'])=='1':
            return (site_id,t['apId'],t['lanIpAddr'])

def login(ap_ip):
    url = 'http://%s/cgi-bin/luci'%(ap_ip)
    data = {
        'username':'admin',
        'password':'admin'
    }
    rsp = requests.post(url=url,data=data,allow_redirects=False)
    cookie1 = rsp.headers['Set-Cookie']
    cookie = cookie1.split(';')
    sysauth = cookie[0]+';'+cookie[2].split(',')[1]
    path = (cookie[1]+';'+cookie[2].split(',')[0])[6:]
    return (sysauth,path)

def site_alert(info):
    headers = {'Host':info[0],
'X-Auth-Token':info[1],
'Referer':'http://%s/aps/dashboard/%s?active=activeAlert'%(info[0],info[3]),}
    url = 'http://%s/sites/%s/aps/%s/log'%(info[0],info[2],info[3])
    for t in range(20):
        requests.get(url,headers=headers)

def reboot_ap(info):
    path = login(info[4])
    headers2 ={'Host':info[4],
'Referer':'http://%s%s/admin/uci/reboot'%(info[4],path[1]),
'Cookie':path[0]
}
    url2 = 'http://%s%s/admin/uci/reboot?reboot=1'%(str(info[4]),path[1])
    print('将重启AP:%s,请稍后...'%str(info[4]))
    requests.get(url2,headers=headers2)

def main1(ip):
    print('正在获取所有的Site,请稍后...')
    care = Get_care_info()
    token = care.get_token(ip)
    site_name_id = care.get_site_name(ip)
    online_ap_id = ap_id(site_name_id,ip,token)
    info = (ip,token,online_ap_id[0],online_ap_id[1],online_ap_id[2])
    T_lsit = []
    print('正在发送请求...')
    for i in range(50):
        T = Thread(target=site_alert,args=(info,))
        T.start()
        T_lsit.append(T)
    for i in T_lsit:
        i.join()
    reboot_ap(info)
    time.sleep(10)
    raw_input('Alert生成大约为:1000条,稍后Alert将生成,回车结束')

def domain_alert(info):
    ip,token,doamin_id,group_id = info
    url = 'http://%s/serviceDomains/%s/users/generate?numOfUser=1'%(ip,doamin_id)
    headers = {'Host':ip,
        'X-Auth-Token':token,
        'Origin':'http://%s'%ip,
        'Content-Type':'application/json;charset=UTF-8',
        'Referer':'http://%s/accountGeneration?active=activeAlert&sortOrder=desc'%ip
        }
    data = {"name":"1","type":1,"userGroupId":int(group_id),"status":'true',"serviceDomainId":int(doamin_id),"detailAccess":{"loginName":"1","allowAuth":["portal","eap"],"nameIndex":1,"allowMac":[""],"nameLength":6,"pwdLength":6,"hasPrefix":'true',"hasIndex":'true',"hasRandom":'false',"isRandom":'false'},"detailService":{"validity":-1,"validityUnit":1,"dataQuotaInMB":-1}}
    print('正在发送请求...')
    number = 0
    for i in range(1000):
        try:
            time.sleep(0.05)
            rsp = requests.post(url,headers=headers,data=json.dumps(data))
            if rsp.status_code == 200:
                number += 1
        except:pass
    raw_input('Alert生成约为:%d条,回车结束'%number)
def main2(ip):
    print('正在获取所有的Domain,请稍后...')
    care = Get_care_info()
    doamin_list = care.set_domain_alert(ip)
    token = care.get_token(ip)
    print('-'*62 + '\n')
    print('获取到的Domain如下:')
    try:
        for i in doamin_list:
            print('(%d). Domain:%s  Project:%s'%(i[0],i[1].encode('utf-8'),i[2].encode('utf-8')))
    except:pass
    print('-'*62 + '\n')
    domain_choice = raw_input('输入你的选择:')
    for a in doamin_list:
        if domain_choice == str(a[0]): doamin_id = a[3]
        else:pass
    group = 'http://%s/serviceDomains/%s/userGroups?orderBy=name&pageIndex=0&pageSize=10&sortOrder=asc'%(ip,doamin_id)
    header1 = {'Host':ip,
    'Accept':'application/json, text/plain, */*',
    'X-Auth-Token':token,
    'Referer':'http://%s/userGroups?active=activeAlert&sortOrder=desc'%ip}
    try:
        rsp = requests.get(group,headers=header1).text
        group_json = json.loads(rsp)
        group_id = group_json['userGroups'][0]['id']
        info = (ip,token,doamin_id,group_id)
        domain_alert(info)
    except:
        raw_input('你选择的Domian中没有一个User group,请创建User group')


if __name__ == '__main__':
    ip = choice()
    print('-'*62)
    print('(1).生成Site-Alert')
    print('(2).生成Domain-Alert')
    print('-'*62 + '\n')
    choice = raw_input('输入你的选择:')
    if choice == '1':
        main1(ip)
    else:main2(ip)