How to debug

Flink是支持本地local运行的,但需要注意的是,如果pom.xml里面Flink相关的依赖是provided的scope的话,直接运行会报错NoClassDefFoundError。这是因为Flink框架本身就包含了Flink相关的依赖,打包的时候可以不用打包进去,但本地执行的话是不能缺少的。(如果是用IntelliJ来执行的话,有时候可能需要重启/删除debug配置再执行)
不修改pom.xml的话也可以通过配置debug设置来包含provided的依赖:

IntelliJ IDEA配置

另外,flink和slf4j-log4j12不兼容,如果dependency有这个包会导致本地启动失败。

A simple test

Socket Stream

一个比较简单的测试是通过socket来发数据,Flink接收并打印出来。
在本地起一个socket的进程:

nc -lk 7777

Flink代码:

import org.apache.flink.streaming.api.scala._

object SimpleTest {
  def main(args: Array[String]): Unit = {
    // set up streaming execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
      // Receive from socket
      val stream = env.socketTextStream("localhost", 7777)
      // Sink to stdout
      stream.print()
      // Execute flink job
      env.execute("Test Job")
  }
}

本地启动Flink进程后,在socket的进程输入多行字符串就能看到Flink的输出了。

From elements

一个简单的词频统计Job:

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.log4j._

object TestJob {

  val logger: Logger = Logger.getLogger(getClass.getName)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val dataStream = env.fromElements("aaa bbb ccc", "qqq ccc fff")
    val keyedStream = dataStream.flatMap { x => x.split(" ")}.map(x => (x, 1)).keyBy(0)
    keyedStream.print("keyed")
    val reduceStream = keyedStream.reduce(new ReduceFunction[(String, Int)] {
      override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
        (t1._1, t1._2 + t2._2)
      }
    })
    reduceStream.print("reduced")
    env.execute("Test job")
  }
}

logging

默认本地local debug,会有有一些log4j的warning,可以这样配置log4j:

log4j.rootLogger=DEBUG, consoleAppender, fileAppender
log4j.appender.consoleAppender=org.apache.log4j.ConsoleAppender
log4j.appender.consoleAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.consoleAppender.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
log4j.appender.fileAppender=org.apache.log4j.RollingFileAppender
log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.fileAppender.layout.ConversionPattern=[%t] %-5p %c %x - %m%n
log4j.appender.fileAppender.File=demoApplication.log

watermark

水位线的意思是,当水位(数据)到达标记的线后就可以触发一些操作。通常来说,watermark是和window一起来作用的,watermark用于window触发。
所以如果需要window操作,那么通常需要先添加watermark。

并不是每条数据都会生成水位线。水位线也是一条数据,是流数据的一部分,watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加。

对于Kafka的数据源,我们通常这样来生成watermark:

stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyDefinedEvent](Time.seconds(10)) {
  override def extractTimestamp(t: MyDefinedEvent): Long = t.eventts
})

这里即使用了t.eventts字段作为生成watermark的时间,并且每条数据都会生成一个watermark。

如果并行度不为1,那么在计算窗口时,是按照各自的并行度单独计算的。只有当所有并行度中都触发了同一个窗口,那么这个窗口才会触发。

Back pressure

在Flink UI页面点击job中具体某个环节可以看到它的back pressure,ratio越大表明压力越大。注意这里的压力并不是指当前环节的压力,而是下面一个环节的。即下一个环节处理速度跟不上当前的环节。

Data skew

数据倾斜问题很容易会导致Back pressure。比如使用一个keyBy操作,由于keyBy的值有很多都是同一个值,那么就会导致大量的数据流入同一个task中,而其他task都是空闲状态:要解决数据倾斜,一种方法是在进行keyBy操作之前,先通过map把大量相同数值的那个字段重新赋予打乱的值(或者新建一个字段),然后再keyBy。另一种方法是keyBy操作添加一个不相关的字段来达到打乱的效果(当然这些字段不能也数据倾斜,至少不能倾斜的分布类似)。
需要注意的是keyBy和shuffle以及rebalance类似,都是partitioning的操作,keyBy可以理解为按照key来把数据重新分发,因此keyBy之后再接上一个shuffle或者rebalance操作上不允许的(也没有意义)。
shuffle和rebalance的区别是shuffle是随机打乱,而rebalance是round-robin的方式来打断数据分发。

Kafka offset out of range

观测到Flink job聚合的数据有两次骤降

碰到过一个问题,在某段时间Flink job sink的结果突然骤降,然后过了几个小时又恢复了正常。观察consumer lag也是类似的情况:

consumer lag

一开始以为是处理的数据由于lag太大超出了我们设置的时间窗口被丢掉了,但如果是这种情况,也应该丢一些再处理一些,无法解释consumer lag骤降的情况。

最后在log里面发现了Kafka offset out of range的log(居然只是INFO,而不是WARNING),表示要消费的offset在Kafka的partition里面已经不存在了(被删除了),这种情况Flink会根据Kafka相关的配置来处理,默认配置来会改从latest的offset来消费数据。