将高频事件保存到连接限制受限的数据库

我们遇到的情况是,我必须处理大量涌入我们服务器的事件,平均每秒约1000次事件(峰值可能是~2000)。

问题

我们的系统托管在 Heroku 上,使用相对昂贵的 Heroku Postgres DB ,允许最多500个DB连接。我们使用连接池从服务器连接到数据库。

事件的速度比数据库连接池可以处理的速度快

问题 we have is that events come faster than the connection pool can handle. By the time one connection has finished the network roundtrip from the server to the DB, so it can get released back to the pool, more than n additional events come in.

最终事件堆叠起来,等待保存,并且由于池中没有可用的连接,它们超时并且整个系统变得不可操作。

我们通过从客户端以较慢的速度发出有问题的高频事件来解决紧急情况,但我们仍然想知道在需要处理高频事件时如何处理这种情况。

约束

其他客户端可能希望同时读取事件

其他客户端不断请求使用特定密钥读取所有事件,即使它们尚未保存在DB中。

客户端可以查询 GET api/v1/events?clientId = 1 并获取客户端1发送的所有事件,即使这些事件尚未在DB中保存。

有关于如何处理这个问题的“课堂”示例吗?

可能的解决方案

将事件排入我们的服务器

我们可以在服务器上排队事件(队列的最大并发数为400,因此连接池不会用完)。

这是坏主意,因为:

  • 会占用可用的服务器内存。堆积的排队事件将消耗大量的RAM。
  • 我们的服务器每24小时重启一次。这是Heroku强加的硬限制。服务器可以在事件排队时重新启动,导致我们丢失排队的事件。
  • 它在服务器上引入了状态,从而损害了可伸缩性。如果我们有多服务器设置并且客户端想要读取所有已排队+已保存的事件,我们将无法知道排队事件所在的服务器。

使用单独的消息队列

我假设我们可以使用消息队列(比如 RabbitMQ ?),我们在其中输入消息另一端有另一台服务器,只处理在DB上保存事件。

我不确定消息队列是否允许查询排队事件(尚未保存),所以如果另一个客户端想要读取另一个客户端的消息,我可以从数据库中获取保存的消息,并从队列中获取待处理的消息并将它们连接在一起,这样我就可以将它们发送回读请求客户端。

使用多个数据库,每个数据库都使用中央DB协调器服务器保存一部分消息来管理它们

我们的另一个解决方案是使用多个数据库,使用中央“数据库协调器/负载均衡器”。收到活动后 此协调器将选择其中一个数据库来写入消息。这应该允许我们使用多个Heroku数据库,从而将连接限制提高到500 x数量的数据库。

在读取查询时,此协调器可以向每个数据库发出 SELECT 查询,合并所有结果并将它们发送回请求读取的客户端。

这是坏主意,因为:

  • 这个想法听起来像......过度工程?管理也是一场噩梦(备份等......)。它的构建和维护很复杂,除非绝对必要,否则它听起来像 KISS 违规。
  • 它牺牲了一致性。如果我们采用这种想法,在多个数据库中进行交易是不可取的。
