- 帖子
- 137
- 积分
- 184
- 技术
- 0
- 捐助
- 0
- 注册时间
- 2017-12-23
|
2楼
发表于 2020-5-17 23:09
| 只看该作者
把第三个线程的计算md5功能完善下这个脚本还是很有用的
本帖最后由 小渣飞 于 2020-5-17 23:11 编辑
# -*- coding: utf-8 -*-
# @Time : 2020/4/23 上午2:39
# @Author : @Jeffrey
# @Email : Jeffrey2971@outlook.com
# @File : ThreadCopyBeta.py
# @Software: Ubuntu 18.04.4 LTS python3.8.0
# /home/jeffrey/桌面/IdeaProjects,/home/jeffrey/test,[pdf,md,CHM]
# /home/jeffrey/iPhone2,/home/jeffrey/test,[BMP,GIF,HFR,JPG,MOV,MP4,PNG]
import datetime
import hashlib
import os
import queue
import re
import sys
import threading
import shutil
import time
import uuid
import math
"""global finish1 --> 解决采集线程的速度跟不上复制线程而break"""
thread_1 = False
# thread_2 = False
# thread_3 = False
out = False
class indexError(Exception):
def __init__(self, error_info):
super().__init__(self)
self.error_info = error_info
def __str__(self):
global out
out = True
return self.error_info
class index(object):
def __init__(self, data, move_default=False):
self.data = data
self.moveDefault = move_default
self.dataList = []
self.typeList = []
self.element = 0
begin = time.time()
"""处理输入类型文件"""
typeListRe = re.compile(r"\[(.*?)\]").findall(self.data)
typeList = list(str(typeListRe).replace("'", "").replace("[", "").replace("]", "").split(","))
for i in typeList:
self.typeList.append(i)
"""处理其他输入元素"""
dataListTmp = self.data.split(",")
"""整理参数"""
try:
self.dataList.append(dataListTmp[0])
self.dataList.append(dataListTmp[1])
self.dataList.append(typeList)
if move_default:
self.dataList.append(True)
else:
self.dataList.append(False)
if backups_default:
self.dataList.append(True)
else:
self.dataList.append(False)
if self.dataList[0] == "":
error = "必须提供一个工作路径"
raise indexError(error)
elif not os.path.exists(self.dataList[0]):
error = ("%s不是一个有效的工作路径" % self.dataList[0])
raise indexError(error)
if self.dataList[1] == "":
error = "必须提供一个保存路径"
raise indexError(error)
elif not os.path.exists(self.dataList[1]):
error = "%s不是一个有效的保存路径" % self.dataList[1]
raise indexError(error)
if self.dataList[2] == 0:
error = "必须至少提供一个类型参数"
raise indexError(error)
run(self.dataList[0], self.dataList[1], self.dataList[2], self.dataList[3], self.dataList[4], begin)
"""0-->workPath, 1-->savePath, 2-->fileType, 3-->Default, 4-->Default"""
except Exception as e:
error = "参数异常:%s" % e
raise indexError(error)
"""Thread1负责采集资源"""
class Thread1(threading.Thread):
def __init__(self, work_path, file_type, save_path, data_queue, wait_queue, num_queue, new_md5_list):
threading.Thread.__init__(self)
self.workPath = work_path
self.savePath = save_path
self.fileType = file_type
self.dataQueue = data_queue
self.numQueue = num_queue
self.waitQueue = wait_queue
self.checkList = []
self.fileTypeTmp = []
self.newMd5List = new_md5_list
self.oldMd5List = []
def run(self):
findNum = 0 # 找到的文件
global lostNum
lostNum = 0 # 丢弃的文件
for x in self.fileType:
self.fileTypeTmp.append(x)
while True:
try:
if len(self.fileTypeTmp) != 0:
for w in self.fileTypeTmp.copy():
for home, dirs, files in os.walk(self.workPath):
# """计算相差百分比,处理较大类型文件时可暂时睡眠采集线程,将更多的资源留给线程2"""
# if findNum != 0 and not self.numQueue.empty():
# if int(("%.0f" % (float(findNum) / float(self.numQueue.qsize())))) > 40:
# print(threading.current_thread().getName() + "等待")
# time.sleep(60)
for filename in files:
path = os.path.join(home, filename)
if os.path.splitext(path)[-1].replace(".", "") in self.fileType:
with open(path, mode="rb") as f:
md5 = str(hashlib.md5(f.read()).hexdigest())
"""本次读取到的md5"""
self.newMd5List.append(md5)
if backups_default:
if os.path.exists(save_path + os.sep + "md5.log"):
with open(save_path + os.sep + "md5.log", mode="r") as m:
# 备份打开,如果有md5文件,则读取文件放入列表中
self.oldMd5List.append(m)
if path.split(os.sep)[-1] not in self.checkList:
self.dataQueue.put(path)
self.checkList.append(path.split(os.sep)[-1]) # 文件名.后缀名
else:
self.waitQueue.put(path)
findNum += 1
print(threading.current_thread().getName() + ":已找到" + str(findNum) + "个文件")
else:
lostNum += 1
print(threading.current_thread().getName() + ":已过滤" + str(lostNum) + "个文件")
self.fileTypeTmp.remove(w)
print("-------------------------------------------------------------")
print("共找到" + str(findNum) + "个文件,过滤了" + str(lostNum) + "个文件")
global thread_1
thread_1 = True # 需声明采集已经结束
print("结束线程:", threading.current_thread())
break # 找的速度跟不上取的速度会发生异常
except Exception as e:
error = "所在线程:%s发生错误:%s" % (threading.current_thread().getName(), e)
raise indexError(error)
"""Thread2负责复制或移动文件"""
class Thread2(threading.Thread):
def __init__(self, data_queue, save_path, file_type, default_move, begin, wait_queue, thread_error, num_queue):
threading.Thread.__init__(self)
self.dataQueue = data_queue
self.savePath = save_path
self.fileType = file_type
self.defaultMove = default_move
self.beginTime = begin
self.waitQueue = wait_queue
self.threadError = thread_error
self.numQueue = num_queue
# self.moveQueue = move_queue
self.num = 0
self.rename = 0
def run(self):
global file
size_all = 0 # 计算文件大小
while True:
try:
if self.dataQueue.empty() and thread_1 is True:
"""处理重名文件"""
if self.waitQueue != 0:
path = self.waitQueue.get()
print("正在处理重名文件" + path + '\n' + "剩余" + str(self.waitQueue.qsize()))
self.rename += 1
try:
if self.defaultMove is True:
shutil.move(path,
self.savePath + os.sep + path.split(os.sep)[-1] + "." + str(
uuid.uuid4()) + "." +
path.split(".")[-1])
# self.moveQueue.put(i)
self.num += 1
self.numQueue.put(self.num)
# self.numQueue.put(self.num)
print("已移动" + str(self.num) + "个文件:" + path)
else:
shutil.copyfile(path,
self.savePath + os.sep + path.split(os.sep)[-1] + "." + str(
uuid.uuid4()) + "." +
path.split(".")[-1])
# self.moveQueue.put(i)
self.num += 1
self.numQueue.put(self.num)
print("已复制" + str(self.num) + "个文件:" + path)
except Exception as e:
error = """
异常报告:
所在线程:%s
异常文件:%s
发现异常:%s
""" % (threading.current_thread().getName(), path, e)
self.threadError.append(error)
for L in self.fileType:
for w in os.listdir(self.savePath):
if w.endswith(L):
shutil.move(self.savePath + os.sep + w, self.savePath + os.sep + L)
print("正在处理类型文件:" + self.savePath + os.sep + w)
endTime = time.time()
if self.defaultMove is True:
mode = "移动"
else:
mode = "复制"
"""处理类型文件"""
sizeGB = "%.2f" % (size_all / 1024 / 1024 / 1024)
sizeMB = "%.2f" % (size_all / 1024 / 1024)
total = "%.2f" % (endTime - self.beginTime)
minute = float(total) / 60
if minute < 1:
minute = 0
print("-------------------------------------------------------------")
global result
result = ("已完成,共" + mode + "了" + str(self.num) + "个文件,处理了" + str(self.rename) + "个重名文件,发生了" + str(
len(self.threadError)) + "个异常,忽略了" + str(
lostNum) + "个文件,总大小为" + sizeGB + "GB," + sizeMB + "MB," + "耗时" + str(
math.floor(minute)) + "分" + str(total) + "秒")
print(result)
print("结束线程:", threading.current_thread())
break
elif not self.dataQueue.empty(): # 假如不为空
try:
"""如果dataQueue没有值,执行取值操作会卡死且不报错"""
file = self.dataQueue.get()
size_all += os.path.getsize(file)
if self.defaultMove is True:
shutil.move(file, self.savePath)
# self.moveQueue.put(file)
self.num += 1
self.numQueue.put(self.num)
print(threading.current_thread().getName() + ":已移动" + str(self.num) + "个文件:" + file)
else:
shutil.copy(file, self.savePath)
# self.moveQueue.put(file)
self.num += 1
self.numQueue.put(self.num)
print(threading.current_thread().getName() + ":已复制" + str(self.num) + "个文件:" + file)
except Exception as e:
error = """
异常报告:
所在线程:%s
异常文件:%s
发现异常:%s
""" % (threading.current_thread().getName(), file, e)
self.threadError.append(error)
except Exception as e:
error = "所在线程:%s发生错误:%s" % (threading.current_thread().getName(), e)
raise indexError(error)
# 未完善
# class Thread3(threading.Thread):
# def __init__(self, all_queue, sava_path):
# threading.Thread.__init__(self)
# self.allQueue = all_queue
# self.savePath = save_path
# self.md5List = []
#
# def run(self):
# while True:
# try:
# path = self.allQueue.get()
# if not os.path.exists(self.savePath + os.sep + "md5.log"):
# with open(path, mode="rb") as f:
# with open(self.savePath + os.sep + "md5.log", mode="a")as m:
# m.write(str(hashlib.md5(f.read()).hexdigest()))
# else:
# if backups_default:
# with open(self.savePath + os.sep + "md5.log", mode="r") as f:
# for l in f:
# self.md5List.append(l)
# if
# except Exception as e:
# error = "所在线程:%s发生错误:%s" % (threading.current_thread().getName(), e)
# raise indexError(error)
def run(work_path, save_path, file_type, move_default, backups_default, begin):
try:
"""尝试解决内存溢出"""
sys.setrecursionlimit(1000000)
"""检查输入路径默认"""
if work_path[-1] == os.sep:
work_path = work_path.replace(work_path[-1], "")
elif save_path[-1] == os.sep:
save_path = save_path.replace(save_path[-1], "")
"""创建时间文件夹"""
directory = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
os.mkdir(save_path + os.sep + str(directory))
save_path = save_path + os.sep + str(directory)
"""创建类型文件夹"""
for L in file_type:
if not os.path.exists(save_path + os.sep + L):
os.mkdir(save_path + os.sep + L)
"""创建队列和列表供全局使用"""
numQueue = queue.Queue()
dataQueue = queue.Queue()
md5_queue = queue.Queue
new_md5_list = []
# moveQueue = queue.Queue()
# waitList = []
"""
这里一定要使用队列存放重名文件而不是列表,否则会发生多次给同一个文件添加uuid值
同一个工作目录下测试结果:
正常:使用WaitQueue存放重名文件:已完成,共复制了81个文件,处理了1个重名文件,发生了0个异常,忽略了95010个文件,总大小为0.26GB,266.50MB,耗时0分31.52秒
异常:使用WaitList存放重名文件:已完成,共复制了525个文件,忽略了95010个文件,总大小为0.26GB,266.50MB,耗时1分118.15秒
"""
waitQueue = queue.Queue()
threadError = []
"""创建并启动线程传递相关参数"""
T1 = Thread1(work_path, file_type, save_path, dataQueue, waitQueue, numQueue, new_md5_list)
T2 = Thread2(dataQueue, save_path, file_type, move_default, begin, waitQueue, threadError, numQueue)
# T3 = Thread3(allQueue, save_path)
T1.start()
T2.start()
# T3.start()
"""逻辑堵塞主线程"""
while True:
if out is True:
print("程序异常将终止!")
exit()
if dataQueue.empty() is True and thread_1 is True:
T1.join()
T2.join()
try:
with open(save_path + os.sep + "md5.log", mode="a") as f:
for i in range(len(new_md5_list)):
f.write(new_md5_list + "\n")
except Exception as e:
error = "md5文件生成异常"
raise indexError(error)
try:
with open(save_path + os.sep + "report.log", mode="a") as f:
for e in threadError:
f.write("------错误报告------" + '\n' + e + '\n')
f.write("------其他信息------" + '\n' + result + '\n')
print("已生成日志:" + save_path + os.sep + "report.log")
except Exception as e:
print("生成日志失败:", e)
# T3.join()
print("程序退出", threading.current_thread())
break
except Exception as e:
error = "发生错误%s" % e
raise indexError(error)
if __name__ == '__main__':
print("""
请提供三个必要参数,一个可选参数
@work_path:程序工作路径
@save_path:程序的保存路径
@file_type:指定一个或多个筛选文件类型
@move_default:[可选参数]是否移动,默认为否
@backup_default:[可选参数]将上一次备份的数据进行md5计算,并在第二次备份时跳过这些被标记过的数据,从而提高效率,节硬盘空间。
exp:work_path, save_path, [file_type1, file_type2...], [True]
注意:Bate版本,请只用于测试。
""")
work_path = ""
save_path = ""
file_type = "[pdf,xlsx,md]"
move_default = False
# backups_default = True 未完善
data = work_path + "," + save_path + "," + file_type
# data = input("请按照格式输入参数:")
index(data) |
|