Wrapper over youtube-dl for searching and batch downloading bilibili videos.
4 years ago
import functools
import json
import logging
import os
import random
import re
import shutil
import sys
import time
import argparse
import ffmpeg
import natsort
import youtube_dl
import utils
from youtube_dl.utils import DownloadError, UnavailableVideoError, MaxDownloadsReached
'merge': True,
'delete_flv': False,
'max_retry': 5,
'output': "%(title)s-|||||||-%(id)s.%(ext)s",
'output_dir': os.getcwd(),
'debug': False,
"debug_tmp_dir_name": "", # Not Implemented
class MyLogger(object):
def __init__(self, debug=False):
self._debug = debug
def debug(self, msg):
if self._debug:
def warning(self, msg):
def error(self, msg):
print(msg, file=sys.stderr)
# TODO string replace \r
class Bilibili:
def __init__(self, params, extra_params=None):
self.url = params['url']
self.merge = params.get('merge', DEFAULT_VALUES['merge'])
self.delete_flv = params.get('delete_flv', DEFAULT_VALUES['delete_flv'])
self.max_retry = params.get('max_retry', DEFAULT_VALUES['max_retry'])
self.output_format = params.get('output', DEFAULT_VALUES['output'])
self.output_dir = params.get('output_dir', DEFAULT_VALUES['output_dir'])
self.debug = params.get('debug', DEFAULT_VALUES['debug'])
self.p_start = None
self.p_end = None
if not os.path.exists(self.output_dir) or not os.path.isdir(self.output_dir):
raise FileNotFoundError('Output path does not exist!')
self.options = {
'format': 'bestvideo+bestaudio/best',
'logger': MyLogger(self.debug),
'outtmpl': self.output_format,
# Deal with playlist
if extra_params is not None and extra_params.get('p_start', '') and extra_params.get('p_end', ''):
self.p_start = extra_params['p_start']
self.p_end = extra_params['p_end']
def process_all(self):
cwd = os.getcwd()
ret_code = 0
# Expand playlist
this_url_list = self.expand_playlist_urls(self.url)
for i in range(len(this_url_list)):
url = this_url_list[i]
if self.p_start is not None:
p_num = i + int(self.p_start)
print("Playlist Num: " + str(p_num))
p_num = 0
# Get into temp dir
output_dir = os.path.join(self.output_dir, 'tmp_' + utils.id_generator(8))
if os.path.exists(output_dir):
raise FileExistsError('tmp path already exists!')
# Download, merge
func = functools.partial(self.process_single, url, output_dir, p_num)
ret, res = utils.retry_wrapper(func, max_retry=self.max_retry)
# Get out of tmp folder
# Rename tmp folder
if ret == 0:
target_dir = os.path.join(self.output_dir, self.prepare_output_filename(res, p_num))
if os.path.exists(target_dir):
if not self.delete_flv:
shutil.copytree(output_dir, target_dir)
ret_code = -1
return ret_code, self
def process_single(self, url, output_dir, p_num):
ret, res = self.download(url)
if ret != 0:
raise RuntimeError("Download unsuccessful")
file_list = self.get_file_list(output_dir, res)
if self.merge:
ret = self.concat_videos(file_list, output_dir, res, p_num)
if ret[1] is not None:
raise RuntimeError("Convert/concat unsuccessful")
return res
def download(self, url):
ret = 0
with youtube_dl.YoutubeDL(self.options) as ydl:
res = ydl.extract_info(url, force_generic_extractor=ydl.params.get('force_generic_extractor', False))
except UnavailableVideoError:
print("Failed video URL: " + url)
ydl.report_error('unable to download video')
except MaxDownloadsReached:
print("Failed video URL: " + url)
ydl.to_screen('[info] Maximum number of downloaded files reached.')
except DownloadError as e:
print("Failed video URL: " + url)
raise e
except Exception as e:
print("Failed video URL: " + url)
raise e
if ydl.params.get('dump_single_json', False):
res['output_filename'] = ydl.prepare_filename(res)
ret = ydl._download_retcode
return ret, res
def get_file_list(self, output_dir, res):
ext = res.get('ext', 'flv')
file_list = [f for f in os.listdir(output_dir) if
os.path.isfile(os.path.join(output_dir, f)) and os.path.splitext(f)[-1] == os.path.extsep + ext]
# and os.path.splitext(f)[0].startswith(title)]
# Eliminate ' in filenames
for i in range(len(file_list)):
if "'" in file_list[i]:
old_file_path = os.path.join(output_dir, file_list[i])
file_list[i] = file_list[i].replace("'", '_')
new_file_path = os.path.join(output_dir, file_list[i])
shutil.move(old_file_path, new_file_path)
file_list = natsort.natsorted(file_list)
if len(file_list) == 0:
raise FileNotFoundError("Empty file list")
if self.debug:
return file_list
def concat_videos(self, file_list, output_dir, res, p_num):
# For ffmpeg concat demuxer, mp4 output
ret = 0
tmp_file = os.path.join(output_dir, 'temp_filelist.txt')
with open(tmp_file, 'w') as f:
for file in file_list:
# Deal with space in cmdline
f.write('file ' + "'" + file + "'" + '\n')
if self.debug:
with open(tmp_file, 'r') as f:
stream = ffmpeg.input(tmp_file, format='concat', safe=0)
stream = ffmpeg.output(stream, os.path.join(self.output_dir,
self.prepare_output_filename(res, p_num) + os.path.extsep + 'mp4'),
if self.debug:
print(ffmpeg.compile(stream, overwrite_output=True))
ret = ffmpeg.run(stream, overwrite_output=True)
return ret
def prepare_output_filename(self, res, count):
# title = utils.slugify(res['title'], allow_unicode=True, simple=True)
# title = res['title']
title = os.path.splitext(res['output_filename'])[0]
title = title.split('-|||||||-')[0] # Arbitrary split, since _part_n is in %id
title = utils.slugify(title, allow_unicode=True, simple=True)
if self.p_start is not None and self.p_end is not None:
if count < int(self.p_start) or count > int(self.p_end):
raise RuntimeError("Count number outside playlist range!")
filename = title + '_' + str(count)
filename = title
return filename
def expand_playlist_urls(self, url):
if self.p_start is None or self.p_end is None:
return [url]
_VALID_URL = r'https?://(?:www\.|bangumi\.|)bilibili\.(?:tv|com)/(?:video/av|anime/(?P<anime_id>\d+)/play#)(?P<id>\d+)(?:/?\?p=(?P<page>\d+))?'
mobj = re.match(_VALID_URL, url)
video_id = mobj.group('id')
anime_id = mobj.group('anime_id')
page = mobj.group('page') or 1
url_list = []
pos = mobj.regs[3]
# Get last position of video id, matching group 2
v_pos = mobj.regs[2]
base_url = url[:v_pos[1] + 1]
if not base_url:
raise RuntimeError("Regex matching failed")
for i in range(int(self.p_start), int(self.p_end) + 1):
# We know 'page' is matching group 3
if pos[0] == -1 or pos[1] == -1: # No p= part
new_url = base_url + '/?p=' + str(i)
else: # Has p= part
url_part1 = url[:pos[0]]
url_part2 = url[pos[1]:]
new_url = url_part1 + str(i) + url_part2
return url_list
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Download and merge single video from bilibili')
parser.add_argument('url', nargs='?', type=str, default="", help="url of the webpage")
parser.add_argument('-m', '--merge', action='store_true', help="Whether merge the resulting flv files")
parser.add_argument('-c', '--delete-flv', action='store_true', help="Whether delete intermediate flv files")
parser.add_argument('-o', '--output', type=str, default=DEFAULT_VALUES['output'], help="Output format")
parser.add_argument('-d', '--output-dir', type=str, default=DEFAULT_VALUES['output_dir'], help="Output directory")
parser.add_argument('--max-retry', type=int, default=DEFAULT_VALUES['max_retry'], help="How many times to retry if fails")
parser.add_argument('--debug', action='store_true', help="Debug Mode")
parser.add_argument('--debug_tmp_dir_name', type=str, default=DEFAULT_VALUES['debug_tmp_dir_name'], help="Fixed tmp dir name for debugging")
args = parser.parse_args()
params = {"url": args.url,
"merge": args.merge,
"output": args.output,
"output_dir": args.output_dir,
"delete_flv": args.delete_flv,
"max_retry": args.max_retry,
"debug": args.debug,
"debug_tmp_dir_name": args.debug_tmp_dir_name,
acc = Bilibili(params)