Temporal 服务编排实例:转账服务

1、SDK

Temporal GO SDK

2、目标

  • 使用 Temporal Server 和Go SDK完成Temporal工作流应用程序的几次运行交互
  • 练习检查工作流的状态
  • 理解工作流功能的内在可靠性
  • 学习许多Temporal的核心术语和概念

3、示例介绍

Temporal Server 和特定语言的SDK(本例中是Go SDK)为现代应用程序开发带来的复杂性提供了全面的解决方案。

您可以将Temporal看作是一种“万能药”,可以解决作为开发人员在尝试构建可靠应用程序时所经历的痛苦。

Temporal提供了开箱即用的可靠性原语,比如无缝和容错的应用程序状态跟踪、自动重试、超时、由于流程失败而回滚等等。

让我们运行我们的第一个Temporal Workflow 应用程序,并永远地改变您进行应用程序开发的方式。

4、应用程序概述

这个项目示例模拟了一个“转账”应用程序,它有一个单一的Workflow函数,编排Withdraw()和Deposit()函数的执行,表示从一个账户到另一个账户的转账。

Tempory称这些实际的函数为Activity functions.

 要运行这个程序,你需要做到以下几点:

1.给Temporal服务器发送信号开始转账。Temporal服务器将跟踪工作流函数执行的进度。

2.运行一个Worker。Worker是你编译的Workflow和Activity代码的包装器。Worker的唯一工作是执行Activity和Workflow函数,并将结果反馈给Temporal服务器。

5、Workflow 函数

Workflow功能是应用程序的入口点。这是我们的转账的workflow看起来如下:

func TransferMoney(ctx workflow.Context, transferDetails TransferDetails) error {
    // RetryPolicy specifies how to automatically handle retries if an Activity fails.
    retrypolicy := &temporal.RetryPolicy{
        InitialInterval:    time.Second,
        BackoffCoefficient: 2.0,
        MaximumInterval:    time.Minute,
        MaximumAttempts:    500,
    }
    options := workflow.ActivityOptions{
        // Timeout options specify when to automatically timeout Actvitivy functions.
        StartToCloseTimeout: time.Minute,
        // Optionally provide a customized RetryPolicy.
        // Temporal retries failures by default, this is just an example.
        RetryPolicy: retrypolicy,
    }
    ctx = workflow.WithActivityOptions(ctx, options)
    err := workflow.ExecuteActivity(ctx, Withdraw, transferDetails).Get(ctx, nil)
    if err != nil {
        return err
    }
    err = workflow.ExecuteActivity(ctx, Deposit, transferDetails).Get(ctx, nil)
    if err != nil {
        return err
    }
    return nil
}

当你“启动”一个工作流时,你基本上是在告诉Temporal Server:"track the state of the Workflow with this function signature"。Workers 将依次执行下面的Workflow代码,将执行事件和结果传回服务器。

6、发起转账

在使用Temporal Server的时候,有两种方式来启动工作流:一种是通过SDK,一种是通过CLI,在本教程中,我们使用SDK来启动工作流,这是大多数工作流在活动环境中启动的方式。对Temporal服务器的调用可以是 synchronously or asynchronously。这里我们是异步执行的,因此您将看到程序运行,告诉您事务正在处理,然后退出。

func main() {
    // Create the client object just once per process
    c, err := client.NewClient(client.Options{})
    if err != nil {
        log.Fatalln("unable to create Temporal client", err)
    }
    defer c.Close()
    options := client.StartWorkflowOptions{
        ID:        "transfer-money-workflow",
        TaskQueue: app.TransferMoneyTaskQueue,
    }
    transferDetails := app.TransferDetails{
        Amount:      54.99,
        FromAccount: "001-001",
        ToAccount:   "002-002",
        ReferenceID: uuid.New().String(),
    }
    we, err := c.ExecuteWorkflow(context.Background(), options, app.TransferMoney, transferDetails)
    if err != nil {
        log.Fatalln("error starting TransferMoney workflow", err)
    }
    printResults(transferDetails, we.GetID(), we.GetRunID())
}

7、运行workflow

确保Temporal Server 在终端中运行,然后运行start/main。使用以下命令从项目根目录进入

go run start/main.go

如果这是你第一次运行这个应用程序,开始时可能会下载一些依赖项,但最终你会得到一些类似这样的反馈:

2021/03/12 01:34:36 INFO  No logger configured for temporal client. Created default one.
2021/03/12 01:34:37
Transfer of $54.990002 from account 001-001 to account 002-002 is processing. ReferenceID: 8e37aafe-5fb8-4649-99e3-3699e35e6c32
2021/03/12 01:34:37
WorkflowID: transfer-money-workflow RunID: 0730a9a3-d17b-4cb5-a4a0-279c9759dfa1

