Python实现的批量上传Firmware工具

Zss 发表于:

公司里有一些的云端AC的服务器,测试时需要在每个服务器上面上传,AP设备所对应的版本,AP的版本很多,且上传的文件需要经过一些打包修改处理

才可以被正确的上传到服务器上面,所以使用Python来实现这样的一个批量上传Firmware的工具解决平时上传固件的麻烦事,每次都是单个单个的点击,效率非常低

需要实现这个工具,先理清楚我需要干什么?

正常手工上传的时候

1.需要修改一个版本的信息文件

2.将固件和note日志和版本信息文件打包为zip,修改zip文件名为pkg格式

3.在服务器上上传此pkg文件

对于第一个,我可以用过python来自动生成这一个文件,并且获取到需要上传的固件的各个信息一次写入到这个文件中

对于第二个,使用zipfile的包来将其需要的文件打包,使用os模块来修改文件名

对于第三个,这个也就是最重要的,需要模拟浏览器的文件上传那么我第一得模拟登陆成功,成功后要不就是存下cookie要么就是一些其他的身份认证的东西,譬如说哦我们的系统使用的是token,

登陆成功后服务器返回一个token,拿这个token就可以干你想干的事了,再者就是使用requests库来实现表单上传文件,这个过程中需要抓取正常的手工上传时的http包,来分析包的结构组成

然后模拟出这样的一个包发送给服务器,那么模拟的上传最终能够被实现

再次证明不管是测试还是开发,抓包分析是多么重要的一件事情,中间遇到了太多的问题,都是依次百度谷歌来查找解决这些问题才最成功的,所以实际动手和理论上还是有很大的区别的

再者我发现抓包分析的这个过程其实是很有意思的,且能学到更多的东西,其实看着几个步骤还是挺简单的,但是实际动手起来发现总是会出现一些意想不到的问题,当使用requests模拟出了

表单来上传文件时,我就感觉这个小工具快写完了,最终的也就是这个了

在上传失败的时候可能失败,那么使用状态码来判断,200则成功,不为200,则将其返回的消息显示出来判断什么问题,譬如说已经存在了,或者数据库字段限制

实现出来就是这样子:

#coding:gbk
import os,zipfile,requests,time

class Zip_fw():
    def __init__(self):
        """
        设置以当前文件夹下的各个文件为公共路径
        """
        self.fw_bin_path = os.getcwd()+'\\firmware\\'
        self.fw_pkg_path = os.getcwd()+'\\pkg\\'
        self.fw_name_list = []

    def check_folder(self):
        """
        判断两个文件夹是否存在,不存在则创建,存在则pass
        删除旧的pkg文件
        """

        if not os.path.exists(self.fw_bin_path):
            os.makedirs(self.fw_bin_path)
        if not os.path.exists(self.fw_pkg_path):
            os.makedirs(self.fw_pkg_path)
        else:
            old_flie = os.listdir(self.fw_pkg_path)
            for i in old_flie:
                os.remove(self.fw_pkg_path + i)


    def get_fw_name(self):
        """
        获取firmware中的各个bin文件
        """
        fw_name_list = os.listdir(self.fw_bin_path)
        for i in fw_name_list:
            if '.bin' in i:
                self.fw_name_list.append(i)
        if len(self.fw_name_list) == 0:
            print('         firmware文件夹似乎没看到有固件哦! O(∩_∩)O\n')
            print('          请将bin文件放在当前路径的firmware文件夹下')
            raw_input('\n'+'-'*25 + '>  回车结束  <'+'-'*25)
            exit()
        return self.fw_name_list

    def make_fw_meta_note(self,fw_name):
        """
        生成fw.meta文件和note文件
        """
        if os.path.isfile(self.fw_bin_path + 'fw.meta'):
            os.remove(self.fw_bin_path + 'fw.meta')
        with open(self.fw_bin_path + 'fw.meta','a') as f:
            if len(fw_name.split('_')) != 4:
                f.write('version:' + fw_name.split('_')[1] + '\n')
            else:
                f.write('version:' + fw_name.split('_')[2] + '\n')
            f.write('modelCode:' + fw_name.split('_')[0] + '\n')
            f.write('imageFileName:' + fw_name + '\n')
            f.write('releaseNoteFileName:release_note.txt')
            note_data = """ release_note_file """
            if os.path.isfile(self.fw_bin_path + 'release_note.txt'):
                os.remove(self.fw_bin_path + 'release_note.txt')
            with open(self.fw_bin_path + 'release_note.txt','a') as f:
                f.write(note_data)

    def zip_fw(self,fw_name):
        """
        防止重复文件名情况,存在则先删除此文件,再将其文件打包更名
        """

        if os.path.isfile(self.fw_pkg_path + fw_name[:-3] + 'pkg'):
            os.remove(self.fw_pkg_path + fw_name[:-3] + 'pkg')

        file_list = [fw_name,'fw.meta','release_note.txt']
        fw_pkg = zipfile.ZipFile(self.fw_pkg_path + fw_name[:-3] + 'zip','w', zipfile.ZIP_STORED)
        for i in file_list:
            file_path = os.path.join(self.fw_bin_path,i)
            fw_pkg.write(file_path,i)
        fw_pkg.close()
        os.rename(self.fw_pkg_path + fw_name[:-3] + 'zip',self.fw_pkg_path + fw_name[:-3] + 'pkg') #改文件后缀
        print('Firmware:%s ---> 打包成功! p( ^ O ^ )q')%(fw_name[:-3] + 'pkg')
        time.sleep(0.2)

    def main(self):
        """
        固件打包主体
        """
        bin_list = self.get_fw_name()
        print('-'*26 + '>  Firmware 打包  <'+'-'*26+'\n')
        for file_name in bin_list:
            self.make_fw_meta_note(file_name)
            self.zip_fw(file_name)
        print('\n')

