SparkStreaming error in simple example : java.lang.IllegalArgumentException












0















I am trying to implement a simple wordcount example for sparkStreaming by listening in localhost:9999 and sending streams of text using nc -lk 9999.



I haven't implemented the logic of the wordcount yet, seeing as I can't execute this simple code. I think something might be wrong with the way I try to acquire the streams of information.



Bellow is my code and the exception that is generated:



Code:



import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


public class NetworkWordCount {
public static void main(String args) throws InterruptedException{
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")//;
.setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
lines.print();

jssc.start();
jssc.awaitTermination();
}
}


Log:



    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/19 11:47:26 INFO SparkContext: Running Spark version 2.2.1
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/root/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
19/01/19 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/19 11:47:27 WARN Utils: Your hostname, imad-Alienware-15-R3 resolves to a loopback address: 127.0.1.1; using 192.168.43.120 instead (on interface wlp61s0)
19/01/19 11:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/19 11:47:27 INFO SparkContext: Submitted application: NetworkWordCount
19/01/19 11:47:27 INFO SecurityManager: Changing view acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing view acls groups to:
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls groups to:
19/01/19 11:47:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
19/01/19 11:47:27 INFO Utils: Successfully started service 'sparkDriver' on port 46093.
19/01/19 11:47:27 INFO SparkEnv: Registering MapOutputTracker
19/01/19 11:47:27 INFO SparkEnv: Registering BlockManagerMaster
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/01/19 11:47:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cc0cd249-2827-43ee-8497-14cba267d755
19/01/19 11:47:27 INFO MemoryStore: MemoryStore started with capacity 997.2 MB
19/01/19 11:47:27 INFO SparkEnv: Registering OutputCommitCoordinator
19/01/19 11:47:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/01/19 11:47:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.43.120:4040
19/01/19 11:47:27 INFO Executor: Starting executor ID driver on host localhost
19/01/19 11:47:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37585.
19/01/19 11:47:27 INFO NettyBlockTransferService: Server created on 192.168.43.120:37585
19/01/19 11:47:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/01/19 11:47:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.43.120:37585 with 997.2 MB RAM, BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
19/01/19 11:47:28 INFO ReceiverTracker: Starting 1 receivers
19/01/19 11:47:28 INFO ReceiverTracker: ReceiverTracker started
19/01/19 11:47:28 INFO SocketInputDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO SocketInputDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO SocketInputDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5e72eed9
19/01/19 11:47:28 INFO ForEachDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO ForEachDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO ForEachDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7779b595
19/01/19 11:47:28 INFO RecurringTimer: Started timer for JobGenerator at time 1547894849000
19/01/19 11:47:28 INFO ReceiverTracker: Receiver 0 started
19/01/19 11:47:28 INFO JobGenerator: Started JobGenerator at 1547894849000 ms
19/01/19 11:47:28 INFO JobScheduler: Started JobScheduler
19/01/19 11:47:28 INFO DAGScheduler: Got job 0 (start at NetworkWordCount.java:28) with 1 output partitions
19/01/19 11:47:28 INFO DAGScheduler: Final stage: ResultStage 0 (start at NetworkWordCount.java:28)
19/01/19 11:47:28 INFO DAGScheduler: Parents of final stage: List()
19/01/19 11:47:28 INFO DAGScheduler: Missing parents: List()
19/01/19 11:47:28 INFO StreamingContext: StreamingContext started
19/01/19 11:47:28 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620), which has no missing parents
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.5 KB, free 997.2 MB)
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.43.120:37585 (size: 15.9 KB, free: 997.2 MB)
19/01/19 11:47:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
19/01/19 11:47:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620) (first 15 tasks are for partitions Vector(0))
19/01/19 11:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
19/01/19 11:47:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5414 bytes)
19/01/19 11:47:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/19 11:47:28 INFO RecurringTimer: Started timer for BlockGenerator at time 1547894848600
19/01/19 11:47:28 INFO BlockGenerator: Started BlockGenerator
19/01/19 11:47:28 INFO BlockGenerator: Started block pushing thread
19/01/19 11:47:28 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.43.120:46093
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Starting receiver 0
19/01/19 11:47:28 INFO SocketReceiver: Connecting to localhost:9999
19/01/19 11:47:28 INFO SocketReceiver: Connected to localhost:9999
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
19/01/19 11:47:28 INFO MemoryStore: Block input-0-1547894848400 stored as bytes in memory (estimated size 55.0 B, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added input-0-1547894848400 in memory on 192.168.43.120:37585 (size: 55.0 B, free: 997.2 MB)
19/01/19 11:47:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/01/19 11:47:28 WARN BlockManager: Block input-0-1547894848400 replicated to only 0 peer(s) instead of 1 peers
19/01/19 11:47:28 INFO BlockGenerator: Pushed block input-0-1547894848400
19/01/19 11:47:29 INFO JobScheduler: Added jobs for time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Starting job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Finished job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 ERROR JobScheduler: Error running job streaming job 1547894849000 ms.0
java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
19/01/19 11:47:29 INFO ReceiverTracker: Sent stop signal to all 1 receivers
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Received stop signal
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver:
19/01/19 11:47:29 INFO SocketReceiver: Closed socket to localhost:9999
19/01/19 11:47:29 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Called receiver onStop
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Deregistering receiver 0
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver 0
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving data: java.net.SocketException: Socket closed
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Receiver has been stopped
19/01/19 11:47:29 INFO BlockGenerator: Stopping BlockGenerator
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for BlockGenerator after time 1547894849400
19/01/19 11:47:29 INFO BlockGenerator: Waiting for block pushing thread to terminate
19/01/19 11:47:29 INFO BlockGenerator: Pushing out the last 0 blocks
19/01/19 11:47:29 INFO BlockGenerator: Stopped block pushing thread
19/01/19 11:47:29 INFO BlockGenerator: Stopped BlockGenerator
Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:196)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver without error
19/01/19 11:47:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver
19/01/19 11:47:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1063 ms on localhost (executor driver) (1/1)
19/01/19 11:47:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/01/19 11:47:29 INFO DAGScheduler: ResultStage 0 (start at NetworkWordCount.java:28) finished in 1.085 s
19/01/19 11:47:29 INFO ReceiverTracker: All of the receivers have deregistered successfully
19/01/19 11:47:29 INFO ReceiverTracker: ReceiverTracker stopped
19/01/19 11:47:29 INFO JobGenerator: Stopping JobGenerator immediately
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for JobGenerator after time 1547894849000
19/01/19 11:47:29 INFO JobGenerator: Stopped JobGenerator
19/01/19 11:47:29 INFO JobScheduler: Stopped JobScheduler
19/01/19 11:47:29 INFO StreamingContext: StreamingContext stopped successfully
19/01/19 11:47:29 INFO SparkContext: Invoking stop() from shutdown hook
19/01/19 11:47:29 INFO SparkUI: Stopped Spark web UI at http://192.168.43.120:4040
19/01/19 11:47:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/01/19 11:47:29 INFO MemoryStore: MemoryStore cleared
19/01/19 11:47:29 INFO BlockManager: BlockManager stopped
19/01/19 11:47:29 INFO BlockManagerMaster: BlockManagerMaster stopped
19/01/19 11:47:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/01/19 11:47:29 INFO SparkContext: Successfully stopped SparkContext
19/01/19 11:47:29 INFO ShutdownHookManager: Shutdown hook called
19/01/19 11:47:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-74f53f9e-d91c-4870-855c-f4272031804b