8、状态可视化

现在是时候看看Temporal提供的一个非常酷的功能:应用程序状态可见性。访问时态Web UI,您将看到您的工作流列表。

接下来,单击工作流的“Run Id”。现在,我们可以看到关于我们告诉Temporal 服务器要跟踪的Workflow代码执行的所有我们想知道的信息,比如给出了什么参数值、超时配置、计划重试、尝试次数、堆栈可跟踪错误等等。

 

 

看起来我们的工作流正在“运行”,但是为什么工作流和活动代码还没有执行呢?通过单击任务队列名称进行调查,查看为处理这些任务而注册的有效的“轮询器”。该列表将为空。没有worker轮询任务队列!

9、Worker

该启动Worker了。Worker负责执行工作流和活动代码。

  • 它只能执行已注册到它的代码。
  • 它知道从任务队列获取的任务中执行哪段代码。
  • 它只侦听它注册到的任务队列。

在The Worker执行代码之后,它将结果返回给Temporal Server。注意,Worker监听Workflow和Activity任务发送到的同一个任务队列。这被称为“任务路由”,是一种用于负载平衡的内置机制。

func main() {
    // Create the client object just once per process
    c, err := client.NewClient(client.Options{})
    if err != nil {
        log.Fatalln("unable to create Temporal client", err)
    }
    defer c.Close()
    // This worker hosts both Workflow and Activity functions
    w := worker.New(c, app.TransferMoneyTaskQueue, worker.Options{})
    w.RegisterWorkflow(app.TransferMoney)
    w.RegisterActivity(app.Withdraw)
    w.RegisterActivity(app.Deposit)
    // Start listening to the Task Queue
    err = w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalln("unable to start Worker", err)
    }
}

任务队列由一个简单的字符串名称定义:

const TransferMoneyTaskQueue = "TRANSFER_MONEY_TASK_QUEUE"

10、运行 Worker

go run worker/main.go

当您启动Worker时,它将开始轮询任务队列(如果您再次检查Temporal Web,您将看到一个新的轮询器在以前没有注册的地方注册)。

终端输出看起来是这样的:

2021/03/12 01:43:49 INFO  No logger configured for temporal client. Created default one.
2021/03/12 01:43:49 INFO  Started Worker Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41326@swyxs-Mac-mini@
2021/03/12 01:43:50 DEBUG ExecuteActivity Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41326@swyxs-Mac-mini@ WorkflowType TransferMoney WorkflowID transfer-money-workflow RunID 0730a9a3-d17b-4cb5-a4a0-279c9759dfa1 ActivityID 5 ActivityType Withdraw

Withdrawing $54.990002 from account 001-001. ReferenceId: 8e37aafe-5fb8-4649-99e3-3699e35e6c32

内部机制:

  • Worker找到的第一个任务是告诉它要执行Workflow函数的任务。
  • Worker将事件通信回服务器。
  • 这导致服务器将Activity Tasks发送到Task Queue。
  • Worker从任务队列中以它们各自的顺序获取每个Activity任务,并执行每个相应的Activity。

这些都是可以在Temporal Web中审计的History Events(在Summary旁边的历史选项卡下)。一旦工作流完成并关闭,在被删除之前,完整的历史记录将保留一段设置的保留期(通常是7-30天)。您可以设置archive特性,以便将它们发送到长期存储,以满足合规/审计需求。

祝贺您,您刚刚运行了一个Temporal 工作流应用程序!

11、失败模拟

您刚刚领略了Temporal的重要功能之一:工作流的可见性和执行代码的Workers的状态。让我们探讨另一个重要的功能特性:即使在失败面前,也要保持工作流的状态。为了演示这一点,我们将为我们的工作流模拟一些失败。确保在继续之前停止Worker。

Server crash

与许多需要复杂的领导者选举过程和外部数据库来处理故障的现代应用程序不同,Temporal会自动保存工作流的状态,即使服务器宕机。你可以很容易地通过以下步骤进行测试(同样,确保你的Worker已经停止,这样你的Workflow就不会完成):

  1. 再次启动工作流。
  2. 验证工作流正在UI中运行。
  3. 使用'Ctrl c'或通过Docker仪表板关闭Temporay Server。
  4. 在Temporay Server 停止后,重新启动它并访问UI。

您的工作流仍然存在!

Activity error

接下来让我们在Deposit() Activity函数中模拟一个错误。让您的工作流继续运行。打开activity.go文件并切换return语句上的注释,这样Deposit()函数就会返回一个错误:

