Commit d8ec1089 authored by wyykak's avatar wyykak

initial commit

Signed-off-by: wyykak's avatarwyykak <wyy_1414@126.com>
parent a3e7ca67
#!/usr/bin/python3
from http.client import CannotSendHeader
from logging.config import fileConfig
from multiprocessing.connection import wait
import pathlib
import urllib.request as R
import urllib.parse
import multiprocessing as MP
import hashlib
from enum import Enum
import os
import queue
import re
# CONFIG BEGIN
# baseURL = 'https://cdn02.moecube.com:444/koishipro/contents/'
baseURL = 'https://cdn02.moecube.com:444/ygopro-222DIY/contents/'
targetPath = './'
scanProcessCount = 2
downloadProcessCount = 4
isSSD = True
retryLimit = 3
downloadTimeout = 120
# regex filters
excludeFilters = []
includeFilters = []
# CONFIG END
class Cmd(Enum):
QUIT = 0
SCAN = 1
FETCH = 2
STORE = 3
class Reply(Enum):
SKIP = 0
NEED_UPDATE = 1
FETCH_DONE = 2
FETCH_FAILED = 3
STORE_DONE = 4
STORE_FAILED = 5
def GetMD5(data: bytes) -> str:
md5 = hashlib.md5()
md5.update(data)
return md5.hexdigest()
def FetchFile(url: str, retry: int, timeout: int, md5: str) -> tuple:
retryCount = retry
data = b''
while retryCount > 0:
retryCount -= 1
print(f'Downloader: Downloading {url}, attempt {retry - retryCount}')
try:
data = R.urlopen(url, timeout=timeout).read()
except:
print(f'Downloader: {url} failed to download')
continue
if len(md5) > 0 and GetMD5(data) != md5:
print(f'Downloader: {url} returned corrupted data')
continue
return (True, data)
return (False, data)
class WorkerProcess:
def __init__(self, recvQueue: MP.Queue, sendQueue: MP.Queue, pathList: list, md5List: list):
self.recvQueue = recvQueue
self.sendQueue = sendQueue
self.pathList = pathList
self.md5List = md5List
def Send(self, content):
self.sendQueue.put(content)
def Recv(self) -> tuple:
return self.recvQueue.get()
class ScanProcess(WorkerProcess):
def __init__(self, recvQueue: MP.Queue, sendQueue: MP.Queue, pathList: list, md5List: list):
WorkerProcess.__init__(self, recvQueue, sendQueue, pathList, md5List)
self.includeFilters = []
self.excludeFilters = []
for f in includeFilters:
self.includeFilters.append(re.compile(f))
for f in excludeFilters:
self.excludeFilters.append(re.compile(f))
def Filter(self, id: int) -> bool:
if len(self.includeFilters) > 0:
for f in self.includeFilters:
if f.search(self.pathList[id]) is None:
return False
if len(self.excludeFilters) > 0:
for f in self.excludeFilters:
if f.search(self.pathList[id]):
return False
return True
def Scan(self, id: int) -> bool:
if not self.Filter(id):
return False
fullPath = targetPath + self.pathList[id]
if not os.path.isfile(fullPath):
print(f'Scanner: {fullPath} does not exist')
return True
data = b''
try:
with open(fullPath, 'rb') as f:
data = f.read()
except:
return False
if GetMD5(data) != self.md5List[id]:
print(f'Scanner: {fullPath} has different MD5')
return True
return False
def Run(self):
while True:
cmd = self.Recv()
if cmd[0] == Cmd.QUIT:
return
elif cmd[0] == Cmd.SCAN:
if self.Scan(cmd[1]):
self.Send((Reply.NEED_UPDATE, cmd[1]))
else:
self.Send((Reply.SKIP, cmd[1]))
class DownloadProcess(WorkerProcess):
def __init__(self, recvQueue: MP.Queue, sendQueue: MP.Queue, pathList: list, md5List: list):
WorkerProcess.__init__(self, recvQueue, sendQueue, pathList, md5List)
self.fileCache = {}
def Fetch(self, id: int) -> bool:
fullURL = baseURL + urllib.parse.quote(self.pathList[id])
result, data = FetchFile(
fullURL, retryLimit, downloadTimeout, self.md5List[id])
if result:
self.fileCache[id] = data
return True
else:
return False
def Store(self, id: int) -> bool:
if id not in self.fileCache and not self.Fetch(id):
return False
fullPath = targetPath + self.pathList[id]
try:
os.makedirs(os.path.dirname(fullPath), exist_ok=True)
with open(fullPath, 'wb') as f:
f.write(self.fileCache[id])
except:
print(f'Downloader: {fullPath} failed to write')
return False
del self.fileCache[id]
return True
def Run(self):
while True:
cmd = self.Recv()
if cmd[0] == Cmd.QUIT:
return
elif cmd[0] == Cmd.FETCH:
if self.Fetch(cmd[1]):
self.Send(
(Reply.FETCH_DONE, cmd[1], len(self.fileCache[cmd[1]])))
else:
self.Send((Reply.FETCH_FAILED, cmd[1]))
elif cmd[0] == Cmd.STORE:
if self.Store(cmd[1]):
self.Send((Reply.STORE_DONE, cmd[1]))
else:
self.Send((Reply.STORE_FAILED, cmd[1]))
class ScanController:
def __init__(self, pathList: list, md5List: list):
self.sendQueue = MP.Queue()
self.recvQueue = MP.Queue()
self.pathList = pathList
self.md5List = md5List
self.process = MP.Process(target=lambda: ScanProcess(
self.sendQueue, self.recvQueue, self.pathList, self.md5List).Run())
self.process.start()
self.pending = set()
self.scanning = set()
self.skipped = set()
self.needUpdate = set()
def Scan(self, id: int):
self.pending.add(id)
def Update(self):
delta = set()
for id in self.pending:
try:
self.sendQueue.put_nowait((Cmd.SCAN, id))
except queue.Full:
break
delta.add(id)
self.pending -= delta
self.scanning |= delta
while True:
try:
reply = self.recvQueue.get_nowait()
except queue.Empty:
break
if reply[0] == Reply.SKIP:
self.skipped.add(reply[1])
else:
self.needUpdate.add(reply[1])
self.scanning.remove(reply[1])
def Stop(self):
self.sendQueue.put((Cmd.QUIT,))
self.process.join()
def Kill(self):
self.process.terminate()
class DownloadController:
def __init__(self, pathList: list, md5List: list, autoFlush: bool):
self.sendQueue = MP.Queue()
self.recvQueue = MP.Queue()
self.pathList = pathList
self.md5List = md5List
self.process = MP.Process(target=lambda: DownloadProcess(
self.sendQueue, self.recvQueue, self.pathList, self.md5List).Run())
self.process.start()
self.autoFlush = autoFlush
self.pending = set()
self.downloading = set()
self.downloaded = set()
self.flushing = set()
self.flushed = set()
self.failed = set()
def Download(self, id: int):
self.pending.add(id)
def Update(self):
delta = set()
if self.autoFlush:
for id in self.pending:
try:
self.sendQueue.put_nowait((Cmd.STORE, id))
except queue.Full:
break
delta.add(id)
self.pending -= delta
self.flushing |= delta
else:
for id in self.pending:
try:
self.sendQueue.put_nowait((Cmd.FETCH, id))
except queue.Full:
break
delta.add(id)
self.pending -= delta
self.downloading |= delta
while True:
try:
reply = self.recvQueue.get_nowait()
except queue.Empty:
break
if reply[0] == Reply.FETCH_DONE:
self.downloaded.add(reply[1])
self.downloading.remove(reply[1])
elif reply[0] == Reply.FETCH_FAILED:
self.failed.add(reply[1])
self.downloading.remove(reply[1])
elif reply[0] == Reply.STORE_DONE:
self.flushed.add(reply[1])
self.flushing.remove(reply[1])
elif reply[0] == Reply.STORE_FAILED:
self.failed.add(reply[1])
self.flushing.remove(reply[1])
def Flush(self):
self.autoFlush = True
self.pending |= self.downloaded
self.downloaded.clear()
def IsIdle(self) -> bool:
return len(self.pending) == 0 and len(self.downloading) == 0 and len(self.flushing) == 0 and len(self.downloaded) == 0
def Stop(self):
self.sendQueue.put((Cmd.QUIT,))
self.process.join()
def Kill(self):
self.process.terminate()
if __name__ == '__main__':
req = R.urlopen(baseURL + 'update/filelist.txt')
filelist = req.read().decode('utf-8-sig')
plist = []
mlist = []
for line in filelist.split('\n'):
info = line.split('\t')
if len(info) < 2:
continue
plist.append(info[0])
mlist.append(info[1])
fileCount = len(plist)
print(f'Coordinator: Filelist downloaded, file count {fileCount}', sep=' ')
if not isSSD:
scanProcessCount = 1
scanners = []
downloaders = []
try:
for i in range(scanProcessCount):
scanners.append(ScanController(plist, mlist))
print(f'Coordinator: Scan process {i} started')
for i in range(downloadProcessCount):
downloaders.append(DownloadController(plist, mlist, isSSD))
print(f'Coordinator: Download process {i} started')
finished = set()
failed = set()
scanned = 0
currentDownloader = 0
currentFlushingDownloader = 0
for id in range(fileCount):
scanners[id % scanProcessCount].Scan(id)
while len(finished) < fileCount:
for scanner in scanners:
scanner.Update()
for id in scanner.skipped:
finished.add(id)
scanned += 1
# print(f'Coordinator: {plist[id]} is latest')
scanner.skipped.clear()
for id in scanner.needUpdate:
scanned += 1
print(f'Coordinator: {plist[id]} needs update')
downloaders[currentDownloader].Download(id)
currentDownloader = (currentDownloader +
1) % downloadProcessCount
scanner.needUpdate.clear()
for downloader in downloaders:
downloader.Update()
for id in downloader.flushed:
finished.add(id)
print(f'Coordinator: {plist[id]} downloaded and saved')
downloader.flushed.clear()
for id in downloader.failed:
finished.add(id)
failed.add(id)
print(f'Coordinator: {plist[id]} download failed')
downloader.failed.clear()
if not isSSD and scanned == fileCount:
if downloaders[currentFlushingDownloader].IsIdle():
currentFlushingDownloader += 1
downloaders[currentFlushingDownloader].Flush()
for scanner in scanners:
scanner.Stop()
for downloader in downloaders:
downloader.Stop()
print('Coordinator: All worker processes stopped')
print('Coordinator: =====SUMMARY=====')
print(f'Coordinator: Update finished, file count {fileCount}')
print(f'Coordinator: {len(failed)} error(s) detected')
for id in failed:
print(f'Coordinator: {plist[id]} failed')
except:
for scanner in scanners:
scanner.Kill()
for downloader in downloaders:
downloader.Kill()
os._exit(1)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment