nicolasyang's blog
Flink 里的 enableObjectReuse 配置到底是什么?
打开后能提升多少性能?会造成问题吗?

Flink 的 ExecuteConfig 中有个 enableObjectReuse 配置,文档说得很简单:

By default, objects are not reused in Flink. Enabling the object reuse mode will instruct the runtime to reuse user objects for better performance. Keep in mind that this can lead to bugs when the user-code function of an operation is not aware of this behavior.

除此之外,就只有在批处理 DataSet API 的文档里有提到这个,在流处理 DataStream API 完全没有提到这个。实际上,这个配置对流处理应用也有很大的性能影响。

下面主要讲一下,enableObjectReuse 在流处理中的效果,以及什么情况下可以安全地启用这个配置。至于批处理的情况,文档说得很清楚,就不再说了。

Operator Chaining

当相邻的 operators 直接不需要做数据 shuffle 时,flink 框架会把它们合并到一起,形成一个 task. Task 的每个并发都是一个 subtask. 一个 subtask 的计算任务,都是在一个线程内完成的。

因此,在一个 subtask 内,不同 operator 之间传递对象时,是可以不做序列化的。

enableObjectReuse

但默认配置下,并不是如此。即使在一个 subtask 内,一个 operator 执行完,输出的对象就会被序列化,下一个 operator 执行时,再把对象反序列化回来。只有显式设置 enableObjectReuse(), 才能消除这里的序列化操作。

序列化是开销很大的操作,Flink 为什么要这样设计呢?这里主要是为了兼容一些奇怪写法的代码。这些代码涉及了复用可变对象的情况。如果不通过序列化/反序列化把对象拷贝一份,就会可能会造成意想不到的 bug. 下面说明一下这些情况。

Object Reuse 造成逻辑错误的例子

  • 上游 Operator: 输出成员变量

    class A extends MapFunction<String, Foo> {
        private Foo foo = new Foo();
    
        /* ... some other logic... */
    
        public Foo map(String value) {
            return foo;
        }
    }
    
  • 下游 Operator: 改变输入对象

    class B extends MapFunction<Foo, String> {
        public Foo map(Foo value) {
            String tmp = value.bar
            value.bar = "";
            return tmp;
        }
    }
    

这种情况下,由于下游 operater 修改的其实是上游 operator 的成员变量,上游 operator 看起来即使自己的成员变量莫名其妙地变了。

类似地,StateBackend 里面的数据效果也和成员变量类似。

此外,如果有多个 operator 处理同一个输入,例如:

DataStream<Foo> fooStream = ...;
fooStream.map(new B());
fooStream.map(new AnotherMapFunction());

那么 AnotherMapFunction 读取输入的时候,就会读到被 B 改过的输入,造成逻辑错误。

如何保证 Object Reuse 安全

使用 immutable data structure. scala 大法好, case class 赛高

如果程序使用的数据结构很复杂(比如 Map 嵌套 Map 再嵌套 List 之类的),序列化开销会很大,打开 enableObjectReuse 可以有效地提高性能。

不复杂也能提高性能,只要没有上面说的奇怪写法,都建议打开。


最后修改于 2021-09-20

Comments powered by Disqus