快速回答

通过 Rails 后端封装 LangChain Ruby SDK,实现 Agent 的工具注册、记忆管理和任务编排,适合已有 Rails 项目的团队快速接入 AI 能力。

· 211 阅读 · Rails + LangChain

使用 Rails + LangChain 构建 AI Agent 系统

结合 Rails 的稳定后端能力与 LangChain 的 Agent 框架,构建生产级的 AI 应用。

为什么 Rails 是构建 AI Agent 后端的好选择?

当大多数 AI Agent 教程都在用 Python + FastAPI 时,Ruby on Rails 其实提供了几个独特优势——成熟的 ActiveJob、强大的 ActionCable WebSocket 支持、以及 Rails 生态丰富的认证、权限、ORM 基础设施。

结合 Python 的 LangChain(通过 sidecar 进程或 langchainrb gem)或直接调用 Anthropic / OpenAI API,Rails 完全可以承担 Agent 系统的编排层与服务层角色。

  • ActiveJob + Sidekiq:Agent 的多步骤 ReAct 循环天然适合异步 Job,避免 HTTP 超时,支持重试与优先级队列。

  • ActionCable 实时推送:Agent 执行中间步骤可通过 WebSocket 实时流回前端,用户体验接近原生 streaming。

  • ActiveRecord 作为记忆层:对话历史、工具调用记录、Agent 状态机全部持久化到 PostgreSQL,天然可审计。

  • LangChain 工具生态:通过 langchainrb gem 或 Python sidecar 直接复用数百个 LangChain 工具集成。

系统整体架构设计
我们采用 ReAct(Reason + Act) 模式构建 Agent 主循环,Rails 负责请求接入与状态管理,LangChain 负责 LLM 调用与工具路由。

整体数据流如下:

  1. 1. Browser / API Client 发起请求
  2. 2. Rails Controller 接收,通过 ActionCable Channel 建立 WebSocket 连接
  3. 3. 创建 AgentJob 交由 Sidekiq 异步执行
  4. 4. AgentExecutor 主循环运行,读写 Memory Store(PostgreSQL)
  5. 5. 调用 LangChain / Claude API 进行推理
  6. 6. 根据 LLM 决策执行 Tool(Web Search、Code Runner、DB Query 等)

设计原则:Agent 的每一次 Think → Act → Observe 循环都作为一条 AgentStep 记录写入数据库,这使得调试、审计、断点续跑变得极为简单。

环境搭建与依赖配置
安装核心 Gem
# Gemfile

# AI & LangChain
gem 'langchainrb', '~> 0.19'
gem 'anthropic'          # Claude SDK
gem 'ruby-openai'        # OpenAI 备选

# 向量存储
gem 'pgvector'           # PostgreSQL 向量扩展
gem 'neighbor'           # Rails 向量相似搜索

# 后台任务
gem 'sidekiq', '~> 7.0'

# 实时通信
gem 'redis'
gem 'actioncable'
数据库迁移
# db/migrate/create_agent_sessions.rb

class CreateAgentSessions < ActiveRecord::Migration[7.1]
  def change
    create_table :agent_sessions do |t|
      t.references :user, null: false, foreign_key: true
      t.string     :status, default: 'pending'
      t.string     :goal, null: false
      t.jsonb      :context, default: {}
      t.timestamps
    end

    create_table :agent_steps do |t|
      t.references :agent_session, null: false, foreign_key: true
      t.integer    :step_index
      t.string     :thought      # LLM 推理过程
      t.string     :action       # 调用的工具名
      t.jsonb      :action_input
      t.text       :observation  # 工具返回结果
      t.timestamps
    end

    add_column :agent_sessions, :messages, :jsonb, default: []
    enable_extension 'vector'
    add_column :agent_sessions, :embedding, :vector, limit: 1536
  end
end

Agent 核心执行器
Agent 的心脏是一个实现 ReAct 循环的 AgentExecutor 服务对象。它循环调用 LLM → 解析意图 → 执行工具 → 将结果反馈回 LLM,直到 LLM 决定输出最终答案。

# app/services/agent_executor.rb

class AgentExecutor
  MAX_ITERATIONS = 10

  def initialize(session:, tools:, llm: nil)
    @session    = session
    @tools      = tools.index_by(&:name)
    @llm        = llm || default_llm
    @memory     = ConversationMemory.new(session)
  end

  def run(goal)
    @session.update!(status: 'running', goal: goal)
    messages = @memory.load

    MAX_ITERATIONS.times do |i|
      response = @llm.chat(
        messages: messages + [{ role: 'user', content: goal }],
        tools: tool_definitions
      )

      case response.stop_reason
      when 'end_turn'
        return finalize(response.content)

      when 'tool_use'
        tool_use = response.content.find { |b| b.type == 'tool_use' }
        result   = execute_tool(tool_use, step_index: i)

        messages << { role: 'assistant', content: response.content }
        messages << { role: 'user', content: [
          { type: 'tool_result', tool_use_id: tool_use.id, content: result }
        ]}

        broadcast_step(step_index: i, tool: tool_use.name, result: result)
      end
    end

    finalize("Max iterations reached")
  end

  private

  def execute_tool(tool_use, step_index:)
    tool   = @tools[tool_use.name]
    result = tool.call(tool_use.input)

    @session.agent_steps.create!(
      step_index:   step_index,
      action:       tool_use.name,
      action_input: tool_use.input,
      observation:  result
    )
    result
  end

  def broadcast_step(**payload)
    AgentChannel.broadcast_to(@session, payload)
  end

  def finalize(answer)
    @session.update!(status: 'completed')
    @memory.save
    answer
  end

  def default_llm
    Langchain::LLM::Anthropic.new(
      api_key: ENV['ANTHROPIC_API_KEY'],
      default_options: { model: 'claude-opus-4-5' }
    )
  end
