Jiayun's Blog

探索与分享

当你拖拽一个节点、连一条线、点击执行的时候,背后发生了什么?250万行TypeScript、46个内部包、6层依赖架构——这是n8n给出的答案。本文将从架构师的视角,逐层拆解这个开源工作流引擎的内部世界。


一、为什么要解剖 n8n?

n8n 是目前最活跃的开源工作流自动化平台之一,GitHub 上超过 70k stars。它的定位很明确:fair-code 许可的 Zapier/Make 替代品,支持自托管,拥有 400+ 官方集成节点。

但 n8n 真正值得研究的不是它的功能列表,而是它背后的架构决策。一个工作流引擎需要解决的核心问题是:

  • 如何让 400+ 集成保持一致的开发体验?
  • 如何在不停机的情况下升级单个节点?
  • 如何让非程序员通过拖拽构建复杂逻辑,同时让程序员可以写代码?
  • 如何在单机和分布式之间无缝切换?

这些问题的答案,藏在 n8n 的代码里。


二、Monorepo 全景:46个包的层次化治理

n8n 采用 pnpm workspaces + Turborepo 管理整个代码库。这不是简单的"把代码放在一起"——它的包结构体现了一套清晰的分层依赖哲学

2.1 六层架构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
Layer 0 (地基层)     @n8n/di · @n8n/constants · @n8n/errors · @n8n/permissions
Layer 1 (配置层)     @n8n/config · @n8n/decorators · @n8n/utils
Layer 2 (领域模型)   n8n-workflow → @n8n/expression-runtime
Layer 3 (引擎层)     n8n-core · @n8n/backend-common · @n8n/db
Layer 4 (节点层)     n8n-nodes-base(393K LOC) · @n8n/nodes-langchain(119K LOC)
Layer 5 (应用层)     n8n CLI(415K LOC) — 编排一切
Layer 6 (前端层)     n8n-editor-ui(151K LOC) → @n8n/design-system(57K LOC)

几个值得注意的设计决策:

Layer 0 只有 ~730 行代码,却是整个系统的基石。 @n8n/di 提供 IoC 容器(仅 483 行),@n8n/errors 定义错误基类(61行),@n8n/constants 存放全局常量(183 行)。这些包几乎零依赖、极少变更,是整个 Monorepo 最稳定的底座。

n8n-workflow(59K LOC)是真正的核心。 它定义了 WorkflowINodeTypeINodeExecutionData 等所有核心数据模型,不依赖任何运行时(不依赖数据库、不依赖 HTTP 框架)。这意味着你可以在任何环境中——浏览器、Worker、Lambda——实例化一个 Workflow 对象并操作它。这是一种纯领域建模的思路。

nodes-base 和 CLI 是两个"巨石",但边界清晰。 nodes-base(393K LOC)只做一件事:实现集成节点。CLI(415K LOC)只做另一件事:把所有模块编排成一个可运行的服务。两者通过 n8n-workflow 定义的接口解耦。

2.2 Turborepo 的构建编排

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// turbo.json
{
  "tasks": {
    "build": {
      "dependsOn": ["^build"],    // 先构建上游依赖
      "outputs": ["dist/**"]
    },
    "test": {
      "dependsOn": ["^build", "build"]  // 测试前确保全部构建完成
    }
  }
}

^build 语法意味着 Turbo 会自动解析包间依赖关系,按拓扑序构建。对于46个包的项目,这不是可有可无的优化——这是能不能正常开发的前提


三、执行引擎:栈机器与数据流的混血

n8n 的执行引擎是整个项目最精妙的部分。它在 packages/core/src/execution-engine/workflow-execute.ts 中实现,约 2700 行代码,是一个基于栈的图遍历器

3.1 执行入口:WorkflowRunner

一切始于 WorkflowRunner.run()

1
2
3
4
5
6
7
8
9
触发事件(Webhook/Cron/手动)
  → WorkflowRunner.run()
    → ActiveExecutions.add()          // 注册到活跃执行表
    → 决定执行模式
      ├─ Queue模式 → ScalingService.addJob() → BullMQ → Worker 消费
      └─ Local模式 → runMainProcess()
        → new Workflow(nodes, connections, nodeTypes)
        → new WorkflowExecute(additionalData, mode)
        → processRunExecutionData()    // 进入主循环

