You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
163 lines
4.8 KiB
163 lines
4.8 KiB
import copy
|
|
import csv
|
|
import functools
|
|
import os
|
|
import multiprocessing as mp
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from collections import OrderedDict
|
|
|
|
from bilibili import Bilibili
|
|
import utils
|
|
|
|
NOT_STARTED = 'Not Started'
|
|
IN_PROCESS = 'In Process'
|
|
SUCCESS = 'Successful'
|
|
|
|
MODE_BILIBILI = 'bilibili'
|
|
processor = {
|
|
MODE_BILIBILI: Bilibili,
|
|
}
|
|
|
|
|
|
def report_status(url_dict):
|
|
print("Status Report:")
|
|
for k, v in url_dict.items():
|
|
print('"' + k + '": ' + v['status'])
|
|
|
|
|
|
def get_urls(input_file):
|
|
if not os.path.exists(input_file):
|
|
raise FileNotFoundError("input url file not exist")
|
|
with open(input_file, 'r') as f:
|
|
reader = csv.DictReader(f)
|
|
url_list = [dict(d) for d in reader]
|
|
|
|
url_dict = OrderedDict()
|
|
url_dict_out = OrderedDict()
|
|
url_keys = sorted(list(url_list[0].keys()))
|
|
url_keys_out = list(url_list[0].keys())
|
|
assert url_keys[1] == 'link'
|
|
assert url_keys_out[1] == 'link'
|
|
for url_set in url_list:
|
|
key = url_set['link']
|
|
value = {k: url_set[k] for k in url_keys[1:]}
|
|
value_out = {k: url_set[k] for k in url_keys_out}
|
|
value['status'] = NOT_STARTED
|
|
url_dict[key] = value
|
|
url_dict_out[key] = value_out
|
|
|
|
return url_dict, url_dict_out
|
|
|
|
|
|
def success_handler(result, url_dict=None, url_dict_out=None):
|
|
ret_code = result[0]
|
|
url = result[1].url
|
|
if ret_code == 0:
|
|
url_dict[url]['status'] = SUCCESS
|
|
del url_dict_out[url]
|
|
report_status(url_dict)
|
|
|
|
|
|
def error_handler(e, url_dict=None, url_dict_out=None):
|
|
# print('error')
|
|
# print(dir(e), "\n")
|
|
sys.stdout.write("-->{}<--".format(e.__cause__))
|
|
traceback.print_exception(type(e), e, e.__traceback__)
|
|
sys.stdout.flush()
|
|
sys.stderr.flush()
|
|
|
|
|
|
def single_url_wrapper(mode, url, params, this_url_info):
|
|
# One url per process
|
|
params = copy.deepcopy(params)
|
|
params["url"] = url
|
|
acc = processor[mode](params, copy.deepcopy(this_url_info))
|
|
ret, res = acc.process_all()
|
|
if ret < 0:
|
|
raise RuntimeError("%s download failed" % url)
|
|
return ret, res
|
|
|
|
|
|
def close_pool(pool=None):
|
|
pool.close()
|
|
pool.terminate()
|
|
pool.join()
|
|
|
|
|
|
def write_new_csv(input_file_new, url_dict_out):
|
|
sys.stdout.write("Writing new csv file\n")
|
|
url_list_out = list(url_dict_out.values())
|
|
if len(url_list_out) == 0:
|
|
return
|
|
|
|
header_list = list(url_list_out[0].keys())
|
|
with open(input_file_new, 'w') as f:
|
|
writer = csv.DictWriter(f, fieldnames=header_list, extrasaction="ignore")
|
|
writer.writeheader()
|
|
writer.writerows(url_list_out)
|
|
|
|
|
|
def signal_handler(*args, mp_pool=None, url_dict=None, url_dict_out=None, input_file_new=""):
|
|
sys.stderr.write('\nStopping...')
|
|
|
|
if mp_pool is not None:
|
|
stoppool = threading.Thread(target=functools.partial(close_pool, pool=mp_pool))
|
|
stoppool.daemon = True
|
|
stoppool.start()
|
|
sys.stdout.write("Multiprocessing pool terminated\n")
|
|
sys.stdout.flush()
|
|
|
|
write_new_csv(input_file_new, url_dict_out)
|
|
print("Signal handler finished")
|
|
print("Received signal: ", signal.Signals(args[0]).name)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
mode = MODE_BILIBILI
|
|
num_of_processes = 1
|
|
input_dir = 'Collections'
|
|
output_dir = 'Collections/temp_output'
|
|
|
|
params = {"merge": True,
|
|
"debug": True,
|
|
"delete_flv": True,
|
|
"output_dir": output_dir,
|
|
# 'output': "%(title)s.%(ext)s",
|
|
}
|
|
|
|
input_file = os.path.join(input_dir, 'input_list_' + mode + '.csv')
|
|
input_file_new = os.path.join(input_dir, 'input_list_' + mode + '_new.csv')
|
|
|
|
url_dict, url_dict_out = get_urls(input_file)
|
|
# url_list = list(url_dict.keys())
|
|
|
|
on_error = functools.partial(error_handler, url_dict=url_dict, url_dict_out=url_dict_out)
|
|
on_success = functools.partial(success_handler, url_dict=url_dict, url_dict_out=url_dict_out)
|
|
|
|
pool = mp.Pool(processes=num_of_processes)
|
|
|
|
signal_func = functools.partial(signal_handler, mp_pool=pool,
|
|
url_dict=url_dict, url_dict_out=url_dict_out,
|
|
input_file_new=input_file_new)
|
|
|
|
for sig in (signal.SIGABRT, signal.SIGINT, signal.SIGTERM):
|
|
signal.signal(sig, signal_func)
|
|
|
|
# with open(os.path.join(output_dir, 'logout.log'), 'w') as sys.stdout:
|
|
# with open(os.path.join(output_dir, 'error.log'), 'w') as sys.stderr:
|
|
|
|
for k in url_dict.keys():
|
|
# single_url_wrapper(mode, k, params, url_dict[k])
|
|
url_dict[k]['status'] = IN_PROCESS
|
|
pool.apply_async(single_url_wrapper, args=(mode, k, params, url_dict[k]),
|
|
error_callback=on_error, callback=on_success)
|
|
time.sleep(1)
|
|
pool.close()
|
|
pool.join()
|
|
|
|
write_new_csv(input_file_new, url_dict_out)
|
|
report_status(url_dict)
|
|
|