近日,国产编程语言MoonBit补全了关键语言特性的最后一块拼图:异步编程库moonbitlang/async。

本次发布时间距离MoonBit Beta Release相距仅仅半年,足见MoonBit团队对异步编程的重视。

moonbitlang/async吸收了现有语言的经验和教训,语法更加简洁,基于结构化并发理念,帮助用户写出更健壮、安全的异步程序。未来很可能「占领」包括云服务、AI agent 等重度依赖异步编程的领域。

0 1

什么是异步编程?

你开了一家饭店,雇佣了5个店小二来招待顾客,但是这几个店小二的干活儿的模式一模一样:

客人来到饭店,马上有个店小二殷勤迎上去,带着找座位,点菜,给后厨下单。

由于后厨做菜需要很长时间,店小二就在客人的旁边等着。

后厨一摇铃铛,大喊一声:上菜,店小二马上端到客人面前, 然后站在一边等着客人吃完。

客人说:结账,小二收钱,找钱,送客, 迎接下一位。

由于只有5个店小二,你饭店同时只能招待5个顾客。

很快,你的饭店倒闭了。

倒闭的核心原因就是店小二采用的是“同步模式”,即使有耗时的工作(厨师做菜,顾客吃饭),他也会干等着,非常浪费。

你接受了教训,开了一家新饭店,这次只雇佣了一个店小二,他的工作方式和之前大相径庭:

客人来到饭店,唯一的店小二殷勤迎上去,带着找座位,点菜,给后厨下单

由于后厨做菜需要很长时间,店小二闪电般的离开,去干别的活了,可能是迎客,点菜,找座等,总之是那些不用等待,迅速干完的活。

后厨大喊一声:上菜,这个小二马上端到客人面前,然后离开,干其他活。

客人说:结账,小二收钱,找钱,然后还是迅速闪人,干其他活。

这一次,店小二采用的是“异步模式”,即对于耗时的操作,店小二会暂时离开,做其他事儿,等到操作完成以后再回来接着干。

对应到计算机世界,耗时的操作就是访问文件/数据库/网络,网络服务器的线程遇到了这些I/O操作,坚决不能等待,因为服务器收到的请求可不是几十个几百个,而是成千上万个,所以一定要采用异步编程。

但是对程序员来说,异步编程很麻烦,为了支持任务的中断和切换,程序的逻辑会被分散到程序的不同部分,使得开发效率和程序的可维护性极大下降。

所以各种编程语言Go/Rust/Python/JS都在语言层面直接支持异步编程,降低程序员的负担,MoonBit也不例外。

0 2

MoonBit 异步性能优势

MoonBit 的异步运行时在底层基于线程池并结合epoll/kqueue实现,支持 Linux 与 macOS 的 native 后端。其设计思路与 Node.js 类似:采用单线程、多任务模型。

在这一模式下,异步程序中的同步部分始终在同一线程上执行。对开发者而言,这带来显著的简化效果:程序的行为与单线程应用一致,无需额外加锁,也不必担心竞争条件等并发错误。

虽然仍处于早期阶段,这一运行时已经展现出出色的性能表现。

为了检验 MoonBit 异步运行时的性能,我们搭建了一个简单的 TCP 服务器:它会把收到的数据原样返回给客户端。这个测试几乎没有计算成分,因此能够直接反映运行时在高并发场景下的处理能力。

在测试中,我们同时维持多个连接,不断收发数据,并记录吞吐量和响应延迟。结果显示,MoonBit 在并发连接数不断增加的情况下,依然保持了优异的吞吐表现和极低的响应延迟,充分体现了其运行时系统的高效性和稳定性。

对比的对象是 Node.js 和 Go 语言。

性能测试的结果如下:

测试结果显示,MoonBit 在 200 到 1000 个并发连接下始终保持最高吞吐量,在高并发场景中明显优于 Node.js 和 Go。这表明其异步运行时具备出色的扩展性。

在高并发场景下,MoonBit 的平均延迟始终保持在个位数毫秒,即便在 1000 个连接时也只有 4.43ms;相比之下,Node.js 延迟超过 116ms。这意味着 MoonBit 的异步运行时能够在大规模连接下依然保持快速响应。

下面是一个 HTTP 服务器的例子,相比 TCP 服务器,HTTP 例子需要进行 HTTP 协议的解析,有更多的计算成分,不是单纯的 I/O。

这个测试会使用 (github.com/wg/wrk) 工具,通过多个连接不断向 HTTP 服务器发送 GET / HTTP/1.1 的请求,服务器应当返回一个空的回复。测试会记录服务器每秒处理的请求数以及每个请求的平均延迟。测试的结果如下:

可以看到,得益于 MoonBit 语言本身的优秀性能,在这个测试中 MoonBit 依然表现良好。

