图思维构建任务流水线:JGraphlet技术详解

JGraphlet是一个轻量级、零依赖的Java库,采用有向无环图模型构建任务流水线,支持同步/异步任务混合执行,提供缓存机制和上下文共享,简化复杂工作流编排。

图思维构建任务流水线:JGraphlet技术详解

JGraphlet是一个轻量级、零依赖的Java库,用于构建任务流水线。其核心优势不在于冗长的功能列表,而在于一组协同工作的核心设计原则。

JGraphlet的核心是 simplicity,基于图模型构建。向流水线添加任务并连接它们以创建图。每个任务都有输入和输出。TaskPipeline构建并执行流水线,同时管理每个任务的I/O。

例如,用于扇入的Map、用于自定义数据模型的Record等。任务流水线还提供PipelineContext在任务间共享数据,此外任务还可以被缓存,避免重复计算。您可以选择任务流水线的执行方式,选择同步任务或异步任务。默认情况下,所有任务都是异步的。

八大核心原则

1. 图优先执行模型

JGraphlet将工作流视为有向无环图(DAG)。您将任务定义为节点,并明确绘制它们之间的连接(边)。这使得扇出(一个任务馈送多个任务)和扇入(多个任务馈送一个任务)等复杂模式变得自然。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import dev.shaaf.jgraphlet.*;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

try (TaskPipeline pipeline = new TaskPipeline()) {
    Task<String, String> fetchInfo = (id, ctx) -> CompletableFuture.supplyAsync(() -> "Info for " + id);
    Task<String, String> fetchFeed = (id, ctx) -> CompletableFuture.supplyAsync(() -> "Feed for " + id);
    Task<Map<String, Object>, String> combine = (inputs, ctx) -> CompletableFuture.supplyAsync(() ->
        inputs.get("infoNode") + " | " + inputs.get("feedNode")
    );

    pipeline.addTask("infoNode", fetchInfo)
            .addTask("feedNode", fetchFeed)
            .addTask("summaryNode", combine);

    pipeline.connect("infoNode", "summaryNode")
            .connect("feedNode", "summaryNode");

    String result = (String) pipeline.run("user123").join();
    System.out.println(result); // "Info for user123 | Feed for user123"
}

2. 两种任务风格:Task<I,O>和SyncTask<I,O>

JGraphlet提供两种可混合搭配的任务类型:

  • Task<I, O>(异步):返回CompletableFuture。适用于I/O操作或繁重计算。
  • SyncTask<I, O>(同步):直接返回O。适用于快速的CPU密集型操作。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
try (TaskPipeline pipeline = new TaskPipeline()) {
    Task<String, String> fetchName = (userId, ctx) ->
        CompletableFuture.supplyAsync(() -> "John Doe");

    SyncTask<String, String> toUpper = (name, ctx) -> name.toUpperCase();

    pipeline.add("fetch", fetchName)
            .then("transform", toUpper);

    String result = (String) pipeline.run("user-42").join();
    System.out.println(result); // "JOHN DOE"
}

3. 简单明确的API

JGraphlet避免复杂的构建器或魔法配置。API简洁明确:

  • 创建流水线:new TaskPipeline()
  • 注册节点:addTask("uniqueId", task)
  • 连接它们:connect("fromId", "toId")

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
try (TaskPipeline pipeline = new TaskPipeline()) {
    SyncTask<String, Integer> lengthTask = (s, c) -> s.length();
    SyncTask<Integer, String> formatTask = (i, c) -> "Length is " + i;

    pipeline.addTask("calculateLength", lengthTask);
    pipeline.addTask("formatOutput", formatTask);

    pipeline.connect("calculateLength", "formatOutput");

    String result = (String) pipeline.run("Hello").join();
    System.out.println(result); // "Length is 5"
}

4. 清晰的扇入输入形状

扇入任务接收Map<String, Object>,其中键是父任务ID,值是它们的结果。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
try (TaskPipeline pipeline = new TaskPipeline()) {
    SyncTask<String, String> fetchUser = (id, ctx) -> "User: " + id;
    SyncTask<String, String> fetchPerms = (id, ctx) -> "Role: admin";

    Task<Map<String, Object>, String> combine = (inputs, ctx) -> CompletableFuture.supplyAsync(() -> {
        String userData = (String) inputs.get("userNode");
        String permsData = (String) inputs.get("permsNode");
        return userData + " (" + permsData + ")";
    });

    pipeline.addTask("userNode", fetchUser)
            .addTask("permsNode", fetchPerms)
            .addTask("combiner", combine);

    pipeline.connect("userNode", "combiner").connect("permsNode", "combiner");

    String result = (String) pipeline.run("user-1").join();
    System.out.println(result); // "User: user-1 (Role: admin)"
}

5. 清晰的运行契约

执行流水线很简单:pipeline.run(input)返回最终结果的CompletableFuture。您可以使用.join()阻塞或使用异步链式调用。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
String input = "my-data";

// 阻塞方式
try {
    String result = (String) pipeline.run(input).join();
    System.out.println("Result (blocking): " + result);
} catch (Exception e) {
    System.err.println("Pipeline failed: " + e.getMessage());
}

// 非阻塞方式
pipeline.run(input)
        .thenAccept(result -> System.out.println("Result (non-blocking): " + result))
        .exceptionally(ex -> {
            System.err.println("Async pipeline failed: " + ex.getMessage());
            return null;
        });

6. 内置资源生命周期

JGraphlet实现AutoCloseable。使用try-with-resources保证内部资源的安全关闭。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
try (TaskPipeline pipeline = new TaskPipeline()) {
    pipeline.add("taskA", new SyncTask<String, String>() {
        @Override
        public String executeSync(String input, PipelineContext context) {
            if (input == null) {
                throw new IllegalArgumentException("Input cannot be null");
            }
            return "Processed: " + input;
        }
    });

    pipeline.run("data").join();

} // pipeline.shutdown() 自动调用
System.out.println("Pipeline resources have been released.");

7. 上下文

PipelineContext是线程安全的、每次运行的工作空间,用于存储元数据。

示例:

1
2
3
4
5
6
7
8
SyncTask<String, String> taskA = (input, ctx) -> {
    ctx.put("requestID", "xyz-123");
    return input;
};
SyncTask<String, String> taskB = (input, ctx) -> {
    String reqId = ctx.get("requestID", String.class).orElse("unknown");
    return "Processed input " + input + " for request: " + reqId;
};

8. 可选缓存

任务可以选择加入缓存以避免重复计算。

示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
Task<String, String> expensiveApiCall = new Task<>() {
    @Override
    public CompletableFuture<String> execute(String input, PipelineContext context) {
        System.out.println("Performing expensive call for: " + input);
        return CompletableFuture.completedFuture("Data for " + input);
    }
    @Override
    public boolean isCacheable() { return true; }
};

try (TaskPipeline pipeline = new TaskPipeline()) {
    pipeline.add("expensive", expensiveApiCall);

    System.out.println("First call...");
    pipeline.run("same-key").join();

    System.out.println("Second call...");
    pipeline.run("same-key").join(); // 结果来自缓存
}

最终结果是构建同步或异步任务编排的简洁、可测试方式,用于组合复杂流程,如并行检索、合并、判断和保护机制,而无需引入重量级工作流引擎。

相关链接:

  • Maven中央仓库
  • GitHub仓库
comments powered by Disqus
使用 Hugo 构建
主题 StackJimmy 设计