这里有一个关键的架构决策:执行模式的切换对上层完全透明。无论是本地执行还是队列模式,WorkflowRunner 都生成相同的 IWorkflowExecutionDataProcess 数据结构。队列模式只是把这个结构序列化后扔给 BullMQ,Worker 端反序列化后执行完全相同的 processRunExecutionData()

这是一种"执行数据自包含"的设计——一个执行所需的所有信息都被序列化在一个 JSON 对象里,不依赖任何进程内状态。

3.2 主循环:栈式图遍历

processRunExecutionData() 的核心是一个 while 循环:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
while (nodeExecutionStack.length !== 0) {
    // 1. 从栈中弹出下一个节点
    const executionData = nodeExecutionStack.shift();

    // 2. 检查超时和取消
    if (this.abortController.signal.aborted) break;

    // 3. 检查 Pin Data(调试模式的固定数据)
    if (pinData[nodeName]) { usePinnedData(); continue; }

    // 4. 执行节点
    const nodeOutput = await this.runNode(executionData, ...);

    // 5. 存储结果
    runData[nodeName] = nodeOutput;

    // 6. 将下游节点加入执行栈
    this.addNodeToBeExecuted(outputConnections, nodeOutput);
}

这看起来简单,但魔鬼在细节里。

执行顺序的版本化。 n8n 有两种执行顺序:v1 使用 unshift(LIFO,深度优先),按节点在画布上的位置从上到下、从左到右排序;legacy 使用 push(FIFO,广度优先)。这个配置存在 workflow.settings.executionOrder 中,意味着同一套引擎可以模拟不同的执行语义

多输入汇聚的同步等待。 当一个节点有多个输入(比如 Merge 节点),addNodeToBeExecuted() 不会立即将它加入执行栈,而是放入 waitingExecution 映射表:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// 简化逻辑
if (node.inputCount > 1) {
    waitingExecution[nodeName] ??= {};
    waitingExecution[nodeName][inputIndex] = data;

    // 只有当所有输入都就绪时,才加入执行栈
    if (allInputsReady(waitingExecution[nodeName])) {
        nodeExecutionStack.push({
            node,
            data: waitingExecution[nodeName]
        });
        delete waitingExecution[nodeName];
    }
} else {
    nodeExecutionStack.push({ node, data });
}

这是一种数据流同步原语的实现。在传统数据流编程中,这叫做 “join”——等待所有上游分支完成后再继续。n8n 用一个简单的哈希表就实现了这个语义,不需要引入复杂的同步框架。

3.3 节点执行:六种范式的统一调度

runNode() 是一个类型分发器,根据节点实现的方法选择执行路径:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
async runNode(executionData, workflow, runExecutionData) {
    const nodeType = workflow.nodeTypes.getByNameAndVersion(node.type, node.typeVersion);

    if (nodeType.execute)       return this.executeNode(nodeType, ...);
    if (nodeType.poll)          return this.executePollNode(nodeType, ...);
    if (nodeType.trigger)       return this.executeTriggerNode(nodeType, ...);
    if (nodeType.webhook)       return passThrough(executionData);  // 已在 handler 中执行
    if (nodeType.description.requestDefaults)
        return this.executeDeclarativeNode(nodeType, ...);          // 声明式节点
}

这里体现了接口隔离原则的灵活运用INodeType 不要求实现所有方法——一个节点可以只实现 execute(),也可以只提供 description.requestDefaults(声明式)。引擎通过检查方法是否存在来决定调度策略。

3.4 取消与超时:协作式中断

n8n 使用 AbortController + PCancelable 实现执行取消:

1
2
3
4
5
const abortController = new AbortController();
const executionPromise = PCancelable.fn(async (signal) => {
    // 主循环在每次迭代时检查 signal
    if (abortController.signal.aborted) break;
});

超时则在 WorkflowRunner 层实现:

1
2
3
4
const maxTimeout = workflow.settings.executionTimeout ?? globalTimeout;
setTimeout(() => {
    activeExecutions.stopExecution(executionId);
}, maxTimeout * 1000);

这是协作式取消,不是强制终止。每个节点需要"配合"检查取消信号。这比 process.kill() 更安全——不会留下未清理的资源或半写入的数据。

3.5 错误处理:三种策略

n8n 提供了三种节点级错误处理策略:

策略行为场景
停止执行记录错误,终止整个工作流默认行为
继续(正常输出)将输入数据原样传递到下游continueOnFail=true
继续(错误输出)将错误路由到专门的错误输出分支onError='continueErrorOutput'

