做网站网页尺寸是多少无锡网站制作那些

bicheng/2025/10/14 5:00:00/文章来源:
做网站网页尺寸是多少,无锡网站制作那些,wordpress自建搜索,网站建设是什么意思原文地址#xff1a; https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/ 欢迎关注留言#xff0c;我是收集整理小能手#xff0c;工具翻译#xff0c;仅供参考#xff0c;笔芯笔芯. 使用发件箱模式进行可靠的微服务数…原文地址 https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/ 欢迎关注留言我是收集整理小能手工具翻译仅供参考笔芯笔芯. 使用发件箱模式进行可靠的微服务数据交换 二月 19, 2019 作者 Gunnar Morling 讨论 微服务apache-kafka示例 作为业务逻辑的一部分微服务通常不仅需要更新自己的本地数据存储还需要通知其他服务发生的数据更改。发件箱模式描述了一种让服务以安全一致的方式执行这两项任务的方法它为源服务提供即时“读取您自己的写入”语义同时提供跨服务边界的可靠、最终一致的数据交换。 更新2019 年 9 月 13 日为了简化发件箱模式的使用Debezium 现在提供了一个即用型SMT 来路由发件箱事件。不再需要本博文中讨论的自定义 SMT。 如果您构建了几个微服务您可能会同意它们最难的部分是数据微服务并不是孤立存在的它们通常需要在彼此之间传播数据和数据更改。 例如考虑一个管理采购订单的微服务当下一个新订单时有关该订单的信息可能必须转发到发货服务以便它可以组装一个或多个订单的发货和客户服务以便它可以根据新订单更新客户的总信用余额等。 有不同的方法可以让订单服务了解有关新采购订单的其他两种方法例如它可以调用这些服务提供的一些REST、grpc或其他同步API。不过这可能会产生一些不需要的耦合发送服务必须知道要调用哪些其他服务以及在哪里可以找到它们。它还必须为这些服务暂时不可用做好准备。Istio等服务网格通过提供请求路由、重试、断路器等功能可以在这方面派上用场。 任何同步方法的普遍问题是一个服务如果没有它调用的其他服务就无法真正运行。虽然缓冲和重试可能有助于其他服务仅需要通知某些事件的情况但如果服务实际上需要查询其他服务以获取信息则情况并非如此。例如当下达采购订单时订单服务可能需要从库存服务获取所采购商品的库存次数信息。 这种同步方法的另一个缺点是它缺乏可重玩性即新消费者在事件发送后到达并且仍然能够从头开始消费整个事件流的可能性。 这两个问题都可以通过使用异步数据交换方法来解决即让订单、库存和其他服务通过持久消息日志例如Apache Kafka传播事件。通过订阅这些事件流每个服务都会收到有关其他服务的数据更改的通知。它可以对这些事件做出反应并且如果需要可以使用根据自己的需求定制的表示在自己的数据存储中创建该数据的本地表示。例如此类视图可能会被非规范化以有效支持特定的访问模式或者它可能仅包含与消费服务相关的原始数据的子集。 持久日志还支持可重玩性即可以根据需要添加新的使用者从而启用您最初可能没有想到的用例并且无需触及源服务。例如考虑一个数据仓库它应该保存有关所有已下订单的信息或者基于Elasticsearch的采购订单的一些全文搜索功能。一旦采购订单事件位于 Kafka 主题中Kafka 主题的保留策略设置可用于确保事件在给定用例和业务需求所需的时间内保留在主题中新消费者就可以订阅、处理该主题从一开始就具体化微服务数据库、搜索索引、数据仓库等中所有数据的视图。 应对主题增长 根据数据量记录的数量和大小、更改频率将事件长期甚至无限期地保留在主题中可能可行也可能不可行。很多时候从业务角度来看与给定数据项例如特定采购订单相关的一些甚至所有事件可能适合在给定时间点之后删除。请参阅下面的“从 Kafka 主题中删除事件”框了解有关从 Kafka 主题中删除事件以将其大小保持在范围内的更多想法。 双写问题 为了提供其功能微服务通常会有自己的本地数据存储。例如订单服务可以使用关系数据库来保存有关采购订单的信息。当下新订单时这可能会导致对服务数据库中的INSERT表进行操作。PurchaseOrder同时服务可能希望向 Apache Kafka 发送有关新订单的事件以便将该信息传播到其他感兴趣的服务。 不过简单地发出这两个请求可能会导致潜在的不一致。原因是我们不能拥有一个跨服务数据库和 Apache Kafka 的共享事务因为后者不支持加入分布式 (XA) 事务。因此在不幸的情况下我们可能最终将新的采购订单保留在本地数据库中但没有将相应的消息发送到 Kafka例如由于某些网络问题。或者相反我们可能已将消息发送到 Kafka但未能将采购订单保留在本地数据库中。这两种情况都是不可取的这可能会导致看似成功下达的订单无法创建发货。或者创建了一个发货 那么如何避免这种情况呢答案是仅修改两个资源数据库或Apache Kafka之一并以此为基础以最终一致的方式驱动第二个资源的更新。我们首先考虑只写入 Apache Kafka 的情况。 当收到新的采购订单时订单服务不会INSERT同步写入数据库相反它只会将描述新订单的事件发送到 Kafka 主题。因此一次只会修改一个资源如果出现问题我们会立即发现并向订单服务的调用者报告请求失败。 同时服务本身将订阅该 Kafka 主题。这样当新消息到达主题时它将收到通知并且可以将新的采购订单保留在其数据库中。不过这里存在一个微妙的挑战那就是缺乏“读你自己写的”语义。例如假设订单服务还有一个 API用于搜索给定客户的所有采购订单。在下达新订单后立即调用该 API 时由于处理来自 Kafka 主题的消息的异步特性可能会发生采购订单尚未保存在服务数据库中的情况因此该查询不会返回该订单。这可能会导致非常混乱的用户体验因为例如用户可能会错过其购物历史记录中新下的订单。有多种方法可以处理这种情况例如服务可以将新下达的采购订单保留在内存中并据此回答后续查询。不过当实现更复杂的查询或考虑到订单服务还可能包含集群设置中的多个节点时这很快就会变得不平凡这将需要在集群内传播该数据。 现在当仅同步写入数据库并基于此驱动将消息导出到 Apache Kafka 时情况会是什么样子这就是发件箱模式的用武之地。 发件箱模式 这种方法的想法是在服务的数据库中有一个“发件箱”表。当接收到下采购订单的请求时不仅会执行INSERT到表中的PurchaseOrder操作而且作为同一事务的一部分表示要发送的事件的记录也会插入到发件箱表中。 该记录描述了服务中发生的事件例如它可以是表示已下达新采购订单这一事实的 JSON 结构包括订单本身、其订单行以及上下文信息例如使用情况的数据案例标识符。通过通过发件箱表中的记录显式发出事件可以确保事件以适合外部使用者的方式构建。这也有助于确保事件使用者在更改内部域模型或表时不会中断PurchaseOrder。 异步进程监视该表中的新条目。如果有的话它将事件作为消息传播到 Apache Kafka。这为我们提供了非常好的特性平衡通过同步写入表PurchaseOrder源服务受益于“读取您自己的写入”语义。一旦提交了第一笔交易后续的采购订单查询将返回新保留的订单。同时我们通过 Apache Kafka 将可靠、异步、最终一致的数据传播到其他服务。 现在发件箱模式实际上并不是一个新想法。它已经使用了相当长一段时间了。事实上即使使用实际上可以参与分布式事务的 JMS 风格的消息代理它也可能是避免远程资源例如消息代理停机造成的任何耦合和潜在影响的更好选择。您还可以在 Chris Richardson 的优秀microservices.io网站上找到该模式的描述。 然而该模式受到的关注远没有应有的那么高而且它在微服务环境中特别有用。正如我们将看到的发件箱模式可以使用变更数据捕获和 Debezium 以非常优雅且高效的方式实现。下面我们就来探讨一下如何做。 基于变更数据捕获的实现 基于日志的变更数据捕获(CDC) 非常适合捕获发件箱表中的新条目并将其流式传输到 Apache Kafka。与任何基于轮询的方法相反事件捕获以非常低的开销进行几乎是实时的。Debezium 附带了适用于 MySQL、Postgres 和 SQL Server 等多种数据库的CDC 连接器。以下示例将使用Postgres 的 Debezium 连接器。 您可以在 GitHub 上找到该示例的完整源代码。有关构建和运行示例代码的详细信息请参阅README.md 。该示例以两个微服务为中心order-service和shipment-service。两者都是用 Java 实现的使用CDI作为组件模型使用 JPA/Hibernate 来访问各自的数据库。订单服务在WildFly上运行并公开一个简单的 REST API用于下达采购订单和取消特定订单行。它使用 Postgres 数据库作为其本地数据存储。发货服务基于Thorntail; 通过 Apache Kafka它接收订单服务导出的事件并在自己的 MySQL 数据库中创建相应的发货条目。Debezium 跟踪订单服务的 Postgres 数据库的事务日志“预写日志”WAL以便捕获发件箱表中的任何新事件并将它们传播到 Apache Kafka。 解决方案的整体架构如下图所示 图片来自于官网原文 发件箱模式概述 请注意该模式与这些特定的实现选择没有任何关系。它同样可以使用替代技术来实现例如 Spring Boot例如利用 Spring Data对领域事件的支持、普通 JDBC 或除 Java 之外的其他编程语言。 现在让我们仔细看看该解决方案的一些相关组件。 发件箱表 该outbox表驻留在订单服务的数据库中具有以下结构 Column | Type | Modifiers --------------±-----------------------±---------- id | uuid | not null aggregatetype | character varying(255) | not null aggregateid | character varying(255) | not null type | character varying(255) | not null payload | jsonb | not null 它的列是 id每条消息的唯一ID消费者可以使用它来检测任何重复事件例如在失败后重新启动以读取消息时。创建新事件时生成。 aggregatetype与给定事件相关的聚合根的类型这个想法是依靠域驱动设计的相同概念导出的事件应该引用一个聚合“可以被视为单个单元的域对象集群”其中聚合根提供唯一的入口点用于访问聚合内的任何实体。例如这可以是“采购订单”或“客户”。 该值将用于将事件路由到 Kafka 中的相应主题因此与采购订单相关的所有事件都有一个主题所有与客户相关的事件都有一个主题等。请注意还有与其中包含的子实体相关的事件这样的聚合应该使用相同的类型。因此例如代表取消单个订单行属于采购订单聚合的一部分的事件也应该使用其聚合根的类型“order”确保该事件也将进入“order”Kafka 主题。 aggregateid受给定事件影响的聚合根的 id例如这可以是采购订单的 ID 或客户 ID与聚合类型类似与聚合中包含的子实体相关的事件应使用包含聚合根的 ID例如订单行取消事件的采购订单 ID。该 id 稍后将用作 Kafka 消息的密钥。这样与一个聚合根或其包含的任何子实体相关的所有事件都将进入该 Kafka 主题的同一分区这确保该主题的使用者将消费与该聚合根中的一个且相同的聚合相关的所有事件。生产时的确切顺序。 type事件类型例如“订单已创建”或“订单行已取消”。允许消费者触发合适的事件处理程序。 payload具有实际事件内容的 JSON 结构例如包含采购订单、有关购买者的信息、包含的订单行、价格等。 将事件发送到发件箱 为了将事件“发送”到发件箱订单服务中的代码通常可以只INSERT对发件箱表执行操作。然而采用稍微抽象一点的 API 是个好主意这样可以在需要时更轻松地调整发件箱的实现细节。CDI 活动对此非常方便。它们可以在应用程序代码中引发并由发件箱事件发送者同步处理这将INSERT在发件箱表中执行所需的操作。 所有发件箱事件类型都应实现以下约定类似于之前显示的发件箱表的结构 public interface ExportedEvent { String getAggregateId(); String getAggregateType(); JsonNode getPayload(); String getType();} 为了产生此类事件应用程序代码使用注入的Event实例例如在类中OrderService ApplicationScoped public class OrderService { PersistenceContext private EntityManager entityManager;Inject private EventExportedEvent event;Transactional public PurchaseOrder addOrder(PurchaseOrder order) {order entityManager.merge(order);event.fire(OrderCreatedEvent.of(order));event.fire(InvoiceCreatedEvent.of(order));return order; }Transactional public PurchaseOrder updateOrderLine(long orderId, long orderLineId,OrderLineStatus newStatus) {// ... }} 在该addOrder()方法中JPA 实体管理器用于将传入的订单保存在数据库中并event使用注入来触发相应的OrderCreatedEvent和InvoiceCreatedEvent. 再次请记住尽管有“事件”的概念但这两件事发生在同一个事务中。即在此事务中三条记录将被插入到数据库中一条记录​​在采购订单表中两条记录在发件箱表中。 实际的事件实现是直接的作为一个例子下面是这个OrderCreatedEvent类 public class OrderCreatedEvent implements ExportedEvent { private static ObjectMapper mapper new ObjectMapper();private final long id; private final JsonNode order;private OrderCreatedEvent(long id, JsonNode order) {this.id id;this.order order; }public static OrderCreatedEvent of(PurchaseOrder order) {ObjectNode asJson mapper.createObjectNode().put(id, order.getId()).put(customerId, order.getCustomerId()).put(orderDate, order.getOrderDate().toString());ArrayNode items asJson.putArray(lineItems);for (OrderLine orderLine : order.getLineItems()) {items.add(mapper.createObjectNode().put(id, orderLine.getId()).put(item, orderLine.getItem()).put(quantity, orderLine.getQuantity()).put(totalPrice, orderLine.getTotalPrice()).put(status, orderLine.getStatus().name()));}return new OrderCreatedEvent(order.getId(), asJson); }Override public String getAggregateId() {return String.valueOf(id); }Override public String getAggregateType() {return Order; }Override public String getType() {return OrderCreated; }Override public JsonNode getPayload() {return order; }} 请注意Jackson 如何ObjectMapper用于创建事件有效负载的 JSON 表示形式。 现在让我们看一下消耗任何已触发ExportedEvent并对发件箱表进行相应写入的代码 ApplicationScoped public class EventSender { PersistenceContext private EntityManager entityManager;public void onExportedEvent(Observes ExportedEvent event) {OutboxEvent outboxEvent new OutboxEvent(event.getAggregateType(),event.getAggregateId(),event.getType(),event.getPayload());entityManager.persist(outboxEvent);entityManager.remove(outboxEvent); }} 这相当简单对于每个事件CDI 运行时都会调用该onExportedEvent()方法。该OutboxEvent实体的实例会持久保存在数据库中并立即删除 乍一看这可能会令人惊讶。但是当记住基于日志的 CDC 的工作原理时这是有意义的它不检查数据库中表的实际内容而是跟踪仅附加事务日志。一旦事务提交对persist()和 的调用将在日志中remove()创建一个INSERT和 一个条目。DELETE之后Debezium 将处理这些事件对于任何INSERT带有事件负载的消息将被发送到 Apache Kafka。DELETE另一方面事件可以被忽略因为从发件箱表中删除只是一个技术问题不需要任何传播到消息代理。因此我们能够通过CDC捕获添加到发件箱表中的事件但是当查看表本身的内容时它始终为空。这意味着该表不需要额外的磁盘空间除了在某些时候会自动丢弃的日志文件元素之外并且也不需要单独的内务处理来阻止其无限增长。 注册 Debezium 连接器 发件箱实现就位后就可以注册 Debezium Postgres 连接器了这样它就可以捕获发件箱表中的任何新事件并将它们中继到 Apache Kafka。这可以通过将以下 JSON 请求 POST 到 Kafka Connect 的 REST API 来完成 { “name”: “outbox-connector”, “config”: { “connector.class” : “io.debezium.connector.postgresql.PostgresConnector”, “tasks.max” : “1”, “database.hostname” : “order-db”, “database.port” : “5432”, “database.user” : “postgresuser”, “database.password” : “postgrespw”, “database.dbname” : “orderdb”, “database.server.name” : “dbserver1”, “schema.whitelist” : “inventory”, “table.whitelist” : “inventory.outboxevent”, “tombstones.on.delete” : “false”, “transforms” : “router”, “transforms.router.type” : “io.debezium.examples.outbox.routingsmt.EventRouter” } } 这将设置 的实例io.debezium.connector.postgresql.PostgresConnector捕获指定 Postgres 实例的更改。请注意通过表白名单仅outboxevent捕获表中的更改。它还应用名为 的单个消息转换 (SMT) EventRouter。 从 Kafka 主题中删除事件 通过设置tombstones.on.deleteto false当事件记录从发件箱表中删除时连接器将不会发出删除标记“逻辑删除”。这是有道理的因为从发件箱表中删除不应影响相应 Kafka 主题中事件的保留。相反可以在 Kafka 中配置事件主题的特定保留时间例如将所有采购订单事件保留 30 天。 或者可以使用紧凑的主题。这需要对发件箱表中的事件设计进行一些更改 他们必须描述整个总体因此例如代表取消单个订单行的事件也应描述包含采购订单的完整当前状态这样在日志压缩运行后消费者仅在看到与给定订单相关的最后一个事件时就能够获取采购订单的完整状态。 它们必须还有一个boolean属性来指示特定事件是否表示删除该事件的聚合根。OrderDeleted然后下一节中描述的事件路由 SMT 可以使用此类事件例如 类型来为该聚合根生成删除标记。OrderDeleted当事件已写入主题时日志压缩将删除与给定采购订单相关的所有事件。 当然删除事件时事件流将无法再从头开始重新播放。根据特定的业务需求仅保留给定采购订单、客户等的最终状态可能就足够了。这可以使用紧凑的主题和主题设置的足够值来实现delete.retention.ms。另一种选择可能是将历史事件移动到某种冷存储例如 Amazon S3 存储桶如果需要可以从那里检索它们然后从 Kafka 主题中读取最新事件。采用哪种方法取决于具体要求、预期数据量以及开发和运营解决方案的团队的专业知识。 主题路由 默认情况下Debezium 连接器会将源自给定表的所有更改事件发送到同一主题即我们最终会得到一个名为的 Kafka 主题dbserver1.inventory.outboxevent该主题将包含所有事件无论是订单事件、客户事件等。 OrderEvents不过为了简化只对特定事件类型感兴趣的消费者的实现拥有多个主题例如CustomerEvents等等更有意义。例如运输服务可能对任何客户事件不感兴趣。通过仅订阅该OrderEvents主题它将确保永远不会收到任何客户事件。 为了将从发件箱表捕获的更改事件路由到不同的主题EventRouter使用了自定义 SMT。以下是其方法的代码apply()Kafka Connect 将针对 Debezium 连接器发出的每条记录调用该方法 Override public R apply(R record) { // Ignoring tombstones just in case if (record.value() null) { return record; } Struct struct (Struct) record.value(); String op struct.getString(op);// ignoring deletions in the outbox table if (op.equals(d)) {return null; } else if (op.equals(c)) {Long timestamp struct.getInt64(ts_ms);Struct after struct.getStruct(after);String key after.getString(aggregateid);String topic after.getString(aggregatetype) Events;String eventId after.getString(id);String eventType after.getString(type);String payload after.getString(payload);Schema valueSchema SchemaBuilder.struct().field(eventType, after.schema().field(type).schema()).field(ts_ms, struct.schema().field(ts_ms).schema()).field(payload, after.schema().field(payload).schema()).build();Struct value new Struct(valueSchema).put(eventType, eventType).put(ts_ms, timestamp).put(payload, payload);Headers headers record.headers();headers.addString(eventId, eventId);return record.newRecord(topic, null, Schema.STRING_SCHEMA, key, valueSchema, value,record.timestamp(), headers); } // not expecting update events, as the outbox table is append only, // i.e. event records will never be updated else {throw new IllegalArgumentException(Record of unexpected op type: record); }} 当接收到删除事件op d时它将丢弃该事件因为从发件箱表中删除事件记录与下游消费者无关。当接收到创建事件 ( op c) 时事情会变得更有趣。此类记录将传播到 Apache Kafka。 Debezium 的更改事件具有复杂的结构其中包含所表示行的旧 ( before) 和新 ( ) 状态。after要传播的事件结构是从after状态获得的。捕获的事件记录中的值aggregatetype用于构建要将事件发送到的主题的名称。例如aggregatetype设置为的事件Order将被发送到OrderEvents主题。aggregateid用作消息键确保该聚合的所有消息都将进入该主题的同一分区。消息值是一个结构其中包含原始事件负载编码为 JSON、指示事件生成时间的时间戳以及事件类型。最后事件 UUID 作为 Kafka 标头字段进行传播。这允许消费者进行有效的重复检测而无需检查实际的消息内容。 Apache Kafka 中的事件 现在让我们来看看OrderEvents和CustomerEvents主题。 如果您已经检查了示例源代码并通过 Docker Compose 启动了所有组件有关更多详细信息请参阅示例项目中的README.md文件您可以通过订单服务的 REST API 下采购订单如下所示 cat resources/data/create-order-request.json | http POST http://localhost:8080/order-service/rest/orders 同样可以取消特定订单行 cat resources/data/cancel-order-line-request.json | http PUT http://localhost:8080/order-service/rest/orders/1/lines/2 当使用诸如非常实用的kafkacat实用程序之类的工具时您现在应该在OrderEvents主题中看到类似以下的消息 kafkacat -b kafka:9092 -C -o beginning -f ‘Headers: %h\nKey: %k\nValue: %s\n’ -q -t OrderEvents Headers: eventIdd03dfb18-8af8-464d-890b-09eb8b2dbbdd Key: “4” Value: {“eventType”:“OrderCreated”,“ts_ms”:1550307598558,“payload”:{“id”: 4, “lineItems”: [{“id”: 7, “item”: “Debezium in Action”, “status”: “ENTERED”, “quantity”: 2, “totalPrice”: 39.98}, {“id”: 8, “item”: “Debezium for Dummies”, “status”: “ENTERED”, “quantity”: 1, “totalPrice”: 29.99}], “orderDate”: “2019-01-31T12:13:01”, “customerId”: 123}} Headers: eventId49f89ea0-b344-421f-b66f-c635d212f72c Key: “4” Value: {“eventType”:“OrderLineUpdated”,“ts_ms”:1550308226963,“payload”:{“orderId”: 4, “newStatus”: “CANCELLED”, “oldStatus”: “ENTERED”, “orderLineId”: 7}} 包含消息值的字段payload是原始事件的字符串化 JSON 表示形式。Debezium Postgres 连接器将JSONB列作为字符串发出使用io.debezium.data.Json逻辑类型名称这就是引号被转义的原因。jq实用程序更具体地说它的fromjson运算符可以方便地以更易读的方式显示事件负载 kafkacat -b kafka:9092 -C -o beginning -t Order | jq ‘.payload | fromjson’ { “id”: 4, “lineItems”: [ { “id”: 7, “item”: “Debezium in Action”, “status”: “ENTERED”, “quantity”: 2, “totalPrice”: 39.98 }, { “id”: 8, “item”: “Debezium for Dummies”, “status”: “ENTERED”, “quantity”: 1, “totalPrice”: 29.99 } ], “orderDate”: “2019-01-31T12:13:01”, “customerId”: 123 } { “orderId”: 4, “newStatus”: “CANCELLED”, “oldStatus”: “ENTERED”, “orderLineId”: 7 } 您还可以查看该CustomerEvents主题来检查表示添加采购订单时创建发票的事件。 消费服务中的重复检测 至此我们的发件箱模式实现已功能齐全当订单服务收到下订单或取消订单行的请求时它将在其数据库的purchaseorder和表中保留相应的状态。orderline同时在同一事务内相应的事件条目将被添加到同一数据库的发件箱表中。Debezium Postgres 连接器捕获对该表的任何插入并将事件路由到与给定事件表示的聚合类型相对应的 Kafka 主题。 最后让我们探讨一下另一个微服务例如发货服务如何使用这些消息。该服务的入口点是常规的 Kafka 消费者实现这并不是太令人兴奋因此为了简洁起见此处省略。您可以在示例存储库中找到其源代码。对于该主题的每条传入消息Order消费者调用OrderEventHandler ApplicationScoped public class OrderEventHandler { private static final Logger LOGGER LoggerFactory.getLogger(OrderEventHandler.class);Inject private MessageLog log;Inject private ShipmentService shipmentService;Transactional public void onOrderEvent(UUID eventId, String key, String event) {if (log.alreadyProcessed(eventId)) {LOGGER.info(Event with UUID {} was already retrieved, ignoring it, eventId);return;}JsonObject json Json.createReader(new StringReader(event)).readObject();JsonObject payload json.containsKey(schema) ? json.getJsonObject(payload) :json;String eventType payload.getString(eventType);Long ts payload.getJsonNumber(ts_ms).longValue();String eventPayload payload.getString(payload);JsonReader payloadReader Json.createReader(new StringReader(eventPayload));JsonObject payloadObject payloadReader.readObject();if (eventType.equals(OrderCreated)) {shipmentService.orderCreated(payloadObject);}else if (eventType.equals(OrderLineUpdated)) {shipmentService.orderLineUpdated(payloadObject);}else {LOGGER.warn(Unkown event type);}log.processed(eventId); }} 首先要做的onOrderEvent()是检查具有给定 UUID 的事件之前是否已被处理过。如果是这样对同一事件的任何进一步调用都将被忽略。这是为了防止由该数据管道的“至少一次”语义引起的任何事件的重复处理。例如在分别使用源数据库或消息代理确认特定事件的检索之前Debezium 连接器或消费服务可能会失败。在这种情况下重新启动 Debezium 或消费服务后可能会再次处理一些事件。将事件 UUID 作为 Kafka 消息头进行传播可以有效检测和排除消费者中的重复项。 如果是第一次接收到消息则解析消息值并ShippingService使用事件负载调用与特定事件类型对应的方法的业务方法。最后消息在消息日志中标记为已处理。 这MessageLog只是跟踪服务本地数据库中表中的所有消费事件 ApplicationScoped public class MessageLog { PersistenceContext private EntityManager entityManager;Transactional(valueTxType.MANDATORY) public void processed(UUID eventId) {entityManager.persist(new ConsumedMessage(eventId, Instant.now())); }Transactional(valueTxType.MANDATORY) public boolean alreadyProcessed(UUID eventId) {return entityManager.find(ConsumedMessage.class, eventId) ! null; }} 这样如果事务因某种原因回滚原始消息也不会被标记为已处理并且异常将冒泡到 Kafka 事件消费者循环。这允许稍后重新尝试处理该消息。 请注意更完整的实现应该只在将任何无法处理的消息重新路由到死信队列或类似队列之前仅重试给定的消息一定次数。消息日志表上还应该有一些管理工作定期地所有早于消费者向代理提交的当前偏移量的事件都可能被删除因为它确保此类消息不会再次传播给消费者。 概括 发件箱模式是在不同微服务之间传播数据的好方法。 通过仅修改单个资源源服务自己的数据库可以避免同时更改不共享一个公共事务上下文数据库和 Apache Kafka的多个资源时出现的任何潜在不一致情况。通过首先写入数据库源服务具有即时“读取您自己的写入”语义这对于一致的用户体验非常重要允许在写入后调用查询方法以立即反映任何数据更改。 同时该模式支持将异步事件传播到其他微服务。Apache Kafka 充当服务之间消息传递的高度可扩展且可靠的骨干。如果有正确的主题保留设置新的消费者可能会在事件最初生成后很长时间内出现并根据事件历史记录建立自己的本地状态。 将Apache Kafka置于整体架构的中心也保证了所涉及服务的解耦。例如如果解决方案的单个组件发生故障或在一段时间内不可用例如在更新期间事件将在稍后处理重新启动后Debezium 连接器将继续从其离开的位置跟踪发件箱表之前关掉。同样任何消费者都将继续处理其先前偏移量中的主题。通过跟踪已成功处理的消息可以检测到重复消息并将其排除在重复处理之外。 当然不同服务之间的这种事件管道最终是一致的即诸如运输服务之类的消费者可能会稍微落后于诸如订单服务之类的生产者。不过通常情况下这很好并且可以根据应用程序的业务逻辑进行处理。例如通常不需要在下订单的同一秒内创建发货。此外由于基于日志的变更数据捕获允许近乎实时地发出事件整个解决方案的端到端延迟通常很低秒甚至亚秒范围。 最后要记住的一件事是通过发件箱公开的事件的结构应被视为发出服务的 API 的一部分。即当需要时应仔细调整它们的结构并考虑兼容性。这是为了确保在升级生产服务时不会意外破坏任何消费者。同时消费者在处理消息时应该宽容例如在接收到的事件中遇到未知属性时不要失败。 非常感谢 Hans-Peter Grahsl、Jiri Pechanec、Justin Holmes 和 René Kerner 在撰写本文时提供的反馈

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/90415.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

