context.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. import time
  2. import logging
  3. from dataclasses import dataclass
  4. from typing import Optional
  5. from playwright.async_api import BrowserContext as PlaywrightBrowserContext, Page
  6. from .dom.dom_mutation_observer import handle_navigation_for_mutation_observer, dom_mutation_change_detected
  7. logger = logging.getLogger(__name__)
  8. @dataclass
  9. class PageInfo:
  10. """Information about the current page"""
  11. url: str # current url
  12. title: str # current title
  13. screenshot: Optional[str] = None # current screenshot
  14. @dataclass
  15. class BrowserContextOptions:
  16. home_page: str
  17. screenshots_dir: str
  18. screenshot_capture_enabled: bool
  19. class BrowserContext:
  20. def __init__(self, context: PlaywrightBrowserContext, options: BrowserContextOptions):
  21. self._context = context
  22. # the current page that is being operated on
  23. self._current_page = None
  24. self._home_page = options.home_page
  25. self._screenshots_dir = options.screenshots_dir
  26. self._screenshot_capture_enabled = options.screenshot_capture_enabled
  27. async def set_current_page(self, nano_tab_id: str) -> Page:
  28. # if there is a specific nano_tab_id, try to find the page with that id
  29. if nano_tab_id is not None:
  30. for page in self._context.pages:
  31. if not page.is_closed() and not page.url.startswith(("chrome-extension://", "chrome://", "edge://")):
  32. id = await page.evaluate("document.body.getAttribute('data-nano-tab-id')")
  33. logger.debug(f"\tPage ID: {id}, URL: {page.url}")
  34. if id == nano_tab_id:
  35. await self._setup_handlers(page)
  36. self._current_page = page
  37. return page
  38. self._current_page = None
  39. logger.warning(f"Page with nano_tab_id {nano_tab_id} not found")
  40. # if there is a current page, return it
  41. if self._current_page is not None:
  42. return self._current_page
  43. # if there is no current page, create a new one and goto home page
  44. page = await self._context.new_page()
  45. logger.debug(f"Creating new page: {page.url}")
  46. await page.goto(self._home_page)
  47. await page.bring_to_front()
  48. await self._setup_handlers(page)
  49. self._current_page = page
  50. return page
  51. async def get_current_page(self):
  52. if self._current_page is None:
  53. self._current_page = await self.set_current_page(None)
  54. return self._current_page
  55. async def get_current_page_info(self) -> PageInfo:
  56. page = await self.get_current_page()
  57. title = await page.title()
  58. url = page.url
  59. return PageInfo(url=url, title=title)
  60. async def _setup_handlers(self, page: Page):
  61. # Check if handler already exists using a custom attribute
  62. handler_exists = getattr(page, '_navigation_handler_added', False)
  63. if not handler_exists:
  64. # Add new handler only if it doesn't exist
  65. logger.debug(f"Adding navigation handler on page: {page.url}")
  66. page.on("domcontentloaded", handle_navigation_for_mutation_observer)
  67. # Mark that we've added the handler
  68. setattr(page, '_navigation_handler_added', True)
  69. else:
  70. logger.debug("Navigation handler already exists, skipping addition")
  71. # Only expose the function if it hasn't been exposed yet
  72. try:
  73. await page.expose_function("dom_mutation_change_detected", dom_mutation_change_detected)
  74. except Exception as e:
  75. # Ignore errors if function is already exposed
  76. if "already registered" not in str(e):
  77. # only log error for now
  78. logger.error(f"Error exposing function: {e}")
  79. logger.debug(f"Navigation handler setup complete for page: {page.url}")
  80. async def get_current_url(self):
  81. page = await self.get_current_page()
  82. return page.url
  83. async def highlight_element(self, selector: str, add_highlight: bool):
  84. try:
  85. page: Page = await self.get_current_page()
  86. if add_highlight:
  87. # Add the 'agente-ui-automation-highlight' class to the element. This class is used to apply the fading border.
  88. await page.eval_on_selector(selector, '''e => {
  89. let originalBorderStyle = e.style.border;
  90. e.classList.add('agente-ui-automation-highlight');
  91. e.addEventListener('animationend', () => {
  92. e.classList.remove('agente-ui-automation-highlight')
  93. });}''')
  94. logger.debug(f"Applied pulsating border to element with selector {selector} to indicate text entry operation")
  95. else:
  96. # Remove the 'agente-ui-automation-highlight' class from the element.
  97. await page.eval_on_selector(selector, "e => e.classList.remove('agente-ui-automation-highlight')")
  98. logger.debug(f"Removed pulsating border from element with selector {selector} after text entry operation")
  99. except Exception:
  100. # This is not significant enough to fail the operation
  101. pass
  102. async def take_screenshots(self, name: str, page: Page|None, full_page: bool = True, include_timestamp: bool = True,
  103. load_state: str = 'domcontentloaded', take_snapshot_timeout: int = 5*1000):
  104. if not self._screenshot_capture_enabled:
  105. return
  106. if page is None:
  107. page = await self.get_current_page()
  108. screenshot_name = name
  109. if include_timestamp:
  110. screenshot_name = f"{int(time.time_ns())}_{screenshot_name}"
  111. screenshot_name += ".png"
  112. screenshot_path = f"{self.get_screenshots_dir()}/{screenshot_name}"
  113. try:
  114. await page.wait_for_load_state(state=load_state, timeout=take_snapshot_timeout) # type: ignore
  115. await page.screenshot(path=screenshot_path, full_page=full_page, timeout=take_snapshot_timeout, caret="initial", scale="device")
  116. logger.debug(f"Screen shot saved to: {screenshot_path}")
  117. except Exception as e:
  118. logger.error(f"Failed to take screenshot and save to \"{screenshot_path}\". Error: {e}")
  119. async def close(self):
  120. try:
  121. if self._current_page is not None:
  122. # Wait for any pending operations to complete
  123. try:
  124. await self._current_page.wait_for_load_state('load', timeout=5000)
  125. except Exception:
  126. # Ignore timeout or other errors during wait
  127. pass
  128. current_page = self._current_page
  129. # Clear reference first
  130. self._current_page = None
  131. # Then close the page
  132. await current_page.close()
  133. # Handle context cleanup separately
  134. if self._context is not None:
  135. context = self._context
  136. self._context = None
  137. await context.close()
  138. except Exception as e:
  139. logger.error(f"Error while closing browser context: {e}")
  140. raise