欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

Dapr for dotnet | 并发计算模型 - Virtual Actors

发布时间:2023/12/18 编程问答 71 豆豆
生活随笔 收集整理的这篇文章主要介绍了 Dapr for dotnet | 并发计算模型 - Virtual Actors 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

Actor 模型简介

actor 模型起源于1973年。 它是由 Carl Hewitt 作为 并发计算的概念模型 提出的, 并发计算是一种同时执行多个计算的计算形式 。 当时还没有高度并行的计算机,但多核 cpu 和分布式系统的最新进展使 actor 模型流行起来。

在 actor 模型中, actor 是一个独立的计算单元和状态单元。 这些 actor 彼此完全隔离,他们永远不会共享内存。 actor 之间使用消息相互通信。 当 actor 接收到消息时,它可以更改其内部状态,并向其他(可能是新的) actor 发送消息。

actor 模型使编写并发和并行系统变得更容易 的原因是它提供了一个基于回合(或单线程)的处理模型。 多个 actor 可以同时运行,但每一种 actor 在某一个时刻只处理一条消息。 这意味着你可以确定在任何时候在一个 actor 中最多只有一个线程是活动的。 这使得编写正确的并发和并行系统变得更加容易。

Dapr Actors 简介

actor 模式/模型 阐述了 Actors 为 最低或最小级别的 “计算单元”。 换句话说,您将代码写入独立单元 ( 称为 actor) ,该单元接收消息并一次处理消息,而不进行任何类型的并行或线程处理。

当代码处理一条消息时,它可以向其他 Actors 发送一条或多条消息,或者创建新的 Actors。 底层 runtime 将管理每个 actor 的运行方式、时机和位置,并在 Actors 之间传递消息。

大量 Actors 可以同时执行,而 Actors 可以相互独立执行。

actor 模型实现通常被绑定到特定的语言或平台上。 但是,通过 Dapr actor 构建块的实现,您可以从任何语言或平台依据 Actors 模型编写 Dapr Actor,而 Dapr 利用了底层平台提供的可扩展性和可靠性保证。

Dapr 包含专门实现 Orleans – Virtual Actors 模式 的运行时。 通过这个虚拟 actor 模型,你不需要显式的创建 actor。在第一次向actor 发送消息时,actor 被隐式激活并放置在集群中的一个节点上。当一个actor在一段时间内未被使用时或不执行操作时,actor 将以静默方式从内存中(In memory)卸载。如果一个节点发生故障,Dapr会自动将激活的 actor 转移到健康的节点。 除了在 actor 之间发送消息外,Dapr actor 模型还支持使用 Timer(计时器) Reminder(提醒器)来调度未来的工作。

Actors 模式的应用场景

尽管 actor 模型可以提供很大的好处(可以很好适应一些分布式系统问题和场景),但仔细考虑 actor 设计模式也是很重要的。 例如, 许多客户机调用同一个 actor 将导致性能低下,因为 actor 操作是线性连续执行的