end
将 Agent 放入后台 Job
# app/jobs/agent_job.rb

class AgentJob < ApplicationJob
  queue_as :agents
  sidekiq_options retry: 2, dead: false

  def perform(session_id)
    session  = AgentSession.find(session_id)
    tools    = [WebSearchTool.new, CodeRunnerTool.new, DatabaseTool.new]
    executor = AgentExecutor.new(session: session, tools: tools)
    executor.run(session.goal)
  rescue => e
    session&.update!(status: 'failed', context: { error: e.message })
    raise
  end
end
构建可扩展的工具系统
LangChain 的核心之一是工具(Tool)。每个工具都需要声明 名称描述(供 LLM 理解何时使用它)、以及 输入 Schema。我们用 Ruby 的统一接口封装。
# app/tools/base_tool.rb

module Tools
  class BaseTool
    attr_reader :name, :description

    def call(input)
      raise NotImplementedError
    end

    def to_claude_definition
      {
        name:         name,
        description:  description,
        input_schema: input_schema
      }
    end
  end
end# app/tools/web_search_tool.rb

class WebSearchTool < Tools::BaseTool
  def name        = 'web_search'
  def description = '使用 Tavily 搜索实时网页信息。输入搜索关键词,返回相关摘要。'

  def input_schema
    {
      type: 'object',
      properties: {
        query: { type: 'string', description: '搜索查询词' }
      },
      required: ['query']
    }
  end

  def call(input)
    client  = Tavily::Client.new(api_key: ENV['TAVILY_API_KEY'])
    results = client.search(query: input['query'], max_results: 5)
    results.map { |r| "**#{r[:title]}**: #{r[:content]}" }.join("\n\n")
  rescue => e
    "Search failed: #{e.message}"
  end
end

提示:工具的 description 是 Agent 智能行为的关键。写清楚工具的能力边界、输入格式预期、以及典型使用场景,LLM 的工具选择准确率可以提升 30–50%。
Agent 记忆与会话状态管理
Agent 的"记忆"分三个层次:

  1. 短期记忆(in-flight messages):当前 ReAct 循环内的对话历史,存在内存中,随 Job 结束消失。
  2. 会话记忆(AgentSession.messages):序列化到 PostgreSQL JSONB 字段,跨 Job 恢复,支持断点续跑。
  3. 语义记忆(pgvector):将历史对话 embedding 存入向量列,允许 Agent 检索"我之前做过类似的任务"。

# app/services/conversation_memory.rb

class ConversationMemory
  def initialize(session)
    @session = session
  end

  def load
    messages = @session.messages || []
    return messages if within_token_limit?(messages)
    summarize_and_trim(messages)
  end

  def save
    @session.update!(messages: @session.reload.messages)
    embed_and_store  # 异步生成 embedding
  end

  private

  def within_token_limit?(messages)
    messages.sum { |m| m[:content].to_s.length } < 80_000
  end

  def summarize_and_trim(messages)
    recent   = messages.last(10)
    old_text = messages[0..-11].map { |m| m[:content] }.join("\n")
    summary  = LLM.summarize(old_text)
    [{ role: 'system', content: "历史摘要: #{summary}" }] + recent
  end
end
ActionCable:实时推送 Agent 执行步骤
# app/channels/agent_channel.rb

class AgentChannel < ApplicationCable::Channel
  def subscribed
    session = AgentSession.find_by(id: params[:session_id])
    if session&.user == current_user
      stream_for session
    else
      reject
    end
  end
end

# 在 AgentExecutor 中广播:
# AgentChannel.broadcast_to(session, {
#   step: 3, tool: 'web_search', status: 'running', result: '...'
# })
前端使用 @rails/actioncable 或 StimulusJS 订阅 channel,实时渲染每个 Agent 步骤,用户可以看到 Agent 在"思考"和"行动"。
生产环境配置要点
# config/sidekiq.yml

:concurrency: 10
:queues:
  - [agents, 3]    # Agent 任务,优先级高
  - [default, 2]
  - [mailers, 1]

:limits:
  agents: 5        # 最多 5 个并发 Agent,防止 API 限速

生产部署还需关注以下几点:

  • Rate Limiting:在 Sidekiq 层限制 Agent Job 并发数,配合 Sidekiq::Limiter 防止 Claude/OpenAI API 429 错误。

  • 幂等性:AgentJob 必须幂等,通过 session.status 检查防止重复执行,Sidekiq 重试时安全。

  • 可观测性:接入 Datadog / Sentry,对每个 Agent 步骤打 span,可追踪 LLM 延迟和工具耗时。

  • 成本控制:在 AgentSession 上记录 tokens_used,超出预算时主动终止 Job,防止无限循环。

总结
Rails + LangChain 提供了一个成熟、可扩展的 AI Agent 基础设施。从 ActiveJob 的异步编排到 ActionCable 的实时推送,Rails 生态的每个组件都天然契合 Agent 系统的需求。
核心设计原则回顾:每一步都持久化(AgentStep)、工具接口统一(BaseTool)、记忆分层管理(短期 / 会话 / 语义),这三点是系统可维护、可扩展的根基。
S

Shin Zhang