| |
| from __future__ import annotations |
|
|
| import html |
| import shlex |
| from typing import List, Optional |
|
|
| import requests |
| from bs4 import BeautifulSoup |
| from duckduckgo_search import DDGS |
|
|
| try: |
| import trafilatura |
| except Exception: |
| trafilatura = None |
|
|
|
|
| def _parse_search_args(args: str) -> dict: |
| """ |
| /search termo [--max N] [--news] [--safesearch off|moderate|strict] [--site dominio] |
| """ |
| tokens = shlex.split(args, posix=False) |
| out = { |
| "term": None, |
| "max": 10, |
| "news": False, |
| "safesearch": "off", |
| "site": None, |
| } |
| i = 0 |
| |
| while i < len(tokens): |
| t = tokens[i] |
| if not t.startswith("--") and out["term"] is None: |
| out["term"] = t |
| i += 1 |
| break |
| i += 1 |
|
|
| while i < len(tokens): |
| t = tokens[i] |
| if t == "--max" and i + 1 < len(tokens): |
| try: |
| out["max"] = int(tokens[i + 1]) |
| except: |
| pass |
| i += 2 |
| elif t == "--news": |
| out["news"] = True |
| i += 1 |
| elif t == "--safesearch" and i + 1 < len(tokens): |
| level = tokens[i + 1].lower() |
| if level in {"off", "moderate", "strict"}: |
| out["safesearch"] = level |
| i += 2 |
| elif t == "--site" and i + 1 < len(tokens): |
| out["site"] = tokens[i + 1] |
| i += 2 |
| else: |
| i += 1 |
| return out |
|
|
|
|
| async def handle_search(args: str, block: str) -> str: |
| """ |
| /search termo [--max N] [--news] [--safesearch off|moderate|strict] [--site dominio] |
| |
| Exemplos: |
| /search rtx 4060 --max 5 |
| /search "python venv windows" --safesearch moderate |
| /search chainlit --site github.com |
| /search OpenAI --news --max 3 |
| """ |
| cfg = _parse_search_args(args) |
| term = cfg["term"] |
| if not term: |
| return "Uso: /search <termo> [--max N] [--news] [--safesearch off|moderate|strict] [--site dominio]" |
|
|
| |
| if cfg["site"]: |
| term = f"site:{cfg['site']} {term}" |
|
|
| results: List[str] = [] |
| try: |
| with DDGS() as ddgs: |
| if cfg["news"]: |
| src = ddgs.news(term, max_results=cfg["max"], safesearch=cfg["safesearch"]) |
| else: |
| src = ddgs.text(term, max_results=cfg["max"], safesearch=cfg["safesearch"]) |
|
|
| for r in src or []: |
| title = (r.get("title") or "").strip() |
| href = r.get("href") or r.get("url") or "" |
| body = (r.get("body") or r.get("snippet") or "").strip() |
| if title: |
| title = title[:120] |
| if body: |
| body = body[:200] |
| item = [] |
| if title: |
| item.append(f"**{title}**") |
| if body: |
| item.append(body) |
| if href: |
| item.append(href) |
| if item: |
| results.append("\n".join(item)) |
|
|
| except Exception as e: |
| return f"💥 Erro na busca: {e}" |
|
|
| if not results: |
| return f"❌ Sem resultados para `{term}`." |
| return f"🔎 Resultados para `{html.escape(term)}`:\n\n" + "\n\n".join(results) |
|
|
|
|
| def _parse_scrape_args(args: str) -> dict: |
| """ |
| /scrape URL [--max N] [--headers] [--links] [--raw] |
| """ |
| tokens = shlex.split(args, posix=False) |
| out = {"url": None, "max": 2000, "headers": False, "links": False, "raw": False, "timeout": 30} |
| i = 0 |
| while i < len(tokens): |
| t = tokens[i] |
| if not t.startswith("--") and out["url"] is None: |
| out["url"] = t |
| i += 1 |
| break |
| i += 1 |
| while i < len(tokens): |
| t = tokens[i] |
| if t == "--max" and i + 1 < len(tokens): |
| try: |
| out["max"] = int(tokens[i + 1]) |
| except: |
| pass |
| i += 2 |
| elif t == "--headers": |
| out["headers"] = True |
| i += 1 |
| elif t == "--links": |
| out["links"] = True |
| i += 1 |
| elif t == "--raw": |
| out["raw"] = True |
| i += 1 |
| elif t == "--timeout" and i + 1 < len(tokens): |
| try: |
| out["timeout"] = int(tokens[i + 1]) |
| except: |
| pass |
| i += 2 |
| else: |
| i += 1 |
| return out |
|
|
|
|
| def _clean_with_trafilatura(html_text: str, url: Optional[str]) -> Optional[str]: |
| if trafilatura is None: |
| return None |
| try: |
| return trafilatura.extract( |
| html_text, |
| include_links=False, |
| include_comments=False, |
| include_tables=False, |
| url=url, |
| favor_precision=True, |
| include_formatting=False, |
| ) |
| except Exception: |
| return None |
|
|
|
|
| async def handle_scrape(args: str, block: str) -> str: |
| """ |
| /scrape URL [--max N] [--headers] [--links] [--raw] [--timeout S] |
| |
| Exemplos: |
| /scrape https://example.com |
| /scrape https://news.ycombinator.com/ --max 1200 --links |
| /scrape https://httpbin.org/html --headers --raw |
| """ |
| cfg = _parse_scrape_args(args) |
| url = cfg["url"] |
| if not url: |
| return "Uso: /scrape <URL> [--max N] [--headers] [--links] [--raw] [--timeout S]" |
|
|
| try: |
| r = requests.get(url, timeout=cfg["timeout"]) |
| r.raise_for_status() |
| except requests.exceptions.Timeout: |
| return f"⏳ Timeout após {cfg['timeout']}s em `{url}`" |
| except requests.exceptions.HTTPError as e: |
| code = getattr(e.response, "status_code", "N/A") |
| return f"❌ HTTP {code}: {e}" |
| except Exception as e: |
| return f"💥 Erro requisitando `{url}`: {e}" |
|
|
| |
| soup = BeautifulSoup(r.text, "html.parser") |
| title_tag = soup.find("title") |
| title = title_tag.get_text(strip=True) if title_tag else "(sem título)" |
|
|
| |
| if cfg["raw"]: |
| text = soup.get_text(" ", strip=True) |
| else: |
| extracted = _clean_with_trafilatura(r.text, url) |
| text = extracted if (extracted and extracted.strip()) else soup.get_text(" ", strip=True) |
|
|
| preview = text[: cfg["max"]] |
| extra = "" if len(text) <= cfg["max"] else "\n...(truncado)" |
|
|
| lines: List[str] = [f"🕷️ `{title}`", f"HTTP {r.status_code} • {len(r.text)} bytes (HTML)"] |
| if cfg["headers"]: |
| lines.append("**Headers de resposta:**") |
| for k, v in r.headers.items(): |
| lines.append(f"- {k}: {v}") |
| if cfg["links"]: |
| links = [] |
| for a in soup.find_all("a"): |
| href = a.get("href") |
| if href and href.strip(): |
| links.append(href.strip()) |
| if len(links) >= 20: |
| break |
| if links: |
| lines.append("**Links (até 20):**") |
| for l in links: |
| lines.append(f"- {l}") |
|
|
| lines.append("**Texto (preview):**") |
| lines.append(f"```\n{preview}\n```{extra}") |
| return "\n".join(lines) |
|
|