Process finished with exit code 1


EDIT and FIX : The problem lies with the calling of print() with a JavaDStream or a JavaPairDStream.



I have completed the script for the word count as such:



import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;


public class NetworkWordCount {
public static void main(String args) throws InterruptedException {
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")
.setMaster("local");
JavaStreamingContext jssc =
new JavaStreamingContext(conf, Durations.seconds(1));

JavaReceiverInputDStream<String> lines =
jssc.socketTextStream("localhost", 9999);

JavaDStream<String> words =
lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs =
words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts =
pairs.reduceByKey((i1, i2) -> i1 + i2);

wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
jssc.start();
jssc.awaitTermination();
}
}

wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));


Once I launch the nc -lk 9999 and enter the following lines:




Hello my man



How you doing



Im not sure this is working



but i hope it doesnt mess up




I get the following results:



First entry:




Hello-----1



my-----1



man-----1




Second entry:




you-----1



How-----1



doing-----1




Third entry:




Im-----1



this-----1



not-----1



is-----1



sure-----1



working-----1




Fourth entry:




up-----1



it-----1



hope-----1



mess-----1



i-----1



doesnt-----1



but-----1




I can't explain why this solution works but fails when using JavaDStream.print(). An explanation would be very helpful.



Thank you very much










share|improve this question





























    0















    I am trying to implement a simple wordcount example for sparkStreaming by listening in localhost:9999 and sending streams of text using nc -lk 9999.



    I haven't implemented the logic of the wordcount yet, seeing as I can't execute this simple code. I think something might be wrong with the way I try to acquire the streams of information.



    Bellow is my code and the exception that is generated:



    Code:



    import org.apache.spark.SparkConf;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;


    public class NetworkWordCount {
    public static void main(String args) throws InterruptedException{
    SparkConf conf = new SparkConf()
    .setAppName("NetworkWordCount")//;
    .setMaster("local");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
    lines.print();

    jssc.start();
    jssc.awaitTermination();
    }
    }


    Log:



        Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    19/01/19 11:47:26 INFO SparkContext: Running Spark version 2.2.1
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/root/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
    WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    19/01/19 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    19/01/19 11:47:27 WARN Utils: Your hostname, imad-Alienware-15-R3 resolves to a loopback address: 127.0.1.1; using 192.168.43.120 instead (on interface wlp61s0)
    19/01/19 11:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    19/01/19 11:47:27 INFO SparkContext: Submitted application: NetworkWordCount
    19/01/19 11:47:27 INFO SecurityManager: Changing view acls to: root
    19/01/19 11:47:27 INFO SecurityManager: Changing modify acls to: root
    19/01/19 11:47:27 INFO SecurityManager: Changing view acls groups to:
    19/01/19 11:47:27 INFO SecurityManager: Changing modify acls groups to:
    19/01/19 11:47:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
    19/01/19 11:47:27 INFO Utils: Successfully started service 'sparkDriver' on port 46093.
    19/01/19 11:47:27 INFO SparkEnv: Registering MapOutputTracker
    19/01/19 11:47:27 INFO SparkEnv: Registering BlockManagerMaster
    19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    19/01/19 11:47:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cc0cd249-2827-43ee-8497-14cba267d755
    19/01/19 11:47:27 INFO MemoryStore: MemoryStore started with capacity 997.2 MB
    19/01/19 11:47:27 INFO SparkEnv: Registering OutputCommitCoordinator
    19/01/19 11:47:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    19/01/19 11:47:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.43.120:4040
    19/01/19 11:47:27 INFO Executor: Starting executor ID driver on host localhost
    19/01/19 11:47:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37585.
    19/01/19 11:47:27 INFO NettyBlockTransferService: Server created on 192.168.43.120:37585
    19/01/19 11:47:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    19/01/19 11:47:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
    19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.43.120:37585 with 997.2 MB RAM, BlockManagerId(driver, 192.168.43.120, 37585, None)
    19/01/19 11:47:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
    19/01/19 11:47:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.43.120, 37585, None)
    19/01/19 11:47:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
    19/01/19 11:47:28 INFO ReceiverTracker: Starting 1 receivers
    19/01/19 11:47:28 INFO ReceiverTracker: ReceiverTracker started
    19/01/19 11:47:28 INFO SocketInputDStream: Slide time = 1000 ms
    19/01/19 11:47:28 INFO SocketInputDStream: Storage level = Serialized 1x Replicated
    19/01/19 11:47:28 INFO SocketInputDStream: Checkpoint interval = null
    19/01/19 11:47:28 INFO SocketInputDStream: Remember interval = 1000 ms
    19/01/19 11:47:28 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5e72eed9
    19/01/19 11:47:28 INFO ForEachDStream: Slide time = 1000 ms
    19/01/19 11:47:28 INFO ForEachDStream: Storage level = Serialized 1x Replicated
    19/01/19 11:47:28 INFO ForEachDStream: Checkpoint interval = null
    19/01/19 11:47:28 INFO ForEachDStream: Remember interval = 1000 ms
    19/01/19 11:47:28 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7779b595
    19/01/19 11:47:28 INFO RecurringTimer: Started timer for JobGenerator at time 1547894849000
    19/01/19 11:47:28 INFO ReceiverTracker: Receiver 0 started
    19/01/19 11:47:28 INFO JobGenerator: Started JobGenerator at 1547894849000 ms
    19/01/19 11:47:28 INFO JobScheduler: Started JobScheduler
    19/01/19 11:47:28 INFO DAGScheduler: Got job 0 (start at NetworkWordCount.java:28) with 1 output partitions
    19/01/19 11:47:28 INFO DAGScheduler: Final stage: ResultStage 0 (start at NetworkWordCount.java:28)
    19/01/19 11:47:28 INFO DAGScheduler: Parents of final stage: List()
    19/01/19 11:47:28 INFO DAGScheduler: Missing parents: List()
    19/01/19 11:47:28 INFO StreamingContext: StreamingContext started
    19/01/19 11:47:28 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620), which has no missing parents
    19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.5 KB, free 997.2 MB)
    19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 997.1 MB)
    19/01/19 11:47:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.43.120:37585 (size: 15.9 KB, free: 997.2 MB)
    19/01/19 11:47:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
    19/01/19 11:47:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620) (first 15 tasks are for partitions Vector(0))
    19/01/19 11:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    19/01/19 11:47:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5414 bytes)
    19/01/19 11:47:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    19/01/19 11:47:28 INFO RecurringTimer: Started timer for BlockGenerator at time 1547894848600
    19/01/19 11:47:28 INFO BlockGenerator: Started BlockGenerator
    19/01/19 11:47:28 INFO BlockGenerator: Started block pushing thread
    19/01/19 11:47:28 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.43.120:46093
    19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Starting receiver 0
    19/01/19 11:47:28 INFO SocketReceiver: Connecting to localhost:9999
    19/01/19 11:47:28 INFO SocketReceiver: Connected to localhost:9999
    19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
    19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
    19/01/19 11:47:28 INFO MemoryStore: Block input-0-1547894848400 stored as bytes in memory (estimated size 55.0 B, free 997.1 MB)
    19/01/19 11:47:28 INFO BlockManagerInfo: Added input-0-1547894848400 in memory on 192.168.43.120:37585 (size: 55.0 B, free: 997.2 MB)
    19/01/19 11:47:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
    19/01/19 11:47:28 WARN BlockManager: Block input-0-1547894848400 replicated to only 0 peer(s) instead of 1 peers
    19/01/19 11:47:28 INFO BlockGenerator: Pushed block input-0-1547894848400
    19/01/19 11:47:29 INFO JobScheduler: Added jobs for time 1547894849000 ms
    19/01/19 11:47:29 INFO JobScheduler: Starting job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
    19/01/19 11:47:29 INFO JobScheduler: Finished job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
    19/01/19 11:47:29 ERROR JobScheduler: Error running job streaming job 1547894849000 ms.0
    java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)
    Exception in thread "main" java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
    at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)
    19/01/19 11:47:29 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
    19/01/19 11:47:29 INFO ReceiverTracker: Sent stop signal to all 1 receivers
    19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Received stop signal
    19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver:
    19/01/19 11:47:29 INFO SocketReceiver: Closed socket to localhost:9999
    19/01/19 11:47:29 WARN SocketReceiver: Error receiving data
    java.net.SocketException: Socket closed
    at java.base/java.net.SocketInputStream.socketRead0(Native Method)
    at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
    at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
    19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Called receiver onStop
    19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Deregistering receiver 0
    19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error receiving data
    java.net.SocketException: Socket closed
    at java.base/java.net.SocketInputStream.socketRead0(Native Method)
    at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
    at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
    19/01/19 11:47:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
    19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver 0
    19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving data: java.net.SocketException: Socket closed
    19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Receiver has been stopped
    19/01/19 11:47:29 INFO BlockGenerator: Stopping BlockGenerator
    19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for BlockGenerator after time 1547894849400
    19/01/19 11:47:29 INFO BlockGenerator: Waiting for block pushing thread to terminate
    19/01/19 11:47:29 INFO BlockGenerator: Pushing out the last 0 blocks
    19/01/19 11:47:29 INFO BlockGenerator: Stopped block pushing thread
    19/01/19 11:47:29 INFO BlockGenerator: Stopped BlockGenerator
    Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:196)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)
    19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver without error
    19/01/19 11:47:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver
    19/01/19 11:47:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1063 ms on localhost (executor driver) (1/1)
    19/01/19 11:47:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
    19/01/19 11:47:29 INFO DAGScheduler: ResultStage 0 (start at NetworkWordCount.java:28) finished in 1.085 s
    19/01/19 11:47:29 INFO ReceiverTracker: All of the receivers have deregistered successfully
    19/01/19 11:47:29 INFO ReceiverTracker: ReceiverTracker stopped
    19/01/19 11:47:29 INFO JobGenerator: Stopping JobGenerator immediately
    19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for JobGenerator after time 1547894849000
    19/01/19 11:47:29 INFO JobGenerator: Stopped JobGenerator
    19/01/19 11:47:29 INFO JobScheduler: Stopped JobScheduler
    19/01/19 11:47:29 INFO StreamingContext: StreamingContext stopped successfully
    19/01/19 11:47:29 INFO SparkContext: Invoking stop() from shutdown hook
    19/01/19 11:47:29 INFO SparkUI: Stopped Spark web UI at http://192.168.43.120:4040
    19/01/19 11:47:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    19/01/19 11:47:29 INFO MemoryStore: MemoryStore cleared
    19/01/19 11:47:29 INFO BlockManager: BlockManager stopped
    19/01/19 11:47:29 INFO BlockManagerMaster: BlockManagerMaster stopped
    19/01/19 11:47:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    19/01/19 11:47:29 INFO SparkContext: Successfully stopped SparkContext
    19/01/19 11:47:29 INFO ShutdownHookManager: Shutdown hook called
    19/01/19 11:47:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-74f53f9e-d91c-4870-855c-f4272031804b

    Process finished with exit code 1


    EDIT and FIX : The problem lies with the calling of print() with a JavaDStream or a JavaPairDStream.



    I have completed the script for the word count as such:



    import org.apache.spark.SparkConf;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;

    import java.util.Arrays;


    public class NetworkWordCount {
    public static void main(String args) throws InterruptedException {
    SparkConf conf = new SparkConf()
    .setAppName("NetworkWordCount")
    .setMaster("local");
    JavaStreamingContext jssc =
    new JavaStreamingContext(conf, Durations.seconds(1));

    JavaReceiverInputDStream<String> lines =
    jssc.socketTextStream("localhost", 9999);

    JavaDStream<String> words =
    lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
    JavaPairDStream<String, Integer> pairs =
    words.mapToPair(s -> new Tuple2<>(s, 1));
    JavaPairDStream<String, Integer> wordCounts =
    pairs.reduceByKey((i1, i2) -> i1 + i2);

    wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
    jssc.start();
    jssc.awaitTermination();
    }
    }

    wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));


    Once I launch the nc -lk 9999 and enter the following lines:




    Hello my man



    How you doing



    Im not sure this is working



    but i hope it doesnt mess up




    I get the following results:



    First entry:




    Hello-----1



    my-----1



    man-----1




    Second entry:




    you-----1



    How-----1



    doing-----1




    Third entry:




    Im-----1



    this-----1



    not-----1



    is-----1



    sure-----1



    working-----1




    Fourth entry:




    up-----1



    it-----1



    hope-----1



    mess-----1



    i-----1



    doesnt-----1



    but-----1




    I can't explain why this solution works but fails when using JavaDStream.print(). An explanation would be very helpful.



    Thank you very much










    share|improve this question



























      0












      0








      0








      I am trying to implement a simple wordcount example for sparkStreaming by listening in localhost:9999 and sending streams of text using nc -lk 9999.



      I haven't implemented the logic of the wordcount yet, seeing as I can't execute this simple code. I think something might be wrong with the way I try to acquire the streams of information.



      Bellow is my code and the exception that is generated:



      Code:



      import org.apache.spark.SparkConf;
      import org.apache.spark.streaming.Durations;
      import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;


      public class NetworkWordCount {
      public static void main(String args) throws InterruptedException{
      SparkConf conf = new SparkConf()
      .setAppName("NetworkWordCount")//;
      .setMaster("local");
      JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

      JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
      lines.print();

      jssc.start();
      jssc.awaitTermination();
      }
      }


      Log:



          Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
      19/01/19 11:47:26 INFO SparkContext: Running Spark version 2.2.1
      WARNING: An illegal reflective access operation has occurred
      WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/root/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
      WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
      WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
      WARNING: All illegal access operations will be denied in a future release
      19/01/19 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      19/01/19 11:47:27 WARN Utils: Your hostname, imad-Alienware-15-R3 resolves to a loopback address: 127.0.1.1; using 192.168.43.120 instead (on interface wlp61s0)
      19/01/19 11:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
      19/01/19 11:47:27 INFO SparkContext: Submitted application: NetworkWordCount
      19/01/19 11:47:27 INFO SecurityManager: Changing view acls to: root
      19/01/19 11:47:27 INFO SecurityManager: Changing modify acls to: root
      19/01/19 11:47:27 INFO SecurityManager: Changing view acls groups to:
      19/01/19 11:47:27 INFO SecurityManager: Changing modify acls groups to:
      19/01/19 11:47:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
      19/01/19 11:47:27 INFO Utils: Successfully started service 'sparkDriver' on port 46093.
      19/01/19 11:47:27 INFO SparkEnv: Registering MapOutputTracker
      19/01/19 11:47:27 INFO SparkEnv: Registering BlockManagerMaster
      19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
      19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
      19/01/19 11:47:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cc0cd249-2827-43ee-8497-14cba267d755
      19/01/19 11:47:27 INFO MemoryStore: MemoryStore started with capacity 997.2 MB
      19/01/19 11:47:27 INFO SparkEnv: Registering OutputCommitCoordinator
      19/01/19 11:47:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
      19/01/19 11:47:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.43.120:4040
      19/01/19 11:47:27 INFO Executor: Starting executor ID driver on host localhost
      19/01/19 11:47:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37585.
      19/01/19 11:47:27 INFO NettyBlockTransferService: Server created on 192.168.43.120:37585
      19/01/19 11:47:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
      19/01/19 11:47:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
      19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.43.120:37585 with 997.2 MB RAM, BlockManagerId(driver, 192.168.43.120, 37585, None)
      19/01/19 11:47:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
      19/01/19 11:47:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.43.120, 37585, None)
      19/01/19 11:47:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
      19/01/19 11:47:28 INFO ReceiverTracker: Starting 1 receivers
      19/01/19 11:47:28 INFO ReceiverTracker: ReceiverTracker started
      19/01/19 11:47:28 INFO SocketInputDStream: Slide time = 1000 ms
      19/01/19 11:47:28 INFO SocketInputDStream: Storage level = Serialized 1x Replicated
      19/01/19 11:47:28 INFO SocketInputDStream: Checkpoint interval = null
      19/01/19 11:47:28 INFO SocketInputDStream: Remember interval = 1000 ms
      19/01/19 11:47:28 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5e72eed9
      19/01/19 11:47:28 INFO ForEachDStream: Slide time = 1000 ms
      19/01/19 11:47:28 INFO ForEachDStream: Storage level = Serialized 1x Replicated
      19/01/19 11:47:28 INFO ForEachDStream: Checkpoint interval = null
      19/01/19 11:47:28 INFO ForEachDStream: Remember interval = 1000 ms
      19/01/19 11:47:28 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7779b595
      19/01/19 11:47:28 INFO RecurringTimer: Started timer for JobGenerator at time 1547894849000
      19/01/19 11:47:28 INFO ReceiverTracker: Receiver 0 started
      19/01/19 11:47:28 INFO JobGenerator: Started JobGenerator at 1547894849000 ms
      19/01/19 11:47:28 INFO JobScheduler: Started JobScheduler
      19/01/19 11:47:28 INFO DAGScheduler: Got job 0 (start at NetworkWordCount.java:28) with 1 output partitions
      19/01/19 11:47:28 INFO DAGScheduler: Final stage: ResultStage 0 (start at NetworkWordCount.java:28)
      19/01/19 11:47:28 INFO DAGScheduler: Parents of final stage: List()
      19/01/19 11:47:28 INFO DAGScheduler: Missing parents: List()
      19/01/19 11:47:28 INFO StreamingContext: StreamingContext started
      19/01/19 11:47:28 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620), which has no missing parents
      19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.5 KB, free 997.2 MB)
      19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 997.1 MB)
      19/01/19 11:47:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.43.120:37585 (size: 15.9 KB, free: 997.2 MB)
      19/01/19 11:47:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
      19/01/19 11:47:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620) (first 15 tasks are for partitions Vector(0))
      19/01/19 11:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
      19/01/19 11:47:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5414 bytes)
      19/01/19 11:47:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
      19/01/19 11:47:28 INFO RecurringTimer: Started timer for BlockGenerator at time 1547894848600
      19/01/19 11:47:28 INFO BlockGenerator: Started BlockGenerator
      19/01/19 11:47:28 INFO BlockGenerator: Started block pushing thread
      19/01/19 11:47:28 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.43.120:46093
      19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Starting receiver 0
      19/01/19 11:47:28 INFO SocketReceiver: Connecting to localhost:9999
      19/01/19 11:47:28 INFO SocketReceiver: Connected to localhost:9999
      19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
      19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
      19/01/19 11:47:28 INFO MemoryStore: Block input-0-1547894848400 stored as bytes in memory (estimated size 55.0 B, free 997.1 MB)
      19/01/19 11:47:28 INFO BlockManagerInfo: Added input-0-1547894848400 in memory on 192.168.43.120:37585 (size: 55.0 B, free: 997.2 MB)
      19/01/19 11:47:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
      19/01/19 11:47:28 WARN BlockManager: Block input-0-1547894848400 replicated to only 0 peer(s) instead of 1 peers
      19/01/19 11:47:28 INFO BlockGenerator: Pushed block input-0-1547894848400
      19/01/19 11:47:29 INFO JobScheduler: Added jobs for time 1547894849000 ms
      19/01/19 11:47:29 INFO JobScheduler: Starting job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
      19/01/19 11:47:29 INFO JobScheduler: Finished job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
      19/01/19 11:47:29 ERROR JobScheduler: Error running job streaming job 1547894849000 ms.0
      java.lang.IllegalArgumentException
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
      at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
      at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
      at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
      at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
      at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
      at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
      at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
      at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
      at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
      at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
      at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
      at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:844)
      Exception in thread "main" java.lang.IllegalArgumentException
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
      at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
      at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
      at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
      at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
      at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
      at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
      at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
      at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
      at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
      at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
      at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
      at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:844)
      19/01/19 11:47:29 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
      19/01/19 11:47:29 INFO ReceiverTracker: Sent stop signal to all 1 receivers
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Received stop signal
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver:
      19/01/19 11:47:29 INFO SocketReceiver: Closed socket to localhost:9999
      19/01/19 11:47:29 WARN SocketReceiver: Error receiving data
      java.net.SocketException: Socket closed
      at java.base/java.net.SocketInputStream.socketRead0(Native Method)
      at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
      at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
      at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
      at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
      at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
      at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
      at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
      at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
      at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Called receiver onStop
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Deregistering receiver 0
      19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error receiving data
      java.net.SocketException: Socket closed
      at java.base/java.net.SocketInputStream.socketRead0(Native Method)
      at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
      at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
      at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
      at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
      at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
      at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
      at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
      at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
      at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
      19/01/19 11:47:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver 0
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving data: java.net.SocketException: Socket closed
      19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Receiver has been stopped
      19/01/19 11:47:29 INFO BlockGenerator: Stopping BlockGenerator
      19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for BlockGenerator after time 1547894849400
      19/01/19 11:47:29 INFO BlockGenerator: Waiting for block pushing thread to terminate
      19/01/19 11:47:29 INFO BlockGenerator: Pushing out the last 0 blocks
      19/01/19 11:47:29 INFO BlockGenerator: Stopped block pushing thread
      19/01/19 11:47:29 INFO BlockGenerator: Stopped BlockGenerator
      Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
      at java.base/java.lang.Thread.sleep(Native Method)
      at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:196)
      at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
      at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
      at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:844)
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver without error
      19/01/19 11:47:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver
      19/01/19 11:47:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1063 ms on localhost (executor driver) (1/1)
      19/01/19 11:47:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
      19/01/19 11:47:29 INFO DAGScheduler: ResultStage 0 (start at NetworkWordCount.java:28) finished in 1.085 s
      19/01/19 11:47:29 INFO ReceiverTracker: All of the receivers have deregistered successfully
      19/01/19 11:47:29 INFO ReceiverTracker: ReceiverTracker stopped
      19/01/19 11:47:29 INFO JobGenerator: Stopping JobGenerator immediately
      19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for JobGenerator after time 1547894849000
      19/01/19 11:47:29 INFO JobGenerator: Stopped JobGenerator
      19/01/19 11:47:29 INFO JobScheduler: Stopped JobScheduler
      19/01/19 11:47:29 INFO StreamingContext: StreamingContext stopped successfully
      19/01/19 11:47:29 INFO SparkContext: Invoking stop() from shutdown hook
      19/01/19 11:47:29 INFO SparkUI: Stopped Spark web UI at http://192.168.43.120:4040
      19/01/19 11:47:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
      19/01/19 11:47:29 INFO MemoryStore: MemoryStore cleared
      19/01/19 11:47:29 INFO BlockManager: BlockManager stopped
      19/01/19 11:47:29 INFO BlockManagerMaster: BlockManagerMaster stopped
      19/01/19 11:47:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
      19/01/19 11:47:29 INFO SparkContext: Successfully stopped SparkContext
      19/01/19 11:47:29 INFO ShutdownHookManager: Shutdown hook called
      19/01/19 11:47:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-74f53f9e-d91c-4870-855c-f4272031804b

      Process finished with exit code 1


      EDIT and FIX : The problem lies with the calling of print() with a JavaDStream or a JavaPairDStream.



      I have completed the script for the word count as such:



      import org.apache.spark.SparkConf;
      import org.apache.spark.streaming.Durations;
      import org.apache.spark.streaming.api.java.JavaDStream;
      import org.apache.spark.streaming.api.java.JavaPairDStream;
      import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;
      import scala.Tuple2;

      import java.util.Arrays;


      public class NetworkWordCount {
      public static void main(String args) throws InterruptedException {
      SparkConf conf = new SparkConf()
      .setAppName("NetworkWordCount")
      .setMaster("local");
      JavaStreamingContext jssc =
      new JavaStreamingContext(conf, Durations.seconds(1));

      JavaReceiverInputDStream<String> lines =
      jssc.socketTextStream("localhost", 9999);

      JavaDStream<String> words =
      lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
      JavaPairDStream<String, Integer> pairs =
      words.mapToPair(s -> new Tuple2<>(s, 1));
      JavaPairDStream<String, Integer> wordCounts =
      pairs.reduceByKey((i1, i2) -> i1 + i2);

      wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
      jssc.start();
      jssc.awaitTermination();
      }
      }

      wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));


      Once I launch the nc -lk 9999 and enter the following lines:




      Hello my man



      How you doing



      Im not sure this is working



      but i hope it doesnt mess up




      I get the following results:



      First entry:




      Hello-----1



      my-----1



      man-----1




      Second entry:




      you-----1



      How-----1



      doing-----1




      Third entry:




      Im-----1



      this-----1



      not-----1



      is-----1



      sure-----1



      working-----1




      Fourth entry:




      up-----1



      it-----1



      hope-----1



      mess-----1



      i-----1



      doesnt-----1



      but-----1




      I can't explain why this solution works but fails when using JavaDStream.print(). An explanation would be very helpful.



      Thank you very much










      share|improve this question
















      I am trying to implement a simple wordcount example for sparkStreaming by listening in localhost:9999 and sending streams of text using nc -lk 9999.



      I haven't implemented the logic of the wordcount yet, seeing as I can't execute this simple code. I think something might be wrong with the way I try to acquire the streams of information.



      Bellow is my code and the exception that is generated:



      Code:



      import org.apache.spark.SparkConf;
      import org.apache.spark.streaming.Durations;
      import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;


      public class NetworkWordCount {
      public static void main(String args) throws InterruptedException{
      SparkConf conf = new SparkConf()
      .setAppName("NetworkWordCount")//;
      .setMaster("local");
      JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

      JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
      lines.print();

      jssc.start();
      jssc.awaitTermination();
      }
      }


      Log:



          Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
      19/01/19 11:47:26 INFO SparkContext: Running Spark version 2.2.1
      WARNING: An illegal reflective access operation has occurred
      WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/root/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
      WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
      WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
      WARNING: All illegal access operations will be denied in a future release
      19/01/19 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      19/01/19 11:47:27 WARN Utils: Your hostname, imad-Alienware-15-R3 resolves to a loopback address: 127.0.1.1; using 192.168.43.120 instead (on interface wlp61s0)
      19/01/19 11:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
      19/01/19 11:47:27 INFO SparkContext: Submitted application: NetworkWordCount
      19/01/19 11:47:27 INFO SecurityManager: Changing view acls to: root
      19/01/19 11:47:27 INFO SecurityManager: Changing modify acls to: root
      19/01/19 11:47:27 INFO SecurityManager: Changing view acls groups to:
      19/01/19 11:47:27 INFO SecurityManager: Changing modify acls groups to:
      19/01/19 11:47:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
      19/01/19 11:47:27 INFO Utils: Successfully started service 'sparkDriver' on port 46093.
      19/01/19 11:47:27 INFO SparkEnv: Registering MapOutputTracker
      19/01/19 11:47:27 INFO SparkEnv: Registering BlockManagerMaster
      19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
      19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
      19/01/19 11:47:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cc0cd249-2827-43ee-8497-14cba267d755
      19/01/19 11:47:27 INFO MemoryStore: MemoryStore started with capacity 997.2 MB
      19/01/19 11:47:27 INFO SparkEnv: Registering OutputCommitCoordinator
      19/01/19 11:47:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
      19/01/19 11:47:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.43.120:4040
      19/01/19 11:47:27 INFO Executor: Starting executor ID driver on host localhost
      19/01/19 11:47:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37585.
      19/01/19 11:47:27 INFO NettyBlockTransferService: Server created on 192.168.43.120:37585
      19/01/19 11:47:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
      19/01/19 11:47:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
      19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.43.120:37585 with 997.2 MB RAM, BlockManagerId(driver, 192.168.43.120, 37585, None)
      19/01/19 11:47:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
      19/01/19 11:47:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.43.120, 37585, None)
      19/01/19 11:47:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
      19/01/19 11:47:28 INFO ReceiverTracker: Starting 1 receivers
      19/01/19 11:47:28 INFO ReceiverTracker: ReceiverTracker started
      19/01/19 11:47:28 INFO SocketInputDStream: Slide time = 1000 ms
      19/01/19 11:47:28 INFO SocketInputDStream: Storage level = Serialized 1x Replicated
      19/01/19 11:47:28 INFO SocketInputDStream: Checkpoint interval = null
      19/01/19 11:47:28 INFO SocketInputDStream: Remember interval = 1000 ms
      19/01/19 11:47:28 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5e72eed9
      19/01/19 11:47:28 INFO ForEachDStream: Slide time = 1000 ms
      19/01/19 11:47:28 INFO ForEachDStream: Storage level = Serialized 1x Replicated
      19/01/19 11:47:28 INFO ForEachDStream: Checkpoint interval = null
      19/01/19 11:47:28 INFO ForEachDStream: Remember interval = 1000 ms
      19/01/19 11:47:28 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7779b595
      19/01/19 11:47:28 INFO RecurringTimer: Started timer for JobGenerator at time 1547894849000
      19/01/19 11:47:28 INFO ReceiverTracker: Receiver 0 started
      19/01/19 11:47:28 INFO JobGenerator: Started JobGenerator at 1547894849000 ms
      19/01/19 11:47:28 INFO JobScheduler: Started JobScheduler
      19/01/19 11:47:28 INFO DAGScheduler: Got job 0 (start at NetworkWordCount.java:28) with 1 output partitions
      19/01/19 11:47:28 INFO DAGScheduler: Final stage: ResultStage 0 (start at NetworkWordCount.java:28)
      19/01/19 11:47:28 INFO DAGScheduler: Parents of final stage: List()
      19/01/19 11:47:28 INFO DAGScheduler: Missing parents: List()
      19/01/19 11:47:28 INFO StreamingContext: StreamingContext started
      19/01/19 11:47:28 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620), which has no missing parents
      19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.5 KB, free 997.2 MB)
      19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 997.1 MB)
      19/01/19 11:47:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.43.120:37585 (size: 15.9 KB, free: 997.2 MB)
      19/01/19 11:47:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
      19/01/19 11:47:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620) (first 15 tasks are for partitions Vector(0))
      19/01/19 11:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
      19/01/19 11:47:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5414 bytes)
      19/01/19 11:47:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
      19/01/19 11:47:28 INFO RecurringTimer: Started timer for BlockGenerator at time 1547894848600
      19/01/19 11:47:28 INFO BlockGenerator: Started BlockGenerator
      19/01/19 11:47:28 INFO BlockGenerator: Started block pushing thread
      19/01/19 11:47:28 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.43.120:46093
      19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Starting receiver 0
      19/01/19 11:47:28 INFO SocketReceiver: Connecting to localhost:9999
      19/01/19 11:47:28 INFO SocketReceiver: Connected to localhost:9999
      19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
      19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
      19/01/19 11:47:28 INFO MemoryStore: Block input-0-1547894848400 stored as bytes in memory (estimated size 55.0 B, free 997.1 MB)
      19/01/19 11:47:28 INFO BlockManagerInfo: Added input-0-1547894848400 in memory on 192.168.43.120:37585 (size: 55.0 B, free: 997.2 MB)
      19/01/19 11:47:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
      19/01/19 11:47:28 WARN BlockManager: Block input-0-1547894848400 replicated to only 0 peer(s) instead of 1 peers
      19/01/19 11:47:28 INFO BlockGenerator: Pushed block input-0-1547894848400
      19/01/19 11:47:29 INFO JobScheduler: Added jobs for time 1547894849000 ms
      19/01/19 11:47:29 INFO JobScheduler: Starting job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
      19/01/19 11:47:29 INFO JobScheduler: Finished job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
      19/01/19 11:47:29 ERROR JobScheduler: Error running job streaming job 1547894849000 ms.0
      java.lang.IllegalArgumentException
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
      at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
      at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
      at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
      at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
      at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
      at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
      at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
      at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
      at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
      at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
      at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
      at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:844)
      Exception in thread "main" java.lang.IllegalArgumentException
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
      at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
      at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
      at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
      at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
      at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
      at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
      at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
      at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
      at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
      at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
      at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
      at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
      at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
      at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
      at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
      at scala.collection.immutable.List.foreach(List.scala:381)
      at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
      at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
      at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
      at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
      at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
      at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
      at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
      at scala.util.Try$.apply(Try.scala:192)
      at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
      at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
      at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:844)
      19/01/19 11:47:29 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
      19/01/19 11:47:29 INFO ReceiverTracker: Sent stop signal to all 1 receivers
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Received stop signal
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver:
      19/01/19 11:47:29 INFO SocketReceiver: Closed socket to localhost:9999
      19/01/19 11:47:29 WARN SocketReceiver: Error receiving data
      java.net.SocketException: Socket closed
      at java.base/java.net.SocketInputStream.socketRead0(Native Method)
      at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
      at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
      at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
      at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
      at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
      at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
      at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
      at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
      at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Called receiver onStop
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Deregistering receiver 0
      19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error receiving data
      java.net.SocketException: Socket closed
      at java.base/java.net.SocketInputStream.socketRead0(Native Method)
      at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
      at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
      at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
      at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
      at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
      at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
      at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
      at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
      at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
      at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
      at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
      at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
      19/01/19 11:47:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver 0
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving data: java.net.SocketException: Socket closed
      19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Receiver has been stopped
      19/01/19 11:47:29 INFO BlockGenerator: Stopping BlockGenerator
      19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for BlockGenerator after time 1547894849400
      19/01/19 11:47:29 INFO BlockGenerator: Waiting for block pushing thread to terminate
      19/01/19 11:47:29 INFO BlockGenerator: Pushing out the last 0 blocks
      19/01/19 11:47:29 INFO BlockGenerator: Stopped block pushing thread
      19/01/19 11:47:29 INFO BlockGenerator: Stopped BlockGenerator
      Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
      at java.base/java.lang.Thread.sleep(Native Method)
      at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:196)
      at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
      at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
      at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
      at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
      at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
      at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
      at java.base/java.lang.Thread.run(Thread.java:844)
      19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver without error
      19/01/19 11:47:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver
      19/01/19 11:47:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1063 ms on localhost (executor driver) (1/1)
      19/01/19 11:47:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
      19/01/19 11:47:29 INFO DAGScheduler: ResultStage 0 (start at NetworkWordCount.java:28) finished in 1.085 s
      19/01/19 11:47:29 INFO ReceiverTracker: All of the receivers have deregistered successfully
      19/01/19 11:47:29 INFO ReceiverTracker: ReceiverTracker stopped
      19/01/19 11:47:29 INFO JobGenerator: Stopping JobGenerator immediately
      19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for JobGenerator after time 1547894849000
      19/01/19 11:47:29 INFO JobGenerator: Stopped JobGenerator
      19/01/19 11:47:29 INFO JobScheduler: Stopped JobScheduler
      19/01/19 11:47:29 INFO StreamingContext: StreamingContext stopped successfully
      19/01/19 11:47:29 INFO SparkContext: Invoking stop() from shutdown hook
      19/01/19 11:47:29 INFO SparkUI: Stopped Spark web UI at http://192.168.43.120:4040
      19/01/19 11:47:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
      19/01/19 11:47:29 INFO MemoryStore: MemoryStore cleared
      19/01/19 11:47:29 INFO BlockManager: BlockManager stopped
      19/01/19 11:47:29 INFO BlockManagerMaster: BlockManagerMaster stopped
      19/01/19 11:47:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
      19/01/19 11:47:29 INFO SparkContext: Successfully stopped SparkContext
      19/01/19 11:47:29 INFO ShutdownHookManager: Shutdown hook called
      19/01/19 11:47:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-74f53f9e-d91c-4870-855c-f4272031804b

      Process finished with exit code 1


      EDIT and FIX : The problem lies with the calling of print() with a JavaDStream or a JavaPairDStream.



      I have completed the script for the word count as such:



      import org.apache.spark.SparkConf;
      import org.apache.spark.streaming.Durations;
      import org.apache.spark.streaming.api.java.JavaDStream;
      import org.apache.spark.streaming.api.java.JavaPairDStream;
      import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
      import org.apache.spark.streaming.api.java.JavaStreamingContext;
      import scala.Tuple2;

      import java.util.Arrays;


      public class NetworkWordCount {
      public static void main(String args) throws InterruptedException {
      SparkConf conf = new SparkConf()
      .setAppName("NetworkWordCount")
      .setMaster("local");
      JavaStreamingContext jssc =
      new JavaStreamingContext(conf, Durations.seconds(1));

      JavaReceiverInputDStream<String> lines =
      jssc.socketTextStream("localhost", 9999);

      JavaDStream<String> words =
      lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
      JavaPairDStream<String, Integer> pairs =
      words.mapToPair(s -> new Tuple2<>(s, 1));
      JavaPairDStream<String, Integer> wordCounts =
      pairs.reduceByKey((i1, i2) -> i1 + i2);

      wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
      jssc.start();
      jssc.awaitTermination();
      }
      }

      wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));


      Once I launch the nc -lk 9999 and enter the following lines:




      Hello my man



      How you doing



      Im not sure this is working



      but i hope it doesnt mess up




      I get the following results:



      First entry:




      Hello-----1



      my-----1



      man-----1




      Second entry:




      you-----1



      How-----1



      doing-----1




      Third entry:




      Im-----1



      this-----1



      not-----1



      is-----1



      sure-----1



      working-----1




      Fourth entry:




      up-----1



      it-----1



      hope-----1



      mess-----1



      i-----1



      doesnt-----1



      but-----1




      I can't explain why this solution works but fails when using JavaDStream.print(). An explanation would be very helpful.



      Thank you very much







      java apache-spark spark-streaming






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Jan 19 at 16:46







      Aetos

















      asked Jan 19 at 11:50









      AetosAetos

      176214




      176214
























          0






          active

          oldest

          votes











          Your Answer






          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "1"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














          draft saved

          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54266765%2fsparkstreaming-error-in-simple-example-java-lang-illegalargumentexception%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown

























          0






          active

          oldest

          votes








          0






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















          draft saved

          draft discarded




















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid



          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.


          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54266765%2fsparkstreaming-error-in-simple-example-java-lang-illegalargumentexception%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Liquibase includeAll doesn't find base path

          How to use setInterval in EJS file?

          Petrus Granier-Deferre