Stream Processing Based on Flink

flink环境搭建

flink版本选择

已有镜像的配置:

  • Ubuntu 16.04
  • Hadoop 3.1.1
  • jdk 1.8.0_181

前往flink 下载页面, 根据release的描述,能配合Hadoop的flink并不支持我们服务器上的Hadoop版本,我们选择了Apache Flink 1.7.2 Only的版本.

1

flink环境配置

flink运行时主要有3个配置文件: slaves,masters, flink-conf.yaml.

由于我们并未使用flink的HA模式如zookeeper, masters这一配置文件在执行start-cluster.sh脚本的时候并不会用到,所以不需要做调整.

2

slaves则需写上集群中TaskManager的主机名.

3

flink-conf.yaml是主要的配置文件,涉及到JobManager和TaskManager的内存大小和并行程度等等.

4

flink集群运行

集群启动/停止

分别调用bin/中的start-cluster.shstop-cluster.sh两个脚本即可启动/停止flink集群.

与Hadoop一样,我们也可以通过前端查看该集群的情况:

5


其实在配置集群让它们顺利启动的过程也非常曲折:Local模式可以跑,但是集群总是启动不了.遍历每一次的ljobmanager和taskmanager的log文件,并逐一google调整配置,可收效甚微.最后万念俱灰的时候依靠万能的重启竟然还真把集群启动了.

集群运行jar包

首先是尝试运行示例程序:

1
flink run example/streaming/WordCount.jar

成功执行:

6

然后是运行自己项目的jar包:

1
flink run taxi.jar

也能成功运行:

7


运行jar包时遇到的主要是相对路径的问题,即使在服务器上已经还原了代码里涉及到的文件结构还是会报错找不到路径.后来将相对路径换成服务器上的绝对路径,问题解决.

Q1:车辆流通统计

第一题题目要求如下:

  1. 设置sliding windows=1小时,每半小时滑动一下。请报告每一个grid上面的出租车总数,每10分钟的平均个数

根据之后的补充,我们这么理解题目:

1.1 统计窗口结束时停留在各grid上的车子数量

1.2 每10分钟对各车子的轨迹进行去重,统计一个窗口内6个10分钟各grid上的流通量平均值

数据结构定义

代码如下:

1
2
case class Location(time:Long, license:String, location:Int)
case class ResultStats(time: Long, locationMap: mutable.HashMap[String, Int], stats: Array[Int])

数据在预处理之后的格式是每一行为(时间戳,车牌号,所在区域),Location就用于存储该结构.

ResultStats则是一个用作aggregate()函数中的accumulator的三元组. 其一是一个Long用于存储一个基准时间,以判断10分钟去重的界限;其二是mutable.Hashmap,将车牌映射到区域,用于存储该车辆最近的位置,用于解第一小问; 其二则是一个Array,用于存储每个区域的流通总量,用于解第二问.

main函数

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val source: DataStream[String] = env.readTextFile("/root/pro1/input")

val stream = source.map(value => {
val columns = value.split(",")
Location(columns(0).toLong * 1000, columns(1), columns(2).toInt)
})

val resultStream = stream.assignAscendingTimestamps( _.time )
.map(t => (t.time ,t.license, t.location))
.windowAll(SlidingEventTimeWindows.of(Time.minutes(60), Time.minutes(30)))
.aggregate(new TaxiStats)

resultStream.writeAsText("root/pro1/output", OVERWRITE)
env.execute("flinkSucks")
}

首先是环境设置. 因为最开始在本地运行的时候,程序会输出多个文件,不方便核对,我们用env.setParallelism(1)将并行度设为1;同时我们采用的stream time为event time, 即根据读进来的流数据所带的时间戳进行处理.

之后读取流数据并进行预处理,将原始流map后转化成自定义数据结构Location的流. 接着我们对流进行处理: 先设置时间戳和水印,由于在预处理的时候对数据进行了sorting,我们这里可以选用flink自带的generatorassignAscendingTimestamps(_.getTime).

然后在window的选择上,我们没有选择keyed window因为车牌和区域都是我们需要的元信息, 所以难以通过划分key来实现, 最后选择了windowAll.

车辆流通统计的需求难以用简单的reduce()实现, 所以我们基于一个窗口下nonkey的全局数据, 采用aggregate()函数来实现目的.

aggregate函数

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class TaxiStats extends AggregateFunction[(Long, String, Int), ResultStats, String] {
override def createAccumulator(): ResultStats = {
ResultStats(600000, new mutable.HashMap[String, Int](), new Array[Int](401))
}

override def add(value: (Long, String, Int), accumulator: ResultStats): ResultStats = {
var time = accumulator.time
if(value._1 >= accumulator.time || !accumulator.locationMap.contains(value._2) || value._3 != accumulator.locationMap(value._2) ){
accumulator.locationMap.update(value._2, value._3);
accumulator.stats(value._3) += 1;
if(value._1 >= accumulator.time){
time += 600000
}
}
ResultStats(time, accumulator.locationMap, accumulator.stats)
}

override def getResult(accumulator: ResultStats):String = {
val current = new Array[Int](401)
val result = new mutable.StringBuilder("grid,current,average\n")
for(tuple <- accumulator.locationMap){
current(tuple._2) += 1
}
for (i <- 1 to 400){
result.append("%d,%d,%.2f\n".format(i, current(i), accumulator.stats(i) / 6.toDouble))
}
result.toString()
}

override def merge(a: ResultStats, b: ResultStats): ResultStats ={
val time = Math.max(a.time, b.time)
for(tuple <- b.locationMap){
a.locationMap.update(tuple._1, tuple._2)
a.stats(tuple._2) += 1
}
ResultStats(time, a.locationMap, a.stats)
}
}

aggregate()需要的参数有三个: input, accumulator, output. 中间的accumulator可以随着element的添加进行操作,我们用来存储所需的全局数据.

继承这一类的时候,我们需要载三个方法:add(), merge(),getResult().其中add()用于每次添加一个输入的element时,accumulatort可以与之发生操作;merge()用于两个accumulator合并时,将两个accumulator的数据进行整合;getResult()用于最后生成output.

我们的input输入流是一个包含时间,车牌.区域的三元组, accumulator是自定义的ResultStats类型, output则是包含该窗口统计结果的string.

  • add()方法

这一份代码的输入文件仅进行了排序,并未进行去重. 所以这一版本的代码将去重写进了代码逻辑里进行判断:

首先我们确定一个element能通过去重的条件: 时间戳已经超过10分钟限制, 或该车第一次出现或现在的位置已与上一次所在位置不同. 通过去重后, 我们需要更新车辆最近所在位置, 并且为该grid所在的车辆流通+1.

同时, 若是因为时间戳超过基准时间, 需要更新一下基准时间,为它再加10分钟.

  • merge()方法

这个方法需要整合两个accumulator的信息. 思想很简单: 对某一个accumulator中的map: car -> grid进行遍历,同时对另一个accumulator中的map和array进行更新. 基准时间则取两个accumulator中的最大值.

  • getResult()方法

最后我们需要将accumulator最后存储的信息转化为输出. 主要是统计map中的每个grid上的信息,也是通过遍历实现.

If it helps, you may buy me a cup of coffee plz.
0%