12
一些答案考虑到了这一点,但我宁愿问:是否绝对有必要将100%的事件正确插入数据库中,如果是这样,当服务器重启时,您目前如何处理问题?
额外 作者 Walfrat,
所以你想要100%的可用性但不是同步的。然后我的赌注是首先在本地持久保存事件(例如:文件)并定期导出文件(这可能是tmp文件滚动,以避免每30秒锁定一次)。这种系统的基础是你可以同时拥有所有东西(没有损失,即时过程,保持性能)。你需要知道你可以丢弃什么(例如:同步或真正的0%损失)来获得你需要的东西。然而,这取决于您的系统的要求,您可能不是修复它们的那个。
额外 作者 Walfrat,
你应该澄清这个比率是高峰还是平均。如果它达到顶峰,那么每天的活动数量是多少?
额外 作者 JimmyJames,
“我们通过从客户那里以较慢的速度发出有问题的高频事件来解决紧急情况,但我们仍然想知道在我们需要处理高频事件的情况下如何处理这种情况。”我不确定这是如何解决这个问题的。如果你获得的平均收益超过平均水平,那么客户放慢速度是否意味着他们不断积累需要处理的更积极的事件?
额外 作者 JimmyJames,
你的瓶颈在哪里?您提到了连接池,但这只会影响并行性,而不会影响每次插入的速度。如果您有500个连接,例如2000QPS,如果每个查询在250ms内完成,这是一个很长的时间,这应该可以正常工作。为什么超过15ms?另请注意,通过使用PaaS,您可以放弃重要的优化机会,例如扩展数据库硬件或使用读取副本来减少主数据库的负载。 Heroku不值得,除非部署是你最大的问题。
额外 作者 amon,
@NicholasKyriakides正确的硬件不是微优化。它是扩展数据库的主要方式。这里,一个数据中心内的网络延迟可忽略不计,<1ms。写入企业级SSD也<1ms。对于1000个交易,您至少需要1k IOPS,例如硬盘无法提供,但RAID-0可以提供帮助。一个称职的系统管理员应该能够正确配置所有这些。但你看到了问题。要么你在软件组件中遇到了巨大的性能问题(你已经为数据库排除了这个问题),要么你的PaaS真的非常糟糕。云太糟糕了。
额外 作者 amon,
在通过网络发送它们之前,在单个请求中打包几个事件不是一个选项吗?我已经解决了类似的问题,让每个客户在一个请求中“打包”在给定时间范围内发生的所有事件,并且每隔10~15秒左右发送一次。如果这是一个选项,给我一个ping,我会在完整的答案上扩展它。
额外 作者 T. Sar,
您究竟如何验证连接池是否存在问题? @amon在他的计算中是正确的。尝试在500个连接上发出 select null 。我打赌你会发现连接池不是那里的问题。
额外 作者 user26009,
如果选择null是有问题的,那么你可能是对的。虽然花费所有时间都会很有趣。没有网络那么慢。
额外 作者 user26009,
@amon瓶颈确实是连接池。我在查询本身上运行 ANALYZE ,它们不是问题。我还构建了一个原型来测试连接池假设,并验证这确实是问题所在。数据库和服务器本身存在于不同的机器上,因此存在延迟。此外,除非绝对必要,否则我们不想放弃Heroku,不担心部署对我们来说是一个巨大的加号。
额外 作者 Nicholas Kyriakides,
......这种情况让我们认为,虽然我们可以通过油门“解决这个问题”,但这一次我们不会这么做。
额外 作者 Nicholas Kyriakides,
@JimmyJames 不会减慢客户端意味着他们不断构建需要处理的更深层积压事件吗?。不是在这种情况下。我们限制了客户,因此他们以较低的速度发送那个事件。对于那个事件,我们不需要以这种速度发送的数据,但它会很高兴。有些事件我们总是需要它们。现在我们没有那么多的用户,所以所需的事件会导致同样的问题,但我们很快就会看到它的样子。我并没有完全解决我目前的问题......
额外 作者 Nicholas Kyriakides,
@Walfrat我们没有处理它。我们只是放慢了作为临时解决方案发布事件的速度。另外:绝对有必要将100%的事件正确插入数据库。是的,不是;如果客户端向服务器发送事件,我想保证在2,3年后立即可供其他客户阅读。它不必立即插入数据库中,但任何提出的解决方案最好是容错的。
额外 作者 Nicholas Kyriakides,
@JimmyJames编辑了这个问题,这是平均水平。
额外 作者 Nicholas Kyriakides,
@usr我的测试工具是在50个连接上运行的,而不是500个。我运行 SELECT NULL ,但仍然存在问题。此外,我在查询上运行 ANALYZE ,它们的时间似乎很好。虽然我的问题的概念仍然存在,但我会用更准确的数据更新它。我也忘了添加通过网络发送的查询的大小,这是非常大的(平均约5KB)
额外 作者 Nicholas Kyriakides,
话虽如此,我知道我可以做的微优化可以帮助我解决当前的问题。我想知道我的问题是否有可扩展的架构解决方案。
额外 作者 Nicholas Kyriakides,
作为一般准则,我会说:当你达到你正在使用的技术的极限时,你需要开始转向其他技术。
额外 作者 Dominique,