MoonBit 在所有并发连接数下的请求处理效率和延迟都稳定高于 Node.js 和单线程的 Go。

0 3

构建简单代码智能体的示例

MoonBit 不只是写服务器更方便,它甚至能直接驱动 AI 智能体。下面,我们就用它构建一个最小可运行的代码智能体(Code Agent)。

这个代码智能体除了可以调用大模型,还支持工具调用(如读取本地文件,执行ls命令等)。

例如,用户的请求是:请读取本地文件 /home/user/data.txt 并告诉我里面的内容。

代码智能体会把这个消息发给大模型,并且告诉大模型,我这里有两个工具可以调用,工具的名称,参数也给你发过去了。

大模型看到看到“请读取本地文件......”,它当然不会直接读取文件,而是看看根据智能体都发来了什么样的工具,然后发挥自己的强项,选择对应的工具,生成调用请求:

{
  "tool_calls": [
    {
      "function": {
        "name": "read_file"
      },
      "arguments": {
        "path": "/home/user/data.txt"
      }
    }
  ]
}

智能体收到大模型发回的工具调用请求,执行真正的工具调用,读取 /home/user/data.txt,假设结果是:MoonBit is the future programming language!

智能体会将结果包装成消息,发送给大模型模型,大模型收到工具返回的内容后,会判断:“我已经得到了文件内容,不需要再调用工具了,我可以生成最终回答”

最终响应可能是这样的:

{
  "role": "assistant",
  "content": "我已经读取了文件 /home/user/data.txt,里面的内容是:\nMoonBit is the future programming language!"
}

在这个代码智能体中,需要处理网络调用,文件读取,命令执行,会使用MoonBit的这些异步操作:

1. @http.post 发送消息到 LLM 接口。

2. @fs.read_file 从文件读取内容。

3. @process.collect_output_merged 来执行外部程序并收集其输出。

值得注意的是,在MoonBit中所有异步函数调用默认会被隐式 await,并且异步调用实现了结构化并发(Structured Concurrency) ,这意味着MoonBit 的异步程序几乎不可能产生僵尸后台任务,并且程序员能够更加容易地理解并分析异步代码的行为。

1、向 LLM 接口发起请求

MoonBit 异步网络库提供了 @http.post 用于发送 HTTP POST 请求。我们可以简单地将其包装一下,用来更方便地发送消息到 LLM:

///|
async fn generate(request : Request) -> Response {
  let (response, body) = @http.post(
    "\{base_url}/chat/completions",
    request.to_json(),
    headers={
      "Authorization": "Bearer \{api_key}",
      "Content-Type": "application/json",
      "Connection": "close",
    },
  )
  guard response.code is (200..=299) else {
    fail("HTTP request failed: \{response.code} \{response.reason}")
  }
  body.json() |> @json.from_json()
}

接下来,我们将展示如何让 LLM 使用工具。

2、定义工具

为了让代码智能体更有用,我们需要通过工具扩展它与外部世界交互的能力。

请求体中的 "tools" 字段描述了我们向 LLM 提供的工具。一个典型的工具描述包含以下字段:

  • name:工具名称,将在工具调用中使用。

  • description:对工具的简短描述。

  • parameters:描述工具参数的 JSON Schema。本示例中为简化处理,我们只使用 type、properties 和 required 字段。

例如,下面的 JSON 描述了一个名为 read_file 的工具:

{
  "name": "read_file",
  "description": "Read a file from local disk",
  "parameters": {
    "type": "object",
    "properties": {
      "path": {
        "type": "string",
        "description": "The path of the file to read"
      }
    },
    "required": ["path"]
  }
}

我们在 MoonBit 中将该工具描述建模为如下结构:

///|
struct Tool {
  name : String
  description : String
  parameters : Json
  /// 执行工具的函数
  execute : async (String) -> String
}

在本演示中,我们将定义两个简单工具:

read_file:从本地磁盘读取文件。

execute_command:执行一个外部程序。

3、read_file 工具

使用 moonbitlang/async 与文件系统交互非常简单。可以直接使用 @fs.read_file/@fs.write_file 来进行对文件的读取/写入。对于更加灵活的需求,moonbitlang/async 也提供了 @fs.open ,用户可以传入自定义选项,并在后续调用 read / write 方法进行 I/O 操作。

我们可以将 read_file 工具实现为:

///|
let read_file_tool : Tool = {
  name: "read_file",
  description: "Read a file from local disk",
  parameters: {
    "type": "object",
    "properties": {
      "path": {
        "type": "string",
        "description": "The path of the file to read",
      },
    },
    "required": ["path"],
  },
  execute: args => {
    guard @json.parse(args) is { "path": String(path), .. } else {
      fail("Invalid arguments for read_file, expected {\"path\": String}")
    }
    @moonbitlang/async/fs.read_file(path).text()
  },
}

