FTP稳定性测试

Zss 发表于:

在存放源文件的目录中挑选大文件件和小文件,大文件盒小文件的区分为设置的参数,参数为文件的大小

使用系统自带的复制命令来复制文件,速度比较快,使用python读写的方式较慢,且文件较大时会存在内存不够的情况

当文件夹中存在文件表示光闸ftp同步还未取走文件,所以不会复制文件到此文件夹,当文件夹为空则复制文件

#coding:utf-8
import os,time
from subprocess import PIPE,Popen

class Stable_test():
    def __init__(self):
        #初始化文件目录等路径
        self.os_name = os.name
   config_list = []
        config = \
'''FTP文件目录-->d:\\FTP
FTP带传输文件目录-->file_list
FTP_log-->result.log
FTP最外层文件夹个数-->500
FTP最里层文件集个数-->3
FTP文件夹标识-->test
文件大小控制(默认100M)-->100000000
文件复制间隔时间(单位:s)-->2
log日志开关(1开启,0关闭)-->0'''

        if not os.path.isfile('ftp_config.ini'):
            with open('ftp_config.ini','a+') as f:
                f.write(config)
        with open('ftp_config.ini','r') as f:
            data = f.readlines()
        for i in data:
            if '-->' in i:
                config_list.append(i.split('-->')[1].strip())
        self.ftp_path = config_list[0]
        self.file_list_path = config_list[1]
        self.log_path = config_list[2]
        self.folder_nmuber = int(config_list[3])
        self.folder_index_nmuber = int(config_list[4])
        self.folder_name = config_list[5]
        self.small_size = int(config_list[6])
        self.sleep_time = int(config_list[7])
        self.log = int(config_list[8])
        self.folder_path_list = []
        print '''
-------------------------- Ftp_Config -------------------------
            1.FTP文件目录-->%s
            2.FTP带传输文件目录-->%s
            3.FTP_log-->%s
            4.FTP最外层文件夹个数-->%d
            5.FTP最里层文件集个数-->%d
            6.FTP文件夹标识-->%s
            7.文件大小控制(默认100M)-->%d
            8.文件复制间隔时间(单位:s)-->%d
            9.log日志开关(1开启,0关闭)-->%d
------------------------------------------------------------'''%(self.ftp_path,self.file_list_path,self.log_path,self.folder_nmuber,self.folder_index_nmuber,self.folder_name,self.small_size,self.sleep_time,self.log)
        if not os.path.exists(self.ftp_path):
            print '            FTP_path:%s不存在! '%self.ftp_path
            raw_input('            回车结束!')
            exit()
   if not os.path.exists(self.ftp_path):
            print '            FTP待传输文件夹目录:%s不存在! '%self.file_list_path
            raw_input('            回车结束!')
            exit()
        raw_input('                 回车开始运行!!!')

    def printlog(self,msg):
        if self.log == 1:
            if 'str'not in str(type(msg)):
                text = '(%s).文件生成时间: %s   文件名: %s\n'%(msg[1],time.strftime("%Y-%m-%d-%H:%M:%S",time.localtime()),msg[0])
                with open(self.log_path,'a+') as f:
                    f.write(text)
                print text
            else:
                text = '时间:%s  msg:%s\n'%(time.strftime("%Y-%m-%d-%H:%M:%S",time.localtime()),msg)
                print text
                with open(self.log_path,'a+') as f:
                    f.write(text)
        else:
            if 'str'not in str(type(msg)):
                text = '(%s).文件生成时间: %s   文件名: %s\n'%(msg[1],time.strftime("%Y-%m-%d-%H:%M:%S",time.localtime()),msg[0])
                print text
            else:
                text = '时间:%s  msg:%s\n'%(time.strftime("%Y-%m-%d-%H:%M:%S",time.localtime()),msg)
                print text


    def linux_touch_all_file(self,path):
        #传输所有文件
        file_list = os.listdir(self.file_list_path)
        index = 1
        for i in file_list:
            old_path = self.file_list_path+'/'+i
            new_path = path+'/'+i
            cmd = 'cp %s %s'%(old_path,new_path)
            result = Popen(cmd,shell=True,stdout=PIPE)
            if result.stdout.read() == '':
                self.printlog((i, index))
            else:
                self.printlog('复制失败!')
                self.printlog(cmd)
            index+=1

    def linux_touch_small_file(self,path):
        #传输小文件
        file_list = os.listdir(self.file_list_path)
        index = 1
        for i in file_list:
            file_path = self.file_list_path + '/' + i
            if os.path.getsize(file_path) <= self.small_size:
                old_path = self.file_list_path + '/' + i
                new_path = path + '/' + i
                cmd = 'cp %s %s' % (old_path, new_path)
                result = Popen(cmd, shell=True, stdout=PIPE)
                if result.stdout.read() == '':
                    self.printlog((i, index))
                else:
                    self.printlog('复制失败!')
                    self.printlog(cmd)
                index += 1

    def linux_mkdir_folder(self):
        text = time.strftime("%H-%M-%S")
        for i in range(1, int(self.folder_nmuber)+1):
            folder_name = self.ftp_path + '/%s-%s-%d' % (time.strftime("%Y-%m-%d"), self.folder_name,i)
            for t in range(1,int(self.folder_index_nmuber)+1):
                try:
                    file_folder_path = folder_name + '/%s'%(text)+ '/%s'%t
                    cmd = 'mkdir -p %s' % file_folder_path
                    Popen(cmd, shell=True, stdout=PIPE)
                    self.folder_path_list.append(file_folder_path)
                except:pass

    def nt_touch_all_file(self,path):
        #传输所有文件
        file_list = os.listdir(self.file_list_path)
        index = 1
        for i in file_list:
            old_path = self.file_list_path+'\\'+i
            new_path = path+'\\'+i
            cmd = 'copy %s %s'%(old_path,new_path)
            result = Popen(cmd,shell=True,stdout=PIPE)
            if '1' in result.stdout.readlines()[-1]:
                self.printlog((i, index))
            else:
                self.printlog('复制失败!')
                self.printlog(cmd)
            index+=1

    def nt_touch_small_file(self,path):
        #传输小文件
        file_list = os.listdir(self.file_list_path)
        index = 1
        for i in file_list:
            file_path = self.file_list_path + '\\' + i
            if os.path.getsize(file_path) <=self.small_size:
                old_path = self.file_list_path + '\\' + i
                new_path = path + '\\' + i
                cmd = 'copy %s %s' % (old_path, new_path)
                result = Popen(cmd, shell=True, stdout=PIPE)
                if '1' in result.stdout.readlines()[-1]:
                    self.printlog((i, index))
                else:
                    self.printlog('复制失败!')
                    self.printlog(cmd)
                index += 1


    def nt_mkdir_folder(self):
        text = time.strftime("%H-%M-%S")
        for i in range(1, int(self.folder_nmuber)+1):
            folder_name = self.ftp_path + '\%s-%s-%d' % (time.strftime("%Y-%m-%d"), self.folder_name,i)
            for t in range(1,int(self.folder_index_nmuber)+1):
                try:
                    file_folder_path = folder_name + '\\%s'%(text)+ '\%s'%t
                    print file_folder_path
                    os.makedirs(file_folder_path)
                    self.folder_path_list.append(file_folder_path)
                except:pass


    def nt_main(self):
        self.nt_mkdir_folder()
        index = 1
        self.printlog('第%d次循环复制' % index)
        while 1:
            for i in self.folder_path_list:
                if len(os.listdir(i))==0:
                    self.printlog('%s文件夹为空,复制文件'%i)
                    if i.split('\\')[2].split('-')[-1] == '1':
                        self.printlog('文件夹编号为1,复制大文件')
                        self.nt_touch_all_file(i)
                    else:
                        self.printlog('文件夹编号不为1,复制小文件')
                        self.nt_touch_small_file(i)
                else:self.printlog('%s文件夹不为空,不复制'%i)
                time.sleep(self.sleep_time)
            self.printlog('第%d次循环结束!等待15秒' % index)
            index += 1
            time.sleep(15)

    def linux_main(self):
        self.linux_mkdir_folder()
   index = 1
        while 1:
       self.printlog('第%d次循环复制'%index)
            for i in self.folder_path_list:
                if len(os.listdir(i))==0:
                    self.printlog('%s文件夹为空,复制文件'%i)
                    if i.split('/')[-3].split('-')[-1] == '1':
                        self.printlog('文件夹编号为1,复制大文件')
                        self.linux_touch_all_file(i)
                    else:
                        self.printlog('文件夹编号不为1,复制小文件')
                        self.linux_touch_small_file(i)
                else:self.printlog('%s文件夹不为空,不复制'%i)
                time.sleep(self.sleep_time)
       self.printlog('第%d次循环结束!等待15秒'%index)
       index += 1
            time.sleep(15)

    def main(self):
        if self.os_name == 'nt':
            self.nt_main()
        if self.os_name == 'posix':
            self.linux_main()

if __name__ == '__main__':
    stable_test = Stable_test()
    stable_test.main()