Tag Archives: serialize

org.apache.spark.SparkException: Task not serializable

preface

This article belongs to the column spark abnormal problems summary, which is original by the author. Please indicate the source of quotation. Please help point out the deficiencies and errors in the comment area. Thank you!

Please refer to spark exception summary for the directory structure and references of this column

Text

If an “org. Apache. Spark. Sparkexception: task not serializable” error occurs in error cause analysis, it is generally because external variables are used in parameters such as map and filter, but this variable cannot be serialized (it does not mean that external variables cannot be referenced, but serialization should be done well).

The most common case is: when a member function or variable of a class (often the current class) is referenced, all members of the class (the whole class) need to support serialization .

In many cases, the current class uses the “extends serializable” declaration to support serialization, but some fields do not support serialization, which will still lead to problems in the serialization of the whole class, and eventually lead to the problem that the task is not serialized.

Practice

one

Requirement description

Because map, filter and other operators in Spark Program internally refer to class member functions or variables, all members of this class need to support serialization, and some member variables of this class do not support serialization, which eventually leads to the problem that task cannot be serialized.

In order to verify the above reasons, we have written an example program, as shown below.

The function of this class is to filter the domain name list of a specific top-level domain (rootdomain, such as. Com, CN, org) from the domain name list (RDD), and the specific top-level domain name needs to be specified when calling the function.

Code 1

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest1 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest1")
  private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)
  private val rootDomain = conf

  private def getResult(): Unit = {
    val result = rdd.filter(item => item.contains(rootDomain))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest1 {
  def main(args: Array[String]): Unit = {
    val test = new MyTest1("com")
    test.getResult()
    test.stop()
  }
}

Log 1

According to the above analysis, the current class needs to be serialized because it depends on the member variables of the current class.

Some fields of the current class are not serialized, resulting in an error.

The actual situation is consistent with the causes analyzed. The errors during operation are as follows.

By analyzing the following logs, we can see that the error is caused by SC (sparkcontext).

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:50:42 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:50:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:50:43 INFO ResourceUtils: ==============================================================
21/07/25 15:50:43 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:50:43 INFO ResourceUtils: ==============================================================
21/07/25 15:50:43 INFO SparkContext: Submitted application: MyTest1
21/07/25 15:50:43 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:50:43 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:50:43 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:50:43 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:50:43 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:50:43 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:50:43 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:50:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:50:43 INFO Utils: Successfully started service 'sparkDriver' on port 63559.
21/07/25 15:50:43 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:50:43 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:50:43 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:50:43 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:50:43 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:50:43 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-862d818a-ff7d-473b-a321-84ca60962ada
21/07/25 15:50:43 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:50:43 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:50:44 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:50:44 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:50:44 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:50:44 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63560.
21/07/25 15:50:44 INFO NettyBlockTransferService: Server created on 192.168.0.105:63560
21/07/25 15:50:44 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:50:44 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63560, None)
21/07/25 15:50:44 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63560 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63560, None)
21/07/25 15:50:44 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63560, None)
21/07/25 15:50:44 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63560, None)
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2459)
	at org.apache.spark.rdd.RDD.$anonfun$filter$1(RDD.scala:439)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.filter(RDD.scala:438)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest1.com$shockang$study$bigdata$spark$errors$serializable$MyTest1$$getResult(MyTest1.scala:13)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest1$.main(MyTest1.scala:25)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest1.main(MyTest1.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
	- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@23811a09)
	- field (class: com.shockang.study.bigdata.spark.errors.serializable.MyTest1, name: sc, type: class org.apache.spark.SparkContext)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest1, com.shockang.study.bigdata.spark.errors.serializable.MyTest1@256aa5f2)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.shockang.study.bigdata.spark.errors.serializable.MyTest1, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/shockang/study/bigdata/spark/errors/serializable/MyTest1.$anonfun$getResult$1$adapted:(Lcom/shockang/study/bigdata/spark/errors/serializable/MyTest1;Ljava/lang/String;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/Object;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest1$$Lambda$689/1155399955, com.shockang.study.bigdata.spark.errors.serializable.MyTest1$$Lambda$689/1155399955@66bacdbc)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
	... 11 more
21/07/25 15:50:44 INFO SparkContext: Invoking stop() from shutdown hook
21/07/25 15:50:44 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:50:44 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:50:45 INFO MemoryStore: MemoryStore cleared
21/07/25 15:50:45 INFO BlockManager: BlockManager stopped
21/07/25 15:50:45 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:50:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:50:45 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:50:45 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:50:45 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-f084a4b0-2f36-4e9d-a1f6-fb2bbbbd99d9

two

Code 2

