Flink实践
How to debug
Flink是支持本地local运行的,但需要注意的是,如果pom.xml
里面Flink相关的依赖是provided
的scope的话,直接运行会报错NoClassDefFoundError
。这是因为Flink框架本身就包含了Flink相关的依赖,打包的时候可以不用打包进去,但本地执行的话是不能缺少的。(如果是用IntelliJ来执行的话,有时候可能需要重启/删除debug配置再执行)
不修改pom.xml
的话也可以通过配置debug设置来包含provided的依赖:
另外,flink和slf4j-log4j12
不兼容,如果dependency有这个包会导致本地启动失败。
A simple test
Socket Stream
一个比较简单的测试是通过socket来发数据,Flink接收并打印出来。
在本地起一个socket的进程:
nc -lk 7777
Flink代码:
import org.apache.flink.streaming.api.scala._
object SimpleTest {
def main(args: Array[String]): Unit = {
// set up streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// Receive from socket
val stream = env.socketTextStream("localhost", 7777)
// Sink to stdout
stream.print()
// Execute flink job
env.execute("Test Job")
}
}
本地启动Flink进程后,在socket的进程输入多行字符串就能看到Flink的输出了。
From elements
一个简单的词频统计Job:
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.log4j._
object TestJob {
val logger: Logger = Logger.getLogger(getClass.getName)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataStream = env.fromElements("aaa bbb ccc", "qqq ccc fff")
val keyedStream = dataStream.flatMap { x => x.split(" ")}.map(x => (x, 1)).keyBy(0)
keyedStream.print("keyed")
val reduceStream = keyedStream.reduce(new ReduceFunction[(String, Int)] {
override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
(t1._1, t1._2 + t2._2)
}
})
reduceStream.print("reduced")
env.execute("Test job")
}
}
logging
默认本地local debug,会有有一些log4j的warning,可以这样配置log4j:
log4j.rootLogger=DEBUG, consoleAppender, fileAppender
log4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
log4j.appender.fileAppender.File=demoApplication.log
watermark
水位线的意思是,当水位(数据)到达标记的线后就可以触发一些操作。通常来说,watermark是和window一起来作用的,watermark用于window触发。
所以如果需要window操作,那么通常需要先添加watermark。
并不是每条数据都会生成水位线。水位线也是一条数据,是流数据的一部分,watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加。
对于Kafka的数据源,我们通常这样来生成watermark:
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyDefinedEvent](Time.seconds(10)) {
override def extractTimestamp(t: MyDefinedEvent): Long = t.eventts
})
这里即使用了t.eventts
字段作为生成watermark的时间,并且每条数据都会生成一个watermark。
如果并行度不为1,那么在计算窗口时,是按照各自的并行度单独计算的。只有当所有并行度中都触发了同一个窗口,那么这个窗口才会触发。
Back pressure
在Flink UI页面点击job中具体某个环节可以看到它的back pressure,ratio越大表明压力越大。注意这里的压力并不是指当前环节的压力,而是下面一个环节的。即下一个环节处理速度跟不上当前的环节。
Data skew
数据倾斜问题很容易会导致Back pressure。比如使用一个keyBy操作,由于keyBy的值有很多都是同一个值,那么就会导致大量的数据流入同一个task中,而其他task都是空闲状态:要解决数据倾斜,一种方法是在进行keyBy操作之前,先通过map把大量相同数值的那个字段重新赋予打乱的值(或者新建一个字段),然后再keyBy。另一种方法是keyBy操作添加一个不相关的字段来达到打乱的效果(当然这些字段不能也数据倾斜,至少不能倾斜的分布类似)。
需要注意的是keyBy和shuffle以及rebalance类似,都是partitioning的操作,keyBy可以理解为按照key来把数据重新分发,因此keyBy之后再接上一个shuffle或者rebalance操作上不允许的(也没有意义)。
shuffle和rebalance的区别是shuffle是随机打乱,而rebalance是round-robin的方式来打断数据分发。
Kafka offset out of range
碰到过一个问题,在某段时间Flink job sink的结果突然骤降,然后过了几个小时又恢复了正常。观察consumer lag也是类似的情况:
一开始以为是处理的数据由于lag太大超出了我们设置的时间窗口被丢掉了,但如果是这种情况,也应该丢一些再处理一些,无法解释consumer lag骤降的情况。
最后在log里面发现了Kafka offset out of range的log(居然只是INFO,而不是WARNING),表示要消费的offset在Kafka的partition里面已经不存在了(被删除了),这种情况Flink会根据Kafka相关的配置来处理,默认配置来会改从latest的offset来消费数据。
Comments