从头开始构建 OpenClaw , 不到 500 行代码理解 OpenClaw

  • dabit3
  • 发布于 17小时前
  • 阅读 54

本文深入浅出地介绍了如何从零开始构建一个类似OpenClaw的个人AI助手。OpenClaw是一个能够连接AI代理到多个消息应用,并赋予其与计算机交互能力的平台,它通过构建一个简单的Telegram机器人,逐步引入持久会话、个性化设置、工具集成、权限控制、多通道网关、上下文压缩、长期记忆、命令队列、定时任务和多代理等关键组件,最终实现了一个功能完善的个人AI助手。

图片

你本可以发明 OpenClaw, 代码在这里

让 OpenClaw 强大的地方出乎意料地简单:它是一个网关,将 AI 代理连接到你的消息应用程序,赋予它与你的计算机交互的工具,并让它记住你在对话中的身份。

复杂性来自于同时处理多个通道、管理持久会话、协调多个代理,并使整个系统足够可靠,可以在你的机器上 24/7 运行。

在这篇文章中,我将从头开始,逐步构建 OpenClaw 的架构,展示你如何能够从第一性原理出发,仅使用消息 API、LLM 和使 AI 真正 在聊天窗口之外 有用的愿望,自己发明它。

最终目标:了解持久 AI 助手如何工作,这样你就可以构建自己的助手。

首先,让我们确定问题

当你在浏览器中使用 ChatGPT 或 Claude 时,存在几个限制:

它是无状态的。每次对话都从零开始。它不知道你的名字、你的偏好、你昨天问了什么,或者你正在进行什么项目。你不断地重新解释上下文。

它是被动的。你去找它。它永远不会来找你。它无法在早上 7 点醒来并向你简要介绍你的日历、监控你的电子邮件或运行重复性任务。它只在你坐在它面前时才工作。

它是隔离的。它无法在你的机器上运行命令、为你浏览网络、控制你的应用程序或代表你发送消息。它生活在一个没有手的文本框中。

它是单通道的。你的真实生活发生在 WhatsApp、Telegram、Discord、Slack、iMessage 等,但 AI 生活在它自己单独的标签中。没有办法在你已经使用的应用程序中给它发短信,更不用说让它在所有这些界面上保持一个连续的记忆。

如果相反,你有一个 AI:

  • 生活在你已经使用的消息应用程序中 - 所有这些应用程序,具有共享内存

  • 记住你的偏好、你的项目以及你在会话中的过往对话

  • 可以在你的计算机上运行命令、浏览网络并控制真实的浏览器

  • 按计划醒来以处理重复性任务,而无需被要求

  • 在你自己硬件上运行 - 你的笔记本电脑、VPS、Mac Mini - 始终开启,在你的控制之下

这就是 OpenClaw 所做的。它不是一个聊天机器人 - 它是一个个人 AI 助手,具有持久的身份、工具以及在你使用的每个通道中的存在。

让我们从头开始构建一个。

最简单的 Bot

让我们从绝对最小值开始:一个响应 Telegram 消息的 AI。

import telebot

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN" # 获取你的 Telegram bot token
bot = telebot.TeleBot(TOKEN)

@bot.message_handler(func=lambda m: True)
def echo_all(message):
    bot.reply_to(message, message.text)

bot.infinity_polling()

运行它,在 Telegram 上发送消息,AI 会响应。简单。

但此版本基本上比 Claude Web 界面更糟糕。每条消息都是独立的。没有记忆。没有工具。没有人格。

如果我们赋予它记忆呢?

目标:持久会话

我们简单 Bot 的一个问题是无状态。每条消息都是一次全新的对话。问它“我之前说了什么?”,它一无所知。

解决方法是会话。为每个用户保留一个对话历史记录。

import telebot
import json

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
bot = telebot.TeleBot(TOKEN)
SESSIONS = {}

def load_session(user_id):
    # 加载会话
    if user_id not in SESSIONS:
        try:
            with open(f"session-{user_id}.jsonl", "r") as f:
                SESSIONS[user_id] = [json.loads(line) for line in f]
        except FileNotFoundError:
            SESSIONS[user_id] = []
    return SESSIONS[user_id]

def save_session(user_id):
    # 保存会话
    with open(f"session-{user_id}.jsonl", "w") as f:
        for item in SESSIONS[user_id]:
            f.write(json.dumps(item) + "\n")