6 答案

我的猜测是你需要更仔细地探索一个你拒绝的方法

  • 将事件排入我们的服务器

我的建议是开始阅读有关 LMAX架构的各种文章。他们设法为他们的用例进行大批量的批处理工作,并且可能使你的权衡看起来更像他们的。

此外,您可能希望了解是否可以将读取放在一边 - 理想情况下,您希望能够独立于写入来扩展它们。这可能意味着要研究CQRS(命令查询责任隔离)。

服务器可以在事件排队时重新启动,导致我们丢失排队的事件。

在分布式系统中,我认为您可以非常确信消息会丢失。您可以通过明智地了解序列障碍来减轻其中的一些影响(例如 - 确保在系统外共享事件之前发生对持久存储的写入)。

  • 使用多个数据库,每个数据库都使用中央数据库协调服务器保存部分消息来管理它们

也许 - 我更有可能查看您的业务边界,看看是否有自然的地方来分隔数据。

有些情况下,丢失数据是可以接受的权衡吗?

好吧,我想可能会有,但那不是我要去的地方。关键在于,设计应该具有在消息丢失方面取得进展所需的稳健性。

通常看起来像是带有通知的基于拉的模型。 Provider将消息写入有序的持久存储区。消费者从商店中提取消息,追踪自己的高水位线。推送通知用作延迟减少设备 - 但是如果通知丢失,则仍然会(最终)提取消息,因为消费者正在提前定时(不同之处在于,如果收到通知,则拉动更快发生)。

See Reliable Messaging Without Distributed Transactions, by Udi Dahan (already referenced by Andy) and Polyglot Data by Greg Young.

11
额外
在分布式系统中,我认为您可以非常确信消息会丢失。真?有些情况下,丢失数据是可以接受的权衡吗?我的印象是失去数据=失败。
额外 作者 Nicholas Kyriakides,
@NicholasKyriakides,这通常是不可接受的,因此OP建议在发布事件之前写入耐用商店的可能性。查看这篇文章此视频由Udi Dahan撰写,他更详细地解决了这个问题。
额外 作者 Andy,

输入流

目前尚不清楚你的1000个事件/秒是代表峰值还是连续负载:

  • 如果它是一个峰值,您可以使用消息队列作为缓冲区,以便在较长时间内将负载分散到DB服务器上;
  • 如果它是常量加载,单独的消息队列是不够的,因为DB服务器永远无法赶上。然后你需要考虑一个分布式数据库。

提出的解决方案

直观地说,在这两种情况下,我都会去 Kafka 为基础的活动 - 流:

  • All events are systematically published on a kafka topic
  • A consumer would subscribe to the events and store them to the database.
  • A query processor will handle the requests from the clients and query the DB.

这在各个层面都具有高度可扩展性:

  • 如果数据库服务器是瓶颈,只需添加几个消费者。每个人都可以订阅该主题,并写入不同的DB服务器。但是,如果分布在DB服务器上随机发生,则查询处理器将无法预测要采用的数据库服务器,并且必须查询多个数据库服务器。这可能会导致查询方面出现新的瓶颈。
  • 因此,可以通过将事件流组织成若干主题(例如,使用键组或属性组,根据可预测的逻辑对DB进行分区)来预期DB分发方案。
  • 如果一个消息服务器不足以处理越来越多的输入事件,您可以添加 kafka分区 以跨多个物理服务器分发kafka主题。

提供尚未在DB中写入客户端的事件

您希望您的客户端能够访问仍在管道中但尚未写入数据库的信息。这有点微妙。

选项1:使用缓存来补充数据库查询

我没有深入分析,但我想到的第一个想法是让查询处理器成为kafka主题的消费者,但是在另一个 kafka消费者群体 。然后,请求处理器将接收DB编写器将接收的所有消息,但是它们是独立的。然后它可以将它们保存在本地缓存中。然后,查询将在DB +缓存上运行(+删除重复项)。

然后设计看起来像:

enter image description here

可以通过添加更多查询处理器(每个查询处理器在其自己的使用者组中)来实现此查询层的可伸缩性。

选项2:设计双API

