1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
import os
import logging
import queue
import threading
import time
import watchdog.observers as observers
import watchdog.events as events
import paramiko
logger = logging.getLogger(__name__)
SENTINEL = None
#获取CPU核数,以便设定进程池大小
def get_CPU_NumberOfCores():
conent=os.popen("wmic cpu get NumberOfCores").readlines()
CPU_NumberOfCores=0
for index in range(len(conent)):
if conent[index].strip()=="":
continue
elif conent[index].strip()=="NumberOfCores":
continue
else:
CPU_NumberOfCores=CPU_NumberOfCores+int(conent[index].strip())
return(str(CPU_NumberOfCores))
#将事件加入列表
class MyEventHandler(events.FileSystemEventHandler):
def on_any_event(self, event):
super(MyEventHandler, self).on_any_event(event)
queue.put(event)
def __init__(self, queue):
self.queue = queue
#根据不同的事件对文件做相应处理
def process(queue):
while True:
event = queue.get()
logger.info(event)
_current_file=(event.key)[1].replace(rsyncSrc,'/cygdrive/d/Debug_Log')
current_file=_current_file.replace('\\','/')
#创建或修改文件,则进行同步
if ((event.key)[0] == "created" or (event.key)[0] == "modified") and (event.key)[2] == False:
rsync_cmd='rsync -avz -R -d --port=873 --delete --progress "'+current_file+'" '+rsyncDes+' --password-file="/cygdrive/d/ServerCheck/rsync/pass.txt"'
os.popen(rsync_cmd)
#若删除本地文件,则删除NAS服务器中的相应文件
elif (event.key)[0] == "deleted":
#使用paramiko模块进行处理
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname='192.168.123.42', port=22, username='administrator', password='XXX')
stdin, stdout, stderr = client.exec_command('rm -rf "/volume1/Real-time'+current_file+'"')
client.close()
#若修改文件名,则删除旧文件,同步新文件
elif (event.key)[0] == "moved":
#删除旧文件
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname='192.168.123.42', port=22, username='administrator', password='Z900TE@Quality!!#')
stdin, stdout, stderr = client.exec_command('rm -rf "/volume1/Real-time'+current_file+'"')
client.close()
#获取新文件名,并进行同步
_new_file=(event.key)[2].replace(rsyncSrc,'/cygdrive/d/Debug_Log')
new_file=_new_file.replace('\\','/')
rsync_cmd='rsync -avz -R -d --port=873 --delete --progress "'+new_file+'" '+rsyncDes+' --password-file="/cygdrive/d/ServerCheck/rsync/pass.txt"'
os.popen(rsync_cmd)
#时间间隔7sec,防止内存和cpu过高
time.sleep(7)
if __name__ == '__main__':
rsyncSrc=r'D:\Debug_Log'
rsyncDes='rsync_backup@192.168.123.42::realtime'
#记录日志
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
#列表
queue = queue.Queue()
#获取cpu核数,用于定义进程池大小
num_workers = int(get_CPU_NumberOfCores())
#将列表中的事件取出,逐一进行处理
pool = [threading.Thread(target=process, args=(queue,)) for i in range(num_workers)]
for t in pool:
t.daemon = True
t.start()
#将文件变更的事件记录到列表中
event_handler = MyEventHandler(queue)
observer = observers.Observer()
observer.schedule(
event_handler,
path=rsyncSrc,
recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
|