4、execute_command 工具

在 moonbitlang/async 中实现 execute_command 工具也非常简单。我们可以使用 @process.collect_output_merged 来执行一个外部程序,并收集其 stdout 和 stderr 输出。

对于更高级的需求,我们可以使用 @process.run 来启动一个进程,并通过管道(pipe)与其交互。

execute_command 工具实现如下:

///|
let execute_command_tool : Tool = {
  name: "execute_command",
  description: "Execute an external program",
  parameters: {
    "type": "object",
    "properties": {
      "command": { "type": "string", "description": "The command to execute" },
      "arguments": {
        "type": "array",
        "items": { "type": "string" },
        "description": "The arguments to pass to the command",
      },
    },
    "required": ["command", "arguments"],
  },
  execute: arguments => {
    guard @json.parse(arguments)
      is { "command": String(command), "arguments": arguments, .. } else {
      fail(
        "Invalid arguments for execute_command, expected {\"command\": String, \"args\": Array[String]}",
      )
    }
    let arguments : Array[String] = @json.from_json(arguments)
    let (status, output) = @process.collect_output_merged(
      command,
      arguments.map(argument => argument),
    )
    let output = output.text()
    (
      $|Exit status: \{status}
      $|Output:
      $|\{output}
    )
  },
}

5、处理工具调用与智能体主循环

得到大模型发回的工具调用请求以后,代码智能体需要进行处理,使用的是这个异步函数:

///|
async fn handle_tool_call(
  tools : Map[String, Tool],
  tool_call : ToolCall,
) -> Json {
  guard tools.get(tool_call.function.name) is Some(tool) else {
    return {
      "role": "tool",
      "content": "Tool not found: \{tool_call.function.name}",
      "tool_call_id": tool_call.id,
    }
  }
  return {
    "role": "tool",
    "content": (tool.execute)(tool_call.function.arguments),
    "tool_call_id": tool_call.id,
  } catch {
    error =>
      {
        "role": "user",
        "content": "Error executing tool \{tool_call.function.name}: \{error}",
      }
  }
}

有了处理工具调用的能力后,我们就可以实现智能体的主循环了。我们定义了一个 Agent 结构来保存智能体状态,包括工具集合、对话历史和消息队列:

///|
struct Agent {
  tools : Map[String, Tool]
  conversation : Array[Json]
  mut message_queue : Array[Json]
}

然后我们为 Agent 实现 run 方法,持续处理消息队列中的消息,直到队列为空:

///|
async fn Agent::run(self : Agent) -> Unit {
  while !self.message_queue.is_empty() {
    // Take all messages from the message queue
    let messages = self.message_queue
    self.message_queue = []
    // Send the messages to LLM endpoint
    let response = generate({
      model,
      messages: [..self.conversation, ..messages],
      tools: self.tools.values().collect(),
    })
    let response = response.choices[0].message
    // Save the response to the conversation history
    self.conversation.push(response)
    if response is { "content": String(content), .. } {
      // Print the assistant's response
      println("Assistant: \{content}")
    }
    let tool_calls : Array[ToolCall] = if response
      is { "tool_calls": tool_calls, .. } {
      @json.from_json(tool_calls)
    } else {
      []
    }
    // Handle tool calls
    for tool_call in tool_calls {
      let message = handle_tool_call(self.tools, tool_call)
      self.message_queue.push(message)
      println("Tool: \{tool_call.function.name}")
      println("Response: \{message.stringify(indent=2)}")
    }
  }
}

大功告成,接下来测试一下。

让这个智能体获取当前时间,并把结果告诉我们:

///|
async test "agent/current-time" {
  let agent = Agent::{
    tools: {
      "read_file": read_file_tool,
      "execute_command": execute_command_tool,
    },
    conversation: [],
    message_queue: [],
  }
  agent.message_queue.push({
    "role": "user",
    "content": "Can you please tell me what time is it now?",
  })
  agent.run()
}

0 4

结论

在这篇文章中,我们展示了如何使用 moonbitlang/async 构建一个简单的代码智能体。该智能体可以通过调用工具从本地磁盘读取文件并执行外部程序。当然,这只是一个基础示例,市面上的智能体通常会更加复杂,例如会添加更多工具、更优雅地处理错误、实现更复杂的对话流程等。

如果你想了解 moonbitlang/async 的更多信息,请参阅其文档。你也可以查看 maria 项目源码,了解我们是如何基于 moonbitlang/async 构建代码智能体的。

(1) MoonBit 再添异步能力,实现 AI Agent 高效与稳定开发:

https://mp.weixin.qq.com/s/t5k9bUmuE-rs3qaGB0yLVw

(2) AI Agent 案例完整代码:

https://gist.github.com/tonyfettes/2953d5bef1610fce12cca05ea20655e2