# -*- coding: utf-8 -*- # @Author: privacy # @Date: 2022-08-03 18:07:19 # @Last Modified by: privacy # @Last Modified time: 2022-08-16 11:03:30 # # 每小时一百个网页 # 解析 url title author publish_time content images # import time from pymongo import MongoClient client = MongoClient("192.168.1.200", 27017) collection = client['education']['response'] from logger import LoggerHandler logger = LoggerHandler(name="education") logger.set_file_handler(filename="education.log") from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.common.exceptions import TimeoutException from selenium.common.exceptions import NoSuchElementException from selenium.common.exceptions import StaleElementReferenceException from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.action_chains import ActionChains from gne import GeneralNewsExtractor class AutoSpider(object): """使用 selenium 在 bing 搜索引擎提取通用 body 文本数据""" def __init__(self): super(AutoSpider, self).__init__() # 服务 service = Service(executable_path="chromedriver.exe") # 选项 options = webdriver.ChromeOptions() # 无界面运行 options.add_argument('--headless') # 以最高权限运行 options.add_argument('--no-sandbox') # 配置代理 # options.add_argument('proxy-server={}'.format(self.proxy_server)) # 直接配置大小和set_window_size一样 options.add_argument('--window-size={},{}'.format(1920, 1080)) # 语言 options.add_argument('--lang=en') # 禁用扩展 options.add_argument('--disable-extensions') # options.add_argument('--disable-infobars') # 忽略证书错误 options.add_argument('--ignore-certificate-errors') # 禁止通知 options.add_argument('--disable-notifications') options.add_argument('--force-device-scale-factor=1') options.add_argument('--disable-dev-shm-usage') # 禁用浏览器侧导航 options.add_argument('--disable-browser-side-navigation') # 不加载图片, 提升速度 options.add_argument('blink-settings=imagesEnabled=false') # info:0 warning:1 error:2 fail:3 options.add_argument('log-level=2') options.add_argument("--disable-blink-features=AutomationControlled") # 设置开发者模式启动,该模式下webdriver属性为正常值 options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging']) options.add_experimental_option('useAutomationExtension', False) # 禁用浏览器弹窗 prefs = { 'profile.default_content_setting_values': { 'notifications': 2 } } options.add_experimental_option("prefs", prefs) self.driver = webdriver.Chrome(options=options, service=service) # 注入防检测脚本 with open('./stealth.min.js') as fj: js = fj.read() self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": js}) # 设置网页超时 self.driver.set_page_load_timeout(60) self.driver.set_script_timeout(60) self.extractor = GeneralNewsExtractor() self.extract = self.extractor.extract def search_from_bing(self, search_query): """使用必应英文搜索""" try: self.driver.get('https://cn.bing.com/') except: self.driver.execute_script('window.stop()') logger.info("page timeout!") WebDriverWait(self.driver, 180, 5).until(EC.visibility_of_element_located((By.ID,'sb_form_q'))) self.driver.find_element(By.ID, "sb_form_q").send_keys(search_query) time.sleep(3) self.driver.implicitly_wait(3) self.driver.find_element(By.ID, "sb_form_q").send_keys(Keys.ENTER) time.sleep(4) self.driver.implicitly_wait(3) WebDriverWait(self.driver, 180, 10).until(EC.visibility_of_element_located((By.ID,'est_en'))) self.driver.find_element(By.ID, "est_en").click() time.sleep(3) self.driver.implicitly_wait(3) return True def get_next_page_form_bing(self): """必应点击下一页""" try: self.driver.refresh() time.sleep(3) self.driver.implicitly_wait(3) # 有下一页按钮 if self.driver.find_element(By.CSS_SELECTOR, "div.sw_next").text: self.driver.find_element(By.CSS_SELECTOR, "div.sw_next").click() # 没有下一页按钮 elif self.driver.find_element(By.XPATH, "//a[@class='sb_pagS sb_pagS_bp b_widePag sb_bp']/parent::li/following-sibling::*").text: self.driver.find_element(By.XPATH, "//a[@class='sb_pagS sb_pagS_bp b_widePag sb_bp']/parent::li/following-sibling::*").click() # 页面加载错误 else: return False # 返回成功 return True except Exception as e: logger.error(e) return False def click_title_from_list(self): """点击当前列表中的所有网页""" for epoch in range(5): # 遍历标题列表 for index, item in enumerate(self.driver.find_elements(By.CSS_SELECTOR, "#b_results li.b_algo")): # 判断 url 是否已获取 try: href = item.find_element(By.CSS_SELECTOR, "h2 a").get_attribute("href") # pdf 文件 if href.endswith('pdf'): continue except StaleElementReferenceException as e: # DOM 树结构改变,尝试重新获取 try: item = self.driver.find_element(By.XPATH, "//*[@id='b_results']//li[{}]".format(index)) href = item.find_element(By.CSS_SELECTOR, "h2 a").get_attribute("href") except Exception as e: continue # 判重成功,跳过页面 if collection.find_one({"url": href}): continue # 尝试打开网页 try: time.sleep(10) # 句柄为 1,可进行点击操作 if len(self.driver.window_handles) == 1: try: item.find_element(By.TAG_NAME, "h2").click() except Exception as e: try: element = item.find_element(By.TAG_NAME, "h2") self.driver.execute_script('arguments[0].click()', element) except Exception as e: logger.error(e) # 句柄不为 1,清理句柄 else: # 遍历句柄 for i in range(len(self.driver.window_handles) - 1): # 切换句柄 self.driver.switch_to.window(self.driver.window_handles[1]) # 关闭句柄 self.driver.close() # 清理完毕,执行点击操作 item.find_element(By.TAG_NAME, "h2").click() # 打开网页失败 except Exception as e: logger.error(e) # 打开网页成功 else: # 判断成功打开标签页 if (len(self.driver.window_handles) == 2): # 切换标签页 self.driver.switch_to.window(self.driver.window_handles[1]) # 判断是否为文章 if self.check_article(): # 读文章 self.parse_page_from_article() time.sleep(10) finally: # 返回原始页 self.driver.switch_to.window(self.driver.window_handles[0]) self.driver.implicitly_wait(5) # 列表读取完毕,打开下一页 else: # 打开下一页失败,直接退出 if not self.get_next_page_form_bing(): break # 打开成功,全局等待加载 self.driver.implicitly_wait(5) # 切换到第一句柄 self.driver.switch_to.window(self.driver.window_handles[0]) def parse_page_from_article(self): """解析网页body""" # 滚动到底层 try: height = self.driver.execute_script('return document.body.scrollHeight') if height > 1080: self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight)') except TimeoutException as e: # 超时尝试停止加载 try: self.driver.execute_script('window.stop ? window.stop() : document.execCommand("Stop");') except TimeoutException as e: logger.error('Timeout!') except Exception as e: logger.error(e) except Exception as e: self.driver.close() logger.error(e) return page_source = "" url = "" for x in range(3): try: # 读取网页源码 if not page_source: page_source = self.driver.page_source # 解析网页 url if not url: url = self.driver.current_url except TimeoutException as e: logger.info('Timeout!') except Exception as e: logger.error(e) if page_source: try: result = self.extract(page_source) except Exception as e: result = {"title":"", "author":"", "publish_time":"", "content":"", "images":[]} result['page_source'] = page_source else: self.driver.close() return if url: result['url'] = url else: self.driver.close() return if not result['title']: # 解析网页标题 try: result['title'] = self.driver.title except Exception as e: logger.error(e) # 元数据 metadata = dict([]) try: for meta in self.driver.find_elements(By.TAG_NAME, "meta"): try: if meta.get_attribute("name"): metadata[meta.get_attribute("name")] = meta.get_attribute("content") except: pass except Exception as e: result['metadata'] = [] else: result['metadata'] = metadata if not result['content']: # 提取正文 try: result['content'] = self.driver.find_element(By.XPATH, "//body").text except NoSuchElementException as e: self.driver.close() return except TimeoutException as e: self.driver.close() return except Exception as e: self.driver.close() logger.error(e) return result["crawl_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) result["used"] = 0 # 存储 try: logger.info(result) collection.insert_one(result) except Exception as e: logger.error(e) finally: self.driver.close() return def check_article(self): """判断网页是否为文章""" return True def start_crawl(self, query): """启动爬虫""" if self.search_from_bing(query): self.click_title_from_list() def close(self): """关闭爬虫""" self.driver.close() def main(): # 关键词列表 with open('querys.txt', 'r', encoding='utf-8') as fp: querys = fp.read() # 启动爬虫 robot = AutoSpider() # 遍历关键词 for query in querys.split('\n'): robot.start_crawl(query) # 关闭爬虫 robot.close() if __name__ == '__main__': main()