与任何其他技术决策一样,您应该根据您尝试解决的问题来决定是否使用 Actors。 一般来说,在下列情况的问题或场景中,可以考虑使用 actor 模式来处理:

  • 【需要处理的问题】
  • 涉及并发性,如果没有actor,就必须在代码中引入显式锁定机制(隐藏死锁问题风险)。
  • 可划分的状态单元和逻辑单元,需要处理的问题空间涉及大量(数千或更多) 可以划分为小的、独立的、孤立的状态单元和逻辑单元。
  • 线性串行单线程对象处理:这些对象不需要外部组件的大量交互,例如在一组 Actors 之间查询状态。
  • 非阻塞化的I/O操作调用: actor 实例不会通过发出 I/O 操作来阻塞调用方。
    • 【不需要处理的问题】
  • 不需要低延迟的读取 actor 状态,由于actor 操作是串行执行的,因此不能保证低延迟读取。
  • 不需要跨一组 actor 查询状态,跨 actor 的查询效率很低,因为每个actor 的状态都需要单独读取,并且可能会带来不可预知的延迟。
  • Dapr 中的 Actors(并发计算模型 & 分布式单例模型)

    每个 actor 都定义为 actor 类型的实例,与对象是类的实例的方式相同。 例如,可能存在实现计算器功能的 actor 类型,并且该类型的许多 Actors 分布在集群的各个节点上。 每个这样的 actor 都是由一个 actor ID (全局唯一)确定的。

    Dapr Actor 生命周期

    Dapr Actors 是虚拟的,意思是他们的 生命周期与他们在内存中(in - memory)的表现不相关。 因此,它们不需要显式创建或销毁。 Dapr Actors 运行时在第一次接收到该 actor ID(全局唯一)的请求时自动激活 actor。 如果 actor 在一段时间内未被使用,那么 Dapr Actors 运行时将回收内存对象。 如果以后需要重新启动,它还将保持对 actor 的一切原有数据(数据持久化)。

    计时器(timer)和提醒器(reminders)

    调用 actor 方法和 reminders(提醒器)将重置空闲 timer(计时器) ,例如,reminders 触发将使 actor 保持活动状态。 不论 actor 是否处于活动状态或不活动状态 actor reminders 都会触发,对不活动 actor ,那么会首先激活 actor。 actor timers 不会重置空闲 timer,因此 timer(计时器)触发不会使 actor 保持活动状态。 timer 仅在 actor 活跃时被触发。

    actor 可以使用 timer 和 reminders 来安排对自己的调用。 这两个概念 都支持适当时间的配置。 区别在于回调注册的生命周期

    • 只有当 actor 被激活时,计时器(timer)才会保持活动状态。 计时器不会重置空闲计时器,因此它们不能让参与者自己处于活动状态。
    • 提醒器比 actor 活得长。 如果 actor 被禁用,一个提醒器(reminders)可以重新激活该 actor。 提醒器(reminders)将重置空闲计时器(timer)。

    Actor 之间的消息传递

    空闲超时和扫描时间间隔 Dapr 运行时(daprd)用于查看是否可以对 actor 进行垃圾收集。 当 daprd 调用 actor 服务以获取受支持的 actor 类型时,此时可以用于信息传递

    Virtual actors 生命周期抽象会将一些警告作为 virtual actors 模型的结果,而事实上, Dapr Actors 实施有时会偏离此模型。

    Dapr actor 状态持久化

    在第一次将消息发送到其 actor 标识时,将自动激活 actor ( 导致构造 actor 对象) 。 在一段时间后,actor 对象将被垃圾回收。 以后,再次使用 actor ID 访问,将构造新的 actor。 actor 的状态比对象的生命周期更久,因为 actor 的状态存储在 Dapr 运行时的 State Management building block 中(也就是说 actor 即使不在活跃状态,仍然可以读取它的状态)。

    Dapr Actors 的工作方式

    Dapr 的 Sidecar 提供了 HTTP/gRPC 的 API 来调用 actor。

    基于 HTTP 的 Sidecar API 调用格式

    http://localhost:<daprPort>/v1.0/actors/<actorType>/<actorId>/
    • < daprPort >:Dapr Sidecar 所监听的 HTTP 端口。
    • < actorType >:actor 的执行组件类型。
    • < actorId >:所指定的要调用的某个 actor 的 Id。

    Sidecar 管理每个 actor 运行的方式、时间和地点,并且还会在 actor 之间路由消息。 当某个 actor 处于未使用状态一段时间后,运行时会停用该 actor 并从内存中删除它。 actor 管理的任何状态都会持久保存,并在 actor 重新激活时可用。 Dapr 使用空闲计时器/Timer 来确定何时可停用参与者。 当对 actor 调用操作时(通过方法调用或 提醒器/Reminder 触发),将重置空闲计时器,并且 actor 实例保持在激活状态。

    Service 与 Sidecar 之间的各种 API 调用

    Sidecar API 只是其中的一部分。 服务本身也需要实现 API 规范,因为你为 actor 编写的实际代码将在服务本身内部运行。 下图显示了 Service 与 Sidecar 之间的各种 API 调用:

    Dapr actor 提供可扩展性和可靠性

    分发和故障转移

    为了提供可扩展性和可靠性,Actors 实例分布在整个集群中, Dapr 会根据需要自动将对象从失败的节点迁移到健康的节点。

    Actors 分布在 actor 服务的实例中,并且这些实例分布在集群中的节点之间。 每个服务实例(actorId)都包含给定 Actors 类型(actorType)的一组 Actors。

    Actor 安置服务(Actor placement service)

    在 self-hosted 模式下,dapr 初始化时会启动一个 placement service 的 docker 容器服务,我们下面所说的 “安置服务”、“placement-service” 等都指代的是这个容器。

    Dapr Sidecar 为了提供可伸缩性和可靠性,在 actor 服务的所有实例中对 actor 进行了分区。 Dapr 安置服务(Actor placement service) 负责跟踪分区信息。 当启动 actor 服务的新实例时,Sidecar 会向安置服务注册受支持的 actor 类型。安置服务计算给定 actor 类型的更新分区信息,并将其广播到所有实例。 下图显示了当服务横向扩展到第二个副本时发生的情况:

  • 启动时,Sidecar 会调用 actor 服务,获取注册的 actor 类型和 actor 配置设置信息。
  • Sidecar 会将注册的 actor 类型列表发送到 Dapr 安置服务 (Actor placement service)。
  • 安置服务(Actor placement service)将更新后的分区信息广播到所有 actor 服务实例。 每个实例将保留分区信息的缓存副本,并使用它来调用 actor 。
  • 【重要】actor 跨服务实例随机分布,因此可以预料 actor 操作总是需要调用网络中的其他节点。

    下图显示了在 Pod 1 中运行的排序服务实例调用 ID 为 3 的 OrderActor 实例的 ship 方法。 由于 ID 为 3 的参与者被放置在不同的实例中,这会导致调用群集中的不同节点:

  • 该服务会对 Sidecar 调用 actor API。 请求正文中的 JSON 有效负载包含要发送给 actor 的数据。
  • Sidecar 使用安置服务(Actor placement service)中本地缓存的分区信息来确定要负责托管 ID 为 3 的 actor 的该服务实例(分区)。 在本示例中,它是 Pod 2 中的服务实例。 调用将转发到相应的 Sidecar。
  • Pod 2 中的 Sidecar 实例会调用服务实例,进而调用 actor。 服务实例会激活 actor(如果尚未激活),并执行 actor 方法。
  • 【注意】pod1 和 pod2 中所运行的都是 Ordering Service 服务的实例。

    Actor 通信

    您可以通过 HTTP/gRPC 来与 Dapr 交互以调用 actor 方法。

    # POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/<method/state/timers/reminders>

    您可以在请求主体中为 actor 方法提供任何数据,并且请求的响应在响应主体中,这是来自 actor 方法调用的数据。

    并发(Concurrency)

    Dapr Actors 运行时提供了一个简单的基于回合的访问模型,用于访问 Actors 方法。 这意味着任何时候都只有一个线程在一个 actor 对象的代码内活动。 基于回合的访问大大简化了并发系统,因为不需要同步数据访问机制。 这也意味着系统的设计必须考虑到每个 actor 实例的单线程访问性质。

    • 单个 actor 实例一次无法处理多个请求。 如果 actor 实例预期要处理并发请求,可能会导致吞吐量瓶颈。
    • 如果两个 Actors 之间存在循环请求,而外部请求同时向其中一个 Actors 发出外部请求,那么 Actors 可以相互死锁。 Dapr actor 运行时会自动分出 actor 调用,并向调用方引发异常以中断可能死锁的情况。

    基于回合的访问

    一个回合包括执行 actor 方法以响应来自其他 Actors 或客户端的请求,或执行 timer/reminders 回调。 即使这些方法和回调是异步的,但 Dapr Actors 运行时并没有将它们交错(Interleave ,即并发调用它们)。 在允许新回合之前,必须完全结束之前的回合。 换句话说,在允许对方法或回调进行新调用之前,必须完全完成当前正在执行的 actor 方法或 timer/reminders 回调。 如果执行从方法或回调返回结果,并且方法或回调返回的任务已完成,则方法或回调将被视为已完成。 值得强调的是,即使在不同方法、timer和回调中,基于回合的并发也一样起作用。

    Dapr Actors 运行时通过在回合开始时获取每个 Actors 的锁定并在该回合结束时释放锁定来实施基于回合的并行。 因此, 基于回合的并发性是按每个 actor 执行的,而不是跨 Actors 执行的。 Actor 方法和 timer/reminders 回调可以代表不同的 Actors 同时执行。

    下面的示例演示了上述概念。 现在有一个实现了两个异步方法(例如,方法 1 和方法 2)、timer 和 reminders 的 actor。 下图显示了执行这些方法的时间线的示例,并代表属于此 Actors 类型的两个 Actors ( ActorId1 和 ActorId2) 的回调。

    使用 Dapr for dotnet SDK 项目演示

    • Actor.StateManager
    • Timer(计时器)
    • Reminder(提醒器)

    新建 lib 类库 Common 项目

    此处 Demo 演示依然使用 FrontEnd 和 BackEnd 项目进行,新建 lib 类库 Common,添加如下 Nuget 依赖包:

    • Dapr.Actors v1.7.0
    • Dapr.Actors.AspNetCore v1.7.0

    新建接口 IWorkflowActor 并继承 IActor,添加如下代码:

    using Dapr.Actors;namespace Common;public interface IWorkflowActor : IActor {Task<bool> ApproveAsync();Task<int> IncrementAsync();#region TimerTask RegisterTimerAsync();Task UnregisterTimerAsync();#endregion#region ReminderTask RegisterReminderAsync();Task UnregisterReminderAsync();#endregion }

    BackEnd 项目改造

  • 项目添加上面新建的 lib 类库 Common 依赖项;
  • 新建文件夹 Actors,并添加 WorkflowActor.cs 文件,让该文件继承 Actor 并实现 IWorkflowActor, IRemindable 接口;
  • using Common; using Dapr.Actors.Runtime; using System.Text.Json;namespace BackEnd.Actors;public class WorkflowActor : Actor, IWorkflowActor, IRemindable {private readonly ILogger<WorkflowActor> _logger;public WorkflowActor(ActorHost host, ILogger<WorkflowActor> logger) : base(host){_logger = logger ?? throw new ArgumentNullException(nameof(host));//_logger = logger;}public async Task<bool> ApproveAsync(){await StateManager.AddOrUpdateStateAsync(Id.ToString(), "approve", (key, currentStatus) => "approve");return true;}private static readonly string stateName = "counter";public async Task<int> IncrementAsync(){var counterValue = await StateManager.TryGetStateAsync<int>(stateName);var currentValue = counterValue.HasValue ? counterValue.Value : 0;var newValue = currentValue + 1;await StateManager.SetStateAsync(stateName, newValue);return newValue;}#region Timerprivate static readonly string timeName = "TestTimer";public Task RegisterTimerAsync(){var serializedTimerParams = JsonSerializer.SerializeToUtf8Bytes($"now is {DateTime.Now}");return RegisterTimerAsync(timeName, nameof(this.TimerCallbackAsync), serializedTimerParams, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3));}private async Task TimerCallbackAsync(byte[] data){var stateKey = "timer-nowtime";var content = JsonSerializer.Deserialize<string>(data);_logger.LogInformation($"[{DateTime.Now}] timer content=> {content}");await StateManager.SetStateAsync<string>(stateKey, content);}public Task UnregisterTimerAsync(){return UnregisterTimerAsync(timeName);}#endregion#region Reminderprivate static readonly string reminderName = "TestReminder";public async Task RegisterReminderAsync(){var state = JsonSerializer.SerializeToUtf8Bytes($"now is {DateTime.Now}");await RegisterReminderAsync(reminderName, state, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3));}// 异步接收提醒 Reminder public async Task ReceiveReminderAsync(string reminderName, byte[] state, TimeSpan dueTime, TimeSpan period){var stateKey = "reminder-nowtime";var content = JsonSerializer.Deserialize<string>(state);_logger.LogInformation($"[{DateTime.Now}] reminder content ==> {content}");await StateManager.SetStateAsync<string>(stateKey, content);}public Task UnregisterReminderAsync(){return base.UnregisterReminderAsync(reminderName);}#endregion }

    Program.cs 注册 actor,映射Actor路由

    在 Console 项目中直接调用 ActorProxy.Create 的静态方法来创建一个 actor 的代理实例。而如果客户端是一个 asp.net core 应用, 你可以使用 IActorProxyFactory (构造函数 DI)接口来创建 actor 的代理。 他带来的最主要的好处是你可以在同一个地方进行配置。AddActors 扩展方法可以接收一个委托来允许你指定 actor 运行时的一些选项(options),比如 dapr sidecar 的 http 端点。

    下面的例子使用自定义的 JsonSerializerOptions 来配置 actor 状态持久化和消息反序列化:

    // 注册 actor services.AddActors(options => {var jsonSerializerOptions = new JsonSerializerOptions(){PropertyNamingPolicy = JsonNamingPolicy.CamelCase,PropertyNameCaseInsensitive = true};options.JsonSerializerOptions = jsonSerializerOptions;options.Actors.RegisterActor<WorkflowActor>(); });//映射Actor路由 app.MapActorsHandlers();

    Timer 和 Reminder 相关方法说明:

    • Timer 通过父类 Actor 的 RegisterTimerAsync(异步注册) 和 UnregisterTimerAsync(异步取消注册);
    • Timer 父类 Actor 的 RegisterTimerAsync(异步注册)通过 callback 实现 StateManager 相关操作;
    • Reminder 通过实现 Actor 的 RegisterReminderAsync(注册)和 UnregisterReminderAsync(异步取消注册);
    • Reminder 通过实现 IRemindable 接口的 ReceiveReminderAsync(异步接收提醒);

    IActorStateManager 接口信息(StateManager)

    IRemindable 接口信息,ReceiveReminderAsync(异步接收提醒)

    其中 Timer 和 Reminder 方法中的参数 TimeSpan dueTime, TimeSpan period 说明:

    • dueTime 首次延迟启动时间;
    • period 激活后的间隔执行周期;

    FrontEnd 项目改造

  • 项目依赖项添加 Common 类库引用;
  • 新建 DaprActorsClientController 继承 ControllerBase,添加如下代码:
  • using Common; using Dapr.Actors; using Dapr.Actors.Client; using Microsoft.AspNetCore.Mvc;namespace FrontEnd.Controllers;[Route("api/[controller]")] [ApiController] public class DaprActorsClientController : ControllerBase {#region DI 注册private readonly ILogger<DaprActorsClientController> _logger;private readonly IActorProxyFactory _actorProxyFactory;public DaprActorsClientController(ILogger<DaprActorsClientController> logger, IActorProxyFactory actorProxyFactory){_logger = logger;_actorProxyFactory = actorProxyFactory;} #endregion[HttpGet("Approve/{orderId}")]public async Task<ActionResult> ApproveAsync(string orderId){_logger.LogInformation($"[{DateTime.Now}] 进入 DaprActorsClient.ApproveAsync 方法");var proxy = CreateProxyById(orderId);return Ok(await proxy.ApproveAsync());}[HttpGet("Increment/{orderId}")]public async Task<ActionResult> IncrementAsync(string orderId){_logger.LogInformation($"[{DateTime.Now}] 进入 DaprActorsClient.IncrementAsync 方法");var proxy = CreateProxyById(orderId);return Ok(await proxy.IncrementAsync());}#region Timer(计时器)[HttpGet("RegisterTimer/{orderId}")]public async Task<ActionResult> RegisterTimerAsync(string orderId){_logger.LogInformation($"[{DateTime.Now}] 进入 DaprActorsClient.RegisterTimerAsync 方法");var proxy = CreateProxyById(orderId);await proxy.RegisterTimerAsync();return Ok("done");}[HttpGet("UnregistTimer/{orderId}")]public async Task<ActionResult> UnregistTimerAsync(string orderId){_logger.LogInformation($"[{DateTime.Now}] 进入 DaprActorsClient.UnregistTimerAsync 方法");var proxy = CreateProxyById(orderId);await proxy.UnregisterTimerAsync();return Ok("done");}#endregion#region Reminder(提醒器)[HttpGet("RegisterReminder/{orderId}")]public async Task<ActionResult> RegisterReminderAsync(string orderId){_logger.LogInformation($"[{DateTime.Now}] 进入 DaprActorsClient.RegisterReminderAsync 方法");var proxy = CreateProxyById(orderId);await proxy.RegisterReminderAsync();return Ok("done");}[HttpGet("UnregisterReminder/{orderId}")]public async Task<ActionResult> UnregistReminderAsync(string orderId){_logger.LogInformation($"[{DateTime.Now}] 进入 DaprActorsClient.UnregistReminderAsync 方法");var proxy = CreateProxyById(orderId);await proxy.UnregisterReminderAsync();return Ok("done");}#endregion/// <summary>/// 创建 Actor 代理/// </summary>/// <param name="orderId">订单Id</param>/// <returns></returns>private IWorkflowActor CreateProxyById(string orderId) {var actorId = new ActorId($"actorPrifix-{orderId}");var proxy = _actorProxyFactory.CreateActorProxy<IWorkflowActor>(actorId, "WorkflowActor");//var proxy = ActorProxy.Create<IWorkflowActor>(actorId, "WorkflowActor");return proxy;} }

    Dapr run 启动测试

    vs 先编译生成下文件,然后执行如下命令:

    • 启动 BackEnd 服务
    dapr run --dapr-grpc-port 50100 --dapr-http-port 3510 --app-port 5000 --app-id backend dotnet run
    • 启动 FrontEnd 服务
    dapr run --dapr-grpc-port 50200 --dapr-http-port 3501 --app-port 5001 --app-id frontend dotnet run

    【注意】dapr run 启动服务时,保持两个服务协议模式对等(ssl),切记一个服务器开启 ssl,另一个服务未开启 ssl,这样两个项目通信时会出现异常(目标通信拒绝);
    开启 ssl 后面添加 --app-ssl 即可;

    • 浏览器访问 Swagger 页面:
  • FrontEnd 服务,http://localhost:5001/swagger/index.html
  • BackEnd 服务,http://localhost:5000/swagger/index.html
  • 页面显示如下:

    curl 命令访问:(从http://ASP.NET Core客户端调用Actor )

    curl -X 'GET' \'http://localhost:5001/api/DaprActorsClient/Approve/111' \-H 'accept: */*'curl -X 'GET' \'http://localhost:5001/api/DaprActorsClient/Increment/222' \-H 'accept: */*'
    • Timer 注册 / 取消注册
    curl -X 'GET' \'http://localhost:5001/api/DaprActorsClient/RegisterTimer/aaa' \-H 'accept: */*'curl -X 'GET' \'http://localhost:5001/api/DaprActorsClient/UnregistTimer/aaa' \-H 'accept: */*'
    • Reminder 注册 / 取消注册
    curl -X 'GET' \'http://localhost:5001/api/DaprActorsClient/RegisterReminder/bbb' \-H 'accept: */*'curl -X 'GET' \'http://localhost:5001/api/DaprActorsClient/UnregisterReminder/bbb' \-H 'accept: */*'

    Demo 中的方法均可成功调用,此处就不再截图说明,感兴趣的小伙伴可自行参照测试;

    总结

    • Dapr actor 是一个独立的计算单元和状态单元。 这些 actor 彼此完全隔离,他们永远不会共享内存。 actor 之间使用消息相互通信;
    • Dapr actor 是虚拟的,生命周期与他们在内存中(in - memory)的表现不相关;
    • Dapr actor 是基于回合的访问模型,任何候都只有一个线程在一个 actor 对象的代码内活动;
    • Dapr actor 中的 timer 不能持久化,再次启动服务 计时器 会失效;
    • Dapr actor 中的 reminder 会持久化,再次启动服务 提醒器 会继续执行;
    • Reminder 触发将使 actor 保持活动状态;
    • Timer(计时器)触发不会使 actor 保持活动状态。 timer 仅在 actor 活跃时被触发;

    参考文章

    • Dapr Actors 概述,https://docs.dapr.io/zh-hans/developing-applications/building-blocks/actors/actors-overview/
    • Dapr Actor 构建块,https://docs.microsoft.com/zh-cn/dotnet/architecture/dapr-for-net-developers/actors

    总结

    以上是生活随笔为你收集整理的Dapr for dotnet | 并发计算模型 - Virtual Actors的全部内容,希望文章能够帮你解决所遇到的问题。

    如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。