Wrapper over youtube-dl for searching and batch downloading bilibili videos.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

209 lines
8.1 KiB

import copy
import csv
import functools
import os
import re
import time
import urllib
from collections import OrderedDict
from logging import warning
import natsort
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.common.keys import Keys
# assert "Python" in driver.title
# elem = driver.find_element_by_name("q")
# elem.clear()
# elem.send_keys("pycon")
# elem.send_keys(Keys.RETURN)
# assert "No results found." not in driver.page_source
# # driver.close()
from utils import retry_wrapper, slugify
class BrowserSearch(object):
_video_base_url = "https://www.bilibili.com/video/"
def __init__(self, query, search_mode='video', mode="chrome", headless=False):
self.driver = None
self.set_driver(mode, headless)
if not isinstance(query, str):
raise TypeError("Query is not string.")
if not query:
warning("Query string is empty!")
self.query = query
self.search_base_url = 'https://search.bilibili.com/' + search_mode + '?keyword='
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.driver is not None:
self.driver.close()
def set_driver(self, mode, headless):
if mode == "chrome":
options = webdriver.ChromeOptions()
options.add_argument('--js-flags=--noexpose_wasm')
if headless:
options.add_argument("--headless")
self.driver = webdriver.Chrome(options=options)
elif mode == "firefox":
raise NotImplementedError
else:
raise NotImplementedError
def init_search(self):
url = self.search_base_url + urllib.parse.quote(self.query)
self.driver.get(url)
if "搜索结果" not in self.driver.title:
raise ConnectionError("Error loading webpage")
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.video.matrix')))
def ayalyze_cur_page(self, secondary_keywords=None, exclude_keywords=None, prev_result=None):
if prev_result is None:
links_out = OrderedDict()
elif not isinstance(prev_result, dict):
raise TypeError("Result from previous run is not Dictionary.")
else:
links_out = copy.deepcopy(prev_result)
video_links = self.driver.find_elements_by_css_selector(".video.matrix")
for video in video_links:
a_elem = video.find_elements_by_class_name("title")
if len(a_elem) != 1:
warning("Incorrect number of <a> elements!")
title = a_elem[0].get_attribute("title")
if secondary_keywords is not None:
if all(keyword not in title for keyword in secondary_keywords):
continue
if exclude_keywords is not None:
if any(keyword in title for keyword in exclude_keywords):
continue
screenshot = video.screenshot_as_png
# innerText, innerHTML, textContent
id = video.find_elements_by_css_selector(".type.avid")[0].get_attribute("innerText")
assert re.search('^av[0-9]{5,8}\Z', id) is not None
v_link = BrowserSearch._video_base_url + id
description = video.find_elements_by_css_selector(".des.hide")[0].get_attribute("innerText")
duration = video.find_elements_by_css_selector(".so-imgTag_rb")[0].get_attribute("innerText")
upload_time = video.find_elements_by_css_selector(".so-icon.time")[0].get_attribute("innerText")
uploader = video.find_elements_by_class_name("up-name")[0].get_attribute("innerText")
links_out[id] = {
"id": id,
"title": title,
"link": v_link,
"description": description,
"duration": duration,
"upload_time": upload_time,
"uploader": uploader,
"screenshot": screenshot,
}
return links_out
def get_page_count(self):
last_page_link = self.driver.find_elements_by_css_selector(".page-item.last")
if len(last_page_link) > 0:
# Pages in the middle are hidden
page_count = int(last_page_link[0].get_attribute("innerText"))
else:
# Not hidden
page_links = self.driver.find_elements_by_css_selector(".page-item:not(.next):not(.prev)")
page_count = len(page_links)
return page_count
def nav_retry_wrapper(self, func, mode='refresh'):
try:
func()
except Exception as e:
if mode == 'refresh':
self.driver.refresh()
elif mode == 'back':
self.driver.back()
print(e)
raise e
def next_page(self):
next_page_link = self.driver.find_elements_by_css_selector(".page-item.next")
if len(next_page_link) > 0:
next_page_link[0].click()
else:
warning("No next page found!")
return
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.page-item')))
def prev_page(self):
prev_page_link = self.driver.find_elements_by_css_selector(".page-item.prev")
if len(prev_page_link) > 0:
prev_page_link[0].click()
else:
warning("No previous page found!")
return
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.page-item')))
def goto_page(self, page_num):
num_pages = self.driver.get_page_count()
assert page_num <= num_pages
url = "https://search.bilibili.com/video?keyword=" + urllib.parse.quote(self.query)
url += "&page=" + str(page_num)
self.driver.get(url)
# Wait until AJAX finish loading
wait = WebDriverWait(self.driver, timeout=20)
wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.page-item')))
if __name__ == "__main__":
output_dir = 'Collections/crawl_out/Pyxis'
query_str = "Pyxis的闪亮大作战!"
secondary_keywords = None
exclude_keywords = None
# Need to repeat switching between all and video, since each search will not return all results
# Didn't work
dict_out = None
# search_mode = {True: 'all', False: 'video'}
# all_mode = False
# for i in range(5):
with BrowserSearch(query_str, headless=True) as driver:
retry_wrapper(driver.init_search, max_retry=5, timeout=5)
num_pages = max(1, driver.get_page_count())
for j in range(num_pages):
dict_out = driver.ayalyze_cur_page(secondary_keywords=secondary_keywords, exclude_keywords=exclude_keywords,
prev_result=dict_out)
retry_wrapper(functools.partial(driver.nav_retry_wrapper, driver.next_page), max_retry=5, timeout=3)
# all_mode = not all_mode
# time.sleep(5)
list_out = natsort.natsorted(dict_out.values(), key=lambda v: v['title']) # Natural sort by title
# Write images
img_out_path = os.path.join(output_dir, 'img')
if not os.path.exists(img_out_path):
os.makedirs(img_out_path, exist_ok=True)
for v in list_out:
title = slugify(v["title"])
path = os.path.join(img_out_path, title + os.path.extsep + 'png')
with open(path, 'wb') as f:
f.write(v["screenshot"])
del v["screenshot"]
# Write csv
csv_path = os.path.join(output_dir, 'links.csv')
header_list = list(list_out[0].keys())
header_list[:] = [x for x in header_list if x != "description" and x != "id"]
with open(csv_path, 'w') as f:
writer = csv.DictWriter(f, fieldnames=header_list, extrasaction="ignore")
writer.writeheader()
writer.writerows(list_out)
print()