第三种策略特别巧妙——它允许在图层面处理错误,而不是在代码层面。用户可以连一条"错误线"到通知节点,实现可视化的 try-catch。


四、节点抽象:从 400 到 ∞ 的扩展之道

n8n 拥有 400+ 官方集成节点,这不是通过堆人头实现的,而是通过一套描述符驱动的抽象层

4.1 INodeType:唯一的节点契约

所有节点实现同一个接口:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
export interface INodeType {
    description: INodeTypeDescription;    // 元数据 + UI配置(声明式)
    execute?();            // 编程式执行
    supplyData?();         // AI/LangChain 数据供给
    poll?();               // 轮询触发器
    trigger?();            // 事件触发器
    webhook?();            // Webhook 触发器
    methods?: {
        loadOptions?;      // 动态下拉选项
        listSearch?;       // 搜索式选择
        credentialTest?;   // 凭证验证
        resourceMapping?;  // 字段映射
    };
}

注意这个接口的设计哲学:所有方法都是可选的。一个节点可以只提供 description(声明式),也可以实现 execute()(编程式),还可以同时提供两者。这是渐进式复杂度的典范。

4.2 声明式节点:零代码集成

n8n 最精妙的设计之一是声明式节点——通过纯 JSON 描述实现 API 集成,不需要写一行执行代码。

以 Okta 节点为例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
export class Okta implements INodeType {
    description: INodeTypeDescription = {
        displayName: 'Okta',
        requestDefaults: {
            baseURL: '={{$credentials.baseUrl}}/api/v1',
            headers: { 'Content-Type': 'application/json' }
        },
        properties: [
            {
                displayName: 'Resource',
                name: 'resource',
                type: 'options',
                options: [
                    { name: 'User', value: 'user' },
                    { name: 'Group', value: 'group' },
                ],
            },
            {
                displayName: 'Operation',
                name: 'operation',
                type: 'options',
                options: [
                    {
                        name: 'Get',
                        value: 'get',
                        routing: {
                            request: {
                                method: 'GET',
                                url: '=/users/{{$parameter.userId}}'
                            }
                        }
                    },
                ],
            },
        ],
    };
    // 没有 execute() 方法!
}

关键洞察:routing 字段将 UI 参数直接映射到 HTTP 请求。 引擎看到 requestDefaults 时,自动调用 executeDeclarativeNode(),根据用户选择的 resource/operation 组合,从 routing 配置生成 HTTP 请求。

这意味着添加一个新的 REST API 集成,开发者只需要编写一个 JSON 描述符。这是 n8n 能快速扩展到 400+ 集成的核心原因。

4.3 编程式节点:完全控制

当 API 交互太复杂时(需要分页、需要多步调用、需要转换数据),节点可以实现 execute()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
async execute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
    const items = this.getInputData();
    const results: INodeExecutionData[] = [];

    for (const item of items) {
        const response = await this.helpers.request({
            method: 'POST',
            url: 'https://api.slack.com/api/chat.postMessage',
            body: {
                channel: this.getNodeParameter('channel', item),
                text: this.getNodeParameter('text', item),
            },
        });
        results.push({ json: response });
    }

    return [results];  // 二维数组:[输出索引][数据项]
}

IExecuteFunctions(通过 this 注入)提供了丰富的上下文:

  • getInputData() — 获取上游数据
  • getNodeParameter() — 获取用户配置的参数(表达式会被自动解析)
  • helpers.request() — 预配置的 HTTP 客户端(自动注入凭证)
  • getCredentials() — 获取解密后的凭证

4.4 节点版本化:不破坏现有工作流

当节点需要 breaking change 时,n8n 使用版本化节点模式:

1
2
3
4
5
6
7
8
9
export class Slack extends VersionedNodeType {
    constructor() {
        const nodeVersions: IVersionedNodeType = {
            1: new SlackV1(),
            2: new SlackV2(),        // 新版本
        };
        super(nodeVersions, { defaultVersion: 2.4 });
    }
}

现有工作流保持绑定到创建时的版本,新工作流使用 defaultVersion。这样就能在不中断任何用户的情况下演进 API。

这比简单的"不破坏兼容性"更优雅——它承认有时你必须破坏兼容性,但提供了平滑迁移的路径

4.5 凭证系统:继承复用

