返回

用 DolphinDB 实时计算分钟资金流的宝典

后端

流数据处理近年来受到了很多开发者的关注,因此,我们提供了使用DolphinDB 流数据框架,进行分钟资金流计算的低延时解决方案。在本教程中,你将领略DolphinDB 流数据处理框架的魅力。

我们先简要介绍一下DolphinDB。
它是基于 MPP 架构的列式数据库,其性能优势明显。它还内置了流数据处理框架,该框架操作简单,并且计算延时较低。

现在让我们通过一个详细的用例,来说明如何使用DolphinDB 流数据处理框架进行分钟资金流的计算。

背景

假设某交易所为我们提供了一份包含股票交易记录的数据源,该数据源每笔数据代表一笔股票交易,其中包含股票代码、股票名称、成交时间、成交价格、成交数量等字段。

目标

我们的目标是基于 DolphinDB 流数据处理框架,从股票交易数据源中实时提取数据,并计算每分钟的资金流,并将结果持久化到表中。

解决方案

DolphinDB 提供了两种计算流数据的方法,一是实时查询,二是函数查询。

实时查询采用异步计算模式,可以实时处理数据。函数查询是另一种计算流数据的方法,其计算模式为同步模式,但其时延较高。

对于实时计算分钟资金流,我们推荐采用DolphinDB的实时查询方法。

首先,我们需要创建一个数据源,用来接收股票交易数据。

source = source('stream_source', '股票交易数据源地址');

然后,我们需要创建一个实时查询任务,并将其绑定到数据源。该实时查询任务负责实时读取数据源中的数据,并计算每分钟的资金流。

task = task( source, '实时计算分钟资金流任务', 
             where = '成交时间 >= ' || now() - (24 * 60 * 60 * 1000),
             expr = 'select 
                       股票代码,
                       股票名称,
                       sum(成交金额) as 资金流
                     from
                       股票交易数据
                     where 
                       成交时间 < ' || now() - (24 * 60 * 60 * 1000) + (60 * 1000)
                     group by 
                       股票代码, 股票名称',
             interval = 60 * 1000);

最后,我们需要将实时查询任务的结果持久化到表中。

sink = sink('db', '分钟资金流表');

task = foreach(task, sink);

现在,我们的 DolphinDB 流数据处理框架已经搭建完成,并且可以实时计算每分钟的资金流了。

如果你还想知道更多详细的内容,可以去查阅DolphinDB 官网上的相关文档。