期待获得更多专家见解?

立即订阅55数据月度精选Tea O'Clock

点击订阅

大规模实时数据处理:如何确保数据完整性

您是否曾经想创建一个能够流式传输数据的无服务器云架构,却发现架构的输入和输出之间存在数据缺口?如果有,我们将在本文中尝试回答您的问题,介绍我们的使用案例,该架构能够每秒处理 3,000 个请求。

虽然大量数据用例可以通过批处理解决方案来解决,但某些情况下必须使用流式解决方案。有了谷歌云,您就可以通过无服务器服务实现这一目标,从而只需为实际使用的资源付费。在本文中,我们将介绍 Google Cloud 架构的工作原理:它将 HTTPS 请求作为输入进行检索,在请求到达后立即对其进行处理,并将这些请求的输出写入数据仓库。我们将重点关注架构中保证信息数量与输出数量相同的部分。

为了更清楚地说明问题,以下是我们的架构所做的工作:

更多细节和 GCP 工具,请看下面:

一般操作

在我们的架构中,传入的请求由云函数接收,云函数将读取请求,检查格式是否有效,然后通过发送到 Pub/Sub 主题的消息传输信息。我们选择使用云函数,是因为要执行的代码非常简单,而且该服务能够根据传入流量的变化增加实例数量,无需我们操心。

因此,我们在 Pub/Sub 主题和云运行之间创建了一个 "推送 "订阅。我们选择 "推送 "配置是为了保持我们架构的流特性。消息随后由云运行处理,必要时可能会比云功能花费更长的时间。然后,云运行将其处理输出写入 BigQuery 表。

建筑优势

通过将获取与处理分离,我们可以确保用户输入的请求尽快得到处理。由于延迟并不那么重要,因此处理工作的时间可以稍长一些。事实上,如果工作量过大,Pub/Sub 订阅会保留消息并将其发送回去,直到云运行处理完毕。

这样,即使在流量高峰期,我们也能保证用户不会遇到任何延迟。我们还可以确保所有 "有效 "信息都会被我们的架构处理,因为 Pub/Sub 保证至少有一条信息会被接收方(本例中为云运行)发送和接收。

遇到的问题

经过几天的测试,我们发现我们的架构确实在处理所有传入的消息,但有些消息在 BigQuery 表中存在多个副本。这意味着我们的云运行正在多次处理同一消息。这是为什么呢?经过多次调查,我们发现 BigQuery 表中的大部分重复信息都是在流量高峰时到达的。在这些请求激增期间,Cloud Run 实例的数量会迅速增加。因此,Pub/Sub 订阅有可能将相同的消息发送到不同的实例,因为该服务保证消息至少有一次交付。如果您希望限制 Pub/Sub 只发送一条消息,这也是可能的,但只能通过 "Pull "订阅来实现,这意味着您现在面临的是批处理架构。

为了解决数据库中重复信息的问题,我们有两个选择:一是在处理过程中过滤信息,删除已经处理过的信息;二是使用一定频率的 SQL 查询来清理数据库。我们决定采用第一种解决方案。之所以这样选择,是因为我们希望尽可能保持实时性。但是,如果执行 SQL 查询,我们就不得不分批进行工作。此外,对大量数据进行常规 SQL 查询可能会产生大量成本。

解决方案

为了解决这个问题,我们必须使用新的 GCP 服务,您可以在我们的架构图中看到这些服务。

我们在谷歌云上建立了一个名为 Memorystore 的 Redis 实例。这项服务被用作缓存:一旦成功处理了一条信息,我们就会将信息 ID 作为密钥写入 Memorystore。然后,一旦有新消息到达,我们就会查询 Memorystore 实例,看看消息 ID 是否已经存在于数据库中。如果是,我们就不处理该消息,因为这意味着 Cloud Run 已经处理过该消息。如果消息 ID 不在 Memorystore 中,我们就会处理消息并将 ID 写入实例,从而表示消息刚刚被处理过。

在 Memorystore 中写入密钥时,还可以为其指定有效期。在我们的例子中,我们将有效期设置为 15 分钟,因为信息 ID 的保存时间不需要超过 15 分钟。

我们还使用了 VPC 来确保云运行和 Memorystore 实例之间的安全连接。为了加强安全性,我们还激活了 Memorystore 实例的验证要求,这意味着我们需要一个安全密钥才能与其通信。为了安全地存储这个密钥,我们将它放在谷歌云的秘密管理器中,直接从 Cloud Run 代码中调用。

我们为什么使用 Redis 而不是其他数据库?首先,我们需要一个 "key:value "数据库,它能让我们快速检索密钥。然后,由于我们只需要在一定时间间隔内使用这些密钥,因此我们需要一个允许我们输入密钥有效期的数据库。这就是我们选择 Google Cloud 的 Memorystore 服务的原因。警告:在我们的案例中,使用 Memorystore 非常有用,因为我们有大量的数据,但必须说明的是,其使用必须与使用案例相适应,因为在最低配置下,该服务的费用为每月 70 美元。

结果

经过几周的测试,我们的架构平均每秒能够处理 1,500 个请求,最高时达到 3,000 个。我们观察到,我们的重复检查系统和 Memorystore 完全没有增加 Cloud Run 处理请求的延迟。我们还发现,我们的系统平均每天检测到 5,000 到 15,000 条重复消息,高峰时每天检测到 300,000 条,而每天的消息总数约为 7,000 万条。更重要的是,现在所有邮件都只向数据库传送一次。

可能的改进

关于我们的架构,有几点可以修改或改进。 

如果您的第一个收集步骤需要调整大量参数(请求数、每个实例的 CPU 和内存数等),并且您想使用 Docker 作为部署工具,那么用云运行来替换我们的云函数可能更适合您的使用案例。

如果您希望您的收集点检索来自地理位置分散的外部用户的请求,请考虑在用户和您的收集点(Cloud Run 或 Cloud Function)之间设置一个负载平衡器。此外,有了负载平衡器,您就可以轻松集成 Cloud Armor(谷歌云的 WAF),并管理您的子域名。

最后,如果您的数据处理量很小,甚至不存在数据处理量,而且您不想使用 Docker 来简化部署,那么您可以用云功能来代替我们的云运行。

所有文章

相关文章

糟糕!无法找到您想访问的页面

期待获得更多专家见解?
立即订阅55数据月度精选Tea O'Clock!

发现最新资讯、深度文章、网络研讨会视频,以及55数据的各项活动。

名*
姓氏*
公司*
首选语言*
电子邮件*
谢谢!

您的订购要求已被充分考虑。
糟糕!提交表格时出了点问题。