在测试公司的AltaiCare系统时,需要产生大于200条的Alert信息来测试系统每天的数据打包功能是否正常
但是这些Alert信息正常使用下是比较难生成的一次ap的重启可能才两条,而且重启也是需要时间的,所以想到让配置下发失败
这样就可以产生一条Alert信息,那么批量的发送请求就可以让他生成很多的Alert信息了,我就是这么懒,什么,叫我一个个点等着重启??
怎么可能?那么想想怎么让他怎么生成,再通过代码来循环实现就好了,有时间人懒点也可以,至少会去思考更加简便,更加快捷的方法
譬如:什么叫我一个个去重启服务器,重启设备???不行,写个脚本
什么叫我每次一遍遍的安装一个服务器???不行,怎么装怎么部署都可以倒着背了,写个shell自动装
什么叫我一次的上传固件???不行,那不是要点死我??写个批量上传的脚本一次性全给你传了
。。。。
于是发现Python还是真方便,至少在工作中很多地方帮得到忙太多了,这可能就是很多公司为什么需要这门语言吧
简单 粗暴 很快就可以想到解决的办法,使用各个三方库组合来实现你需要的功能
有两个地方需要生成,所以写了一个工具来专门生成这两个地方的Alert,代码写得有点乱,因为实现没有理清楚就开始写了,下次应该先把他理清楚
中间因为涉及到需要获取到多个数据的字段id来判断,所以有点乱了,不过最后还是实现了
希望下次把逻辑理清楚再动手
#coding:gbk import requests import json,time from threading import Thread import sys reload(sys) sys.setdefaultencoding('utf-8') class Get_care_info(object): def get_token(self,care_ip): url = 'http://{}/login'.format(str(care_ip)) headers = { 'Authorization':'Basic ....=', 'Host':'{}'.format(str(care_ip)), 'Referer':'http://{}/login'.format(str(care_ip)), } rsp = requests.get(url=url,headers=headers).text token = rsp[10:][:-13] return token def get_site_name(self,care_ip): site_list = [] token = self.get_token(care_ip) headers = { 'Host':'{}'.format(str(care_ip)), 'Referer':'http://{}/sites'.format(str(care_ip)), 'X-Auth-Token':token } url = 'http://{}/sites?pageIndex=0&pageSize=1000&sortOrder=asc'.format(str(care_ip)) rsp = requests.get(url,headers=headers).text rsp = json.loads(rsp) index = 1 for i in rsp['siteStatus']: site_list.append((index,i['name'],i['projectName'],i['id'])) index += 1 return site_list def set_domain_alert(self,care_ip): domain_list = [] porject_dict = {} token = self.get_token(care_ip) url2 = 'http://{}/sites?pageIndex=0&pageSize=1000&sortOrder=asc'.format(str(care_ip)) url1 = 'http://{}/serviceDomains?pageIndex=0&pageSize=1000&sortOrder=asc'.format(str(care_ip)) headers = { 'Host':'{}'.format(str(care_ip)), 'Referer':'http://{}/sites'.format(str(care_ip)), 'X-Auth-Token':token } rsp2 = requests.get(url2,headers=headers).text rsp1 = requests.get(url1,headers=headers).text rsp2 = json.loads(rsp2) rsp1 = json.loads(rsp1) for i in rsp2['siteStatus']: porject_dict[i['projectId']] = i['projectName'] index = 1 for i in rsp1["serviceDomains"]: for w in porject_dict: if i['projectId'] == w: domain_list.append((index,i['name'],porject_dict[w],i['id'])) index += 1 return domain_list def choice(): ip_list = ['52.74.53.225','52.74.80.211','10.6.161.123','10.6.161.135','10.6.161.20'] print('-'*15 + '> Alert 生成工具 <'+'-'*15) print('说明:') print('Site通过aplog下载失败来产生大量Alert') print('Domain通过批量添加user失败来生成大量Alert') print('-'*62) print('服务器IP如下:') index = 1 for ip in ip_list: if ip == '52.74.x.x': print('(%d).Alati Care 2.2 ---> %s'%(index,ip)) elif ip == '52.74.x.x': print('(%d).Alati Care 2.1 ---> %s'%(index,ip)) elif ip == '10.6.161.123': print('(%d).On Premise ---> %s'%(index,ip)) elif ip == '10.6.161.135': print('(%d).On Premise ---> %s'%(index,ip)) elif ip == '10.6.161.20': print('(%d).Altai Gate ---> %s'%(index,ip)) else:pass index += 1 print('-'*62 + '\n') choice = raw_input('输入你的选择:') if choice == '1':choice = '52.74.x.x' elif choice == '2':choice = '52.74.x.x' elif choice == '3':choice = '10.6.161.123' elif choice == '4':choice = '10.6.161.135' elif choice == '5':choice = '10.6.161.20' else:pass print('\n') return choice def ap_id(site_name_id,ip,token): print('-'*62 + '\n') print('获取的Site如下:') try: for i in site_name_id: print('(%d). Site:%s Project:%s'%(i[0],i[1].encode('utf-8'),i[2].encode('utf-8'))) except:pass print('-'*62 + '\n') site = raw_input('输入你的选择:') for a in site_name_id: if site == str(a[0]): site_id = a[3] else:pass url = 'http://%s/sites/%s/apStatus?orderBy=name&pageIndex=0&pageSize=100&sortOrder=asc'%(ip,site_id) headers = {'Host':ip, 'X-Auth-Token': token, 'Referer': 'http://%s/aps?active=activeAlert'%ip, } rsp = requests.get(url,headers=headers).text rsp = json.loads(rsp) for t in rsp['apStatus']: if str(t['status'])=='1': return (site_id,t['apId'],t['lanIpAddr']) def login(ap_ip): url = 'http://%s/cgi-bin/luci'%(ap_ip) data = { 'username':'admin', 'password':'admin' } rsp = requests.post(url=url,data=data,allow_redirects=False) cookie1 = rsp.headers['Set-Cookie'] cookie = cookie1.split(';') sysauth = cookie[0]+';'+cookie[2].split(',')[1] path = (cookie[1]+';'+cookie[2].split(',')[0])[6:] return (sysauth,path) def site_alert(info): headers = {'Host':info[0], 'X-Auth-Token':info[1], 'Referer':'http://%s/aps/dashboard/%s?active=activeAlert'%(info[0],info[3]),} url = 'http://%s/sites/%s/aps/%s/log'%(info[0],info[2],info[3]) for t in range(20): requests.get(url,headers=headers) def reboot_ap(info): path = login(info[4]) headers2 ={'Host':info[4], 'Referer':'http://%s%s/admin/uci/reboot'%(info[4],path[1]), 'Cookie':path[0] } url2 = 'http://%s%s/admin/uci/reboot?reboot=1'%(str(info[4]),path[1]) print('将重启AP:%s,请稍后...'%str(info[4])) requests.get(url2,headers=headers2) def main1(ip): print('正在获取所有的Site,请稍后...') care = Get_care_info() token = care.get_token(ip) site_name_id = care.get_site_name(ip) online_ap_id = ap_id(site_name_id,ip,token) info = (ip,token,online_ap_id[0],online_ap_id[1],online_ap_id[2]) T_lsit = [] print('正在发送请求...') for i in range(50): T = Thread(target=site_alert,args=(info,)) T.start() T_lsit.append(T) for i in T_lsit: i.join() reboot_ap(info) time.sleep(10) raw_input('Alert生成大约为:1000条,稍后Alert将生成,回车结束') def domain_alert(info): ip,token,doamin_id,group_id = info url = 'http://%s/serviceDomains/%s/users/generate?numOfUser=1'%(ip,doamin_id) headers = {'Host':ip, 'X-Auth-Token':token, 'Origin':'http://%s'%ip, 'Content-Type':'application/json;charset=UTF-8', 'Referer':'http://%s/accountGeneration?active=activeAlert&sortOrder=desc'%ip } data = {"name":"1","type":1,"userGroupId":int(group_id),"status":'true',"serviceDomainId":int(doamin_id),"detailAccess":{"loginName":"1","allowAuth":["portal","eap"],"nameIndex":1,"allowMac":[""],"nameLength":6,"pwdLength":6,"hasPrefix":'true',"hasIndex":'true',"hasRandom":'false',"isRandom":'false'},"detailService":{"validity":-1,"validityUnit":1,"dataQuotaInMB":-1}} print('正在发送请求...') number = 0 for i in range(1000): try: time.sleep(0.05) rsp = requests.post(url,headers=headers,data=json.dumps(data)) if rsp.status_code == 200: number += 1 except:pass raw_input('Alert生成约为:%d条,回车结束'%number) def main2(ip): print('正在获取所有的Domain,请稍后...') care = Get_care_info() doamin_list = care.set_domain_alert(ip) token = care.get_token(ip) print('-'*62 + '\n') print('获取到的Domain如下:') try: for i in doamin_list: print('(%d). Domain:%s Project:%s'%(i[0],i[1].encode('utf-8'),i[2].encode('utf-8'))) except:pass print('-'*62 + '\n') domain_choice = raw_input('输入你的选择:') for a in doamin_list: if domain_choice == str(a[0]): doamin_id = a[3] else:pass group = 'http://%s/serviceDomains/%s/userGroups?orderBy=name&pageIndex=0&pageSize=10&sortOrder=asc'%(ip,doamin_id) header1 = {'Host':ip, 'Accept':'application/json, text/plain, */*', 'X-Auth-Token':token, 'Referer':'http://%s/userGroups?active=activeAlert&sortOrder=desc'%ip} try: rsp = requests.get(group,headers=header1).text group_json = json.loads(rsp) group_id = group_json['userGroups'][0]['id'] info = (ip,token,doamin_id,group_id) domain_alert(info) except: raw_input('你选择的Domian中没有一个User group,请创建User group') if __name__ == '__main__': ip = choice() print('-'*62) print('(1).生成Site-Alert') print('(2).生成Domain-Alert') print('-'*62 + '\n') choice = raw_input('输入你的选择:') if choice == '1': main1(ip) else:main2(ip)