可信的移动网站建设wordpress积分商城插件

新能源场站和区域电网作为复杂且具有动态特性的大规模电力系统,需要实时仿真测试来验证其性能、稳定性和响应能力。在这种背景下,多核并行仿真运算显得尤为重要。多核并行仿真能够同时处理电力系统的复杂模型,加速仿真过程,实现接…

网站建设与设计饰品中小学生做的网站

在IDEA中,如果控制台输出的中文字符显示为乱码,可能是因为控制台的编码设置不正确。你可以尝试以下方法来解决此问题: 方法1:更改IDEA控制台编码 打开IDEA的设置:File -> Settings(Windows/Linux&…

昆山做网站多少钱网站开发vs2013

拷贝构造函数是一种特殊的构造函数,它在创建对象时,是使用同一类中之前创建的对象来初始化新创建的对象。拷贝构造函数通常用于:通过使用另一个同类型的对象来初始化新创建的对象。复制对象把它作为参数传递给函数。复制对象,并从…

免费建网站讨论aws配置wordpress

华为二面!!!被问常用API,这也太偏门了吧,我秀了一波hhhh~常用API一、API概述二、Scanner类代码三、Random类代码四、* ArrayList类**存储基本数据类型**代码五、匿名对象昨天我去了华为面试,问我常用API,我以为我被搞到…

珠江新城网站建设店铺设计软件

winreg模块 进入系统注册表的方法多种多样,最常见的就是运行窗口输入命令“regedit”,即可进入注册表,而Python的winreg模块可以对注册表进行一系列操作 "winreg"中的各个常量 注册表地址(HKEY_ )常量 winreg.HKEY_CLASSES_ROOT #存储应用和shell的信息 winreg…

卡盟网站建设公司中企动力科技股份有限公司是国企吗

目录: Java中的关键字 static关键字final关键字Java中的权限修饰符代码块 构造代码块静态代码块接口 接口的介绍接口的定义和特点接口的成员特点接口的案例接口中成员方法的特点枚举随堂小记 继承方法重写抽象类模板设计模式staticfinal权限修饰符接口回顾上午内容…

Godaddy优惠码网站怎么做的婚庆企业网站建设

实现思路: 在按钮上绑定一个点击事件,默认是true;在export default { }中注册变量给卡片标签用v-if判断是否要显示卡片,ture则显示;在卡片里面写好你想要展示的数据;给卡片添加一个取消按钮,绑…

营销型网站建设设计服务公司文化墙创意设计

Geekbench 6 是一款跨平台的系统性能测试软件,可以对处理器和内存等硬件进行评测,并提供了单核和多核两种测试模式。该软件适用于 Windows、macOS、Linux 和 iOS 等多种操作系统平台。 Geekbench 6 测试可以帮助用户快速准确地了解自己设备的性能表现&am…

制作公司网站视频四川省建设厅官方网站三内人员

通过两种方法实例讲解ajax定时刷新局部页面,当然方法有很多种,也可以不使用ajax来刷新页面,可以使用jquery中的append来给指定内容加东西,但是都不太实用,最实用的方法还是ajax加载数据了。 方法一: 局部刷…

导航网站优化茂名网站建设培训

⭐️dijkstra 介绍(想看的可以看) Dijkstra算法( /ˈdaɪkstrəz/ DYKE-str z)是一种用于找到加权图中的节点之间的最短路径的算法,该加权图可以表示例如道路网络。它是由计算机科学家Edsger W. Dijkstra于1956年出版…

wordpress vps建站沈阳微信网站建设

文章目录 前言1.本地Tomcat网页搭建1.1 Tomcat安装1.2 配置环境变量1.3 环境配置1.4 Tomcat运行测试1.5 Cpolar安装和注册 2.本地网页发布2.1.Cpolar云端设置2.2 Cpolar本地设置 3.公网访问测试4.结语 正文开始前给大家推荐个网站,前些天发现了一个巨牛的人工智能学…

怎么做带后台的网站学畅留学招聘网站开发主管

xxxx 不在 sudoers 文件中。此事将被报告。 在Ubuntu中,可以通过将用户添加到sudo组来为其提供sudo(超级用户)权限。 要添加sudo权限,按照以下步骤操作: 打开终端(CtrlAltT)。 输入以下命令并…

东莞企业网站费用加强网站建设考察交流

1 平台介绍 Davinci 是一个 DVaaS(Data Visualization as a Service)平台解决方案,面向业务人员/数据工程师/数据分析师/数据科学家,致力于提供一站式数据可视化解决方案。既可作为公有云/私有云独立部署使用,也可作为…

建设简单网站cms网站模板 数据采集

朋友们、伙计们,我们又见面了,本专栏是关于各种算法的解析,如果看完之后对你有一定的启发,那么请留下你的三连,祝大家心想事成! C 语 言 专 栏:C语言:从入门到精通 数据结构专栏&…

中博建设集团有限公司网站wordpress支持

前言 由于兼容性问题,使得我们若想用较新版本的 PyTorch,通过 GPU 方式训练模型,也得更换较新版本得 CUDA 工具包。然而 CUDA 的版本又与电脑显卡的驱动程序版本关联,如果是低版本的显卡驱动程序安装 CUDA11 及以上肯定会失败。 比…

西安市城乡建设管理局网站城市门户网站

ssmvue医院住院管理系统源码和论文PPT012 开发工具:idea 数据库mysql5.7(mysql5.7最佳) 数据库链接工具:navcat,小海豚等 开发技术:java ssm tomcat8.5 摘 要 随着时代的发展,医疗设备愈来愈完善,医院也变成人们生…

网站建设公司小程序广州网页设计html

为什么要异步? CPU的工艺越来越小,Cannon Lake架构的Intel CPU已经达到10nm技术,因此在面积不变的情况下,核心数可以明显提升。单纯的提升主频将造成发热量大、需要的电压大、功耗大的问题。而传统的算法与数据结构是针对单核心单…

网站规划管理系统99元一月做网站

for 可以用来遍历数组、字符串、类数组、DOM节点,可以更改原数组,可以使用break、continue 跳出循环 return 只能在函数内部使用 for(声明循环变量;判断循环条件;更新循环变量){循环体 }forEach 参数(当前元素&#x…

郑州中企业网站建设什么查网站是否降权

竞争环境不是匀速变化,而是加速变化。企业的衰退与进化、兴衰更迭在不断发生,这成为一种不可避免的现实。事实上,在产业链竞争中增长困境不分企业大小,而是一种普遍存在的问题,许多收入在1亿至10亿美元间的制造企业也同…