123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349 |
- # -*- coding: utf-8 -*-
- # @Author: privacy
- # @Date: 2022-08-03 18:07:19
- # @Last Modified by: privacy
- # @Last Modified time: 2022-08-16 11:03:30
- #
- # 每小时一百个网页
- # 解析 url title author publish_time content images
- #
- import time
- from pymongo import MongoClient
- client = MongoClient("192.168.1.200", 27017)
- collection = client['education']['response']
- from logger import LoggerHandler
- logger = LoggerHandler(name="education")
- logger.set_file_handler(filename="education.log")
- from selenium import webdriver
- from selenium.webdriver.chrome.service import Service
- from selenium.common.exceptions import TimeoutException
- from selenium.common.exceptions import NoSuchElementException
- from selenium.common.exceptions import StaleElementReferenceException
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.webdriver.support.wait import WebDriverWait
- from selenium.webdriver.common.by import By
- from selenium.webdriver.common.keys import Keys
- from selenium.webdriver.common.action_chains import ActionChains
- from gne import GeneralNewsExtractor
- class AutoSpider(object):
- """使用 selenium 在 bing 搜索引擎提取通用 body 文本数据"""
- def __init__(self):
- super(AutoSpider, self).__init__()
- # 服务
- service = Service(executable_path="chromedriver.exe")
- # 选项
- options = webdriver.ChromeOptions()
- # 无界面运行
- options.add_argument('--headless')
- # 以最高权限运行
- options.add_argument('--no-sandbox')
- # 配置代理
- # options.add_argument('proxy-server={}'.format(self.proxy_server))
- # 直接配置大小和set_window_size一样
- options.add_argument('--window-size={},{}'.format(1920, 1080))
- # 语言
- options.add_argument('--lang=en')
- # 禁用扩展
- options.add_argument('--disable-extensions')
- # options.add_argument('--disable-infobars')
- # 忽略证书错误
- options.add_argument('--ignore-certificate-errors')
- # 禁止通知
- options.add_argument('--disable-notifications')
- options.add_argument('--force-device-scale-factor=1')
- options.add_argument('--disable-dev-shm-usage')
- # 禁用浏览器侧导航
- options.add_argument('--disable-browser-side-navigation')
- # 不加载图片, 提升速度
- options.add_argument('blink-settings=imagesEnabled=false')
- # info:0 warning:1 error:2 fail:3
- options.add_argument('log-level=2')
- options.add_argument("--disable-blink-features=AutomationControlled")
- # 设置开发者模式启动,该模式下webdriver属性为正常值
- options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging'])
- options.add_experimental_option('useAutomationExtension', False)
- # 禁用浏览器弹窗
- prefs = {
- 'profile.default_content_setting_values': {
- 'notifications': 2
- }
- }
- options.add_experimental_option("prefs", prefs)
- self.driver = webdriver.Chrome(options=options, service=service)
- # 注入防检测脚本
- with open('./stealth.min.js') as fj:
- js = fj.read()
- self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": js})
- # 设置网页超时
- self.driver.set_page_load_timeout(60)
- self.driver.set_script_timeout(60)
- self.extractor = GeneralNewsExtractor()
- self.extract = self.extractor.extract
- def search_from_bing(self, search_query):
- """使用必应英文搜索"""
- try:
- self.driver.get('https://cn.bing.com/')
- except:
- self.driver.execute_script('window.stop()')
- logger.info("page timeout!")
- WebDriverWait(self.driver, 180, 5).until(EC.visibility_of_element_located((By.ID,'sb_form_q')))
- self.driver.find_element(By.ID, "sb_form_q").send_keys(search_query)
- time.sleep(3)
- self.driver.implicitly_wait(3)
- self.driver.find_element(By.ID, "sb_form_q").send_keys(Keys.ENTER)
- time.sleep(4)
- self.driver.implicitly_wait(3)
- WebDriverWait(self.driver, 180, 10).until(EC.visibility_of_element_located((By.ID,'est_en')))
- self.driver.find_element(By.ID, "est_en").click()
- time.sleep(3)
- self.driver.implicitly_wait(3)
- return True
-
- def get_next_page_form_bing(self):
- """必应点击下一页"""
- try:
- self.driver.refresh()
- time.sleep(3)
- self.driver.implicitly_wait(3)
- # 有下一页按钮
- if self.driver.find_element(By.CSS_SELECTOR, "div.sw_next").text:
- self.driver.find_element(By.CSS_SELECTOR, "div.sw_next").click()
- # 没有下一页按钮
- elif self.driver.find_element(By.XPATH, "//a[@class='sb_pagS sb_pagS_bp b_widePag sb_bp']/parent::li/following-sibling::*").text:
- self.driver.find_element(By.XPATH, "//a[@class='sb_pagS sb_pagS_bp b_widePag sb_bp']/parent::li/following-sibling::*").click()
- # 页面加载错误
- else:
- return False
- # 返回成功
- return True
- except Exception as e:
- logger.error(e)
- return False
- def click_title_from_list(self):
- """点击当前列表中的所有网页"""
- for epoch in range(5):
- # 遍历标题列表
- for index, item in enumerate(self.driver.find_elements(By.CSS_SELECTOR, "#b_results li.b_algo")):
- # 判断 url 是否已获取
- try:
- href = item.find_element(By.CSS_SELECTOR, "h2 a").get_attribute("href")
- # pdf 文件
- if href.endswith('pdf'):
- continue
- except StaleElementReferenceException as e:
- # DOM 树结构改变,尝试重新获取
- try:
- item = self.driver.find_element(By.XPATH, "//*[@id='b_results']//li[{}]".format(index))
- href = item.find_element(By.CSS_SELECTOR, "h2 a").get_attribute("href")
- except Exception as e:
- continue
- # 判重成功,跳过页面
- if collection.find_one({"url": href}):
- continue
- # 尝试打开网页
- try:
- time.sleep(10)
- # 句柄为 1,可进行点击操作
- if len(self.driver.window_handles) == 1:
- try:
- item.find_element(By.TAG_NAME, "h2").click()
- except Exception as e:
- try:
- element = item.find_element(By.TAG_NAME, "h2")
- self.driver.execute_script('arguments[0].click()', element)
- except Exception as e:
- logger.error(e)
- # 句柄不为 1,清理句柄
- else:
- # 遍历句柄
- for i in range(len(self.driver.window_handles) - 1):
- # 切换句柄
- self.driver.switch_to.window(self.driver.window_handles[1])
- # 关闭句柄
- self.driver.close()
- # 清理完毕,执行点击操作
- item.find_element(By.TAG_NAME, "h2").click()
- # 打开网页失败
- except Exception as e:
- logger.error(e)
- # 打开网页成功
- else:
- # 判断成功打开标签页
- if (len(self.driver.window_handles) == 2):
- # 切换标签页
- self.driver.switch_to.window(self.driver.window_handles[1])
- # 判断是否为文章
- if self.check_article():
- # 读文章
- self.parse_page_from_article()
- time.sleep(10)
- finally:
- # 返回原始页
- self.driver.switch_to.window(self.driver.window_handles[0])
- self.driver.implicitly_wait(5)
- # 列表读取完毕,打开下一页
- else:
- # 打开下一页失败,直接退出
- if not self.get_next_page_form_bing():
- break
- # 打开成功,全局等待加载
- self.driver.implicitly_wait(5)
- # 切换到第一句柄
- self.driver.switch_to.window(self.driver.window_handles[0])
- def parse_page_from_article(self):
- """解析网页body"""
- # 滚动到底层
- try:
- height = self.driver.execute_script('return document.body.scrollHeight')
- if height > 1080:
- self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight)')
- except TimeoutException as e:
- # 超时尝试停止加载
- try:
- self.driver.execute_script('window.stop ? window.stop() : document.execCommand("Stop");')
- except TimeoutException as e:
- logger.error('Timeout!')
- except Exception as e:
- logger.error(e)
- except Exception as e:
- self.driver.close()
- logger.error(e)
- return
- page_source = ""
- url = ""
- for x in range(3):
- try:
- # 读取网页源码
- if not page_source:
- page_source = self.driver.page_source
- # 解析网页 url
- if not url:
- url = self.driver.current_url
- except TimeoutException as e:
- logger.info('Timeout!')
- except Exception as e:
- logger.error(e)
- if page_source:
- try:
- result = self.extract(page_source)
- except Exception as e:
- result = {"title":"", "author":"", "publish_time":"", "content":"", "images":[]}
- result['page_source'] = page_source
- else:
- self.driver.close()
- return
- if url:
- result['url'] = url
- else:
- self.driver.close()
- return
- if not result['title']:
- # 解析网页标题
- try:
- result['title'] = self.driver.title
- except Exception as e:
- logger.error(e)
- # 元数据
- metadata = dict([])
- try:
- for meta in self.driver.find_elements(By.TAG_NAME, "meta"):
- try:
- if meta.get_attribute("name"):
- metadata[meta.get_attribute("name")] = meta.get_attribute("content")
- except:
- pass
- except Exception as e:
- result['metadata'] = []
- else:
- result['metadata'] = metadata
- if not result['content']:
- # 提取正文
- try:
- result['content'] = self.driver.find_element(By.XPATH, "//body").text
- except NoSuchElementException as e:
- self.driver.close()
- return
- except TimeoutException as e:
- self.driver.close()
- return
- except Exception as e:
- self.driver.close()
- logger.error(e)
- return
- result["crawl_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- result["used"] = 0
- # 存储
- try:
- logger.info(result)
- collection.insert_one(result)
- except Exception as e:
- logger.error(e)
- finally:
- self.driver.close()
- return
- def check_article(self):
- """判断网页是否为文章"""
- return True
- def start_crawl(self, query):
- """启动爬虫"""
- if self.search_from_bing(query):
- self.click_title_from_list()
- def close(self):
- """关闭爬虫"""
- self.driver.close()
- def main():
- # 关键词列表
- with open('querys.txt', 'r', encoding='utf-8') as fp:
- querys = fp.read()
- # 启动爬虫
- robot = AutoSpider()
- # 遍历关键词
- for query in querys.split('\n'):
- robot.start_crawl(query)
- # 关闭爬虫
- robot.close()
- if __name__ == '__main__':
- main()
|