""" 🌐 OpenCLAW Literary Agent — Autonomous 24/7 Orchestrator ========================================================== Manages all platforms, generates content, engages with communities, reflects on strategy, and self-improves continuously. Runs on HuggingFace Spaces (Gradio dashboard) + GitHub Actions. """ import json import logging import os import random import threading import time from datetime import datetime, timezone, timedelta # Setup logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", handlers=[logging.StreamHandler()] ) logger = logging.getLogger("agent") # Ensure state directory STATE_DIR = os.environ.get("STATE_DIR", "state") os.makedirs(STATE_DIR, exist_ok=True) os.environ["STATE_DIR"] = STATE_DIR from core import config from core.content_engine import ( research_post, novel_post, collaboration_post, fiction_research_bridge, agent_networking_post, engagement_reply, get_random_content ) from platforms import moltbook, telegram_bot, reddit_bot, email_bot from strategy.reflector import record_post, reflect, get_stats, get_current_strategy from research.arxiv_fetcher import get_papers # ── Agent State ────────────────────────────────────────────────────── STATE_FILE = os.path.join(STATE_DIR, "agent_state.json") def load_state() -> dict: try: if os.path.exists(STATE_FILE): with open(STATE_FILE) as f: return json.load(f) except Exception: pass return { "total_cycles": 0, "total_posts": 0, "total_engagements": 0, "last_post_time": None, "last_engagement_time": None, "last_reflection_time": None, "boot_time": datetime.now(timezone.utc).isoformat(), "log": [], } def save_state(state: dict): os.makedirs(STATE_DIR, exist_ok=True) with open(STATE_FILE, "w") as f: json.dump(state, f, indent=2, default=str) def log_action(state: dict, action: str, detail: str = ""): entry = { "time": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M"), "action": action, "detail": detail[:200], } state["log"].append(entry) state["log"] = state["log"][-100:] logger.info(f"[{action}] {detail[:120]}") # ── Platform Dispatcher ────────────────────────────────────────────── def publish_to_platform(platform: str, content_data: dict) -> bool: """Publish content to a specific platform.""" content = content_data.get("content", "") if not content: return False try: if platform == "moltbook" and moltbook.is_available(): result = moltbook.post(content) return "error" not in result elif platform == "telegram" and telegram_bot.is_available(): result = telegram_bot.post(content) return result.get("ok", False) elif platform == "reddit" and reddit_bot.is_available(): title = content_data.get("paper", content_data.get("novel", "OpenCLAW Research Update")) if len(title) > 300: title = title[:297] + "..." subreddits = ["artificial", "MachineLearning", "OpenAI", "singularity"] sub = random.choice(subreddits) result = reddit_bot.post(title, content, sub) return "error" not in result else: # Log content for platforms without API (twitter, fb, linkedin, etc) logger.info(f"[{platform}] Content generated (no API): {content[:80]}...") return True except Exception as e: logger.error(f"Publish to {platform} failed: {e}") return False # ── Autonomous Cycle ───────────────────────────────────────────────── def run_post_cycle(state: dict) -> str: """Generate and publish content to a random platform.""" platforms = ["moltbook", "telegram", "reddit", "twitter", "linkedin", "facebook", "chirper", "instagram", "agentarxiv"] # Weight platforms by availability available = [] for p in platforms: if p == "moltbook" and moltbook.is_available(): available.extend([p] * 3) # Higher weight elif p == "telegram" and telegram_bot.is_available(): available.extend([p] * 2) elif p == "reddit" and reddit_bot.is_available(): available.extend([p] * 2) else: available.append(p) platform = random.choice(available) if available else random.choice(platforms) try: content_data = get_random_content(platform) success = publish_to_platform(platform, content_data) record_post({ "platform": platform, "type": content_data.get("type", "unknown"), "content_preview": content_data.get("content", "")[:100], "success": success, }) state["total_posts"] += 1 state["last_post_time"] = datetime.now(timezone.utc).isoformat() detail = f"{platform} | {content_data.get('type', '?')} | {'✅' if success else '❌'}" log_action(state, "POST", detail) return detail except Exception as e: log_action(state, "POST_ERROR", str(e)) return f"Error: {e}" def run_engagement_cycle(state: dict) -> str: """Engage with other agents/posts on Moltbook.""" if not moltbook.is_available(): return "Moltbook not available" try: results = moltbook.engage_with_hot_posts(engagement_reply) count = len(results) state["total_engagements"] += count state["last_engagement_time"] = datetime.now(timezone.utc).isoformat() detail = f"Engaged with {count} posts" log_action(state, "ENGAGE", detail) return detail except Exception as e: log_action(state, "ENGAGE_ERROR", str(e)) return f"Error: {e}" def run_reflection_cycle(state: dict) -> str: """Perform strategic self-reflection.""" try: reflection = reflect() state["last_reflection_time"] = datetime.now(timezone.utc).isoformat() analysis = reflection.get("analysis", "")[:150] log_action(state, "REFLECT", analysis) return analysis except Exception as e: log_action(state, "REFLECT_ERROR", str(e)) return f"Error: {e}" # ── Main Autonomous Loop ───────────────────────────────────────────── def autonomous_loop(): """Run forever: post → engage → reflect → repeat.""" state = load_state() log_action(state, "BOOT", f"Agent starting. Previous cycles: {state['total_cycles']}") # Send boot notification try: email_bot.send_boot_notification() except Exception: pass # Pre-fetch papers try: papers = get_papers() log_action(state, "ARXIV", f"Loaded {len(papers)} papers") except Exception: pass save_state(state) cycle = 0 while True: try: cycle += 1 state["total_cycles"] += 1 now = datetime.now(timezone.utc) # ── POST (every cycle) ── post_result = run_post_cycle(state) logger.info(f"Cycle {cycle}: POST → {post_result}") # ── ENGAGE (every 2nd cycle) ── if cycle % 2 == 0: eng_result = run_engagement_cycle(state) logger.info(f"Cycle {cycle}: ENGAGE → {eng_result}") # ── REFLECT (every 6th cycle) ── if cycle % 6 == 0: ref_result = run_reflection_cycle(state) logger.info(f"Cycle {cycle}: REFLECT → {ref_result}") save_state(state) # Sleep between 3-6 hours (randomized) sleep_hours = random.uniform( config.POST_INTERVAL_MIN_HOURS, config.POST_INTERVAL_MAX_HOURS ) sleep_secs = int(sleep_hours * 3600) logger.info(f"💤 Sleeping {sleep_hours:.1f}h until next cycle...") time.sleep(sleep_secs) except Exception as e: logger.error(f"Cycle error: {e}") log_action(state, "ERROR", str(e)) save_state(state) time.sleep(300) # 5 min on error # ── CLI Mode (for GitHub Actions) ──────────────────────────────────── def run_once(): """Run a single cycle (for GitHub Actions / cron).""" state = load_state() # Pre-fetch papers try: papers = get_papers() log_action(state, "ARXIV", f"Loaded {len(papers)} papers") except Exception: pass state["total_cycles"] += 1 # Post to 2-3 platforms for _ in range(random.randint(2, 3)): run_post_cycle(state) time.sleep(5) # Engage run_engagement_cycle(state) # Reflect every 6th cycle if state["total_cycles"] % 6 == 0: run_reflection_cycle(state) save_state(state) # Print summary stats = get_stats() print(f"\n{'='*60}") print(f"🌐 OpenCLAW Literary Agent — Cycle #{state['total_cycles']}") print(f" Total posts: {state['total_posts']}") print(f" Total engagements: {state['total_engagements']}") print(f" Posts by platform: {json.dumps(stats.get('by_platform', {}))}") print(f" Posts by type: {json.dumps(stats.get('by_type', {}))}") print(f"{'='*60}\n") # ── Gradio Dashboard (for HF Spaces) ───────────────────────────────── def build_dashboard(): """Build Gradio dashboard for monitoring and control.""" try: import gradio as gr except ImportError: logger.warning("Gradio not installed, running CLI mode") return None def get_status(): state = load_state() stats = get_stats() strategy = get_current_strategy() papers = get_papers() lines = [ "# 🌐 OpenCLAW Literary Agent — Dashboard", "", f"**Status:** 🟢 ONLINE", f"**Boot time:** {state.get('boot_time', 'unknown')}", f"**Total cycles:** {state.get('total_cycles', 0)}", f"**Total posts:** {state.get('total_posts', 0)}", f"**Total engagements:** {state.get('total_engagements', 0)}", "", "## 📊 Posts by Platform", ] for plat, count in stats.get("by_platform", {}).items(): lines.append(f"- **{plat}**: {count}") lines.extend([ "", "## 📝 Posts by Type", ]) for typ, count in stats.get("by_type", {}).items(): lines.append(f"- **{typ}**: {count}") lines.extend([ "", f"## 📄 ArXiv Papers Loaded: {len(papers)}", ]) for p in papers[:5]: lines.append(f"- [{p.get('id','')}] {p.get('title','')[:60]}") lines.extend([ "", "## 📡 Platform Status", f"- Moltbook: {'🟢' if moltbook.is_available() else '🔴'}", f"- Telegram: {'🟢' if telegram_bot.is_available() else '🔴'}", f"- Reddit: {'🟢' if reddit_bot.is_available() else '🔴'}", f"- Email: {'🟢' if email_bot.is_available() else '🔴'}", f"- LLM (NVIDIA): {'🟢' if config.NVIDIA_KEYS else '🔴'}", f"- LLM (Groq): {'🟢' if config.GROQ_KEYS else '🔴'}", f"- LLM (Gemini): {'🟢' if config.GEMINI_KEYS else '🔴'}", ]) return "\n".join(lines) def get_recent_log(): state = load_state() log = state.get("log", []) if not log: return "No activity yet." lines = ["| Time | Action | Detail |", "|------|--------|--------|"] for entry in reversed(log[-25:]): lines.append(f"| {entry.get('time','')} | {entry.get('action','')} | {entry.get('detail','')[:80]} |") return "\n".join(lines) def trigger_post(): state = load_state() result = run_post_cycle(state) save_state(state) return f"✅ {result}\n\n{get_recent_log()}" def trigger_engage(): state = load_state() result = run_engagement_cycle(state) save_state(state) return f"✅ {result}\n\n{get_recent_log()}" def trigger_reflect(): state = load_state() result = run_reflection_cycle(state) save_state(state) return f"✅ {result}" with gr.Blocks(title="OpenCLAW Literary Agent", theme=gr.themes.Soft()) as app: gr.Markdown("# 🌐 OpenCLAW Literary Agent — Autonomous 24/7") gr.Markdown(f"**Author:** {config.AUTHOR_NAME} | [Wikipedia]({config.AUTHOR_WIKIPEDIA}) | [GitHub]({config.AUTHOR_GITHUB}) | [Scholar]({config.AUTHOR_SCHOLAR})") with gr.Row(): with gr.Column(scale=2): status_md = gr.Markdown(get_status()) refresh_btn = gr.Button("🔄 Refresh Status", variant="secondary") refresh_btn.click(fn=get_status, outputs=status_md) with gr.Column(scale=1): gr.Markdown("### ⚡ Manual Controls") post_btn = gr.Button("📝 Publish Now", variant="primary") engage_btn = gr.Button("💬 Engage Now", variant="primary") reflect_btn = gr.Button("🧠 Reflect Now", variant="secondary") manual_output = gr.Markdown("") post_btn.click(fn=trigger_post, outputs=manual_output) engage_btn.click(fn=trigger_engage, outputs=manual_output) reflect_btn.click(fn=trigger_reflect, outputs=manual_output) with gr.Row(): log_md = gr.Markdown(get_recent_log()) log_btn = gr.Button("🔄 Refresh Log") log_btn.click(fn=get_recent_log, outputs=log_md) return app # ── Entry Point ────────────────────────────────────────────────────── if __name__ == "__main__": import sys if len(sys.argv) > 1 and sys.argv[1] == "once": # CLI mode for GitHub Actions run_once() else: # Dashboard + background agent app = build_dashboard() if app: # Start autonomous loop in background bg = threading.Thread(target=autonomous_loop, daemon=True) bg.start() # Launch Gradio app.launch(server_name="0.0.0.0", server_port=7860, share=False) else: # No Gradio, run autonomous loop directly autonomous_loop()