一个更好的方法IMHO将提供双API(使用单独的消费者组的机制):

  • 用于访问数据库中的事件和/或进行分析的查询API
  • 直接从主题转发邮件的流式传输API

优点是,让客户决定什么是有趣的。当客户端只对新的传入事件感兴趣时,这可以避免系统地将DB数据与新兑换的数据合并。如果确实需要新鲜事件和存档事件之间的微妙合并,那么客户端必须组织它。

变种

我提议使用kafka,因为它专为非常高容量而设计/ a>使用持久性消息,以便您可以根据需要重新启动服务器。

您可以使用RabbitMQ构建类似的架构。但是,如果您需要持久性队列,请

它可能会降低性能。另外,据我所知,使用RabbitMQ实现多个读者(例如编写器+缓存)并行消耗相同消息的唯一方法是克隆队列。因此,更高的可扩展性可能会以更高的价格出现。

8
额外
@NicholasKyriakides我解释说“其他客户持续要求阅读使用特定密钥的所有事件,即使他们尚未保存在数据库中“作为需要进行数据库查询(”全部“)并将其与传入事件合并(此处使用直接从输入馈送的”缓存“处理),从而消除了双精度。如果用“all”表示“全新”,我们可以简化:没有缓存,没有合并,要么从DB读取,要么转发新事件
额外 作者 Christophe,
是。我的第一个想法是不去随机分发,因为它可能会增加查询的处理负荷(即大多数时候都是多个DB的查询)。您还可以考虑分布式数据库引擎(例如,Ignite?)。但是要做出任何明智的选择,需要很好地理解数据库使用模式(数据库中还有什么,查询的频率,查询类型,是否存在超出单个事件的事务约束等等)。
额外 作者 Christophe,
@NicholasKyriakides谢谢! 1)我只是考虑几个独立的数据库服务器,但有一个明确的分区方案(密钥,地理等),可用于有效地分派命令。 2)直观地,也许是因为Kafka的设计非常具有持久性消息的高吞吐量需要重新启动服务器吗?)。我不确定RabbitMQ对于分布式方案是否灵活,而且持久性队列降低性能</一>
额外 作者 Christophe,
恒星;你是什​​么意思一个分布式数据库(例如使用一组密钥服务器的专业化)?还有为什么Kafka而不是RabbitMQ?有没有特别的理由选择一个而不是另一个?
额外 作者 Nicholas Kyriakides,
对于1)所以这非常类似于我的 Use multiple databases idea,但你说我不应该随机(或循环)将消息分发给每个任何数据库。对?
额外 作者 Nicholas Kyriakides,
我想知道,为什么需要本地缓存?使用多个数据库/编写器的整个想法是即时保存事件,几乎从不存在积压。为什么不直接从DB读取?
额外 作者 Nicholas Kyriakides,
即使它们尚未保存在DB中。。我在这里的意思是,如果选择一个接受的解决方案,总会有积压的事件尚未写入,那么读取客户端也希望得到积压事件。多数据库的想法几乎意味着没有积压(理论上)=永远未保存的数据库事件=不需要缓存。
额外 作者 Nicholas Kyriakides,
只是想说,尽管kafka可以提供非常高的吞吐量,但它可能超出了大多数人的需求。我发现处理kafka及其API对我们来说是一个很大的错误。 RabbitMQ并不懈怠,它具有您期望从MQ获得的接口
额外 作者 Ankit,

如果我理解正确,目前的流程是:

  1. 接收和事件(我通过HTTP假设?)
  2. 从池中请求连接。
  3. 将事件插入数据库
  4. 释放与池的连接。

如果是这样,我认为对设计的第一个改变是停止让你的偶数处理代码在每个事件上返回到池的连接。而是创建一个插入线程/进程池,它与数据库连接数为1对1。这些将包含专用的DB连接。

使用某种并发队列,然后让这些线程从并发队列中提取消息并插入它们。理论上,他们永远不需要将连接返回到池或请求新的连接,但是如果连接变坏,您可能需要构建处理。杀死线程/进程并开始一个新线程可能是最容易的。

