股票市场数据流(股票价格数据流处理)_炒股学院_景合财经知识网_景合财经景合财经

景合财经
景合财经知识网站

股票市场数据流(股票价格数据流处理)

之前的文章《万字长文深度解析WordCount,入门Flink,看这一篇就够了》使用WordCount展示了Flink程序的基本结构,本文将以股票价格案例来演示如何使用Flink的DataStream API。

股票市场数据流

通过本文,你可以学到:

  1. 定义相关数据结构。
  2. Flink流处理程序的骨架。
  3. Flink的执行环境概念。
  4. 自定义Source、设置时间戳和Watermark。

数据结构

Flink能处理任何可被序列化的数据结构:

  • 基础数据类型,包括 String、Integer、Boolean、Array
  • 复杂数据结构,包括 Scala case class和 Java POJO

此外,Flink也支持Kryo序列化工具。

本例使用Scala case class来定义一个股票类,该对象包括三个字段:股票代号、时间戳和价格。真实的股票交易数据比这个更为复杂,这里只是一个简化的模型。

case class StockPrice(symbol: String, timestamp: Long, price: Double)

当然,如果使用Java,也可以定义一个POJO(Plain Old Java Object),该类中各个字段或者具有public属性,或者有一个对应的getter和setter方法,且该类有一个无参数的构造函数。

public class StockPrice { public String symbol; public Long timestamp; public Double price; public StockPrice() {}; public StockPrice(String symbol, Long timestamp, Double price){ ... };}

相比而言,Scala的类定义更为简洁,因为Scala的编译器在编译阶段帮忙生成了不少代码,Java的代码风格有些臃肿。

Flink对数据类型有以上要求,主要因为在分布式计算过程中,需要将内存中的对象序列化成可多节点传输的数据,并且能够在对应节点被反序列化成对象。

Flink流处理程序的骨架结构

基于上面的数据结构,我们开始开发程序。下面的代码清单使用Flink对股票数据流分析程序,该程序能够统计数据源中每支股票5秒时间窗口内的最大值。

object StockPriceDemo { def main(args: Array[String]) { // 设置执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 每5秒生成一个Watermark env.getConfig.setAutoWatermarkInterval(5000L) // 股票价格数据流 val stockPriceRawStream: DataStream[StockPrice] = env // 该数据流由StockPriceSource类随机生成 .addSource(new StockPriceSource) // 设置 Timestamp 和 Watermark .assignTimestampsAndWatermarks(new StockPriceTimeAssigner) val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream .keyBy(_.symbol) // 设置5秒的时间窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 取5秒内某一只股票的最大值 .max("price") // 打印结果 stockPriceStream.print() // 执行程序 env.execute("Compute max stock price") }}

Java或Scala的程序入口一般是一个静态(static)的main函数。而在Scala中,object下的变量和方法都是静态的。在main函数中,还需要定义下面几个步骤:

