123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518 |
- # -*- coding: utf-8 -*-
- # @Author: privacy
- # @Date: 2022-08-03 18:07:19
- # @Last Modified by: privacy
- # @Last Modified time: 2022-09-26 16:05:39
- #
- # 每小时一百个网页
- # 添加使用新闻内容解析功能
- # 解析 url title author publish_time content images
- # js 渲染, 添加每个节点的位置信息,提升判断文章内容方法
- # 已完成
- import time
- import re
- import platform
- import yaml
- from pymongo import MongoClient
- client = MongoClient("192.168.1.200", 27017)
- # collection = client['education']['response']
- collection = client['education']['test']
- from logger import LoggerHandler
- logger = LoggerHandler(name="education")
- logger.set_file_handler(filename="education.log")
- from selenium import webdriver
- from selenium.webdriver.chrome.service import Service
- from selenium.common.exceptions import TimeoutException
- from selenium.common.exceptions import NoSuchElementException
- from selenium.common.exceptions import StaleElementReferenceException
- from selenium.common.exceptions import UnexpectedAlertPresentExpection
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from selenium.webdriver.support.wait import WebDriverWait
- from selenium.webdriver.common.by import By
- from selenium.webdriver.common.keys import Keys
- from selenium.webdriver.common.action_chains import ActionChains
- from gne import GeneralNewsExtractor
- class AutoSpider(object):
- """使用 selenium 在 bing 搜索引擎提取通用 body 文本数据"""
- def __init__(self):
- super(AutoSpider, self).__init__()
- # 服务
- if (platform.system() == "Windows"):
- service = Service(executable_path="chromedriver.exe")
- else:
- service = Service(executable_path="chromedriver")
- # 选项
- options = webdriver.ChromeOptions()
- # 无界面运行
- options.add_argument('--headless')
- # 以最高权限运行
- options.add_argument('--no-sandbox')
- # 配置代理
- # options.add_argument('proxy-server={}'.format(self.proxy_server))
- # 直接配置大小和set_window_size一样
- options.add_argument('--window-size={},{}'.format(1920, 1080))
- # 语言
- options.add_argument('--lang=en')
- # 禁用扩展
- options.add_argument('--disable-extensions')
- # options.add_argument('--disable-infobars')
- # 忽略证书错误
- options.add_argument('--ignore-certificate-errors')
- # 禁止通知
- options.add_argument('--disable-notifications')
- options.add_argument('--force-device-scale-factor=1')
- options.add_argument('--disable-dev-shm-usage')
- # 禁用浏览器侧导航
- options.add_argument('--disable-browser-side-navigation')
- # 不加载图片, 提升速度
- options.add_argument('blink-settings=imagesEnabled=false')
- # info:0 warning:1 error:2 fail:3
- options.add_argument('log-level=2')
- options.add_argument("--disable-blink-features=AutomationControlled")
- # 设置开发者模式启动,该模式下webdriver属性为正常值
- options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging'])
- options.add_experimental_option('useAutomationExtension', False)
- # 禁用浏览器弹窗
- prefs = {
- 'profile.default_content_setting_values': {
- 'notifications': 2
- }
- }
- options.add_experimental_option("prefs", prefs)
- self.driver = webdriver.Chrome(options=options, service=service)
- # 注入防检测脚本
- with open('./stealth.min.js') as fj:
- js = fj.read()
- self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": js})
- # 设置网页超时
- self.driver.set_page_load_timeout(60)
- self.driver.set_script_timeout(60)
- self.extractor = GeneralNewsExtractor()
- self.extract = self.extractor.extract
- with open('render.js') as fr:
- self.render = fr.read()
- def search_from_bing(self, search_query):
- """使用必应英文搜索"""
- try:
- self.driver.get('https://cn.bing.com/')
- except:
- self.driver.execute_script('window.stop()')
- logger.info("page timeout!")
- WebDriverWait(self.driver, 180, 5).until(EC.visibility_of_element_located((By.ID,'sb_form_q')))
- self.driver.find_element(By.ID, "sb_form_q").send_keys(search_query + " (language:en)")
- time.sleep(3)
- self.driver.implicitly_wait(3)
- self.driver.find_element(By.ID, "sb_form_q").send_keys(Keys.ENTER)
- time.sleep(4)
- self.driver.implicitly_wait(3)
- WebDriverWait(self.driver, 180, 10).until(EC.visibility_of_element_located((By.ID,'est_en')))
- self.driver.find_element(By.ID, "est_en").click()
- time.sleep(3)
- self.driver.implicitly_wait(3)
- return True
- def search_from_google(self, search_query):
- try:
- self.driver.get("https://www.google.com")
- except:
- self.driver.execute_script('window.stop()')
- logger.info("page timeout")
- WebDriverWait(self.driver, 180, 5).until(EC.visibility_of_element_located((By.NAME, "q")))
- self.driver.find_element(By.NAME, "q").send_keys(search_query)
- time.sleep(3)
- self.driver.implicitly_wait(3)
- self.driver.find_element(By.NAME, "q").send_keys(Keys.ENTER)
- time.sleep(4)
- self.driver.implicitly_wait(3)
- return True
- def get_next_page_form_bing(self):
- """必应点击下一页"""
- try:
- self.driver.refresh()
- time.sleep(3)
- self.driver.implicitly_wait(3)
- # 有下一页按钮
- if self.driver.find_element(By.CSS_SELECTOR, "div.sw_next").text:
- self.driver.find_element(By.CSS_SELECTOR, "div.sw_next").click()
- # 没有下一页按钮
- elif self.driver.find_element(By.XPATH, "//a[@class='sb_pagS sb_pagS_bp b_widePag sb_bp']/parent::li/following-sibling::*").text:
- self.driver.find_element(By.XPATH, "//a[@class='sb_pagS sb_pagS_bp b_widePag sb_bp']/parent::li/following-sibling::*").click()
- # 页面加载错误
- else:
- return False
- # 返回成功
- return True
- except Exception as e:
- logger.error(e)
- return False
- def get_next_page_form_google(self):
- """谷歌点击下一页"""
- try:
- self.driver.refresh()
- time.sleep(3)
- self.driver.implicitly_wait(3)
- # 有下一页按钮
- if self.driver.find_element(By.ID, "pnnext").text:
- self.driver.find_element(By.ID, "pnnext").click()
- # 没有下一页按钮
- # elif self.driver.find_element(By.XPATH, "//a[@class='sb_pagS sb_pagS_bp b_widePag sb_bp']/parent::li/following-sibling::*").text:
- # self.driver.find_element(By.XPATH, "//a[@class='sb_pagS sb_pagS_bp b_widePag sb_bp']/parent::li/following-sibling::*").click()
- # 页面加载错误
- else:
- return False
- # 返回成功
- return True
- except Exception as e:
- logger.error(e)
- return False
- def click_title_from_bing(self, hotkey):
- """点击当前列表中的所有网页"""
- for epoch in range(1):
- # 遍历标题列表
- for index, item in enumerate(self.driver.find_elements(By.CSS_SELECTOR, "#b_results li.b_algo")):
- # 判断 url 是否已获取
- try:
- href = item.find_element(By.CSS_SELECTOR, "h2 a").get_attribute("href")
- # pdf 文件
- if href.endswith('pdf'):
- continue
- except StaleElementReferenceException as e:
- # DOM 树结构改变,尝试重新获取
- try:
- item = self.driver.find_element(By.XPATH, "//*[@id='b_results']//li[{}]".format(index+1))
- href = item.find_element(By.CSS_SELECTOR, "h2 a").get_attribute("href")
- except Exception as e:
- continue
- # 判重成功,跳过页面
- if collection.find_one({"url": href}):
- continue
- # 尝试打开网页
- try:
- time.sleep(10)
- # 句柄为 1,可进行点击操作
- if len(self.driver.window_handles) == 1:
- try:
- item.find_element(By.TAG_NAME, "h2").click()
- except Exception as e:
- try:
- element = item.find_element(By.TAG_NAME, "h2")
- self.driver.execute_script('arguments[0].click()', element)
- except Exception as e:
- logger.error(e)
- # 句柄不为 1,清理句柄
- else:
- # 遍历句柄
- for i in range(len(self.driver.window_handles) - 1):
- # 切换句柄
- self.driver.switch_to.window(self.driver.window_handles[1])
- # 关闭句柄
- self.driver.close()
- # 清理完毕,执行点击操作
- item.find_element(By.TAG_NAME, "h2").click()
- # 打开网页失败
- except Exception as e:
- logger.error(e)
- # 打开网页成功
- else:
- # 判断成功打开标签页
- if (len(self.driver.window_handles) == 2):
- # 切换标签页
- self.driver.switch_to.window(self.driver.window_handles[1])
- # 判断是否为文章
- if self.check_article():
- # 读文章
- self.parse_page_from_article(hotkey)
- time.sleep(10)
- finally:
- # 返回原始页
- self.driver.switch_to.window(self.driver.window_handles[0])
- self.driver.implicitly_wait(5)
- # 列表读取完毕,打开下一页
- # else:
- # # 打开下一页失败,直接退出
- # if not self.get_next_page_form_bing():
- # break
- # # 打开成功,全局等待加载
- # self.driver.implicitly_wait(5)
- # 切换到第一句柄
- self.driver.switch_to.window(self.driver.window_handles[0])
- def click_title_from_google(self, hotkey):
- """点击当前列表中的所有网页"""
- for epoch in range(10):
- # 遍历标题列表
- for index, item in enumerate(self.driver.find_elements(By.XPATH, '//*[@id="rso"]//div[@class="yuRUbf"]')):
- # 判断 url 是否已获取
- try:
- href = item.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
- # pdf 文件
- if href.endswith('pdf'):
- continue
- except StaleElementReferenceException:
- # DOM 树结构改变,尝试重新获取
- try:
- item = self.driver.find_elements(By.XPATH, '//*[@id="rso"]//div[@class="yuRUbf"]')[index]
- href = item.find_element(By.CSS_SELECTOR, "a").get_attribute("href")
- except Exception as e:
- continue
- # 判重成功,跳过页面
- if collection.find_one({"url": href}):
- continue
- # 尝试打开网页
- try:
- time.sleep(10)
- # 句柄为 1,可进行点击操作
- if len(self.driver.window_handles) == 1:
- try:
- self.driver.execute_script('window.open("' + item.find_element(By.XPATH, "a").get_attribute("href") + '","_blank");')
- except Exception as e:
- logger.error(e)
- # try:
- # element = item.find_element(By.TAG_NAME, "h2")
- # self.driver.execute_script('arguments[0].click()', element)
- # except Exception as e:
- # logger.error(e)
- # 句柄不为 1,清理句柄
- else:
- # 遍历句柄
- for i in range(len(self.driver.window_handles) - 1):
- # 切换句柄
- self.driver.switch_to.window(self.driver.window_handles[1])
- # 关闭句柄
- self.driver.close()
- # 清理完毕,执行点击操作
- self.driver.execute_script('window.open("' + item.find_element(By.XPATH, "a").get_attribute("href") + '","_blank");')
- # 打开网页失败
- except Exception as e:
- logger.error(e)
- # 打开网页成功
- else:
- # 判断成功打开标签页
- if (len(self.driver.window_handles) == 2):
- # 切换标签页
- self.driver.switch_to.window(self.driver.window_handles[1])
- # 判断是否为文章
- if self.check_article():
- # 读文章
- self.parse_page_from_article(hotkey)
- time.sleep(10)
- finally:
- # 返回原始页
- self.driver.switch_to.window(self.driver.window_handles[0])
- self.driver.implicitly_wait(5)
- # 列表读取完毕,打开下一页
- else:
- # 打开下一页失败,直接退出
- if not self.get_next_page_form_google():
- break
- # 打开成功,全局等待加载
- self.driver.implicitly_wait(5)
- # 切换到第一句柄
- self.driver.switch_to.window(self.driver.window_handles[0])
- def parse_page_from_article(self, hotkey):
- """解析网页body"""
- # 滚动到底层
- try:
- height = self.driver.execute_script('return document.body.scrollHeight')
- if height > 1080:
- self.driver.execute_script('window.scrollTo(0, document.body.scrollHeight)')
- except TimeoutException as e:
- # 超时尝试停止加载
- try:
- self.driver.execute_script('window.stop ? window.stop() : document.execCommand("Stop");')
- except TimeoutException as e:
- logger.error('Timeout!')
- except Exception as e:
- logger.error(e)
- except UnexpectedAlertPresentExpection as e:
- logger.error(e)
- return
- except Exception as e:
- self.driver.close()
- logger.error(e)
- return
- page_source = ""
- url = ""
- for x in range(3):
- try:
- # 读取网页源码
- if not page_source:
- page_source = self.driver.page_source
- # 解析网页 url
- if not url:
- url = self.driver.current_url
- except TimeoutException as e:
- logger.info('Timeout!')
- except Exception as e:
- logger.error(e)
- if page_source:
- visible = False
- try:
- self.driver.execute_script(self.render)
- visible = True
- except Exception as e:
- logger.error(e)
- try:
- #if visible:
- # result = self.extract(page_source, use_visiable_info=True)
- #else:
- # result = self.extract(page_source)
- result = self.extract(page_source)
- except Exception as e:
- result = {"title":"", "author":"", "publish_time":"", "content":"", "images":[]}
- result['page_source'] = page_source
- else:
- self.driver.close()
- return
- if url:
- result['url'] = url
- else:
- self.driver.close()
- return
- if not result['title']:
- # 解析网页标题
- try:
- result['title'] = self.driver.title
- except Exception as e:
- logger.error(e)
- # 元数据
- metadata = dict([])
- for meta in self.driver.find_elements(By.TAG_NAME, "meta"):
- try:
- if meta.get_attribute("name"):
- metadata[meta.get_attribute("name")] = meta.get_attribute("content")
- except Exception as e:
- pass
- result['metadata'] = metadata
- if not result['content']:
- # 提取正文
- try:
- result['content'] = self.driver.find_element(By.XPATH, "//body").text
- except NoSuchElementException as e:
- self.driver.close()
- return
- except TimeoutException as e:
- self.driver.close()
- return
- except Exception as e:
- self.driver.close()
- logger.error(e)
- return
- result["crawl_time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- if self.check_useful(result):
- result['used'] = 0
- else:
- result["used"] = 1
- result['hotkey'] = hotkey
- # 存储
- try:
- logger.info(result)
- # 再次查重
- if not collection.find_one({"url":result["url"]}):
- collection.insert_one(result)
- except Exception as e:
- logger.error(e)
- finally:
- self.driver.close()
- return
- def check_article(self):
- """判断网页是否为文章"""
- return True
- def check_useful(self, result):
- """判断网页是否可用"""
- if (not result['content']) or (not result['title']) or (len(result['content']) < 100) or re.search(r"[\u4e00-\u9fa5]", result['title']) or ("403" in result["title"]) or ("404" in result["title"]) or ("502" in result["title"]):
- return False
- else:
- return True
- def start_crawl(self, query, engine):
- """启动爬虫"""
- if (engine == 'bing') and self.search_from_bing(query):
- self.click_title_from_bing(query)
- elif (engine == 'google') self.search_from_google(query):
- self.click_title_from_google(query)
- def close(self):
- """关闭爬虫"""
- self.driver.close()
- def main():
- # 关键词列表
- # with open('querys.txt', 'r', encoding='utf-8') as fp:
- # querys = fp.read()
- with open("querys.yaml", "r", encoding="utf-8") as fp:
- querys = yaml.safe_load(fp.read())
- with open("temp","r",encoding="utf-8") as ft:
- tk = ft.read()
- if "Adult education" in tk:
- tk = None
- if not tk:
- start = True
- else:
- start = False
- # 启动爬虫
- robot = AutoSpider()
- # 遍历关键词
- for key in querys.keys():
- for cont in querys[key][0].keys():
- for query in querys[key][0][cont]:
- if start:
- robot.start_crawl(cont+" "+query, "bing")
- with open("temp","w",encoding="utf-8") as ft:
- ft.write(key+cont+query)
- elif (key+cont+query == tk):
- start = True
- robot.start_crawl(cont+" "+query, "bing")
- with open("temp","w",encoding="utf-8") as ft:
- ft.write(key+cont+query)
- else:
- continue
- # 关闭爬虫
- robot.close()
- if __name__ == '__main__':
- main()
|