凭证类型支持继承:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
export class SalesforceOAuth2Api implements ICredentialType {
    name = 'salesforceOAuth2Api';
    extends = ['oAuth2Api'];          // 继承 OAuth2 基类
    properties = [
        { name: 'environment', type: 'options',
          options: [{ name: 'Production', value: 'production' }, ...] },
        { name: 'authorizationUrl', type: 'hidden',
          default: '={{$self["environment"] === "production" ? "https://login.salesforce.com" : "https://test.salesforce.com"}}/services/oauth2/authorize' },
    ];
}

extends 避免了每个 OAuth2 服务都重新实现授权流程。 基类 oAuth2Api 处理了 token 刷新、PKCE、回调 URL 等通用逻辑,子类只需配置端点 URL。

更精妙的是 authenticate 字段:

1
2
3
4
5
6
authenticate: IAuthenticateGeneric = {
    type: 'generic',
    properties: {
        headers: { Authorization: '=Bearer {{$credentials.apiToken}}' }
    }
};

它用声明式的方式描述"如何将凭证注入请求",引擎在发送 HTTP 请求前自动应用。这消除了每个节点手动处理认证的样板代码。


五、表达式引擎:每个字段都是可编程的

n8n 的表达式系统让每个节点参数都可以引用上游数据。当用户在某个字段写入 ={{ $json.email }} 时,背后发生了什么?

5.1 WorkflowDataProxy:$上下文的构造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class WorkflowDataProxy {
    // 构造 $ 上下文对象
    createProxy(): IWorkflowDataProxyData {
        return {
            $json:        // 当前项的 JSON 数据
            $binary:      // 当前项的二进制数据
            $node:        // 按名称访问任意节点的输出
            $items():     // 获取指定节点的所有输出项
            $input:       // 当前节点的输入数据
            $env:         // 环境变量
            $workflow:    // 工作流元数据(名称、ID)
            $execution:   // 执行元数据(ID、模式)
            $prevNode:    // 上一个节点信息
            $parameter:   // 当前节点的参数
        };
    }
}

5.2 Paired Items:数据血缘追踪

n8n 的每个数据项(INodeExecutionData)都携带一个 pairedItem 字段:

1
2
3
4
5
interface INodeExecutionData {
    json: IDataObject;
    binary?: IBinaryKeyData;
    pairedItem?: IPairedItemData;  // { item: number, input?: number }
}

pairedItem 记录了"这个数据项来自上游哪个节点的哪个输出项"。这使得表达式引擎可以精确地追溯数据血缘

当你在节点 C 写 ={{ $node["A"].json.name }} 时,引擎不是简单地取节点 A 的第一个输出——它通过 pairedItem 链,找到与当前处理项对应的那个上游项。这在处理批量数据时至关重要。

5.3 表达式解析的惰性求值

表达式不是在工作流加载时解析的,而是在节点执行时按需求值

1
2
3
4
5
6
// WorkflowExpression.resolveSimpleParameterValue()
if (typeof value === 'string' && value.startsWith('=')) {
    // 创建代理对象,只在表达式求值时才访问上游数据
    const proxy = new WorkflowDataProxy(workflow, runData, itemIndex, ...);
    return expression.evaluate(value.substring(1), proxy.createProxy());
}

惰性求值意味着只有实际执行到某个节点时,它的参数表达式才会被求值。这既节省了计算资源(不执行的分支不求值),也保证了数据的时序正确性(在表达式求值时,上游节点一定已经执行完成)。


六、前后端架构:类型安全的全栈通信

6.1 共享类型包

n8n 用 @n8n/api-types 包定义了前后端共享的 DTO:

1
2
3
4
5
@n8n/api-types
├── CreateWorkflowDto
├── UpdateWorkflowDto
├── CredentialDto
└── ...

前端和后端都依赖这个包。这意味着API 的请求/响应格式在编译时就被类型检查——不可能出现前端发送的字段名和后端期望的不一致的问题。

6.2 装饰器驱动的控制器

后端使用自定义装饰器定义 API:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
@RestController('/workflows')
export class WorkflowsController {
    @Get('/')
    @ProjectScope('workflow:list')    // RBAC 权限检查
    async getAll(req: AuthenticatedRequest): Promise<WorkflowDto[]> {
        return this.workflowService.getAll(req.user);
    }

    @Post('/')
    @ProjectScope('workflow:create')
    async create(req: AuthenticatedRequest<CreateWorkflowDto>): Promise<WorkflowDto> {
        return this.workflowService.create(req.body, req.user);
    }
}