  1. 设置运行环境。
  2. 读取一到多个数据源。
  3. 根据业务逻辑对数据流进行Transformation操作。
  4. 将结果输出。
  5. 调用作业执行函数 execute。

接下来我们对这五个步骤拆解分析。

设置执行环境

val env = StreamExecutionEnvironment.getExecutionEnvironment

这行代码可以获取一个Flink流处理执行环境。Flink一般运行在一个集群上,执行环境是Flink程序运行的上下文,它提供了一系列作业与集群交互的方法,比如作业如何与外部世界交互。当调用getExecutionEnvironment方法时,假如我们是在一个集群上提交作业,则返回集群的上下文,假如我们是在本地执行,则返回本地的上下文。本例中我们是进行流处理,在批处理场景则要获取DataSet API中批处理执行环境。

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)这行代码告知执行环境使用Event-time时间语义来进行后续时间上的计算。Event-time语义下需要依赖Watermark机制,即收到一个Watermark后,开始对这个窗口进行计算,比Watermark更晚到达的事件都被视为延迟数据。env.getConfig.setAutoWatermarkInterval(5000L)设置每5秒生成一个Watermark,默认情况下每200毫秒生成一个Watermark。

此外,我们还可以设置作业的并行度、配置Checkpoint等操作。可见,执行环境是我们与Flink交互的入口。

读取数据源

接着我们需要使用执行环境提供的方法读取数据源,读取数据源的部分统称为Source。数据源一般是消息队列或文件,我们也可以根据业务需求重写数据源,比如定时爬取网络中某处的数据。在本例中,我们使用val stockPriceRawStream: DataStream[StockPrice] = env.addSource(new StockPriceSource)来读取数据源,其中StockPriceSource随机生成了一些股票价格数据。最终生成的stockPriceRawStream是一个由StockPrice组成的DataStream数据流。

下面的代码清单展示了StockPriceSource类继承RichSourceFunction,对run方法重写,不断随机生成股票价格,生成的数据最终写入SourceContext中。

class StockPriceSource extends RichSourceFunction[StockPrice]{ var isRunning: Boolean = true val rand = new Random() // 初始化股票价格 var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d) var stockId = 0 var curPrice = 0.0d override def run(srcCtx: SourceContext[StockPrice]): Unit = { while (isRunning) { // 每次从列表中随机选择一只股票 stockId = rand.nextInt(priceList.size) val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05 priceList = priceList.updated(stockId, curPrice) val curTime = Calendar.getInstance.getTimeInMillis // 将数据源收集写入SourceContext srcCtx.collect(StockPrice("symbol_" + stockId.toString, curTime, curPrice)) Thread.sleep(rand.nextInt(10)) } } override def cancel(): Unit = { isRunning = false }}

尽管StockPrice的数据结构中有时间戳的字段,但是Flink并不知道哪个字段是时间戳,所以还需要手动设置。assignTimestampsAndWatermarks(new StockPriceTimeAssigner)方法允许我们设置时间戳和Watermark,这样Flink就可以知道本程序的时间戳。Flink Watermark相关的内容将在后续文章中介绍。

下面的代码清单抽取数据源中StockPrice的timestamp字段作为该事件的时间戳。

class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) { override def extractTimestamp(t: StockPrice): Long = t.timestamp}

Transformation

此时,我们已经获取了一个股票价格数据流,接下来我们就可以在数据流上进行有状态的计算了。我们一般使用Flink提供的各类算子,使用链式调用的方式,对一个数据流进行操作。经过各Transformation算子的处理,DataStream可能被转换为KeyedStream、JoinedStream等不同的数据流结构。相比Spark RDD的数据结构,Flink的数据流结构确实更加复杂。

本例中,我们按照股票代号对数据进行分组,并开启一个5秒的时间窗口,统计该窗口下某支股票的5秒内的最大值。

val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream .keyBy(_.symbol) // 设置5秒的时间窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 取5秒内某一只股票的最大值 .max("price")

keyBy算子将数据流按照股票的symbol分组,相同symbol的股票数据会被归结到一起;window算子开启了一个5秒的滚动窗口;max算子统计这个5秒窗口内的最大值。最终我们能够得到每支股票5秒内的最大值。

输出结果

然后我们需要将前面的计算结果输出到外部系统,可能是一个消息队列、文件系统或数据库,也可以自定义输出方式,输出结果的部分统称为Sink。

本例中,5秒窗口内每支股票的最大值是计算结果,是一个DataStream[StockPrice]结构的数据流。我们调用print函数将这个数据流打印到标准输出(standard output)。

执行

当定义好程序的Source、Transformation和Sink的业务逻辑后,程序并不会立即执行这些算子对应的任何计算,还需要调用执行环境execute()方法来执行前面的业务逻辑。Flink是延迟执行(lazy evaluation)的,即当程序明确调用execute()方法时,Flink会将数据流图转化为一个JobGraph,提交给JobManager,JobManager根据当前的执行环境来执行这个作业。

总结

一个Flink程序的核心业务逻辑主要包括:Source、Transformation和Sink三部分。程序的开始前要设置执行环境,最后要调用execute()方法。

股票市场数据流

Flink核心业务逻辑

整个程序的完整代码如下所示,完整程序和更多案例参见我的GitHub:https://github.com/luweizheng/flink-tutorials

package com.flink.tutorials.demos.stockimport java.util.Calendarimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.functions.source.RichSourceFunctionimport org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContextimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}import scala.util.Randomobject StockPriceDemo { /** * Case Class StockPrice * symbol 股票代号 * timestamp 时间戳 * price 价格 */ case class StockPrice(symbol: String, timestamp: Long, price: Double) def main(args: Array[String]) { // 设置执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 每5秒生成一个Watermark env.getConfig.setAutoWatermarkInterval(5000L) // 股票价格数据流 val stockPriceRawStream: DataStream[StockPrice] = env // 该数据流由StockPriceSource类随机生成 .addSource(new StockPriceSource) // 设置 Timestamp 和 Watermark .assignTimestampsAndWatermarks(new StockPriceTimeAssigner) val stockPriceStream: DataStream[StockPrice] = stockPriceRawStream .keyBy(_.symbol) // 设置5秒的时间窗口 .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 取5秒内某一只股票的最大值 .max("price") // 打印结果 stockPriceStream.print() // 执行程序 env.execute("Compute max stock price") } class StockPriceSource extends RichSourceFunction[StockPrice]{ var isRunning: Boolean = true val rand = new Random() // 初始化股票价格 var priceList: List[Double] = List(100.0d, 200.0d, 300.0d, 400.0d, 500.0d) var stockId = 0 var curPrice = 0.0d override def run(srcCtx: SourceContext[StockPrice]): Unit = { while (isRunning) { // 每次从列表中随机选择一只股票 stockId = rand.nextInt(priceList.size) val curPrice = priceList(stockId) + rand.nextGaussian() * 0.05 priceList = priceList.updated(stockId, curPrice) val curTime = Calendar.getInstance.getTimeInMillis // 将数据源收集写入SourceContext srcCtx.collect(StockPrice("symbol_" + stockId.toString, curTime, curPrice)) Thread.sleep(rand.nextInt(10)) } } override def cancel(): Unit = { isRunning = false } } class StockPriceTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[StockPrice](Time.seconds(5)) { override def extractTimestamp(t: StockPrice): Long = t.timestamp }}

家电维修,空调维修,智能锁维修全国报修号码分享:可以直接拔打400-968-1665 全国各大城市均设网点。
赞(0) 打赏
欢迎转载分享:景合财经 » 股票市场数据流(股票价格数据流处理)
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

非常感谢你的打赏,我们将继续给力更多优质内容,让我们一起创建更加美好的网络世界!

支付宝扫一扫打赏

微信扫一扫打赏

-景合财经

在线报修网点查询