func Deposit(ctx context.Context, transferDetails TransferDetails) error {
    fmt.Printf(
        "\nDepositing $%f into account %s. ReferenceId: %s\n",
        transferDetails.Amount,
        transferDetails.ToAccount,
        transferDetails.ReferenceID,
    )
    // Switch out comments on the return statements to simulate an error
    //return fmt.Errorf("deposit did not occur due to an issue")
    return nil
}

保存更改并运行Worker。您将看到Worker完成Withdraw() Activity函数,但在尝试Deposit() Activity 函数时出错。这里需要注意的重要一点是,Worker一直在重试Deposit()函数:

021/03/12 02:03:23 INFO  No logger configured for temporal client. Created default one.
2021/03/12 02:03:23 INFO  Started Worker Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@
2021/03/12 02:04:25 DEBUG ExecuteActivity Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowType TransferMoney WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityID 5 ActivityType Withdraw

Withdrawing $54.990002 from account 001-001. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1
2021/03/12 02:04:25 DEBUG ExecuteActivity Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowType TransferMoney WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityID 11 ActivityType Deposit

Depositing $54.990002 into account 002-002. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1
2021/03/12 02:04:25 ERROR Activity error. Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityType Deposit Error deposit did not occur due to an issue

Depositing $54.990002 into account 002-002. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1
2021/03/12 02:04:26 ERROR Activity error. Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityType Deposit Error deposit did not occur due to an issue

# it keeps retrying... with the RetryPolicy specified in workflow.go

你可以查看更多关于UI中发生的事情的信息。单击工作流的RunId。您将看到挂起的Activity列在那里,其中包含详细信息,如它的状态、它被尝试的次数和下一次计划的尝试

传统上,应用程序开发人员被迫在服务代码本身中实现超时和重试逻辑。这是重复和容易出错的。对于Temporal,一个关键的价值主张是超时配置和重试策略在Workflow代码中作为Activity选项指定。在workflow.go,您可以看到我们已经为活动指定了一个StartToCloseTimeout,并设置了一个重试策略,告诉服务器重试它们最多500次。你可以在我们的文档中阅读更多关于活动和工作流重试的内容。

因此,您的工作流正在运行,但只有Withdraw() Activity 函数成功。在任何其他应用程序中,整个流程可能都必须放弃并回滚。因此,这是本教程的最后一个价值主张:使用Temporal,我们可以在工作流运行时调试问题!假设你找到了解决这个问题的潜在方法;在activity.go切换回Deposit()函数的return语句上的注释。进入文件并保存更改。

我们怎么可能更新一个已经完成了一半的工作流?对于Temporal,它实际上非常简单:只要重新启动Worker!

# continuing logs from previous retries...

Depositing $54.990002 into account 002-002. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1
2021/03/12 02:09:28 ERROR Activity error. Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ WorkflowID transfer-money-workflow RunID 90059628-4506-4afb-bb41-90abc6a6ade9 ActivityType Deposit Error deposit did not occur due to an issue

# cancel and restart...

^C2021/03/12 02:10:17 INFO  Worker has been stopped. Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@ Signal interrupt
2021/03/12 02:10:17 INFO  Stopped Worker Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41532@swyxs-Mac-mini@

$ go run worker/main.go
2021/03/12 02:10:23 INFO  No logger configured for temporal client. Created default one.
2021/03/12 02:10:23 INFO  Started Worker Namespace default TaskQueue TRANSFER_MONEY_TASK_QUEUE WorkerID 41708@swyxs-Mac-mini@

Depositing $54.990002 into account 002-002. ReferenceId: 1a203da3-f5ea-4c17-b5cf-5ee4dcb061a1

在下一次计划的尝试中,Worker将从工作流失败的地方开始,并成功执行新编译的Deposit()Activity函数,完成工作流。基本上,您只是在不丢失工作流状态的情况下“动态地”修复了一个bug

12、总结

现在您已经了解了如何运行Temporal Workflow,并了解了Temporal提供的一些关键值。让我们快速复习一下,确保你记住了一些更重要的内容:

  • 我们在本教程中提到的四个Temporal的价值主张是什么?
  1. Temporal为您提供了工作流和代码执行状态的完整可见性。
  2. Temporal 维护您的工作流的状态,即使在服务器停机和错误。
  3. Temporal使得使用存在于业务逻辑之外的选项很容易超时和重试活动代码。
  4. Temporal使您能够在工作流运行时对业务逻辑执行“live debugging”。
  • 如何将Workflow初始化与执行它的Worker配对

        利用相同的 Task Queue

  • 如果我们为正在运行的Workflow更改Activity代码,我们必须做什么?

        重启Worker


版权声明:本文为hero_java原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
THE END
< <上一篇
下一篇>>