这应该有效地消除连接池开销。当然,您需要能够在每个连接上每秒至少推送1000个/连接事件。您可能想要尝试不同数量的连接,因为在同一个表上有500个连接可能会在DB上产生争用,但这是完全不同的问题。另一件需要考虑的事情是使用批量插入,即每个线程拉出一些消息并立即将它们全部推送。此外,避免多个连接尝试更新相同的行。

6
额外

假设

我将假设您描述的负载是不变的,因为这是更难以解决的方案。

我还假设您可以通过某种方式在Web应用程序流程之外运行触发的,长时间运行的工作负载。

Assuming that you have correctly identified your bottleneck - latency between your process and the Postgres database - that is the primary problem to solve for. The 解 needs to account for your consistency 约束 with other clients wanting to read the events as soon as practicable after they are received.

要解决延迟问题,您需要以最小化每个事件要存储的延迟量的方式工作。 如果您不愿意或无法更改硬件,这是您需要实现的关键。鉴于您使用PaaS服务并且无法控制硬件或网络,减少每个事件延迟的唯一方法是使用某种批量写入事件。

您需要在本地存储一个事件队列,这些事件将被刷新并定期写入您的数据库,一旦达到给定大小,或者经过一段时间后。进程需要监视此队列以触发对存储的刷新。关于如何管理以您选择的语言定期刷新的并发队列应该有很多示例 - 以下是C#中的一个示例,来自流行的Serilog日志库的定期批处理接收器。

This SO answer describes the fastest way to flush data in Postgres - although it would require your batching store the queue on disk, and there is likely a problem to be solved there when your disk disappears upon reboot in Heroku.

约束

Another answer has already mentioned CQRS, and that is the correct approach to solve for the 约束. You want to hydrate read models as each event is processed - a Mediator pattern can help encapsulate an event and distribute it to multiple handlers in-process. So one handler may add the event to your read model that is in-memory that clients can query, and another handler can be responsible for queuing the event for its eventual batched write.

CQRS的主要优点是您可以将概念性读写模型分离 - 这是一种说您写入一个模型的奇特方式,而您从另一个完全不同的模型中读取。为了获得CQRS的可扩展性优势,您通常希望确保每个模型以最适合其使用模式的方式单独存储。在这种情况下,我们可以使用聚合读取模型 - 例如,Redis缓存,或简单地在内存中 - 以确保我们的读取快速且一致,同时我们仍然使用我们的事务数据库来编写数据。

5
额外

事件的速度比数据库连接池可以处理的速度快

如果每个进程都需要一个数据库连接,则会出现问题。应该设计系统,以便拥有一个工作池,每个工作者只需要一个数据库连接,每个工作人员可以处理多个事件。

消息队列可以与该设计一起使用,您需要将事件推送到消息队列的消息生成器,并且worker(消费者)处理来自队列的消息。

其他客户可能希望同时读取事件

只有存储在数据库中的事件没有任何处理(原始事件)时,才能使用此约束。如果事件在存储到数据库之前被处理,那么获取事件的唯一方法是从数据库。

如果客户只想查询原始事件,那么我建议使用像Elastic Search这样的搜索引擎。您甚至可以免费获得查询/搜索API。

鉴于在将数据保存到数据库之前查询事件对您来说很重要,像Elastic Search这样的简单解决方案应该可行。您基本上只是将所有事件存储在其中,并且不会通过将它们复制到数据库中来复制相同的数据。

扩展弹性搜索很容易,但即使使用基本配置,它也具有相当高的性能。

当您需要处理时,您的流程可以从ES获取事件,处理并将它们存储在数据库中。我不知道您在此处理中需要的性能级别,但它与从ES查询事件完全不同。您不应该有连接问题,因为您可以拥有固定数量的worker并且每个都有一个数据库连接。

3
额外

我会把heroku放在一起,也就是说,我会放弃一个集中的方法:多次写入,最大池连接达到峰值是db cluster发明的主要原因之一,主要是因为你没有加载写入db(s)具有可以由集群中的其他db执行的读取请求,我尝试使用主从拓扑,而且 - 正如其他人已经提到的那样,拥有自己的数据库安装可以调整整个系统,以确保正确处理查询传播时间。

祝好运

1
额外