@ProjectScope 装饰器自动进行权限检查,不需要在每个方法里手动调用。这是横切关注点的优雅处理

6.3 依赖注入:483行的IoC容器

@n8n/di 只有 483 行代码,却支撑了整个后端的依赖管理。这是一个有意识的选择——不用 NestJS 这样的重型框架,而是只取自己需要的部分


七、AI 架构:后来者的野心

n8n 在 AI 方面的投入惊人——5个专门的AI包,合计 295K 行代码:

代码量职责
@n8n/nodes-langchain119KLangChain 集成节点
@n8n/ai-workflow-builder105KAI 驱动的工作流构建器
@n8n/instance-ai46K实例级 AI 功能
@n8n/agents26KAI Agent 框架
@n8n/ai-utilities15KAI 工具函数

@n8n/nodes-langchain 值得关注。 它不是简单地包装 LangChain API,而是将 LangChain 的概念(Chain、Agent、Tool、Memory)映射到 n8n 的节点模型中。每个 LangChain 组件变成一个可拖拽的节点,用户可以可视化地组装 AI Pipeline

这里有一个巧妙的扩展:supplyData() 方法。传统节点用 execute() 处理数据流,但 AI 节点需要提供能力(而不是处理数据)。supplyData() 允许节点向下游 Agent 节点提供 Tool、Memory 或 Retriever,建立了一种不同于数据流的"能力流"


八、数据库层:务实的持久化

n8n 使用 TypeORM 的 fork(@n8n/typeorm),支持 SQLite 和 PostgreSQL:

  • SQLite 用于开发和小规模部署(零配置启动)
  • PostgreSQL 用于生产环境

实体设计遵循 WithTimestampsAndStringId 基类模式——所有实体都有字符串 ID(应用层生成 UUID,不依赖数据库自增)、createdAtupdatedAt

应用层生成 UUID 而非数据库自增,这是为分布式做的准备——Queue 模式下多个 Worker 可能同时写入,UUID 避免了 ID 冲突。


九、设计哲学总结

通读 n8n 的 250 万行代码,我提炼出五条核心设计哲学:

1. 描述符驱动(Descriptor-Driven)

不要写代码,写配置。 声明式节点、声明式凭证认证、声明式 UI 生成——n8n 用一个 JSON 描述符同时定义了"这个节点长什么样"和"这个节点做什么"。这使得 400+ 集成成为可能,否则维护成本将是灾难性的。

2. 渐进式复杂度(Progressive Complexity)

简单的事情简单做,复杂的事情可以做。 声明式节点只需要 JSON;编程式节点可以写任意 TypeScript;版本化节点可以管理 breaking change。三种模式覆盖了从"5分钟写个集成"到"处理复杂API逻辑"的完整光谱。

3. 执行数据自包含(Self-Contained Execution)

一个 JSON 就是一次完整执行。 IRunExecutionData 包含了执行所需的所有信息——节点图、参数、凭证引用、执行栈、中间结果。这使得执行可以被序列化、传输、恢复、重放。Queue 模式就是这个设计的自然延伸。

4. 图是一等公民(Graph-First)

错误处理是连线,不是代码。 通过"错误输出"分支,n8n 把异常处理从代码层面提升到了图层面。用户可以看到错误的流向,可以为不同节点配置不同的错误策略。这是工作流引擎相比传统编程的独特优势。

5. 克制的技术选型(Minimal Dependencies)

483行的IoC容器,61行的错误基类。 n8n 没有选择 NestJS、没有选择 Prisma、没有选择市面上任何"重型"框架。它只取自己需要的部分,保持了对底层的完全控制。对于一个需要长期维护的基础设施项目,这种克制比"用最新技术"重要得多。


十、写在最后:工程与产品的平衡

n8n 的代码库不是学术论文里的完美架构——415K LOC 的 CLI 包说明它也有"巨石化"的倾向,46个内部包的管理成本不可忽视,TypeORM 的 fork 意味着额外的维护负担。

但它做对了最关键的一件事:在正确的层次做出正确的抽象

INodeType 接口是稳定的契约,十年后可能还在用。WorkflowExecute 是灵活的引擎,能适应从单机到分布式的演进。声明式节点是高效的杠杆,一个人可以维护一百个集成。

这就是优秀开源项目给我们的启示:架构不是画在白板上的方框和箭头,而是代码中每一个接口定义、每一次方法分发、每一个可选参数的选择。 这些微小的决策累积起来,决定了一个项目能走多远。


参考来源