In order to verify the above conclusion, the member variables that do not need to be serialized are marked with the keyword “ @ transient “, indicating that the two member variables in the current class are not serialized.

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest2 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest2")
  @transient private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)
  private val rootDomain = conf

  private def getResult(): Unit = {
    val result = rdd.filter(item => item.contains(rootDomain))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest2 {
  def main(args: Array[String]): Unit = {
    val test = new MyTest2("com")
    test.getResult()
    test.stop()
  }
}

Log 2

Execute the program again and the program runs normally.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:51:17 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:51:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:51:18 INFO ResourceUtils: ==============================================================
21/07/25 15:51:18 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:51:18 INFO ResourceUtils: ==============================================================
21/07/25 15:51:18 INFO SparkContext: Submitted application: MyTest2
21/07/25 15:51:18 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:51:18 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:51:18 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:51:18 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:51:18 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:51:18 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:51:18 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:51:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:51:18 INFO Utils: Successfully started service 'sparkDriver' on port 63584.
21/07/25 15:51:18 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:51:18 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:51:18 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:51:18 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:51:18 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:51:18 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-292dce43-a0c2-4ba7-aed2-7786b9c34b6d
21/07/25 15:51:18 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:51:18 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:51:19 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:51:19 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:51:19 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:51:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63585.
21/07/25 15:51:19 INFO NettyBlockTransferService: Server created on 192.168.0.105:63585
21/07/25 15:51:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:51:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63585 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63585, None)
21/07/25 15:51:19 INFO SparkContext: Starting job: foreach at MyTest2.scala:14
21/07/25 15:51:19 INFO DAGScheduler: Got job 0 (foreach at MyTest2.scala:14) with 12 output partitions
21/07/25 15:51:19 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at MyTest2.scala:14)
21/07/25 15:51:19 INFO DAGScheduler: Parents of final stage: List()
21/07/25 15:51:19 INFO DAGScheduler: Missing parents: List()
21/07/25 15:51:19 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at filter at MyTest2.scala:13), which has no missing parents
21/07/25 15:51:19 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.4 KiB, free 2004.6 MiB)
21/07/25 15:51:19 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1922.0 B, free 2004.6 MiB)
21/07/25 15:51:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.105:63585 (size: 1922.0 B, free: 2004.6 MiB)
21/07/25 15:51:19 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
21/07/25 15:51:19 INFO DAGScheduler: Submitting 12 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at filter at MyTest2.scala:13) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
21/07/25 15:51:19 INFO TaskSchedulerImpl: Adding task set 0.0 with 12 tasks resource profile 0
21/07/25 15:51:19 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.105, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.0.105, executor driver, partition 1, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.0.105, executor driver, partition 2, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.0.105, executor driver, partition 3, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (192.168.0.105, executor driver, partition 4, PROCESS_LOCAL, 4461 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (192.168.0.105, executor driver, partition 5, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (192.168.0.105, executor driver, partition 6, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (192.168.0.105, executor driver, partition 7, PROCESS_LOCAL, 4456 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8) (192.168.0.105, executor driver, partition 8, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9) (192.168.0.105, executor driver, partition 9, PROCESS_LOCAL, 4460 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10) (192.168.0.105, executor driver, partition 10, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO TaskSetManager: Starting task 11.0 in stage 0.0 (TID 11) (192.168.0.105, executor driver, partition 11, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:51:19 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
21/07/25 15:51:19 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/07/25 15:51:19 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
21/07/25 15:51:19 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
21/07/25 15:51:19 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/07/25 15:51:19 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
21/07/25 15:51:19 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
21/07/25 15:51:19 INFO Executor: Running task 10.0 in stage 0.0 (TID 10)
21/07/25 15:51:19 INFO Executor: Running task 11.0 in stage 0.0 (TID 11)
21/07/25 15:51:19 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
21/07/25 15:51:19 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
21/07/25 15:51:19 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
www.b.com
a.com
a.com.cn
21/07/25 15:51:20 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 880 bytes result sent to driver
21/07/25 15:51:20 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 880 bytes result sent to driver
21/07/25 15:51:20 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 595 ms on 192.168.0.105 (executor driver) (1/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 600 ms on 192.168.0.105 (executor driver) (2/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 601 ms on 192.168.0.105 (executor driver) (3/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 599 ms on 192.168.0.105 (executor driver) (4/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 603 ms on 192.168.0.105 (executor driver) (5/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 602 ms on 192.168.0.105 (executor driver) (6/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 606 ms on 192.168.0.105 (executor driver) (7/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 603 ms on 192.168.0.105 (executor driver) (8/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 607 ms on 192.168.0.105 (executor driver) (9/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 606 ms on 192.168.0.105 (executor driver) (10/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 609 ms on 192.168.0.105 (executor driver) (11/12)
21/07/25 15:51:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 637 ms on 192.168.0.105 (executor driver) (12/12)
21/07/25 15:51:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/07/25 15:51:20 INFO DAGScheduler: ResultStage 0 (foreach at MyTest2.scala:14) finished in 0.763 s
21/07/25 15:51:20 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/07/25 15:51:20 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
21/07/25 15:51:20 INFO DAGScheduler: Job 0 finished: foreach at MyTest2.scala:14, took 0.807256 s
21/07/25 15:51:20 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:51:20 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:51:20 INFO MemoryStore: MemoryStore cleared
21/07/25 15:51:20 INFO BlockManager: BlockManager stopped
21/07/25 15:51:20 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:51:20 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:51:20 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:51:20 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:51:20 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-b6b9cd75-c3cc-4701-8b63-93e25180bd56

Preliminary conclusion

Therefore, we can draw a conclusion from the above example:

Because map, filter and other operators in Spark Program internally refer to class member functions or variables, all members of this class need to support serialization. In addition, some member variables of this class do not support serialization, which eventually leads to the problem that task cannot be serialized.

On the contrary, after labeling the member variables in the class that do not support serialization, the whole class can be serialized normally, and finally the problem of non serialization of task is eliminated.

three

Instance analysis of reference member function

Member variables and member functions have the same impact on serialization, that is, if a member function of a class is referenced, all members of the class will support serialization.

To test this hypothesis, we use a member function of the current class in the map to add the prefix “www.” to the header of the domain name if the current domain name does not start with “www.”

Note: since rootdomain is defined inside the getResult function, there is no problem of referencing class member variables, and the problems discussed and caused in the previous example are not present and excluded
therefore, this example mainly discusses the impact of member function reference:
in addition, not directly referencing class member variables is also a means to solve this kind of problem. For example, in this example, in order to eliminate the impact of member variables, variables are defined inside functions.

Code 3

The following code will also report an error. Like the above example, the serialization of the current class is problematic because the two member variables SC (sparkcontext) and sparkconf (sparkconf) in the current class are not serialized properly.

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest3 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest3")
  private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)

  private def getResult(): Unit = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => addWWW(item))
    result.foreach(println)
  }

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest3 {
  def main(args: Array[String]): Unit = {
    val test = new MyTest3("com")
    test.getResult()
    test.stop()
  }
}

Log 3

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:52:26 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:52:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:52:27 INFO ResourceUtils: ==============================================================
21/07/25 15:52:27 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:52:27 INFO ResourceUtils: ==============================================================
21/07/25 15:52:27 INFO SparkContext: Submitted application: MyTest3
21/07/25 15:52:27 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:52:27 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:52:27 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:52:27 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:52:27 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:52:27 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:52:27 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:52:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:52:27 INFO Utils: Successfully started service 'sparkDriver' on port 63658.
21/07/25 15:52:27 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:52:27 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:52:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:52:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:52:27 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:52:27 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-135805bc-d9ab-4d01-a374-f5631e2cb311
21/07/25 15:52:27 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:52:27 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:52:28 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:52:28 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:52:28 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:52:28 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63659.
21/07/25 15:52:28 INFO NettyBlockTransferService: Server created on 192.168.0.105:63659
21/07/25 15:52:28 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:52:28 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63659, None)
21/07/25 15:52:28 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63659 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63659, None)
21/07/25 15:52:28 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63659, None)
21/07/25 15:52:28 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63659, None)
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2459)
	at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.map(RDD.scala:421)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest3.com$shockang$study$bigdata$spark$errors$serializable$MyTest3$$getResult(MyTest3.scala:13)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest3$.main(MyTest3.scala:29)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest3.main(MyTest3.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
	- object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@3900fa71)
	- field (class: com.shockang.study.bigdata.spark.errors.serializable.MyTest3, name: sc, type: class org.apache.spark.SparkContext)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest3, com.shockang.study.bigdata.spark.errors.serializable.MyTest3@5c82cd4f)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.shockang.study.bigdata.spark.errors.serializable.MyTest3, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/shockang/study/bigdata/spark/errors/serializable/MyTest3.$anonfun$getResult$2:(Lcom/shockang/study/bigdata/spark/errors/serializable/MyTest3;Ljava/lang/String;)Ljava/lang/String;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/String;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest3$$Lambda$703/393481646, com.shockang.study.bigdata.spark.errors.serializable.MyTest3$$Lambda$703/393481646@3c6aa04a)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
	... 11 more
21/07/25 15:52:28 INFO SparkContext: Invoking stop() from shutdown hook
21/07/25 15:52:28 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:52:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:52:28 INFO MemoryStore: MemoryStore cleared
21/07/25 15:52:28 INFO BlockManager: BlockManager stopped
21/07/25 15:52:28 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:52:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:52:28 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:52:28 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:52:28 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-43b61841-b823-4c8d-9589-725962d49c2a

four

Code 4

As before, after labeling SC (sparkcontext) and sparkconf (sparkconf) member variables with “@ transient”, the current class will not serialize these two variables, and the program can run normally.

In addition, slightly different from the member variable, because the member function does not depend on a specific member variable, it can be defined in scala’s object (similar to the static function in Java), which also eliminates the dependence on a specific class.

As shown in the following example, put addwww into the object object and call it directly in the filter operation. After processing, the program can run normally

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest4 private(conf: String) extends Serializable {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest4")
  private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)

  private def getResult(): Unit = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest4.addWWW(item))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest4 {

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  def main(args: Array[String]): Unit = {
    val test = new MyTest4("com")
    test.getResult()
    test.stop()
  }
}

Log 4

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 15:53:24 INFO SparkContext: Running Spark version 3.1.2
21/07/25 15:53:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 15:53:24 INFO ResourceUtils: ==============================================================
21/07/25 15:53:24 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 15:53:24 INFO ResourceUtils: ==============================================================
21/07/25 15:53:24 INFO SparkContext: Submitted application: MyTest4
21/07/25 15:53:24 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 15:53:24 INFO ResourceProfile: Limiting resource is cpu
21/07/25 15:53:24 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 15:53:24 INFO SecurityManager: Changing view acls to: shockang
21/07/25 15:53:24 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 15:53:24 INFO SecurityManager: Changing view acls groups to: 
21/07/25 15:53:24 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 15:53:24 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 15:53:25 INFO Utils: Successfully started service 'sparkDriver' on port 63720.
21/07/25 15:53:25 INFO SparkEnv: Registering MapOutputTracker
21/07/25 15:53:25 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 15:53:25 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 15:53:25 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 15:53:25 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 15:53:25 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-44d2c8c8-0c79-4d90-b0c5-94ea0528a8ba
21/07/25 15:53:25 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 15:53:25 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 15:53:25 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 15:53:25 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 15:53:25 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 15:53:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 63721.
21/07/25 15:53:25 INFO NettyBlockTransferService: Server created on 192.168.0.105:63721
21/07/25 15:53:25 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 15:53:25 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:25 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:63721 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:25 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:25 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 63721, None)
21/07/25 15:53:26 INFO SparkContext: Starting job: foreach at MyTest4.scala:14
21/07/25 15:53:26 INFO DAGScheduler: Got job 0 (foreach at MyTest4.scala:14) with 12 output partitions
21/07/25 15:53:26 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at MyTest4.scala:14)
21/07/25 15:53:26 INFO DAGScheduler: Parents of final stage: List()
21/07/25 15:53:26 INFO DAGScheduler: Missing parents: List()
21/07/25 15:53:26 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at map at MyTest4.scala:13), which has no missing parents
21/07/25 15:53:26 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.6 KiB, free 2004.6 MiB)
21/07/25 15:53:26 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2003.0 B, free 2004.6 MiB)
21/07/25 15:53:26 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.105:63721 (size: 2003.0 B, free: 2004.6 MiB)
21/07/25 15:53:26 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
21/07/25 15:53:26 INFO DAGScheduler: Submitting 12 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at map at MyTest4.scala:13) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
21/07/25 15:53:26 INFO TaskSchedulerImpl: Adding task set 0.0 with 12 tasks resource profile 0
21/07/25 15:53:26 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.105, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.0.105, executor driver, partition 1, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.0.105, executor driver, partition 2, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.0.105, executor driver, partition 3, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (192.168.0.105, executor driver, partition 4, PROCESS_LOCAL, 4461 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (192.168.0.105, executor driver, partition 5, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (192.168.0.105, executor driver, partition 6, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (192.168.0.105, executor driver, partition 7, PROCESS_LOCAL, 4456 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8) (192.168.0.105, executor driver, partition 8, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9) (192.168.0.105, executor driver, partition 9, PROCESS_LOCAL, 4460 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10) (192.168.0.105, executor driver, partition 10, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO TaskSetManager: Starting task 11.0 in stage 0.0 (TID 11) (192.168.0.105, executor driver, partition 11, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 15:53:26 INFO Executor: Running task 11.0 in stage 0.0 (TID 11)
21/07/25 15:53:26 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/07/25 15:53:26 INFO Executor: Running task 10.0 in stage 0.0 (TID 10)
21/07/25 15:53:26 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
21/07/25 15:53:26 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/07/25 15:53:26 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
21/07/25 15:53:26 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
21/07/25 15:53:26 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
21/07/25 15:53:26 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
21/07/25 15:53:26 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
21/07/25 15:53:26 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
21/07/25 15:53:26 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
www.a.com.cn
www.b.com
www.a.com
21/07/25 15:53:27 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 880 bytes result sent to driver
21/07/25 15:53:27 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 880 bytes result sent to driver
21/07/25 15:53:27 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 574 ms on 192.168.0.105 (executor driver) (1/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 539 ms on 192.168.0.105 (executor driver) (2/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 542 ms on 192.168.0.105 (executor driver) (3/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 539 ms on 192.168.0.105 (executor driver) (4/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 546 ms on 192.168.0.105 (executor driver) (5/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 546 ms on 192.168.0.105 (executor driver) (6/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 549 ms on 192.168.0.105 (executor driver) (7/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 546 ms on 192.168.0.105 (executor driver) (8/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 549 ms on 192.168.0.105 (executor driver) (9/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 545 ms on 192.168.0.105 (executor driver) (10/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 551 ms on 192.168.0.105 (executor driver) (11/12)
21/07/25 15:53:27 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 544 ms on 192.168.0.105 (executor driver) (12/12)
21/07/25 15:53:27 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/07/25 15:53:27 INFO DAGScheduler: ResultStage 0 (foreach at MyTest4.scala:14) finished in 0.716 s
21/07/25 15:53:27 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/07/25 15:53:27 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
21/07/25 15:53:27 INFO DAGScheduler: Job 0 finished: foreach at MyTest4.scala:14, took 0.760258 s
21/07/25 15:53:27 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 15:53:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 15:53:27 INFO MemoryStore: MemoryStore cleared
21/07/25 15:53:27 INFO BlockManager: BlockManager stopped
21/07/25 15:53:27 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 15:53:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 15:53:27 INFO SparkContext: Successfully stopped SparkContext
21/07/25 15:53:27 INFO ShutdownHookManager: Shutdown hook called
21/07/25 15:53:27 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-8a24c6de-c463-40e9-817a-8ad40561f024

Analysis 4

As mentioned above, if a class member function is referenced, the class and all members need to support serialization.

Therefore, when a certain type of member variable or function is used, the class needs to be serialized first, and some member variables that do not need to be serialized need to be marked to avoid affecting serialization.

five

Remove extends serializable

For the above two examples, because the member variables or functions of the class are referenced, the class and all members support serialization. In order to eliminate the impact of some member variables on serialization, use “@ transient” for annotation.

In order to further verify the assumption that the whole class needs serialization, delete the code related to class serialization (remove extends serializable)

In this way, the program execution will report an error that the class is not serialized, as shown below.

Caused by: java.io.NotSerializableException: com.shockang.study.bigdata.spark.errors.serializable.MyTest5

Therefore, this example illustrates the above hypothesis.

Code 5

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest5 private(conf: String) {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest5")
  @transient private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)
  private val rootDomain = conf

  private def getResult(): Unit = {
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest5.addWWW(item))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest5 {

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  def main(args: Array[String]): Unit = {
    val test = new MyTest5("com")
    test.getResult()
    test.stop()
  }
}

Log 5

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 16:01:25 INFO SparkContext: Running Spark version 3.1.2
21/07/25 16:01:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 16:01:25 INFO ResourceUtils: ==============================================================
21/07/25 16:01:25 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 16:01:25 INFO ResourceUtils: ==============================================================
21/07/25 16:01:25 INFO SparkContext: Submitted application: MyTest5
21/07/25 16:01:25 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 16:01:25 INFO ResourceProfile: Limiting resource is cpu
21/07/25 16:01:25 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 16:01:25 INFO SecurityManager: Changing view acls to: shockang
21/07/25 16:01:25 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 16:01:25 INFO SecurityManager: Changing view acls groups to: 
21/07/25 16:01:25 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 16:01:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 16:01:26 INFO Utils: Successfully started service 'sparkDriver' on port 64099.
21/07/25 16:01:26 INFO SparkEnv: Registering MapOutputTracker
21/07/25 16:01:26 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 16:01:26 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 16:01:26 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 16:01:26 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 16:01:26 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-2276d227-1ad9-4742-96c1-aa4507fb17f5
21/07/25 16:01:26 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 16:01:26 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 16:01:26 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 16:01:26 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 16:01:26 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 16:01:26 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64100.
21/07/25 16:01:26 INFO NettyBlockTransferService: Server created on 192.168.0.105:64100
21/07/25 16:01:26 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 16:01:26 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 64100, None)
21/07/25 16:01:26 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:64100 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 64100, None)
21/07/25 16:01:26 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 64100, None)
21/07/25 16:01:26 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 64100, None)
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:2459)
	at org.apache.spark.rdd.RDD.$anonfun$filter$1(RDD.scala:439)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.filter(RDD.scala:438)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest5.com$shockang$study$bigdata$spark$errors$serializable$MyTest5$$getResult(MyTest5.scala:13)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest5$.main(MyTest5.scala:30)
	at com.shockang.study.bigdata.spark.errors.serializable.MyTest5.main(MyTest5.scala)
Caused by: java.io.NotSerializableException: com.shockang.study.bigdata.spark.errors.serializable.MyTest5
Serialization stack:
	- object not serializable (class: com.shockang.study.bigdata.spark.errors.serializable.MyTest5, value: com.shockang.study.bigdata.spark.errors.serializable.MyTest5@256aa5f2)
	- element of array (index: 0)
	- array (class [Ljava.lang.Object;, size 1)
	- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
	- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class com.shockang.study.bigdata.spark.errors.serializable.MyTest5, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic com/shockang/study/bigdata/spark/errors/serializable/MyTest5.$anonfun$getResult$1$adapted:(Lcom/shockang/study/bigdata/spark/errors/serializable/MyTest5;Ljava/lang/String;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/String;)Ljava/lang/Object;, numCaptured=1])
	- writeReplace data (class: java.lang.invoke.SerializedLambda)
	- object (class com.shockang.study.bigdata.spark.errors.serializable.MyTest5$$Lambda$689/1155399955, com.shockang.study.bigdata.spark.errors.serializable.MyTest5$$Lambda$689/1155399955@66bacdbc)
	at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413)
	... 11 more
21/07/25 16:01:27 INFO SparkContext: Invoking stop() from shutdown hook
21/07/25 16:01:27 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 16:01:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 16:01:27 INFO MemoryStore: MemoryStore cleared
21/07/25 16:01:27 INFO BlockManager: BlockManager stopped
21/07/25 16:01:27 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 16:01:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 16:01:27 INFO SparkContext: Successfully stopped SparkContext
21/07/25 16:01:27 INFO ShutdownHookManager: Shutdown hook called
21/07/25 16:01:27 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-a750f6ba-13a5-4c0a-a79d-8fa86fa19f0b

Analysis 5

The above example shows that external variables and member variables of a class can be referenced inside operators such as map, but the serialization of this class should be done well.

First, this class needs to inherit the serializable class. In addition, handle some member variables in the class that will cause serialization errors, which is also the main reason for the non serialization of tasks.

In case of such problems, first check which member variable cannot be serialized. For member variables that do not need to be serialized, use “@ transient” annotation.

In addition, it is not that the class where the map operation is located must be serialized (inheriting the serializable class). For cases where it is not necessary to reference a member variable or function of a class, the corresponding class will not be required to implement serialization, as shown in the following example.

The filter operation does not reference any class member variables or functions, so the current class does not need serialization, and the program can execute normally.

six

Code 6

package com.shockang.study.bigdata.spark.errors.serializable

import org.apache.spark.{SparkConf, SparkContext}

class MyTest6 private(conf: String) {
  private val list = List("a.com", "www.b.com", "a.cn", "a.com.cn", "a.org")
  @transient private val sparkConf = new SparkConf().setMaster("local[*]").setAppName("MyTest6")
  @transient private val sc = new SparkContext(sparkConf)
  private val rdd = sc.parallelize(list)

  private def getResult(): Unit = {
    val rootDomain = conf
    val result = rdd.filter(item => item.contains(rootDomain)).map(item => MyTest6.addWWW(item))
    result.foreach(println)
  }

  private def stop(): Unit = {
    sc.stop()
  }
}

object MyTest6 {

  private def addWWW(str: String): String = {
    if (str.startsWith("www")) str else "www." + str
  }

  def main(args: Array[String]): Unit = {
    val test = new MyTest6("com")
    test.getResult()
    test.stop()
  }
}

Log 6

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/07/25 16:16:37 INFO SparkContext: Running Spark version 3.1.2
21/07/25 16:16:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/07/25 16:16:37 INFO ResourceUtils: ==============================================================
21/07/25 16:16:37 INFO ResourceUtils: No custom resources configured for spark.driver.
21/07/25 16:16:37 INFO ResourceUtils: ==============================================================
21/07/25 16:16:37 INFO SparkContext: Submitted application: MyTest6
21/07/25 16:16:37 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
21/07/25 16:16:37 INFO ResourceProfile: Limiting resource is cpu
21/07/25 16:16:37 INFO ResourceProfileManager: Added ResourceProfile id: 0
21/07/25 16:16:37 INFO SecurityManager: Changing view acls to: shockang
21/07/25 16:16:37 INFO SecurityManager: Changing modify acls to: shockang
21/07/25 16:16:37 INFO SecurityManager: Changing view acls groups to: 
21/07/25 16:16:37 INFO SecurityManager: Changing modify acls groups to: 
21/07/25 16:16:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(shockang); groups with view permissions: Set(); users  with modify permissions: Set(shockang); groups with modify permissions: Set()
21/07/25 16:16:37 INFO Utils: Successfully started service 'sparkDriver' on port 64622.
21/07/25 16:16:37 INFO SparkEnv: Registering MapOutputTracker
21/07/25 16:16:37 INFO SparkEnv: Registering BlockManagerMaster
21/07/25 16:16:37 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
21/07/25 16:16:37 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
21/07/25 16:16:37 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
21/07/25 16:16:37 INFO DiskBlockManager: Created local directory at /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/blockmgr-24ef1968-ab12-416b-ba8a-6b0c79994d55
21/07/25 16:16:37 INFO MemoryStore: MemoryStore started with capacity 2004.6 MiB
21/07/25 16:16:37 INFO SparkEnv: Registering OutputCommitCoordinator
21/07/25 16:16:38 INFO Utils: Successfully started service 'SparkUI' on port 4040.
21/07/25 16:16:38 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.105:4040
21/07/25 16:16:38 INFO Executor: Starting executor ID driver on host 192.168.0.105
21/07/25 16:16:38 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64623.
21/07/25 16:16:38 INFO NettyBlockTransferService: Server created on 192.168.0.105:64623
21/07/25 16:16:38 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/07/25 16:16:38 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.105, 64623, None)
21/07/25 16:16:38 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.105:64623 with 2004.6 MiB RAM, BlockManagerId(driver, 192.168.0.105, 64623, None)
21/07/25 16:16:38 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.105, 64623, None)
21/07/25 16:16:38 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.0.105, 64623, None)
21/07/25 16:16:38 INFO SparkContext: Starting job: foreach at MyTest6.scala:14
21/07/25 16:16:38 INFO DAGScheduler: Got job 0 (foreach at MyTest6.scala:14) with 12 output partitions
21/07/25 16:16:38 INFO DAGScheduler: Final stage: ResultStage 0 (foreach at MyTest6.scala:14)
21/07/25 16:16:38 INFO DAGScheduler: Parents of final stage: List()
21/07/25 16:16:38 INFO DAGScheduler: Missing parents: List()
21/07/25 16:16:38 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at map at MyTest6.scala:13), which has no missing parents
21/07/25 16:16:39 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.4 KiB, free 2004.6 MiB)
21/07/25 16:16:39 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1919.0 B, free 2004.6 MiB)
21/07/25 16:16:39 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.0.105:64623 (size: 1919.0 B, free: 2004.6 MiB)
21/07/25 16:16:39 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1388
21/07/25 16:16:39 INFO DAGScheduler: Submitting 12 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at map at MyTest6.scala:13) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11))
21/07/25 16:16:39 INFO TaskSchedulerImpl: Adding task set 0.0 with 12 tasks resource profile 0
21/07/25 16:16:39 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.0.105, executor driver, partition 0, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (192.168.0.105, executor driver, partition 1, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2) (192.168.0.105, executor driver, partition 2, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3) (192.168.0.105, executor driver, partition 3, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4) (192.168.0.105, executor driver, partition 4, PROCESS_LOCAL, 4461 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5) (192.168.0.105, executor driver, partition 5, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6) (192.168.0.105, executor driver, partition 6, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7) (192.168.0.105, executor driver, partition 7, PROCESS_LOCAL, 4456 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8) (192.168.0.105, executor driver, partition 8, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9) (192.168.0.105, executor driver, partition 9, PROCESS_LOCAL, 4460 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10) (192.168.0.105, executor driver, partition 10, PROCESS_LOCAL, 4449 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO TaskSetManager: Starting task 11.0 in stage 0.0 (TID 11) (192.168.0.105, executor driver, partition 11, PROCESS_LOCAL, 4457 bytes) taskResourceAssignments Map()
21/07/25 16:16:39 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/07/25 16:16:39 INFO Executor: Running task 11.0 in stage 0.0 (TID 11)
21/07/25 16:16:39 INFO Executor: Running task 7.0 in stage 0.0 (TID 7)
21/07/25 16:16:39 INFO Executor: Running task 10.0 in stage 0.0 (TID 10)
21/07/25 16:16:39 INFO Executor: Running task 6.0 in stage 0.0 (TID 6)
21/07/25 16:16:39 INFO Executor: Running task 4.0 in stage 0.0 (TID 4)
21/07/25 16:16:39 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
21/07/25 16:16:39 INFO Executor: Running task 3.0 in stage 0.0 (TID 3)
21/07/25 16:16:39 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
21/07/25 16:16:39 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
21/07/25 16:16:39 INFO Executor: Running task 5.0 in stage 0.0 (TID 5)
21/07/25 16:16:39 INFO Executor: Running task 9.0 in stage 0.0 (TID 9)
www.b.com
www.a.com
www.a.com.cn
21/07/25 16:16:39 INFO Executor: Finished task 4.0 in stage 0.0 (TID 4). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 6.0 in stage 0.0 (TID 6). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 3.0 in stage 0.0 (TID 3). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 9.0 in stage 0.0 (TID 9). 923 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 8.0 in stage 0.0 (TID 8). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 7.0 in stage 0.0 (TID 7). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 5.0 in stage 0.0 (TID 5). 880 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 923 bytes result sent to driver
21/07/25 16:16:39 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 880 bytes result sent to driver
21/07/25 16:16:39 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 706 ms on 192.168.0.105 (executor driver) (1/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 665 ms on 192.168.0.105 (executor driver) (2/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 663 ms on 192.168.0.105 (executor driver) (3/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 664 ms on 192.168.0.105 (executor driver) (4/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 667 ms on 192.168.0.105 (executor driver) (5/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 666 ms on 192.168.0.105 (executor driver) (6/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 672 ms on 192.168.0.105 (executor driver) (7/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 667 ms on 192.168.0.105 (executor driver) (8/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 671 ms on 192.168.0.105 (executor driver) (9/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 670 ms on 192.168.0.105 (executor driver) (10/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 672 ms on 192.168.0.105 (executor driver) (11/12)
21/07/25 16:16:39 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 667 ms on 192.168.0.105 (executor driver) (12/12)
21/07/25 16:16:39 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/07/25 16:16:39 INFO DAGScheduler: ResultStage 0 (foreach at MyTest6.scala:14) finished in 0.863 s
21/07/25 16:16:39 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job
21/07/25 16:16:39 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished
21/07/25 16:16:39 INFO DAGScheduler: Job 0 finished: foreach at MyTest6.scala:14, took 0.910534 s
21/07/25 16:16:39 INFO SparkUI: Stopped Spark web UI at http://192.168.0.105:4040
21/07/25 16:16:39 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
21/07/25 16:16:39 INFO MemoryStore: MemoryStore cleared
21/07/25 16:16:39 INFO BlockManager: BlockManager stopped
21/07/25 16:16:39 INFO BlockManagerMaster: BlockManagerMaster stopped
21/07/25 16:16:39 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
21/07/25 16:16:39 INFO SparkContext: Successfully stopped SparkContext
21/07/25 16:16:39 INFO ShutdownHookManager: Shutdown hook called
21/07/25 16:16:39 INFO ShutdownHookManager: Deleting directory /private/var/folders/hr/js774vr11dndfxny1324ch980000gn/T/spark-6e765980-e5b6-410a-b0e7-5635b476cb11

Summary

As mentioned above, such problems are mainly caused by referencing the member variables or functions of a class and the corresponding class is not serialized.

There are two ways to solve this problem:

1. Do not directly refer to a certain type of member function or member variable inside (or not directly) closures such as map

For cases that depend on a certain type of member variable:

    if the value that the program depends on is relatively fixed, it can be fixed, or defined inside operations such as map and filter, or in scala object objects (similar to static variables in Java). If the dependency value needs to be specified dynamically during program call (in the form of function parameters), during map, filter and other operations, you can not directly refer to the member variable, but redefine a local variable according to the value of the member variable in the getResult function similar to the above example. In this way, map and other operators do not need to refer to the member variable of the class

    For the case of relying on a certain type of member function:

    If the function is functionally independent, it can be defined in the scala object object (similar to the static method in Java), so there is no need for a specific class.

    2. If a member function or variable of a class is referenced, the corresponding class needs to be serialized.

    In this case, the class needs to be serialized. First, the class inherits the serialized class, and then use the “@ transient” annotation for the member variables that cannot be serialized to tell the compiler that serialization is not required.

    In addition, if possible, the dependent variables can be independently put into a small class to make the class support serialization, which can reduce the amount of network transmission and improve efficiency.