How to Solve Flick operate Error: not serialized

An error is reported related to flick serialization

Problem solving run code error reporting content solution custom implementation serialization re-execute code

The problem goes deep into why the error is thrown, why the source of the error needs to be serialized, and why the closure needs to be cleaned up

Problem-solving

Run code

public class JavaSourceEx {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1)use fromCollection(Collection) to read datas
        //ArrayList<String> list = new ArrayList<>();
        //list.add("hello");list.add("word");list.add("cctv");
        //DataStreamSource<String> stream01 = env.fromCollection(list);

        // 2)use fromCollection(Iterator, Class) to read datas
        Iterator<String> it = list.iterator();
        DataStreamSource<String> stream02 = env.fromCollection(it, TypeInformation.of(String.class));

        stream02.print().setParallelism(1);
        env.execute();
    }
}

Error content

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: java.util.ArrayList$Itr@a1cdc6d is not serializable. The implementation accesses fields of its enclosing class, which is a common reason for non-serializability. A common solution is to make the function a proper (non-inner) class, or a static inner class.
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164)
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2053)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1737)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:1147)
	at Examples.JavaSourceEx.main(JavaSourceEx.java:30)
Caused by: java.io.NotSerializableException: java.util.ArrayList$Itr
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143)
	... 6 more

Process finished with exit code 1

Solution

If reading data from an internal container:
1) the official flick also provides the following methods: from collection (Collection) , you can replace the method of reading data from the iterator with this method
2) the reason for the error is that the iterator does not implement the serial machine interface. The container has implemented serialization, but the iterator has not been implemented. Therefore, if you want to use it, you need to customize the iterator and implement the serialization interface. This operation is redundant, so it is recommended to solve it according to the first method

Custom implementation serialization

package Examples.Utils;

import com.sun.org.apache.xpath.internal.functions.WrongNumberArgsException;
import com.sun.tools.jdi.EventSetImpl;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;

public class MyListItr<T> implements Serializable{

    private static int default_capacity = 10;
    private int size = 0;
    private Object[] elements;

    public MyListItr(){
        this.elements = new Object[default_capacity];
    }
    public MyListItr(int capa){
        this.default_capacity = capa;
        this.elements = new Object[default_capacity];
    }

    public int size(){
        return this.size;
    }

    public T get(int index) throws MyException {
        if(index<0){
            throw new MyException("Index given cannot be less than 0");
        }
        if(index>=size){
            throw new MyException("Index given cannot be larger than or equal to the collection size");
        }
        return (T)elements[index];
    }

    public T add(T ele){
        if(size == default_capacity){
            elements = Arrays.copyOf(elements,default_capacity*2);
            default_capacity *=2;
            elements[size++] = ele;
        }else{
            elements[size++] = ele;
        }
        return (T)ele;
    }

    public Iterator iterator(){
        return new Itr();
    }

    private class Itr implements Iterator<T>, Serializable {

        int cursor;
        Itr(){}
        @Override
        public boolean hasNext() {
            return cursor!=size();
        }

        @Override
        public T next() {
            return (T)elements[cursor++];
        }


    }

    public static void main(String[] args) throws MyException {
        MyListItr<Integer> obj = new MyListItr<>();
        obj.add(1);
        obj.add(2);
        System.out.println(obj.get(0));
        obj.add(3);
        Iterator it = obj.iterator();
        while(it.hasNext()){
            System.out.println(it.next());
        }

    }

}


class MyException extends Exception implements Serializable{
    public MyException(String message) {
        super(message);
    }
}

Re execute the code

	MyListItr<Integer> myList = new MyListItr<>();
	myList.add(1);myList.add(2);myList.add(3);
	Iterator<Integer> it02 = myList.iterator();
	DataStreamSource<Integer> stream02 = env.fromCollection(it02, TypeInformation.of(Integer.class));
	stream02.print().setParallelism(1);
	env.execute();

Can run:

Problem depth

Why is this error thrown

To put it in-depth, Java needs to run on the JVM platform and be interpreted and run by the JVM in the form of bytecode. Because Flink is a distributed computing, the data in map and other operators will be distributed among various network nodes for computing. In addition, after the source code of Flink is compiled into a bytecode file, you can see from the bytecode file of the operator that the read object enters the operator, and all objects entering the operator must be serialized. If there is no serialization, an error is thrown.

Why serialization

In distributed computing, such as spark, MapReduce, Flink and other computing prerequisites, the serializability of computing objects needs to be realized. Serialization is to reduce the delay, loss and resource consumption caused by data transmission and exchange in network nodes. Objects that are not serialized will not be distributed in network nodes.

The error is thrown from the source

This error is caused by the error when Flink executes closure cleanup logic. The specific logic is in this class: org. Apache. Flink. API. Java. Closurecleaner .

Why clean up closures

Many times, anonymous classes or nested subclasses are used for convenience and quickness. When class a needs to be serialized for transmission, it also needs internal subclasses to be serialized. However, some unnecessary classes or unnecessary variable information may be referenced in general nested classes, so it is necessary to clean up Flink to save the cost of serialization.

Read More: