[新手上路]批处理新手入门导读[视频教程]批处理基础视频教程[视频教程]VBS基础视频教程[批处理精品]批处理版照片整理器
[批处理精品]纯批处理备份&还原驱动[批处理精品]CMD命令50条不能说的秘密[在线下载]第三方命令行工具[在线帮助]VBScript / JScript 在线参考
返回列表 发帖

[原创代码] Python遍历目录,搜索指定文件,复制文件到指定目录(学习笔记)

  1. #coding=utf-8
  2. import os
  3. import pathlib
  4. import re
  5. import shutil
  6. # import gol
  7. def check_dir(des_path):
  8.     my_dir = pathlib.Path(des_path)
  9.     if my_dir.is_dir():
  10.         return 0
  11.     else:
  12.         print('{}目录不存在\n正在创建文件夹')
  13.         os.mkdir(des_path)
  14. #搜索文件目录
  15. test_path = r'd:\gin\c'
  16. #复制到指定目录
  17. des_path = r'd:\gin\test_16'
  18. #存储文件
  19. _clist = []
  20. temp = []
  21. if check_dir(des_path) == 0: print('[{}]目录已经存在'.format(des_path))
  22. else: check_dir(des_path)
  23. for root,dirs,files in os.walk(test_path):
  24.     # print('当前目录为:',root)
  25.     # print('当前目录下的子目录为:',dirs)
  26.     for each_file in files:
  27.         #将以.c结尾的文件添加到列表
  28.         temp=re.findall('.*\.c,os.path.join(root,each_file))
  29.         if temp != []: _clist.append(temp)
  30. # os.chdir(des_path)
  31. for each_file in _clist:
  32.     print('copy :',*each_file,'--->',des_path)
  33.     #复制文件
  34.     shutil.copy(*each_file,des_path)
  35.    
  36. print('Done!')
复制代码
1

评分人数

回复 6# netdzb


    C++不会的,会C,C没有对象,学Python我都还木有弄清楚对象!

TOP

回复 5# Gin_Q

我也要跟上啊,感觉被你超越了。感觉你会c++,而我不会。所以你进步神速。

TOP

回复 3# netdzb


    这几天都在学习。哈哈哈。。。

TOP

回复 2# 小渣飞


    看不懂,我还没有学习多线程!

TOP

回复 1# Gin_Q

楼主厉害了,已经能写出一些小工具了啊.

TOP

把第三个线程的计算md5功能完善下这个脚本还是很有用的

本帖最后由 小渣飞 于 2020-5-17 23:11 编辑

# -*- coding: utf-8 -*-
# @Time    : 2020/4/23 上午2:39
# @Author  : @Jeffrey
# @Email   : Jeffrey2971@outlook.com
# @File    : ThreadCopyBeta.py
# @Software: Ubuntu 18.04.4 LTS python3.8.0

# /home/jeffrey/桌面/IdeaProjects,/home/jeffrey/test,[pdf,md,CHM]
# /home/jeffrey/iPhone2,/home/jeffrey/test,[BMP,GIF,HFR,JPG,MOV,MP4,PNG]



import datetime
import hashlib
import os
import queue
import re
import sys
import threading
import shutil
import time
import uuid
import math

"""global finish1 --> 解决采集线程的速度跟不上复制线程而break"""
thread_1 = False
# thread_2 = False
# thread_3 = False
out = False


class indexError(Exception):
    def __init__(self, error_info):
        super().__init__(self)
        self.error_info = error_info

    def __str__(self):
        global out
        out = True
        return self.error_info


class index(object):

    def __init__(self, data, move_default=False):
        self.data = data
        self.moveDefault = move_default
        self.dataList = []
        self.typeList = []
        self.element = 0
        begin = time.time()

        """处理输入类型文件"""
        typeListRe = re.compile(r"\[(.*?)\]").findall(self.data)
        typeList = list(str(typeListRe).replace("'", "").replace("[", "").replace("]", "").split(","))
        for i in typeList:
            self.typeList.append(i)

        """处理其他输入元素"""
        dataListTmp = self.data.split(",")

        """整理参数"""
        try:
            self.dataList.append(dataListTmp[0])
            self.dataList.append(dataListTmp[1])
            self.dataList.append(typeList)

            if move_default:
                self.dataList.append(True)
            else:
                self.dataList.append(False)

            if backups_default:
                self.dataList.append(True)
            else:
                self.dataList.append(False)

            if self.dataList[0] == "":
                error = "必须提供一个工作路径"
                raise indexError(error)
            elif not os.path.exists(self.dataList[0]):
                error = ("%s不是一个有效的工作路径" % self.dataList[0])
                raise indexError(error)

            if self.dataList[1] == "":
                error = "必须提供一个保存路径"
                raise indexError(error)
            elif not os.path.exists(self.dataList[1]):
                error = "%s不是一个有效的保存路径" % self.dataList[1]
                raise indexError(error)

            if self.dataList[2] == 0:
                error = "必须至少提供一个类型参数"
                raise indexError(error)

            run(self.dataList[0], self.dataList[1], self.dataList[2], self.dataList[3], self.dataList[4], begin)

            """0-->workPath, 1-->savePath, 2-->fileType, 3-->Default, 4-->Default"""

        except Exception as e:
            error = "参数异常:%s" % e
            raise indexError(error)


"""Thread1负责采集资源"""


class Thread1(threading.Thread):
    def __init__(self, work_path, file_type, save_path, data_queue, wait_queue, num_queue, new_md5_list):
        threading.Thread.__init__(self)
        self.workPath = work_path
        self.savePath = save_path
        self.fileType = file_type
        self.dataQueue = data_queue
        self.numQueue = num_queue
        self.waitQueue = wait_queue
        self.checkList = []
        self.fileTypeTmp = []
        self.newMd5List = new_md5_list
        self.oldMd5List = []

    def run(self):
        findNum = 0  # 找到的文件
        global lostNum
        lostNum = 0  # 丢弃的文件
        for x in self.fileType:
            self.fileTypeTmp.append(x)
        while True:
            try:
                if len(self.fileTypeTmp) != 0:
                    for w in self.fileTypeTmp.copy():
                        for home, dirs, files in os.walk(self.workPath):
                            # """计算相差百分比,处理较大类型文件时可暂时睡眠采集线程,将更多的资源留给线程2"""
                            # if findNum != 0 and not self.numQueue.empty():
                            #     if int(("%.0f" % (float(findNum) / float(self.numQueue.qsize())))) > 40:
                            #         print(threading.current_thread().getName() + "等待")
                            #         time.sleep(60)

                            for filename in files:
                                path = os.path.join(home, filename)
                                if os.path.splitext(path)[-1].replace(".", "") in self.fileType:
                                    with open(path, mode="rb") as f:
                                        md5 = str(hashlib.md5(f.read()).hexdigest())
                                        """本次读取到的md5"""
                                        self.newMd5List.append(md5)
                                        if backups_default:
                                            if os.path.exists(save_path + os.sep + "md5.log"):
                                                with open(save_path + os.sep + "md5.log", mode="r") as m:
                                                    # 备份打开,如果有md5文件,则读取文件放入列表中
                                                    self.oldMd5List.append(m)

                                            if path.split(os.sep)[-1] not in self.checkList:

                                                self.dataQueue.put(path)
                                                self.checkList.append(path.split(os.sep)[-1])  # 文件名.后缀名
                                            else:
                                                self.waitQueue.put(path)
                                            findNum += 1
                                            print(threading.current_thread().getName() + ":已找到" + str(findNum) + "个文件")
                                else:
                                    lostNum += 1
                                    print(threading.current_thread().getName() + ":已过滤" + str(lostNum) + "个文件")

                        self.fileTypeTmp.remove(w)
                print("-------------------------------------------------------------")
                print("共找到" + str(findNum) + "个文件,过滤了" + str(lostNum) + "个文件")
                global thread_1
                thread_1 = True  # 需声明采集已经结束
                print("结束线程:", threading.current_thread())
                break  # 找的速度跟不上取的速度会发生异常
            except Exception as e:
                error = "所在线程:%s发生错误:%s" % (threading.current_thread().getName(), e)
                raise indexError(error)


"""Thread2负责复制或移动文件"""


class Thread2(threading.Thread):

    def __init__(self, data_queue, save_path, file_type, default_move, begin, wait_queue, thread_error, num_queue):
        threading.Thread.__init__(self)
        self.dataQueue = data_queue
        self.savePath = save_path
        self.fileType = file_type
        self.defaultMove = default_move
        self.beginTime = begin
        self.waitQueue = wait_queue
        self.threadError = thread_error
        self.numQueue = num_queue
        # self.moveQueue = move_queue
        self.num = 0
        self.rename = 0

    def run(self):
        global file
        size_all = 0  # 计算文件大小
        while True:
            try:
                if self.dataQueue.empty() and thread_1 is True:
                    """处理重名文件"""
                    if self.waitQueue != 0:
                        path = self.waitQueue.get()
                        print("正在处理重名文件" + path + '\n' + "剩余" + str(self.waitQueue.qsize()))
                        self.rename += 1
                        try:
                            if self.defaultMove is True:
                                shutil.move(path,
                                            self.savePath + os.sep + path.split(os.sep)[-1] + "." + str(
                                                uuid.uuid4()) + "." +
                                            path.split(".")[-1])
                                # self.moveQueue.put(i)
                                self.num += 1
                                self.numQueue.put(self.num)
                                # self.numQueue.put(self.num)
                                print("已移动" + str(self.num) + "个文件:" + path)
                            else:
                                shutil.copyfile(path,
                                                self.savePath + os.sep + path.split(os.sep)[-1] + "." + str(
                                                    uuid.uuid4()) + "." +
                                                path.split(".")[-1])
                                # self.moveQueue.put(i)
                                self.num += 1
                                self.numQueue.put(self.num)
                                print("已复制" + str(self.num) + "个文件:" + path)
                        except Exception as e:
                            error = """
                            异常报告:
                            所在线程:%s
                            异常文件:%s
                            发现异常:%s
                            """ % (threading.current_thread().getName(), path, e)
                            self.threadError.append(error)

                    for L in self.fileType:
                        for w in os.listdir(self.savePath):
                            if w.endswith(L):
                                shutil.move(self.savePath + os.sep + w, self.savePath + os.sep + L)
                                print("正在处理类型文件:" + self.savePath + os.sep + w)

                    endTime = time.time()
                    if self.defaultMove is True:
                        mode = "移动"
                    else:
                        mode = "复制"
                    """处理类型文件"""
                    sizeGB = "%.2f" % (size_all / 1024 / 1024 / 1024)
                    sizeMB = "%.2f" % (size_all / 1024 / 1024)
                    total = "%.2f" % (endTime - self.beginTime)
                    minute = float(total) / 60
                    if minute < 1:
                        minute = 0
                    print("-------------------------------------------------------------")
                    global result
                    result = ("已完成,共" + mode + "了" + str(self.num) + "个文件,处理了" + str(self.rename) + "个重名文件,发生了" + str(
                        len(self.threadError)) + "个异常,忽略了" + str(
                        lostNum) + "个文件,总大小为" + sizeGB + "GB," + sizeMB + "MB," + "耗时" + str(
                        math.floor(minute)) + "分" + str(total) + "秒")
                    print(result)
                    print("结束线程:", threading.current_thread())

                    break

                elif not self.dataQueue.empty():  # 假如不为空
                    try:
                        """如果dataQueue没有值,执行取值操作会卡死且不报错"""
                        file = self.dataQueue.get()
                        size_all += os.path.getsize(file)
                        if self.defaultMove is True:
                            shutil.move(file, self.savePath)
                            # self.moveQueue.put(file)
                            self.num += 1
                            self.numQueue.put(self.num)
                            print(threading.current_thread().getName() + ":已移动" + str(self.num) + "个文件:" + file)
                        else:
                            shutil.copy(file, self.savePath)
                            # self.moveQueue.put(file)
                            self.num += 1
                            self.numQueue.put(self.num)
                            print(threading.current_thread().getName() + ":已复制" + str(self.num) + "个文件:" + file)

                    except Exception as e:
                        error = """
                            异常报告:
                            所在线程:%s
                            异常文件:%s
                            发现异常:%s
                            """ % (threading.current_thread().getName(), file, e)
                        self.threadError.append(error)

            except Exception as e:
                error = "所在线程:%s发生错误:%s" % (threading.current_thread().getName(), e)
                raise indexError(error)

            # 未完善
            # class Thread3(threading.Thread):
            #     def __init__(self, all_queue, sava_path):
            #         threading.Thread.__init__(self)
            #         self.allQueue = all_queue
            #         self.savePath = save_path
            #         self.md5List = []
            #
            #     def run(self):
            #         while True:
            #             try:
            #                 path = self.allQueue.get()
            #                 if not os.path.exists(self.savePath + os.sep + "md5.log"):
            #                     with open(path, mode="rb") as f:
            #                         with open(self.savePath + os.sep + "md5.log", mode="a")as m:
            #                             m.write(str(hashlib.md5(f.read()).hexdigest()))
            #                 else:
            #                     if backups_default:
            #                         with open(self.savePath + os.sep + "md5.log", mode="r") as f:
            #                             for l in f:
            #                                 self.md5List.append(l)
            #                         if

            # except Exception as e:
            #     error = "所在线程:%s发生错误:%s" % (threading.current_thread().getName(), e)
            #     raise indexError(error)


def run(work_path, save_path, file_type, move_default, backups_default, begin):
    try:
        """尝试解决内存溢出"""
        sys.setrecursionlimit(1000000)

        """检查输入路径默认"""
        if work_path[-1] == os.sep:
            work_path = work_path.replace(work_path[-1], "")
        elif save_path[-1] == os.sep:
            save_path = save_path.replace(save_path[-1], "")

        """创建时间文件夹"""
        directory = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
        os.mkdir(save_path + os.sep + str(directory))
        save_path = save_path + os.sep + str(directory)

        """创建类型文件夹"""
        for L in file_type:
            if not os.path.exists(save_path + os.sep + L):
                os.mkdir(save_path + os.sep + L)

        """创建队列和列表供全局使用"""
        numQueue = queue.Queue()
        dataQueue = queue.Queue()
        md5_queue = queue.Queue
        new_md5_list = []
        # moveQueue = queue.Queue()
        # waitList = []

        """
        这里一定要使用队列存放重名文件而不是列表,否则会发生多次给同一个文件添加uuid值
        同一个工作目录下测试结果:
        正常:使用WaitQueue存放重名文件:已完成,共复制了81个文件,处理了1个重名文件,发生了0个异常,忽略了95010个文件,总大小为0.26GB,266.50MB,耗时0分31.52秒
        异常:使用WaitList存放重名文件:已完成,共复制了525个文件,忽略了95010个文件,总大小为0.26GB,266.50MB,耗时1分118.15秒
        """
        waitQueue = queue.Queue()
        threadError = []

        """创建并启动线程传递相关参数"""
        T1 = Thread1(work_path, file_type, save_path, dataQueue, waitQueue, numQueue, new_md5_list)
        T2 = Thread2(dataQueue, save_path, file_type, move_default, begin, waitQueue, threadError, numQueue)
        # T3 = Thread3(allQueue, save_path)
        T1.start()
        T2.start()
        # T3.start()

        """逻辑堵塞主线程"""
        while True:
            if out is True:
                print("程序异常将终止!")
                exit()
            if dataQueue.empty() is True and thread_1 is True:
                T1.join()
                T2.join()
                try:
                    with open(save_path + os.sep + "md5.log", mode="a") as f:
                        for i in range(len(new_md5_list)):
                            f.write(new_md5_list + "\n")
                except Exception as e:
                    error = "md5文件生成异常"
                    raise indexError(error)

                try:
                    with open(save_path + os.sep + "report.log", mode="a") as f:
                        for e in threadError:
                            f.write("------错误报告------" + '\n' + e + '\n')
                        f.write("------其他信息------" + '\n' + result + '\n')
                    print("已生成日志:" + save_path + os.sep + "report.log")
                except Exception as e:
                    print("生成日志失败:", e)
                # T3.join()
                print("程序退出", threading.current_thread())
                break

    except Exception as e:
        error = "发生错误%s" % e
        raise indexError(error)


if __name__ == '__main__':
    print("""
        请提供三个必要参数,一个可选参数
        @work_path:程序工作路径
        @save_path:程序的保存路径
        @file_type:指定一个或多个筛选文件类型
        @move_default:[可选参数]是否移动,默认为否
        @backup_default:[可选参数]将上一次备份的数据进行md5计算,并在第二次备份时跳过这些被标记过的数据,从而提高效率,节硬盘空间。
        exp:work_path, save_path, [file_type1, file_type2...], [True]
        注意:Bate版本,请只用于测试。
        """)

    work_path = ""
    save_path = ""
    file_type = "[pdf,xlsx,md]"
    move_default = False
    # backups_default = True 未完善

    data = work_path + "," + save_path + "," + file_type

    # data = input("请按照格式输入参数:")
    index(data)

TOP

返回列表