Wrapper over youtube-dl for searching and batch downloading bilibili videos.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

350 lines
14 KiB

import functools
import json
import logging
import os
import random
import re
import shutil
import sys
import time
import argparse
import ffmpeg
import natsort
import youtube_dl
import utils
from youtube_dl.utils import DownloadError, UnavailableVideoError, MaxDownloadsReached
DEFAULT_VALUES = {
'merge': True,
'delete_flv': False,
'retries': 50,
# 'ratelimit': 1537 * 2 ** 10,
'ratelimit': float("inf"),
'max_retry': 50,
'output': "%(title)s-|||||||-%(id)s.%(ext)s",
'output_dir': os.getcwd(),
'debug': False,
"debug_tmp_dir_name": "", # Not Implemented
}
class MyLogger(object):
def __init__(self, debug=False):
self._debug = debug
def debug(self, msg):
if self._debug:
print(msg)
def warning(self, msg):
logging.warning(msg)
def error(self, msg):
print(msg, file=sys.stderr)
# TODO string replace \r
class Bilibili:
def __init__(self, params, extra_params=None):
self.url = params['url']
self.merge = params.get('merge', DEFAULT_VALUES['merge'])
self.delete_flv = params.get('delete_flv', DEFAULT_VALUES['delete_flv'])
self.retries = params.get('retries', DEFAULT_VALUES['retries'])
self.ratelimit = params.get('ratelimit', DEFAULT_VALUES['ratelimit'])
self.max_retry = params.get('max_retry', DEFAULT_VALUES['max_retry'])
self.output_format = params.get('output', DEFAULT_VALUES['output'])
self.output_dir = params.get('output_dir', DEFAULT_VALUES['output_dir'])
self.debug = params.get('debug', DEFAULT_VALUES['debug'])
self.p_start = None
self.p_end = None
if not os.path.exists(self.output_dir) or not os.path.isdir(self.output_dir):
raise FileNotFoundError('Output path does not exist!')
self.options = {
'format': 'bestvideo+bestaudio/best',
# 'format': '5+2',
'logger': MyLogger(self.debug),
'outtmpl': self.output_format,
'retries': self.retries,
'ratelimit': self.ratelimit,
# 'verbose': True
}
# Deal with playlist
if extra_params is not None and extra_params.get('p_start', '') and extra_params.get('p_end', ''):
self.p_start = extra_params['p_start']
self.p_end = extra_params['p_end']
def process_all(self):
cwd = os.getcwd()
ret_code = 0
# Expand playlist
this_url_list = self.expand_playlist_urls(self.url)
for i in range(len(this_url_list)):
url = this_url_list[i]
if self.p_start is not None:
p_num = i + int(self.p_start)
print("Playlist Num: " + str(p_num))
else:
p_num = 0
# Get into temp dir
output_dir = os.path.join(self.output_dir, 'tmp_' + utils.id_generator(8))
if os.path.exists(output_dir):
raise FileExistsError('tmp path already exists!')
os.mkdir(output_dir)
os.chdir(output_dir)
# Download, merge
func = functools.partial(self.process_single, url, '.', p_num)
ret, res = utils.retry_wrapper(func, max_retry=self.max_retry, timeout=30)
# Get out of tmp folder
os.chdir(cwd)
# Rename tmp folder
if ret == 0:
target_path = os.path.join(self.output_dir, self.prepare_output_filename(res, p_num))
if not res.get("file_exist", False):
if os.path.exists(target_path):
shutil.rmtree(target_path)
if not self.delete_flv:
shutil.copytree(output_dir, target_path)
else:
print(target_path, " already exists, skipping.")
else:
ret_code = -1
shutil.rmtree(output_dir)
return ret_code, self
def process_single(self, url, output_dir, p_num):
ret, res = self.download(url, p_num)
if ret == -2:
res["file_exist"] = True
return res # Skip existing file
elif ret != 0:
raise RuntimeError("Download unsuccessful")
file_list = self.get_file_list(output_dir, res)
if ".m4s" in [os.path.splitext(f)[1] for f in file_list] and len(file_list) != 1:
raise RuntimeError("Multiple unmerged m4s files!")
if self.merge:
ret = self.concat_videos(file_list, output_dir, res, p_num)
if ret[1] is not None:
raise RuntimeError("Convert/concat unsuccessful")
return res
def download(self, url, p_num):
ret = 0
with youtube_dl.YoutubeDL(self.options) as ydl:
try:
res = ydl.extract_info(
url, download=False,
force_generic_extractor=ydl.params.get('force_generic_extractor', False))
output_filename = ydl.prepare_filename(res)
output_filename, output_ext = os.path.splitext(output_filename)
if output_ext == '.m4s':
# Deal with mkv
output_ext = '.mkv'
else:
output_ext = '.mp4'
output_filename = output_filename + output_ext
res['output_filename'] = output_filename
output_file_abs_paths = [os.path.abspath(os.path.join(
'..', self.prepare_output_filename(res, p_num) + ext)) for ext in ('.mp4', '.mkv')]
for abs_path in output_file_abs_paths:
if os.path.exists(abs_path):
return -2, res
res = ydl.extract_info(url, force_generic_extractor=ydl.params.get('force_generic_extractor', False))
except UnavailableVideoError:
print("Failed video URL: " + url)
ydl.report_error('unable to download video')
except MaxDownloadsReached:
print("Failed video URL: " + url)
ydl.to_screen('[info] Maximum number of downloaded files reached.')
raise
except DownloadError as e:
print("Failed video URL: " + url)
raise e
except Exception as e:
print("Failed video URL: " + url)
raise e
else:
if ydl.params.get('dump_single_json', False):
ydl.to_stdout(json.dumps(res))
res['output_filename'] = output_filename
ret = ydl._download_retcode
return ret, res
def get_file_list(self, output_dir, res):
ext = res.get('ext', 'flv')
if ext == 'm4s':
file_list = [f for f in os.listdir(output_dir) if
os.path.isfile(os.path.join(output_dir, f)) and
os.path.splitext(f)[-1] == os.path.extsep + "m4s" or
os.path.splitext(f)[-1] == os.path.extsep + "mkv"]
else:
file_list = [f for f in os.listdir(output_dir) if
os.path.isfile(os.path.join(output_dir, f)) and
os.path.splitext(f)[-1] == os.path.extsep + ext]
# and os.path.splitext(f)[0].startswith(title)]
# Eliminate ' in filenames
for i in range(len(file_list)):
if "'" in file_list[i]:
old_file_path = os.path.join(output_dir, file_list[i])
file_list[i] = file_list[i].replace("'", '_')
new_file_path = os.path.join(output_dir, file_list[i])
shutil.move(old_file_path, new_file_path)
file_list = natsort.natsorted(file_list)
if len(file_list) == 0:
raise FileNotFoundError("Empty file list")
if self.debug:
print('\n'.join(file_list))
return file_list
def concat_videos(self, file_list, output_dir, res, p_num):
# For ffmpeg concat demuxer, mp4 output
ret = 0
exts = [os.path.splitext(f)[1] for f in file_list]
if res.get('ext') == 'm4s':
if ".mkv" in exts and ".m4s" not in exts:
# Copy mkv
for f in file_list:
new_dst = os.path.join('..', self.prepare_output_filename(res, p_num) + os.path.extsep + 'mkv')
if os.path.exists(new_dst):
os.remove(new_dst)
dst = shutil.move(f, new_dst)
return ret, None
elif ".m4s" in exts and ".mkv" not in exts:
tmp_file = os.path.join(output_dir, 'temp_filelist.txt')
with open(tmp_file, 'w') as f:
for file in file_list:
# Deal with space in cmdline
f.write('file ' + "'" + file + "'" + '\n')
if self.debug:
with open(tmp_file, 'r') as f:
print(''.join(f.readlines()))
stream = ffmpeg.input(tmp_file, format='concat', safe=0)
stream = ffmpeg.output(stream, os.path.join(
'..', self.prepare_output_filename(res, p_num) + os.path.extsep + 'mkv'), c='copy')
if self.debug:
print(ffmpeg.compile(stream, overwrite_output=True))
ret = ffmpeg.run(stream, overwrite_output=True)
os.remove(tmp_file)
return ret
else:
raise RuntimeError("m4s and mkv files exists at the same time!")
else:
tmp_file = os.path.join(output_dir, 'temp_filelist.txt')
with open(tmp_file, 'w') as f:
for file in file_list:
# Deal with space in cmdline
f.write('file ' + "'" + file + "'" + '\n')
if self.debug:
with open(tmp_file, 'r') as f:
print(''.join(f.readlines()))
stream = ffmpeg.input(tmp_file, format='concat', safe=0)
stream = ffmpeg.output(stream, os.path.join(
'..', self.prepare_output_filename(res, p_num) + os.path.extsep + 'mp4'), c='copy')
if self.debug:
print(ffmpeg.compile(stream, overwrite_output=True))
ret = ffmpeg.run(stream, overwrite_output=True)
os.remove(tmp_file)
return ret
def prepare_output_filename(self, res, count):
# title = utils.slugify(res['title'], allow_unicode=True, simple=True)
# title = res['title']
title = os.path.splitext(res['output_filename'])[0]
title = title.split('-|||||||-')[0] # Arbitrary split, since _part_n is in %id
title = utils.slugify(title, allow_unicode=True, simple=True)
if self.p_start is not None and self.p_end is not None:
if count < int(self.p_start) or count > int(self.p_end):
raise RuntimeError("Count number outside playlist range!")
filename = title + '_' + str(count)
else:
filename = title
return filename
def expand_playlist_urls(self, url):
if self.p_start is None or self.p_end is None:
return [url]
else:
_VALID_URL = r'''(?x)
https?://
(?:(?:www|bangumi)\.)?
bilibili\.(?:tv|com)/
(?:
(?:
video/[aA][vV]|
anime/(?P<anime_id>\d+)/play\#
)(?P<id_bv>\d+)|
video/[bB][vV](?P<id>[^/?#&]+)
)(?:/?\?p=(?P<page>\d+))?
'''
mobj = re.match(_VALID_URL, url)
video_id = mobj.group('id') or mobj.group('id_bv')
anime_id = mobj.group('anime_id')
page = mobj.group('page') or 1
url_list = []
pos = mobj.regs[4]
# Get last position of video id, matching group 2
v_pos = mobj.regs[3]
base_url = url[:v_pos[1] + 1]
if not base_url:
raise RuntimeError("Regex matching failed")
for i in range(int(self.p_start), int(self.p_end) + 1):
# We know 'page' is matching group 3
if pos[0] == -1 or pos[1] == -1: # No p= part
new_url = base_url + '?p=' + str(i)
else: # Has p= part
url_part1 = url[:pos[0]]
url_part2 = url[pos[1]:]
new_url = url_part1 + str(i) + url_part2
url_list.append(new_url)
print()
return url_list
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Download and merge single video from bilibili')
parser.add_argument('url', nargs='?', type=str, default="", help="url of the webpage")
parser.add_argument('-m', '--merge', action='store_true', help="Whether merge the resulting flv files")
parser.add_argument('-c', '--delete-flv', action='store_true', help="Whether delete intermediate flv files")
parser.add_argument('-o', '--output', type=str, default=DEFAULT_VALUES['output'], help="Output format")
parser.add_argument('-d', '--output-dir', type=str, default=DEFAULT_VALUES['output_dir'], help="Output directory")
parser.add_argument('--max-retry', type=int, default=DEFAULT_VALUES['max_retry'], help="How many times to retry if fails")
parser.add_argument('--debug', action='store_true', help="Debug Mode")
parser.add_argument('--debug_tmp_dir_name', type=str, default=DEFAULT_VALUES['debug_tmp_dir_name'], help="Fixed tmp dir name for debugging")
args = parser.parse_args()
params = {"url": args.url,
"merge": args.merge,
"output": args.output,
"output_dir": args.output_dir,
"delete_flv": args.delete_flv,
"max_retry": args.max_retry,
"debug": args.debug,
"debug_tmp_dir_name": args.debug_tmp_dir_name,
}
acc = Bilibili(params)
acc.process_all()