class Care():
    def __init__(self):
        """
        设置公共路径
        """
        self.fw_pkg_path = os.getcwd()+'\\pkg\\'

    def get_token(self,ip):
        """
        模拟登陆care,获取token
        """
        url = 'http://{}/login'.format(str(ip))
        headers = {
        'Authorization':'Basic 。。。。。',
        'Host':'{}'.format(str(ip)),
        'Referer':'http://{}/login'.format(str(ip))
        }
        rsp = requests.get(url=url,headers=headers).text
        token = rsp[10:][:-13]
        return token

    def up_fw(self,ip,token,fw_pkg_name):
        """
        将不同的pkg文件依次上传到单台服务器
        """
        headers = {'Host':ip,
        'Connection':'keep-alive',
        'Referer':'http://%s/firmwares'%ip,
        'X-Auth-Token':token
        }
        file_data = open(self.fw_pkg_path + fw_pkg_name,'rb')
        if len(fw_pkg_name.split('_')) == 4:
            care_fw_name = 'Auto_'+fw_pkg_name.split('_')[0] + '_' + fw_pkg_name.split('_')[2]
        else:care_fw_name = 'Auto_'+fw_pkg_name.split('_')[0] + '_' + fw_pkg_name.split('_')[1]

        file_list = {"projectId": (None,"-1"),"siteId": (None,"-1"),"name": (None,care_fw_name),"file": (fw_pkg_name,file_data.read(),"application/octet-stream")}
        url = 'http://%s/firmwares'%ip
        print('Firmware:%s ---> 正在上传到 %s Server 请稍后...'%(care_fw_name,ip))
        rsp = requests.post(url=url,headers=headers,files=file_list)
        if rsp.status_code == 200:print('Firmware:%s ---> 上传成功!p( ^ O ^ )q\n'%care_fw_name)
        else:
            print('Firmware:%s ---> 上传失败! +﹏+')%care_fw_name
            print('Firmware:{} ---> 错误消息为:{}\n'.format(care_fw_name,rsp.text))

def main(ip):
    """
    给单台服务器进行上传固件操作
    """
    up_fw = Care()
    token = up_fw.get_token(ip)
    print('-'*18 + '>  Firmware 上传 ---> %s <'%ip+'-'*18+'\n')
    for pkg in os.listdir(up_fw.fw_pkg_path):
        up_fw.up_fw(ip,token,pkg)



def choice():
    """
    用户选择列表,支持多选来分别给不同的服务器上传固件
    """
    ip_list = ['52.74.53.225','52.74.80.211','10.6.161.123','10.6.161.135','10.6.161.20']
    choice_list = []
    print('-'*15 + '>  欢迎使用 Firmware 上传工具  <'+'-'*15)
    print('使用说明:')
    print('1:所有需要上传的bin文件,请放置在此脚本同目录的firmware文件夹下')
    print('2:note文件和meta文件不需要,会自动生成')
    print('3:请勿随意修改bin文件的文件名,默认格式就好,格式为:型号_版本_日期.bin')
    print('4:可以选择多个服务器来上传,但若服务器不存在请不选择此服务器')
    print('5:选择完服务器后请输入0,开始进行上传')
    print('6:默认使用。。。。账号上传,若账号错误则上传失败')
    print('7:固件的作用域为:All Projects 和 All Sites')
    print('-'*62)
    print('服务器地址如下:')
    index = 1
    print('(0).开始上传')
    for ip in ip_list:
        if ip == '52.74.53.225':
            print('(%d).Alati Care 2.2    ---> %s'%(index,ip))
        elif ip == '52.74.80.211':
            print('(%d).Alati Care 2.1    ---> %s'%(index,ip))
        elif ip == '10.6.161.123':
            print('(%d).On Premise        ---> %s'%(index,ip))
        elif ip == '10.6.161.135':
            print('(%d).On Premise        ---> %s'%(index,ip))
        elif ip == '10.6.161.20':
            print('(%d).Altai Gate        ---> %s'%(index,ip))
        else:pass
        index += 1
    print('-'*62 + '\n')
    while 1:
        choice = raw_input('输入您的选择:')
        if choice == '0':break
        elif choice == '1':choice_list.append('52.74.53.225')
        elif choice == '2':choice_list.append('52.74.80.211')
        elif choice == '3':choice_list.append('10.6.161.123')
        elif choice == '4':choice_list.append('10.6.161.135')
        elif choice == '5':choice_list.append('10.6.161.20')
        else:pass
    print('\n')
    return choice_list

if __name__ == '__main__':
    check = Zip_fw()
    check.check_folder()
    ip_list = choice()
    make_pkg = Zip_fw()
    make_pkg.main()
    for ip in ip_list:
        main(ip)
    raw_input('\n'+'-'*25 + '>  回车结束  <'+'-'*25)