Wrapper over youtube-dl for searching and batch downloading bilibili videos.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

342 lines
12 KiB

3 years ago
import copy
import csv
import functools
import os
import re
import time
import traceback
3 years ago
import urllib
import multiprocessing as mp
3 years ago
from collections import OrderedDict
from logging import warning
from queue import Empty
3 years ago
import natsort
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.keys import Keys
from utils import retry_wrapper, slugify
class BrowserSearch(object):
_video_base_url = "https://www.bilibili.com/video/"
def __init__(self, query, search_mode='video', mode="chrome", headless=False, queue=None, lock=None):
3 years ago
self.driver = None
self.set_driver(mode, headless)
if not isinstance(query, str):
raise TypeError("Query is not string.")
if not query:
warning("Query string is empty!")
self.query = query
self.search_base_url = 'https://search.bilibili.com/' + search_mode + '?keyword='
if lock is None:
raise RuntimeError("Lock object does not exist")
self.lock = lock
if queue is None:
raise RuntimeError("Queue object does not exist")
self.queue = queue
3 years ago
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.driver is not None:
self.driver.close()
def set_driver(self, mode, headless):
if mode == "chrome":
options = webdriver.ChromeOptions()
options.add_argument('--js-flags=--noexpose_wasm')
if headless:
options.add_argument("--headless")
self.driver = webdriver.Chrome(options=options)
elif mode == "firefox":
raise NotImplementedError
else:
raise NotImplementedError
def init_search(self):
url = self.search_base_url + urllib.parse.quote(self.query)
self.driver.get(url)
if "搜索结果" not in self.driver.title:
raise ConnectionError("Error loading webpage")
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.video-item.matrix')))
3 years ago
def ayalyze_cur_page(self, secondary_keywords=None, exclude_keywords=None):
video_links = self.driver.find_elements_by_css_selector(".video-item.matrix")
3 years ago
for video in video_links:
a_elem = video.find_elements_by_class_name("title")
if len(a_elem) != 1:
warning("Incorrect number of <a> elements!")
a_elem = a_elem[0]
title = a_elem.get_attribute("title")
3 years ago
if secondary_keywords is not None:
if all(keyword not in title for keyword in secondary_keywords):
continue
if exclude_keywords is not None:
if any(keyword in title for keyword in exclude_keywords):
continue
screenshot = video.screenshot_as_png
# innerText, innerHTML, textContent
link = a_elem.get_attribute('href')
id_match = re.search(
r'^(https:)?//www.bilibili.com/video/(?P<id>BV[0-9A-Za-z]{9,11})(\?from=search&seid=[0-9]{20})?', link)
if id_match is None:
raise RuntimeError("ID match not found!")
id = id_match.group('id')
3 years ago
v_link = BrowserSearch._video_base_url + id
description = video.find_elements_by_css_selector(".des.hide")[0].get_attribute("innerText")
duration = video.find_elements_by_css_selector(".so-imgTag_rb")[0].get_attribute("innerText")
upload_time = video.find_elements_by_css_selector(".so-icon.time")[0].get_attribute("innerText")
uploader = video.find_elements_by_class_name("up-name")[0].get_attribute("innerText")
# links_out[id] = {
out = {
3 years ago
"id": id,
"title": title,
"link": v_link,
"description": description,
"duration": duration,
"upload_time": upload_time,
"uploader": uploader,
"screenshot": screenshot,
"pages": 0,
3 years ago
}
with self.lock:
self.queue.put_nowait(out)
3 years ago
def get_page_count(self):
last_page_link = self.driver.find_elements_by_css_selector(".page-item.last")
if len(last_page_link) > 0:
# Pages in the middle are hidden
page_count = int(last_page_link[0].get_attribute("innerText"))
else:
# Not hidden
page_links = self.driver.find_elements_by_css_selector(".page-item:not(.next):not(.prev)")
page_count = len(page_links)
return page_count
def nav_retry_wrapper(self, func, mode='refresh'):
try:
func()
except Exception as e:
if mode == 'refresh':
self.driver.refresh()
elif mode == 'back':
self.driver.back()
print(e)
raise e
def next_page(self):
next_page_link = self.driver.find_elements_by_css_selector(".page-item.next")
if len(next_page_link) > 0:
next_page_link[0].click()
else:
warning("No next page found!")
return
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.page-item')))
def prev_page(self):
prev_page_link = self.driver.find_elements_by_css_selector(".page-item.prev")
if len(prev_page_link) > 0:
prev_page_link[0].click()
else:
warning("No previous page found!")
return
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.page-item')))
def goto_page(self, page_num):
num_pages = self.driver.get_page_count()
assert page_num <= num_pages
url = "https://search.bilibili.com/video?keyword=" + urllib.parse.quote(self.query)
url += "&page=" + str(page_num)
self.driver.get(url)
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.page-item')))
class SubPageSearch(object):
def __init__(self, mode="chrome", headless=False, queue=None, dict_out=None, lock=None):
self.driver = None
self.set_driver(mode, headless)
if queue is None:
raise RuntimeError("Queue object does not exist")
self.queue = queue
if dict_out is None:
raise RuntimeError("Dict object does not exist")
self.dict_out = dict_out
if lock is None:
raise RuntimeError("Lock object does not exist")
self.lock = lock
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.driver is not None:
self.driver.close()
def set_driver(self, mode, headless):
if mode == "chrome":
options = webdriver.ChromeOptions()
options.add_argument('--js-flags=--noexpose_wasm')
if headless:
options.add_argument("--headless")
self.driver = webdriver.Chrome(options=options)
elif mode == "firefox":
raise NotImplementedError
else:
raise NotImplementedError
def sub_page(self, main_status):
# Need to be wrapped in a while true loop
while True:
time.sleep(2)
with self.lock:
if main_status.value == 0 and self.queue.qsize() == 0:
return
try:
item = self.queue.get_nowait()
except Empty as e:
warning("Empty queue")
continue
3 years ago
retry_wrapper(functools.partial(self._sub_page_exec, item), max_retry=5, timeout=5)
def _sub_page_exec(self, item):
v_link = item['link']
self.driver.get(v_link)
if "bilibili" not in self.driver.title:
raise ConnectionError("Error loading webpage")
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.bilibili-player')))
multi_page_elem = self.driver.find_elements_by_css_selector('.multi-page')
if len(multi_page_elem) == 1:
cur_page_elem = multi_page_elem[0].find_elements_by_css_selector(".cur-page")
if len(cur_page_elem) != 1:
warning("Incorrect number of cur-page elements!")
text = cur_page_elem[0].get_attribute("innerText")
match_out = re.findall(r'([0-9]{1,3})/([0-9]{1,3})', text)
if len(match_out) != 1:
raise RuntimeError("Incorrect number of matches")
item["pages"] = int(match_out[0][1])
elif len(multi_page_elem) > 1:
warning("Incorrect number of multi-page elements!")
with self.lock:
self.dict_out[item["id"]] = item
def search_main(headless=False, queue=None, lock=None, status=None):
if status is None:
raise RuntimeError("Status object does not exist")
with BrowserSearch(query_str, headless=headless, queue=queue, lock=lock) as driver:
3 years ago
retry_wrapper(driver.init_search, max_retry=5, timeout=5)
for j in range(max(1, driver.get_page_count())):
driver.ayalyze_cur_page(secondary_keywords=secondary_keywords, exclude_keywords=exclude_keywords)
3 years ago
retry_wrapper(functools.partial(driver.nav_retry_wrapper, driver.next_page), max_retry=5, timeout=3)
# all_mode = not all_mode
# time.sleep(5)
with driver.lock:
status.value = 0
def search_sub(headless=False, queue=None, dict_out=None, lock=None, status=None):
if status is None:
raise RuntimeError("Status object does not exist")
with SubPageSearch(headless=headless, queue=queue, dict_out=dict_out, lock=lock) as sub:
sub.sub_page(status)
def success_handler():
# ret_code = result[0]
# url = result[1].url
# if ret_code == 0:
# url_dict[url]['status'] = SUCCESS
# report_status(url_dict)
print('success')
3 years ago
def error_handler(e):
# print('error')
# print(dir(e), "\n")
print("-->{}<--".format(e.__cause__))
traceback.print_exception(type(e), e, e.__traceback__)
if __name__ == "__main__":
num_of_processes = 2
output_dir = 'Collections/crawl_out/temp'
query_str = ""
secondary_keywords = None
exclude_keywords = None
headless = True
pool = mp.Pool(processes=num_of_processes)
mg = mp.Manager()
main_status = mg.Value('b', 1)
lock = mg.Lock()
queue = mg.Queue()
dict_out = mg.dict()
pool.apply_async(search_main, args=(headless, queue, lock, main_status,), error_callback=error_handler)
# search_main(queue)
# Visit sub-pages
for i in range(max(1, num_of_processes - 1)):
pool.apply_async(search_sub, args=(headless, queue, dict_out, lock, main_status,), error_callback=error_handler)
pool.close()
pool.join()
# Assemble output
dict_out = OrderedDict(dict_out)
3 years ago
list_out = natsort.natsorted(dict_out.values(), key=lambda v: v['title']) # Natural sort by title
# Write images
img_out_path = os.path.join(output_dir, 'img')
if not os.path.exists(img_out_path):
os.makedirs(img_out_path, exist_ok=True)
for v in list_out:
title = slugify(v["title"])
# Screenshot
3 years ago
path = os.path.join(img_out_path, title + os.path.extsep + 'png')
with open(path, 'wb') as f:
f.write(v["screenshot"])
del v["screenshot"]
# nP
if v["pages"] == 0:
v["p_start"] = ''
v["p_end"] = ''
del v["pages"]
else:
v["p_start"] = '1'
v["p_end"] = str(v["pages"])
del v["pages"]
3 years ago
# Write csv
csv_path = os.path.join(output_dir, 'links.csv')
header_list = list(list_out[0].keys())
header_list[:] = [x for x in header_list if x != "description" and x != "id"]
with open(csv_path, 'w') as f:
writer = csv.DictWriter(f, fieldnames=header_list, extrasaction="ignore")
writer.writeheader()
writer.writerows(list_out)
print()