快速回答
通过 Rails 后端封装 LangChain Ruby SDK,实现 Agent 的工具注册、记忆管理和任务编排,适合已有 Rails 项目的团队快速接入 AI 能力。
·
211 阅读
·
Rails + LangChain
使用 Rails + LangChain 构建 AI Agent 系统
结合 Rails 的稳定后端能力与 LangChain 的 Agent 框架,构建生产级的 AI 应用。
为什么 Rails 是构建 AI Agent 后端的好选择?
当大多数 AI Agent 教程都在用 Python + FastAPI 时,Ruby on Rails 其实提供了几个独特优势——成熟的 ActiveJob、强大的 ActionCable WebSocket 支持、以及 Rails 生态丰富的认证、权限、ORM 基础设施。
结合 Python 的 LangChain(通过 sidecar 进程或 langchainrb gem)或直接调用 Anthropic / OpenAI API,Rails 完全可以承担 Agent 系统的编排层与服务层角色。
- ActiveJob + Sidekiq:Agent 的多步骤 ReAct 循环天然适合异步 Job,避免 HTTP 超时,支持重试与优先级队列。
- ActionCable 实时推送:Agent 执行中间步骤可通过 WebSocket 实时流回前端,用户体验接近原生 streaming。
- ActiveRecord 作为记忆层:对话历史、工具调用记录、Agent 状态机全部持久化到 PostgreSQL,天然可审计。
- LangChain 工具生态:通过 langchainrb gem 或 Python sidecar 直接复用数百个 LangChain 工具集成。
系统整体架构设计
我们采用 ReAct(Reason + Act) 模式构建 Agent 主循环,Rails 负责请求接入与状态管理,LangChain 负责 LLM 调用与工具路由。
整体数据流如下:
- 1. Browser / API Client 发起请求
- 2. Rails Controller 接收,通过 ActionCable Channel 建立 WebSocket 连接
- 3. 创建 AgentJob 交由 Sidekiq 异步执行
- 4. AgentExecutor 主循环运行,读写 Memory Store(PostgreSQL)
- 5. 调用 LangChain / Claude API 进行推理
- 6. 根据 LLM 决策执行 Tool(Web Search、Code Runner、DB Query 等)
设计原则:Agent 的每一次 Think → Act → Observe 循环都作为一条 AgentStep 记录写入数据库,这使得调试、审计、断点续跑变得极为简单。
环境搭建与依赖配置
安装核心 Gem
# Gemfile # AI & LangChain gem 'langchainrb', '~> 0.19' gem 'anthropic' # Claude SDK gem 'ruby-openai' # OpenAI 备选 # 向量存储 gem 'pgvector' # PostgreSQL 向量扩展 gem 'neighbor' # Rails 向量相似搜索 # 后台任务 gem 'sidekiq', '~> 7.0' # 实时通信 gem 'redis' gem 'actioncable'
数据库迁移
# db/migrate/create_agent_sessions.rb
class CreateAgentSessions < ActiveRecord::Migration[7.1]
def change
create_table :agent_sessions do |t|
t.references :user, null: false, foreign_key: true
t.string :status, default: 'pending'
t.string :goal, null: false
t.jsonb :context, default: {}
t.timestamps
end
create_table :agent_steps do |t|
t.references :agent_session, null: false, foreign_key: true
t.integer :step_index
t.string :thought # LLM 推理过程
t.string :action # 调用的工具名
t.jsonb :action_input
t.text :observation # 工具返回结果
t.timestamps
end
add_column :agent_sessions, :messages, :jsonb, default: []
enable_extension 'vector'
add_column :agent_sessions, :embedding, :vector, limit: 1536
end
endAgent 核心执行器
Agent 的心脏是一个实现 ReAct 循环的 AgentExecutor 服务对象。它循环调用 LLM → 解析意图 → 执行工具 → 将结果反馈回 LLM,直到 LLM 决定输出最终答案。
# app/services/agent_executor.rb
class AgentExecutor
MAX_ITERATIONS = 10
def initialize(session:, tools:, llm: nil)
@session = session
@tools = tools.index_by(&:name)
@llm = llm || default_llm
@memory = ConversationMemory.new(session)
end
def run(goal)
@session.update!(status: 'running', goal: goal)
messages = @memory.load
MAX_ITERATIONS.times do |i|
response = @llm.chat(
messages: messages + [{ role: 'user', content: goal }],
tools: tool_definitions
)
case response.stop_reason
when 'end_turn'
return finalize(response.content)
when 'tool_use'
tool_use = response.content.find { |b| b.type == 'tool_use' }
result = execute_tool(tool_use, step_index: i)
messages << { role: 'assistant', content: response.content }
messages << { role: 'user', content: [
{ type: 'tool_result', tool_use_id: tool_use.id, content: result }
]}
broadcast_step(step_index: i, tool: tool_use.name, result: result)
end
end
finalize("Max iterations reached")
end
private
def execute_tool(tool_use, step_index:)
tool = @tools[tool_use.name]
result = tool.call(tool_use.input)
@session.agent_steps.create!(
step_index: step_index,
action: tool_use.name,
action_input: tool_use.input,
observation: result
)
result
end
def broadcast_step(**payload)
AgentChannel.broadcast_to(@session, payload)
end
def finalize(answer)
@session.update!(status: 'completed')
@memory.save
answer
end
def default_llm
Langchain::LLM::Anthropic.new(
api_key: ENV['ANTHROPIC_API_KEY'],
default_options: { model: 'claude-opus-4-5' }
)
end
end将 Agent 放入后台 Job
# app/jobs/agent_job.rb
class AgentJob < ApplicationJob
queue_as :agents
sidekiq_options retry: 2, dead: false
def perform(session_id)
session = AgentSession.find(session_id)
tools = [WebSearchTool.new, CodeRunnerTool.new, DatabaseTool.new]
executor = AgentExecutor.new(session: session, tools: tools)
executor.run(session.goal)
rescue => e
session&.update!(status: 'failed', context: { error: e.message })
raise
end
end构建可扩展的工具系统
LangChain 的核心之一是工具(Tool)。每个工具都需要声明 名称、描述(供 LLM 理解何时使用它)、以及 输入 Schema。我们用 Ruby 的统一接口封装。
# app/tools/base_tool.rb
module Tools
class BaseTool
attr_reader :name, :description
def call(input)
raise NotImplementedError
end
def to_claude_definition
{
name: name,
description: description,
input_schema: input_schema
}
end
end
end# app/tools/web_search_tool.rb
class WebSearchTool < Tools::BaseTool
def name = 'web_search'
def description = '使用 Tavily 搜索实时网页信息。输入搜索关键词,返回相关摘要。'
def input_schema
{
type: 'object',
properties: {
query: { type: 'string', description: '搜索查询词' }
},
required: ['query']
}
end
def call(input)
client = Tavily::Client.new(api_key: ENV['TAVILY_API_KEY'])
results = client.search(query: input['query'], max_results: 5)
results.map { |r| "**#{r[:title]}**: #{r[:content]}" }.join("\n\n")
rescue => e
"Search failed: #{e.message}"
end
end
提示:工具的 description 是 Agent 智能行为的关键。写清楚工具的能力边界、输入格式预期、以及典型使用场景,LLM 的工具选择准确率可以提升 30–50%。
Agent 记忆与会话状态管理
Agent 的"记忆"分三个层次:
- 短期记忆(in-flight messages):当前 ReAct 循环内的对话历史,存在内存中,随 Job 结束消失。
- 会话记忆(AgentSession.messages):序列化到 PostgreSQL JSONB 字段,跨 Job 恢复,支持断点续跑。
- 语义记忆(pgvector):将历史对话 embedding 存入向量列,允许 Agent 检索"我之前做过类似的任务"。
# app/services/conversation_memory.rb
class ConversationMemory
def initialize(session)
@session = session
end
def load
messages = @session.messages || []
return messages if within_token_limit?(messages)
summarize_and_trim(messages)
end
def save
@session.update!(messages: @session.reload.messages)
embed_and_store # 异步生成 embedding
end
private
def within_token_limit?(messages)
messages.sum { |m| m[:content].to_s.length } < 80_000
end
def summarize_and_trim(messages)
recent = messages.last(10)
old_text = messages[0..-11].map { |m| m[:content] }.join("\n")
summary = LLM.summarize(old_text)
[{ role: 'system', content: "历史摘要: #{summary}" }] + recent
end
endActionCable:实时推送 Agent 执行步骤
# app/channels/agent_channel.rb
class AgentChannel < ApplicationCable::Channel
def subscribed
session = AgentSession.find_by(id: params[:session_id])
if session&.user == current_user
stream_for session
else
reject
end
end
end
# 在 AgentExecutor 中广播:
# AgentChannel.broadcast_to(session, {
# step: 3, tool: 'web_search', status: 'running', result: '...'
# })前端使用 @rails/actioncable 或 StimulusJS 订阅 channel,实时渲染每个 Agent 步骤,用户可以看到 Agent 在"思考"和"行动"。
生产环境配置要点
# config/sidekiq.yml :concurrency: 10 :queues: - [agents, 3] # Agent 任务,优先级高 - [default, 2] - [mailers, 1] :limits: agents: 5 # 最多 5 个并发 Agent,防止 API 限速
生产部署还需关注以下几点:
- Rate Limiting:在 Sidekiq 层限制 Agent Job 并发数,配合 Sidekiq::Limiter 防止 Claude/OpenAI API 429 错误。
- 幂等性:AgentJob 必须幂等,通过 session.status 检查防止重复执行,Sidekiq 重试时安全。
- 可观测性:接入 Datadog / Sentry,对每个 Agent 步骤打 span,可追踪 LLM 延迟和工具耗时。
- 成本控制:在 AgentSession 上记录 tokens_used,超出预算时主动终止 Job,防止无限循环。
总结
Rails + LangChain 提供了一个成熟、可扩展的 AI Agent 基础设施。从 ActiveJob 的异步编排到 ActionCable 的实时推送,Rails 生态的每个组件都天然契合 Agent 系统的需求。
核心设计原则回顾:每一步都持久化(AgentStep)、工具接口统一(BaseTool)、记忆分层管理(短期 / 会话 / 语义),这三点是系统可维护、可扩展的根基。