@bot.message_handler(func=lambda m: True)
def handle_message(message):
    # 处理消息
    user_id = message.from_user.id
    session = load_session(user_id)
    session.append({"role": "user", "content": message.text})
    response = call_llm(session) # 替换为你的 LLM 调用
    session.append({"role": "assistant", "content": response})
    save_session(user_id)
    bot.reply_to(message, response)

bot.infinity_polling()

现在你可以进行实际的对话:

User: Hi
Bot: Hi
User: What did I say earlier?
Bot: You said hi.

关键的见解是 JSONL 格式。每一行都是一条消息。仅追加。如果进程在写入过程中崩溃,你最多会丢失一行。这正是 OpenClaw 用于会话记录的方式:

{"role": "user", "content": "Hi"}
{"role": "assistant", "content": "Hi"}
{"role": "user", "content": "What did I say earlier?"}
{"role": "assistant", "content": "You said hi."}

每个会话映射到一个文件。每个文件都是一个对话。重新启动进程,一切仍然存在。

但我们会遇到一个问题:对话会增长。最终它们将超过模型的上下文窗口。我们会回过头来解决这个问题。

目标:添加人格(SOUL.md

我们的 Bot 可以工作,但它没有人格。它是一个通用的 AI 助手。如果我们想让它成为某个人 呢?

OpenClaw 通过 SOUL.md 解决这个问题:一个 markdown 文件,用于定义代理的身份、行为和界限。

现在,你不是在与一个通用的助手交谈,而是在与 Jarvis 交谈。SOUL 作为系统提示注入到每个 API 调用中。

在 OpenClaw 中,SOUL.md 位于代理的工作区中:

它会在会话开始时加载并注入到系统提示中。你可以在其中写入任何你想要的内容。给代理一个起源故事。定义其核心哲学。列出其行为规则。

你的 SOUL 越具体,代理的行为就越一致。“有帮助”很模糊。“做你真正 想与之交谈的助手。必要时简洁,重要时详尽。不是企业无人机。不是马屁精。只是……好。” - 这给了模型一些可以使用的东西。

目标:添加工具

一个只能说话的 Bot 是有限的。如果它可以做事情 呢?

核心思想:给 AI 结构化的工具,让它决定何时使用它们。

import telebot
import json
import subprocess

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
bot = telebot.TeleBot(TOKEN)
SESSIONS = {}
TOOLS = [
    {
        "name": "run_command",
        "description": "Run a shell command and return the output",
        "parameters": {
            "command": {"type": "string", "description": "The command to run"}
        },
    }
]

def call_llm(session, tools=None):
    # 调用 LLM - 注意:你需要能够传递工具
    prompt = "\n".join([item['content'] for item in session])
    if tools:
        prompt += "\n\nAvailable tools:\n" + json.dumps(tools)
    return "pretend this is an LLM response to: " + prompt

def execute_tool(tool_name, parameters):
    # 执行特定工具
    if tool_name == "run_command":
        result = subprocess.run(parameters["command"], shell=True, capture_output=True, text=True)
        return result.stdout + "\n" + result.stderr
    return "Unknown tool"

@bot.message_handler(func=lambda m: True)
def handle_message(message):
    # 处理消息
    user_id = message.from_user.id
    session = load_session(user_id)
    session.append({"role": "user", "content": message.text})
    response = run_agent_turn(session, TOOLS) # 使用“代理循环”
    session.append({"role": "assistant", "content": response})
    save_session(user_id)
    bot.reply_to(message, response)

def run_agent_turn(session, tools):
    # 代理循环:调用 LLM,执行工具,然后重复
    llm_response = call_llm(session, tools)
    session.append({"role": "assistant", "content": llm_response})
    if "run_command" in llm_response: # 假设 LLM 请求一个工具
        tool_name = "run_command" # 在实际情况中,从 LLM 响应中解析出来
        parameters = {"command": "date"} # 再次,从 LLM 响应中解析出来
        tool_result = execute_tool(tool_name, parameters)
        session.append({"role": "tool", "content": tool_result})
        return call_llm(session, tools) # 循环
    return llm_response

bot.infinity_polling()

现在我们需要代理循环。当 AI 想要使用工具时,我们执行它并将结果反馈回去:

现在我们更新 handle_message 以使用代理循环,而不是直接调用 API:

现在你可以给你的 Bot 发短信:

User: What's the date
Bot: pretend this is an LLM response to: ... Available tools: [{"name": "run_command", "description": "Run a shell command and return the output", "parameters": {"command": {"type": "string", "description": "The command to run"}}}]

Mon Feb 12 23:03:19 PST 2024

AI 决定使用哪些工具,以什么顺序使用,并将结果综合成自然的响应。所有这些都通过 Telegram 消息完成。

OpenClaw 的生产工具目录更大 - 浏览器自动化、代理间消息传递、子代理生成等等 - 但每个工具都遵循完全相同的模式:一个模式、一个描述和一个执行函数。

目标:权限控制

我们正在从 Telegram 消息执行命令。这太可怕了。如果有人访问了你的 Telegram 帐户并告诉 Bot rm -rf / 怎么办?

我们需要一个权限系统。OpenClaw 的方法:一个批准允许列表,用于记住你已批准的内容。

我们添加这些助手以及我们现有的代码,然后更新 execute_tool 中的 run_command 案例以使用它们:

import telebot
import json
import subprocess

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
bot = telebot.TeleBot(TOKEN)
SESSIONS = {}
EXEC_APPROVALS = {} # 保存我们的批准
TOOLS = [...]

def load_session(user_id):
    # 加载会话
    if user_id not in SESSIONS:
        try:
            with open(f"session-{user_id}.jsonl", "r") as f:
                SESSIONS[user_id] = [json.loads(line) for line in f]
        except FileNotFoundError:
            SESSIONS[user_id] = []
    return SESSIONS[user_id]

def save_session(user_id):
    # 保存会话
    with open(f"session-{user_id}.jsonl", "w") as f:
        for item in SESSIONS[user_id]:
            f.write(json.dumps(item) + "\n")

def load_approvals():
    # 加载 exec-approvals.json
    global EXEC_APPROVALS
    try:
        with open("exec-approvals.json", "r") as f:
            EXEC_APPROVALS = json.load(f)
    except FileNotFoundError:
        EXEC_APPROVALS = {}

def save_approvals():
    # 保存 exec-approvals.json
    with open("exec-approvals.json", "w") as f:
        json.dump(EXEC_APPROVALS, f)

def is_command_safe(command):
    # 检查命令是否安全
    if "rm -rf" in command:
        return False
    return True

def is_command_approved(user_id, command):
    # 检查命令是否以前批准过
    load_approvals()
    return EXEC_APPROVALS.get(user_id, {}).get(command, False)

def approve_command(user_id, command):
    # 批准命令
    load_approvals()
    if user_id not in EXEC_APPROVALS:
        EXEC_APPROVALS[user_id] = {}
    EXEC_APPROVALS[user_id][command] = True
    save_approvals()

def execute_tool(tool_name, parameters, user_id=None):
    # 执行特定工具
    if tool_name == "run_command":
        command = parameters["command"]
        if is_command_safe(command) or is_command_approved(user_id, command):
            result = subprocess.run(command, shell=True, capture_output=True, text=True)
            return result.stdout + "\n" + result.stderr
        else:
            return "Permission denied"
    return "Unknown tool"

@bot.message_handler(func=lambda m: True)
def handle_message(message):
    # 处理消息
    user_id = message.from_user.id
    session = load_session(user_id)
    session.append({"role": "user", "content": message.text})
    response = run_agent_turn(session, TOOLS, user_id) # 传递 user_id
    session.append({"role": "assistant", "content": response})
    save_session(user_id)
    bot.reply_to(message, response)

def run_agent_turn(session, tools, user_id):
    # 代理循环:调用 LLM,执行工具,然后重复
    llm_response = call_llm(session, tools)
    session.append({"role": "assistant", "content": llm_response})
    if "run_command" in llm_response: # 假设 LLM 请求一个工具
        tool_name = "run_command" # 在实际情况中,从 LLM 响应中解析出来
        parameters = {"command": "date"} # 再次,从 LLM 响应中解析出来
        tool_result = execute_tool(tool_name, parameters, user_id) # 传递 user_id
        session.append({"role": "tool", "content": tool_result})
        return call_llm(session, tools) # 循环
    return llm_response

load_approvals()
bot.infinity_polling()

现在更新 execute_tool 中的 run_command 案例以在执行前检查权限:

def execute_tool(tool_name, parameters, user_id=None):
    # 执行特定工具
    if tool_name == "run_command":
        command = parameters["command"]
        if is_command_safe(command) or is_command_approved(user_id, command):
            result = subprocess.run(command, shell=True, capture_output=True, text=True)
            return result.stdout + "\n" + result.stderr
        else:
            return "Permission denied"
    return "Unknown tool"

当命令安全或以前批准时,它会立即运行。当命令不安全或以前未批准时,代理会收到“权限被拒绝”的通知,并且可以尝试其他方法。批准会持久保存在 exec-approvals.json 中,因此你永远不会为同一命令被要求两次。

OpenClaw 使用 glob 模式(一次批准 git *)和一个三层模型来扩展这一点:“询问”(提示用户)、“记录”(记录但允许)和“忽略”(自动允许)。

目标:网关

这里变得有趣了。到目前为止,我们有一个 Telegram Bot。但是如果你也想在 Discord 上使用 AI 呢?还有 WhatsApp?还有 Slack?

你可以为每个平台编写单独的 Bot。但是那样的话,你将会有单独的会话、单独的内存、单独的配置。 Telegram 上的 AI 将不知道你在 Discord 上讨论了什么。

解决方案:一个网关。一个管理所有通道的中心进程。

看看我们已经拥有的东西。我们的 run_agent_turn 函数不了解任何关于 Telegram 的信息。它接收消息并返回文本。这是关键 - 代理逻辑已经与通道分离。

为了证明这一点,让我们添加第二个接口。我们将在我们的 Telegram Bot 旁边添加一个简单的 HTTP API,两者都与同一个代理和同一个会话通信:

import telebot
import json
import subprocess
from flask import Flask, request

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
bot = telebot.TeleBot(TOKEN)
SESSIONS = {}
EXEC_APPROVALS = {}
TOOLS = [...]

app = Flask(__name__)

#(其余所有 Bot 代码)

@app.route('/chat', methods=['POST'])
def chat_endpoint():
    # HTTP 聊天端点
    user_id = request.json['user_id']
    message = request.json['message']
    session = load_session(user_id)
    session.append({"role": "user", "content": message})
    response = run_agent_turn(session, TOOLS, user_id)
    session.append({"role": "assistant", "content": response})
    save_session(user_id)
    return response

if __name__ == '__main__':
    #  使用线程同时运行 Flask 应用程序和 Telegram Bot
    import threading
    threading.Thread(target=app.run, kwargs={'port': 5000}).start()
    bot.infinity_polling()

试用一下:在 Telegram 上告诉 Bot 你的名字。然后使用相同的用户 ID(你的 Telegram 用户 ID)通过 HTTP 查询,以证明会话是共享的:

curl -X POST -H "Content-Type: application/json" -d '{"user_id": "12345", "message": "What's my name?"}' http://localhost:5000/chat

相同的代理,相同的会话,相同的内存。两个不同的接口。这就是网关模式。

下一步是使此配置驱动 - 一个 JSON 文件,用于指定要启动哪些通道以及如何对其进行身份验证。

这就是 OpenClaw 所做的:它的网关通过单个配置文件管理 Telegram、Discord、WhatsApp、Slack、Signal、iMessage 等。它还支持可配置的会话范围 - 每个用户、每个通道或单个共享会话 - 因此同一个人可以在不同通道获得统一的体验。我们将暂时保留我们简单的用户 ID 作为会话密钥的方法。

目标:上下文压缩

还记得我们之前标记的会话增长问题吗?在与你的 Bot 聊天数周后,会话文件有数千条消息。总 Token 数超过了模型的上下文窗口。现在怎么办?

解决方案:总结旧消息,保留最近的消息。在现有代码旁边添加这两个函数:

import telebot
import json
import subprocess
from flask import Flask, request

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
bot = telebot.TeleBot(TOKEN)
SESSIONS = {}
EXEC_APPROVALS = {}
TOOLS = [...]
MAX_SESSION_TOKENS = 1000 #  为测试而设定的较低值

app = Flask(__name__)

#(其余所有 Bot 代码)

def count_tokens(text):
    # 计算文本中的 Token 数 - 粗略的近似值
    return len(text.split())

def compact_session(session):
    # 总结旧消息,保留最近的消息
    summary = call_llm(session[:len(session)//2] + [{"role": "user", "content": "Summarize the above"}]) # 总结第一部分
    return [{"role": "system", "content": "Previous conversation summary: " + summary}] + session[len(session)//2:] # 连接摘要与其余部分

现在在 handle_message 的顶部添加压缩检查,就在加载会话之后:

@bot.message_handler(func=lambda m: True)
def handle_message(message):
    # 处理消息
    user_id = message.from_user.id
    session = load_session(user_id)
    if count_tokens("\n".join([item['content'] for item in session])) > MAX_SESSION_TOKENS:
        session = compact_session(session)
    session.append({"role": "user", "content": message.text})
    response = run_agent_turn(session, TOOLS, user_id)
    session.append({"role": "assistant", "content": response})
    save_session(user_id)
    bot.reply_to(message, response)

试用一下:为了在不聊天数小时的情况下测试压缩,暂时降低阈值:

MAX_SESSION_TOKENS = 100 # 为测试而设定的较低值

进行 10-15 条消息的对话,然后观看旧消息被摘要替换。Bot 仍然记得关键事实,但会话文件要小得多。

OpenClaw 的压缩更加复杂 - 它按 Token 数将消息分成块,分别总结每个块,并包括一个用于估计不准确的安全边际 - 但核心思想是相同的。

目标:长期记忆

会话历史记录为你提供对话记忆。但是当你重置会话或开始新会话时会发生什么?一切都消失了。

我们需要一个单独的记忆系统 - 持久的知识,可以在会话重置后幸存下来。方法:给代理工具来保存和搜索存储为文件的记忆。

将这两个工具添加到 TOOLS 列表中:

TOOLS = [
    {
        "name": "run_command",
        "description": "Run a shell command and return the output",
        "parameters": {
            "command": {"type": "string", "description": "The command to run"}
        },
    },
    {
        "name": "save_memory",
        "description": "Save a piece of information to memory",
        "parameters": {
            "key": {"type": "string", "description": "The key to save the information under"},
            "content": {"type": "string", "description": "The information to save"}
        },
    },
    {
        "name": "search_memory",
        "description": "Search for a piece of information in memory",
        "parameters": {
            "key": {"type": "string", "description": "The key to search for"}
        },
    }
]

将它们的案例添加到 execute_tool

import telebot
import json
import subprocess
from flask import Flask, request
import os # 添加此项

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
bot = telebot.TeleBot(TOKEN)
SESSIONS = {}
EXEC_APPROVALS = {}
TOOLS = [...]
MAX_SESSION_TOKENS = 100

app = Flask(__name__)

#(其余所有 Bot 代码)

MEMORY_DIR = "memory" # 用于存储记忆的目录

def execute_tool(tool_name, parameters, user_id=None):
    # 执行一个特定的工具
    if tool_name == "run_command":
        command = parameters["command"]
        if is_command_safe(command) or is_command_approved(user_id, command):
            result = subprocess.run(command, shell=True, capture_output=True, text=True)
            return result.stdout + "\n" + result.stderr
        else:
            return "Permission denied"
    elif tool_name == "save_memory":
        key = parameters["key"]
        content = parameters["content"]
        os.makedirs(MEMORY_DIR, exist_ok=True) # 确保目录存在
        with open(os.path.join(MEMORY_DIR, key + ".txt"), "w") as f:
            f.write(content)
        return "Memory saved"
    elif tool_name == "search_memory":
        key = parameters["key"]
        try:
            with open(os.path.join(MEMORY_DIR, key + ".txt"), "r") as f:
                return f.read()
        except FileNotFoundError:
            return "Memory not found"
    return "Unknown tool"

最后,更新 SOUL,以便代理知道记忆:

I have two special tools: save_memory and search_memory. Use them to remember facts across sessions.

试用一下:

User: save_memory key=my_name content=Bob
Bot: Memory saved
User: <reset the session by deleting session-YOUR_USER_ID.jsonl>
User: search_memory key=my_name
Bot: Bob

记忆会一直存在,因为它们存储在文件中,而不是在会话中。重置会话,重新启动 Bot - 记忆仍然存在。

OpenClaw 的生产记忆使用带有嵌入的向量搜索进行语义匹配(因此“auth bug”匹配“authentication issues”),但是我们的关键字搜索非常适合入门。

目标:命令队列

这是一个微妙但至关重要的问题:当两条消息同时到达时会发生什么?

假设你在 Telegram 上发送“检查我的日历”,并通过 HTTP API 同时发送“天气怎么样”。两者都尝试加载相同的会话,两者都尝试附加到会话,并且你得到损坏的数据。

解决方案很简单:每个会话锁定。每个会话一次只处理一条消息。不同的会话仍然可以并行运行。

现在使用锁包装 handle_message 的主体:

import telebot
import json
import subprocess
from flask import Flask, request
import os
import threading # 导入互斥锁

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
bot = telebot.TeleBot(TOKEN)
SESSIONS = {}
EXEC_APPROVALS = {}
TOOLS = [...]
MAX_SESSION_TOKENS = 100
SESSION_LOCKS = {} # 添加会话锁
MEMORY_DIR = "memory"

app = Flask(__name__)

#(其余所有 Bot 代码)

def handle_message(message):
    # 处理消息
    user_id = message.from_user.id
    with get_session_lock(user_id): # 为了线程安全而获得的锁
        session = load_session(user_id)
        if count_tokens("\n".join([item['content'] for item in session])) > MAX_SESSION_TOKENS:
            session = compact_session(session)
        session.append({"role": "user", "content": message.text})
        response = run_agent_turn(session, TOOLS, user_id)
        session.append({"role": "assistant", "content": response})
        save_session(user_id)
        bot.reply_to(message, response)

def get_session_lock(user_id):
    # 获取锁
    if user_id not in SESSION_LOCKS:
        SESSION_LOCKS[user_id] = threading.Lock()
    return SESSION_LOCKS[user_id]

/chat HTTP 端点执行相同的操作:

@app.route('/chat', methods=['POST'])
def chat_endpoint():
    # HTTP 聊天端点
    user_id = request.json['user_id']
    with get_session_lock(user_id): # 为了线程安全而获得的锁
        message = request.json['message']
        session = load_session(user_id)
        session.append({"role": "user", "content": message})
        response = run_agent_turn(session, TOOLS, user_id)
        session.append({"role": "assistant", "content": response})
        save_session(user_id)
        return response

就是这样 - 五行设置。同一用户的消息排队。不同用户的消息并行运行。没有竞争条件。

OpenClaw 使用基于通道的队列(消息、cron 作业和子代理的单独通道)扩展了这一点,因此心跳永远不会阻止实时对话。

目标:Cron 作业(心跳)

到目前为止,我们的代理只会在你与它交谈时做出响应。但是如果你想让它每天早上检查你的电子邮件怎么办?或者在会议前总结你的日历?

你需要计划执行。让我们添加心跳 - 定期触发代理的重复性任务。

关键的见解:每个心跳都使用自己的会话密钥 (cron:morning-briefing)。这可以防止计划的任务使你的主要对话历史记录变得混乱。心跳调用相同的 run_agent_turn 函数 - 它只是另一条消息,由计时器而不是人类触发。

import telebot
import json
import subprocess
from flask import Flask, request
import os
import threading
import time # 导入时间

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
bot = telebot.TeleBot(TOKEN)
SESSIONS = {}
EXEC_APPROVALS = {}
TOOLS = [...]
MAX_SESSION_TOKENS = 100
SESSION_LOCKS = {}
MEMORY_DIR = "memory"
HEARTBEAT_INTERVAL = 60 * 60 * 24 # 24 小时

app = Flask(__name__)

# (其余所有 Bot 代码)

def run_heartbeat():
    # 后台 Heartbeat 功能
    while True:
        time.sleep(HEARTBEAT_INTERVAL)
        user_id = "cron:morning-briefing" # 单独的心跳用户
        with get_session_lock(user_id):
            session = load_session(user_id)
            session.append({"role": "user", "content": "Brief me on my calendar and email for today"})
            response = run_agent_turn(session, TOOLS, user_id)
            session.append({"role": "assistant", "content": response})
            save_session(user_id)
            print("Heartbeat fired")

if __name__ == '__main__':
    # 同时运行 Flask 应用程序、 Telegram Bot 和 Heartbeat
    import threading
    threading.Thread(target=app.run, kwargs={'port': 5000}).start()
    threading.Thread(target=run_heartbeat).start()
    bot.infinity_polling()

试用一下:为了进行测试,将计划更改为每分钟运行一次:

HEARTBEAT_INTERVAL = 60 # 1 分钟

你会在终端中看到心跳触发,并且代理会做出响应。完成后将其改回每日计划。

OpenClaw 支持完整的 cron 表达式 (30 7 * * *),并通过单独的命令队列通道路由心跳,因此它们永远不会阻止实时消息。

目标:多代理

一个代理很有用。但是随着你添加更多任务,你会发现单一的人格和工具集无法很好地涵盖所有内容。研究助手需要与普通助手不同的说明。

解决方案:多个代理配置与路由。每个代理都有自己的 SOUL、自己的会话,并且你可以根据消息在它们之间切换。

更新 handle_message 以将消息路由到正确的代理:

import telebot
import json
import subprocess
from flask import Flask, request
import os
import threading
import time

TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
bot = telebot.TeleBot(TOKEN)
SESSIONS = {}
EXEC_APPROVALS = {}
TOOLS = [...]
MAX_SESSION_TOKENS = 100
SESSION_LOCKS = {}
MEMORY_DIR = "memory"
HEARTBEAT_INTERVAL = 60 * 60 * 24
AGENT_CONFIGS = { # 多个代理配置
    "jarvis": {
        "soul": "You are Jarvis, a helpful assistant",
        "tools": TOOLS
    },
    "scout": {
        "soul": "You are Scout, a research assistant",
        "tools": TOOLS
    }
}
DEFAULT_AGENT = "jarvis"

app = Flask(__name__)

#(其余所有 Bot 代码)

def run_agent_turn(session, tools, user_id, agent_id=DEFAULT_AGENT):
    # 代理循环:调用 LLM,执行工具,然后重复
    agent_config = AGENT_CONFIGS[agent_id]
    llm_response = call_llm(session, agent_config["tools"], soul=agent_config["soul"])
    session.append({"role": "assistant", "content": llm_response})
    if "run_command" in llm_response: # 假设 LLM 请求一个工具
        tool_name = "run_command" # 在真实情况中,从 LLM 响应中解析出来
        parameters = {"command": "date"} # 再次,从 LLM 响应中解析出来
        tool_result = execute_tool(tool_name, parameters, user_id)
        session.append({"role": "tool", "content": tool_result})
        return call_llm(session, tools, soul=agent_config["soul"]) # 循环
    return llm_response

@bot.message_handler(func=lambda m: True)
def handle_message(message):
    # 处理消息
    user_id = message.from_user.id
    agent_id = DEFAULT_AGENT # 默认代理
    if message.text.startswith("/scout"): # 用于切换的简单路由
        agent_id = "scout"
        message.text = message.text[len("/scout"):].strip() # 删除命令部分
    with get_session_lock(user_id):
        session = load_session(user_id, agent_id) # 每个代理的会话
        if count_tokens("\n".join([item['content'] for item in session])) > MAX_SESSION_TOKENS:
            session = compact_session(session)
        session.append({"role": "user", "content": message.text})
        response = run_agent_turn(session, TOOLS, user_id, agent_id) # 传递 agent_id
        session.append({"role": "assistant", "content": response})
        save_session(user_id, agent_id) # 每个代理的会话
        bot.reply_to(message, response)

def load_session(user_id, agent_id=DEFAULT_AGENT):
    # 加载会话
    if (user_id, agent_id) not in SESSIONS:
        try:
            with open(f"session-{user_id}-{agent_id}.jsonl", "r") as f:
                SESSIONS[(user_id, agent_id)] = [json.loads(line) for line in f]
        except FileNotFoundError:
            SESSIONS[(user_id, agent_id)] = []
    return SESSIONS[(user_id, agent_id)]

def save_session(user_id, agent_id=DEFAULT_AGENT):
  • 原文链接: x.com/dabit3/status/2021...
  • 登链社区 AI 助手,为大家转译优秀英文文章,如有翻译不通的地方,还请包涵~
点赞 0
收藏 0
分享
本文参与登链社区写作激励计划 ,好文好收益,欢迎正在阅读的你也加入。

0 条评论

请先 登录 后评论
dabit3
dabit3
江湖只有他的大名,没有他的介绍。