Wrapper over youtube-dl for searching and batch downloading bilibili videos.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

341 lines
12 KiB

import copy
import csv
import functools
import os
import re
import time
import traceback
import urllib
import multiprocessing as mp
from collections import OrderedDict
from logging import warning
from queue import Empty
import natsort
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.keys import Keys
from utils import retry_wrapper, slugify
class BrowserSearch(object):
_video_base_url = "https://www.bilibili.com/video/"
def __init__(self, query, search_mode='video', mode="chrome", headless=False, queue=None, lock=None):
self.driver = None
self.set_driver(mode, headless)
if not isinstance(query, str):
raise TypeError("Query is not string.")
if not query:
warning("Query string is empty!")
self.query = query
self.search_base_url = 'https://search.bilibili.com/' + search_mode + '?keyword='
if lock is None:
raise RuntimeError("Lock object does not exist")
self.lock = lock
if queue is None:
raise RuntimeError("Queue object does not exist")
self.queue = queue
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.driver is not None:
self.driver.close()
def set_driver(self, mode, headless):
if mode == "chrome":
options = webdriver.ChromeOptions()
options.add_argument('--js-flags=--noexpose_wasm')
if headless:
options.add_argument("--headless")
self.driver = webdriver.Chrome(options=options)
elif mode == "firefox":
raise NotImplementedError
else:
raise NotImplementedError
def init_search(self):
url = self.search_base_url + urllib.parse.quote(self.query)
self.driver.get(url)
if "搜索结果" not in self.driver.title:
raise ConnectionError("Error loading webpage")
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.video-item.matrix')))
def ayalyze_cur_page(self, secondary_keywords=None, exclude_keywords=None):
video_links = self.driver.find_elements_by_css_selector(".video-item.matrix")
for video in video_links:
a_elem = video.find_elements_by_class_name("title")
if len(a_elem) != 1:
warning("Incorrect number of <a> elements!")
a_elem = a_elem[0]
title = a_elem.get_attribute("title")
if secondary_keywords is not None:
if all(keyword not in title for keyword in secondary_keywords):
continue
if exclude_keywords is not None:
if any(keyword in title for keyword in exclude_keywords):
continue
screenshot = video.screenshot_as_png
# innerText, innerHTML, textContent
link = a_elem.get_attribute('href')
id_match = re.search(
r'^(https:)?//www.bilibili.com/video/(?P<id>BV[0-9A-Za-z]{9,11})(\?from=search&seid=[0-9]{20})?', link)
if id_match is None:
raise RuntimeError("ID match not found!")
id = id_match.group('id')
v_link = BrowserSearch._video_base_url + id
description = video.find_elements_by_css_selector(".des.hide")[0].get_attribute("innerText")
duration = video.find_elements_by_css_selector(".so-imgTag_rb")[0].get_attribute("innerText")
upload_time = video.find_elements_by_css_selector(".so-icon.time")[0].get_attribute("innerText")
uploader = video.find_elements_by_class_name("up-name")[0].get_attribute("innerText")
# links_out[id] = {
out = {
"id": id,
"title": title,
"link": v_link,
"description": description,
"duration": duration,
"upload_time": upload_time,
"uploader": uploader,
"screenshot": screenshot,
"pages": 0,
}
with self.lock:
self.queue.put_nowait(out)
def get_page_count(self):
last_page_link = self.driver.find_elements_by_css_selector(".page-item.last")
if len(last_page_link) > 0:
# Pages in the middle are hidden
page_count = int(last_page_link[0].get_attribute("innerText"))
else:
# Not hidden
page_links = self.driver.find_elements_by_css_selector(".page-item:not(.next):not(.prev)")
page_count = len(page_links)
return page_count
def nav_retry_wrapper(self, func, mode='refresh'):
try:
func()
except Exception as e:
if mode == 'refresh':
self.driver.refresh()
elif mode == 'back':
self.driver.back()
print(e)
raise e
def next_page(self):
next_page_link = self.driver.find_elements_by_css_selector(".page-item.next")
if len(next_page_link) > 0:
next_page_link[0].click()
else:
warning("No next page found!")
return
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.page-item')))
def prev_page(self):
prev_page_link = self.driver.find_elements_by_css_selector(".page-item.prev")
if len(prev_page_link) > 0:
prev_page_link[0].click()
else:
warning("No previous page found!")
return
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.page-item')))
def goto_page(self, page_num):
num_pages = self.driver.get_page_count()
assert page_num <= num_pages
url = "https://search.bilibili.com/video?keyword=" + urllib.parse.quote(self.query)
url += "&page=" + str(page_num)
self.driver.get(url)
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.page-item')))
class SubPageSearch(object):
def __init__(self, mode="chrome", headless=False, queue=None, dict_out=None, lock=None):
self.driver = None
self.set_driver(mode, headless)
if queue is None:
raise RuntimeError("Queue object does not exist")
self.queue = queue
if dict_out is None:
raise RuntimeError("Dict object does not exist")
self.dict_out = dict_out
if lock is None:
raise RuntimeError("Lock object does not exist")
self.lock = lock
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.driver is not None:
self.driver.close()
def set_driver(self, mode, headless):
if mode == "chrome":
options = webdriver.ChromeOptions()
options.add_argument('--js-flags=--noexpose_wasm')
if headless:
options.add_argument("--headless")
self.driver = webdriver.Chrome(options=options)
elif mode == "firefox":
raise NotImplementedError
else:
raise NotImplementedError
def sub_page(self, main_status):
# Need to be wrapped in a while true loop
while True:
time.sleep(2)
with self.lock:
if main_status.value == 0 and self.queue.qsize() == 0:
return
try:
item = self.queue.get_nowait()
except Empty as e:
warning("Empty queue")
continue
retry_wrapper(functools.partial(self._sub_page_exec, item), max_retry=5, timeout=5)
def _sub_page_exec(self, item):
v_link = item['link']
self.driver.get(v_link)
if "bilibili" not in self.driver.title:
raise ConnectionError("Error loading webpage")
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.bilibili-player')))
multi_page_elem = self.driver.find_elements_by_css_selector('.multi-page')
if len(multi_page_elem) == 1:
cur_page_elem = multi_page_elem[0].find_elements_by_css_selector(".cur-page")
if len(cur_page_elem) != 1:
warning("Incorrect number of cur-page elements!")
text = cur_page_elem[0].get_attribute("innerText")
match_out = re.findall(r'([0-9]{1,3})/([0-9]{1,3})', text)
if len(match_out) != 1:
raise RuntimeError("Incorrect number of matches")
item["pages"] = int(match_out[0][1])
elif len(multi_page_elem) > 1:
warning("Incorrect number of multi-page elements!")
with self.lock:
self.dict_out[item["id"]] = item
def search_main(headless=False, queue=None, lock=None, status=None):
if status is None:
raise RuntimeError("Status object does not exist")
with BrowserSearch(query_str, headless=headless, queue=queue, lock=lock) as driver:
retry_wrapper(driver.init_search, max_retry=5, timeout=5)
for j in range(max(1, driver.get_page_count())):
driver.ayalyze_cur_page(secondary_keywords=secondary_keywords, exclude_keywords=exclude_keywords)
retry_wrapper(functools.partial(driver.nav_retry_wrapper, driver.next_page), max_retry=5, timeout=3)
# all_mode = not all_mode
# time.sleep(5)
with driver.lock:
status.value = 0
def search_sub(headless=False, queue=None, dict_out=None, lock=None, status=None):
if status is None:
raise RuntimeError("Status object does not exist")
with SubPageSearch(headless=headless, queue=queue, dict_out=dict_out, lock=lock) as sub:
sub.sub_page(status)
def success_handler():
# ret_code = result[0]
# url = result[1].url
# if ret_code == 0:
# url_dict[url]['status'] = SUCCESS
# report_status(url_dict)
print('success')
def error_handler(e):
# print('error')
# print(dir(e), "\n")
print("-->{}<--".format(e.__cause__))
traceback.print_exception(type(e), e, e.__traceback__)
if __name__ == "__main__":
num_of_processes = 2
output_dir = 'Collections/crawl_out/temp'
query_str = ""
secondary_keywords = None
exclude_keywords = None
headless = True
pool = mp.Pool(processes=num_of_processes)
mg = mp.Manager()
main_status = mg.Value('b', 1)
lock = mg.Lock()
queue = mg.Queue()
dict_out = mg.dict()
pool.apply_async(search_main, args=(headless, queue, lock, main_status,), error_callback=error_handler)
# search_main(queue)
# Visit sub-pages
for i in range(max(1, num_of_processes - 1)):
pool.apply_async(search_sub, args=(headless, queue, dict_out, lock, main_status,), error_callback=error_handler)
pool.close()
pool.join()
# Assemble output
dict_out = OrderedDict(dict_out)
list_out = natsort.natsorted(dict_out.values(), key=lambda v: v['title']) # Natural sort by title
# Write images
img_out_path = os.path.join(output_dir, 'img')
if not os.path.exists(img_out_path):
os.makedirs(img_out_path, exist_ok=True)
for v in list_out:
title = slugify(v["title"])
# Screenshot
path = os.path.join(img_out_path, title + os.path.extsep + 'png')
with open(path, 'wb') as f:
f.write(v["screenshot"])
del v["screenshot"]
# nP
if v["pages"] == 0:
v["p_start"] = ''
v["p_end"] = ''
del v["pages"]
else:
v["p_start"] = '1'
v["p_end"] = str(v["pages"])
del v["pages"]
# Write csv
csv_path = os.path.join(output_dir, 'links.csv')
header_list = list(list_out[0].keys())
header_list[:] = [x for x in header_list if x != "description" and x != "id"]
with open(csv_path, 'w') as f:
writer = csv.DictWriter(f, fieldnames=header_list, extrasaction="ignore")
writer.writeheader()
writer.writerows(list_out)
print()