SparkStreaming error in simple example : java.lang.IllegalArgumentException
I am trying to implement a simple wordcount example for sparkStreaming by listening in localhost:9999
and sending streams of text using nc -lk 9999
.
I haven't implemented the logic of the wordcount yet, seeing as I can't execute this simple code. I think something might be wrong with the way I try to acquire the streams of information.
Bellow is my code and the exception that is generated:
Code:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class NetworkWordCount {
public static void main(String args) throws InterruptedException{
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")//;
.setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
lines.print();
jssc.start();
jssc.awaitTermination();
}
}
Log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/19 11:47:26 INFO SparkContext: Running Spark version 2.2.1
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/root/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
19/01/19 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/19 11:47:27 WARN Utils: Your hostname, imad-Alienware-15-R3 resolves to a loopback address: 127.0.1.1; using 192.168.43.120 instead (on interface wlp61s0)
19/01/19 11:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/19 11:47:27 INFO SparkContext: Submitted application: NetworkWordCount
19/01/19 11:47:27 INFO SecurityManager: Changing view acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing view acls groups to:
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls groups to:
19/01/19 11:47:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
19/01/19 11:47:27 INFO Utils: Successfully started service 'sparkDriver' on port 46093.
19/01/19 11:47:27 INFO SparkEnv: Registering MapOutputTracker
19/01/19 11:47:27 INFO SparkEnv: Registering BlockManagerMaster
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/01/19 11:47:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cc0cd249-2827-43ee-8497-14cba267d755
19/01/19 11:47:27 INFO MemoryStore: MemoryStore started with capacity 997.2 MB
19/01/19 11:47:27 INFO SparkEnv: Registering OutputCommitCoordinator
19/01/19 11:47:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/01/19 11:47:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.43.120:4040
19/01/19 11:47:27 INFO Executor: Starting executor ID driver on host localhost
19/01/19 11:47:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37585.
19/01/19 11:47:27 INFO NettyBlockTransferService: Server created on 192.168.43.120:37585
19/01/19 11:47:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/01/19 11:47:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.43.120:37585 with 997.2 MB RAM, BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
19/01/19 11:47:28 INFO ReceiverTracker: Starting 1 receivers
19/01/19 11:47:28 INFO ReceiverTracker: ReceiverTracker started
19/01/19 11:47:28 INFO SocketInputDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO SocketInputDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO SocketInputDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5e72eed9
19/01/19 11:47:28 INFO ForEachDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO ForEachDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO ForEachDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7779b595
19/01/19 11:47:28 INFO RecurringTimer: Started timer for JobGenerator at time 1547894849000
19/01/19 11:47:28 INFO ReceiverTracker: Receiver 0 started
19/01/19 11:47:28 INFO JobGenerator: Started JobGenerator at 1547894849000 ms
19/01/19 11:47:28 INFO JobScheduler: Started JobScheduler
19/01/19 11:47:28 INFO DAGScheduler: Got job 0 (start at NetworkWordCount.java:28) with 1 output partitions
19/01/19 11:47:28 INFO DAGScheduler: Final stage: ResultStage 0 (start at NetworkWordCount.java:28)
19/01/19 11:47:28 INFO DAGScheduler: Parents of final stage: List()
19/01/19 11:47:28 INFO DAGScheduler: Missing parents: List()
19/01/19 11:47:28 INFO StreamingContext: StreamingContext started
19/01/19 11:47:28 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620), which has no missing parents
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.5 KB, free 997.2 MB)
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.43.120:37585 (size: 15.9 KB, free: 997.2 MB)
19/01/19 11:47:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
19/01/19 11:47:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620) (first 15 tasks are for partitions Vector(0))
19/01/19 11:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
19/01/19 11:47:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5414 bytes)
19/01/19 11:47:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/19 11:47:28 INFO RecurringTimer: Started timer for BlockGenerator at time 1547894848600
19/01/19 11:47:28 INFO BlockGenerator: Started BlockGenerator
19/01/19 11:47:28 INFO BlockGenerator: Started block pushing thread
19/01/19 11:47:28 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.43.120:46093
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Starting receiver 0
19/01/19 11:47:28 INFO SocketReceiver: Connecting to localhost:9999
19/01/19 11:47:28 INFO SocketReceiver: Connected to localhost:9999
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
19/01/19 11:47:28 INFO MemoryStore: Block input-0-1547894848400 stored as bytes in memory (estimated size 55.0 B, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added input-0-1547894848400 in memory on 192.168.43.120:37585 (size: 55.0 B, free: 997.2 MB)
19/01/19 11:47:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/01/19 11:47:28 WARN BlockManager: Block input-0-1547894848400 replicated to only 0 peer(s) instead of 1 peers
19/01/19 11:47:28 INFO BlockGenerator: Pushed block input-0-1547894848400
19/01/19 11:47:29 INFO JobScheduler: Added jobs for time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Starting job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Finished job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 ERROR JobScheduler: Error running job streaming job 1547894849000 ms.0
java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
19/01/19 11:47:29 INFO ReceiverTracker: Sent stop signal to all 1 receivers
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Received stop signal
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver:
19/01/19 11:47:29 INFO SocketReceiver: Closed socket to localhost:9999
19/01/19 11:47:29 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Called receiver onStop
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Deregistering receiver 0
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver 0
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving data: java.net.SocketException: Socket closed
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Receiver has been stopped
19/01/19 11:47:29 INFO BlockGenerator: Stopping BlockGenerator
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for BlockGenerator after time 1547894849400
19/01/19 11:47:29 INFO BlockGenerator: Waiting for block pushing thread to terminate
19/01/19 11:47:29 INFO BlockGenerator: Pushing out the last 0 blocks
19/01/19 11:47:29 INFO BlockGenerator: Stopped block pushing thread
19/01/19 11:47:29 INFO BlockGenerator: Stopped BlockGenerator
Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:196)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver without error
19/01/19 11:47:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver
19/01/19 11:47:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1063 ms on localhost (executor driver) (1/1)
19/01/19 11:47:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/01/19 11:47:29 INFO DAGScheduler: ResultStage 0 (start at NetworkWordCount.java:28) finished in 1.085 s
19/01/19 11:47:29 INFO ReceiverTracker: All of the receivers have deregistered successfully
19/01/19 11:47:29 INFO ReceiverTracker: ReceiverTracker stopped
19/01/19 11:47:29 INFO JobGenerator: Stopping JobGenerator immediately
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for JobGenerator after time 1547894849000
19/01/19 11:47:29 INFO JobGenerator: Stopped JobGenerator
19/01/19 11:47:29 INFO JobScheduler: Stopped JobScheduler
19/01/19 11:47:29 INFO StreamingContext: StreamingContext stopped successfully
19/01/19 11:47:29 INFO SparkContext: Invoking stop() from shutdown hook
19/01/19 11:47:29 INFO SparkUI: Stopped Spark web UI at http://192.168.43.120:4040
19/01/19 11:47:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/01/19 11:47:29 INFO MemoryStore: MemoryStore cleared
19/01/19 11:47:29 INFO BlockManager: BlockManager stopped
19/01/19 11:47:29 INFO BlockManagerMaster: BlockManagerMaster stopped
19/01/19 11:47:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/01/19 11:47:29 INFO SparkContext: Successfully stopped SparkContext
19/01/19 11:47:29 INFO ShutdownHookManager: Shutdown hook called
19/01/19 11:47:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-74f53f9e-d91c-4870-855c-f4272031804b
Process finished with exit code 1
EDIT and FIX : The problem lies with the calling of print()
with a JavaDStream
or a JavaPairDStream
.
I have completed the script for the word count as such:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
public class NetworkWordCount {
public static void main(String args) throws InterruptedException {
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")
.setMaster("local");
JavaStreamingContext jssc =
new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines =
jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words =
lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs =
words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts =
pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
jssc.start();
jssc.awaitTermination();
}
}
wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
Once I launch the nc -lk 9999
and enter the following lines:
Hello my man
How you doing
Im not sure this is working
but i hope it doesnt mess up
I get the following results:
First entry:
Hello-----1
my-----1
man-----1
Second entry:
you-----1
How-----1
doing-----1
Third entry:
Im-----1
this-----1
not-----1
is-----1
sure-----1
working-----1
Fourth entry:
up-----1
it-----1
hope-----1
mess-----1
i-----1
doesnt-----1
but-----1
I can't explain why this solution works but fails when using JavaDStream.print()
. An explanation would be very helpful.
Thank you very much
java apache-spark spark-streaming
add a comment |
I am trying to implement a simple wordcount example for sparkStreaming by listening in localhost:9999
and sending streams of text using nc -lk 9999
.
I haven't implemented the logic of the wordcount yet, seeing as I can't execute this simple code. I think something might be wrong with the way I try to acquire the streams of information.
Bellow is my code and the exception that is generated:
Code:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class NetworkWordCount {
public static void main(String args) throws InterruptedException{
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")//;
.setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
lines.print();
jssc.start();
jssc.awaitTermination();
}
}
Log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/19 11:47:26 INFO SparkContext: Running Spark version 2.2.1
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/root/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
19/01/19 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/19 11:47:27 WARN Utils: Your hostname, imad-Alienware-15-R3 resolves to a loopback address: 127.0.1.1; using 192.168.43.120 instead (on interface wlp61s0)
19/01/19 11:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/19 11:47:27 INFO SparkContext: Submitted application: NetworkWordCount
19/01/19 11:47:27 INFO SecurityManager: Changing view acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing view acls groups to:
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls groups to:
19/01/19 11:47:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
19/01/19 11:47:27 INFO Utils: Successfully started service 'sparkDriver' on port 46093.
19/01/19 11:47:27 INFO SparkEnv: Registering MapOutputTracker
19/01/19 11:47:27 INFO SparkEnv: Registering BlockManagerMaster
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/01/19 11:47:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cc0cd249-2827-43ee-8497-14cba267d755
19/01/19 11:47:27 INFO MemoryStore: MemoryStore started with capacity 997.2 MB
19/01/19 11:47:27 INFO SparkEnv: Registering OutputCommitCoordinator
19/01/19 11:47:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/01/19 11:47:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.43.120:4040
19/01/19 11:47:27 INFO Executor: Starting executor ID driver on host localhost
19/01/19 11:47:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37585.
19/01/19 11:47:27 INFO NettyBlockTransferService: Server created on 192.168.43.120:37585
19/01/19 11:47:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/01/19 11:47:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.43.120:37585 with 997.2 MB RAM, BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
19/01/19 11:47:28 INFO ReceiverTracker: Starting 1 receivers
19/01/19 11:47:28 INFO ReceiverTracker: ReceiverTracker started
19/01/19 11:47:28 INFO SocketInputDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO SocketInputDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO SocketInputDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5e72eed9
19/01/19 11:47:28 INFO ForEachDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO ForEachDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO ForEachDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7779b595
19/01/19 11:47:28 INFO RecurringTimer: Started timer for JobGenerator at time 1547894849000
19/01/19 11:47:28 INFO ReceiverTracker: Receiver 0 started
19/01/19 11:47:28 INFO JobGenerator: Started JobGenerator at 1547894849000 ms
19/01/19 11:47:28 INFO JobScheduler: Started JobScheduler
19/01/19 11:47:28 INFO DAGScheduler: Got job 0 (start at NetworkWordCount.java:28) with 1 output partitions
19/01/19 11:47:28 INFO DAGScheduler: Final stage: ResultStage 0 (start at NetworkWordCount.java:28)
19/01/19 11:47:28 INFO DAGScheduler: Parents of final stage: List()
19/01/19 11:47:28 INFO DAGScheduler: Missing parents: List()
19/01/19 11:47:28 INFO StreamingContext: StreamingContext started
19/01/19 11:47:28 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620), which has no missing parents
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.5 KB, free 997.2 MB)
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.43.120:37585 (size: 15.9 KB, free: 997.2 MB)
19/01/19 11:47:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
19/01/19 11:47:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620) (first 15 tasks are for partitions Vector(0))
19/01/19 11:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
19/01/19 11:47:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5414 bytes)
19/01/19 11:47:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/19 11:47:28 INFO RecurringTimer: Started timer for BlockGenerator at time 1547894848600
19/01/19 11:47:28 INFO BlockGenerator: Started BlockGenerator
19/01/19 11:47:28 INFO BlockGenerator: Started block pushing thread
19/01/19 11:47:28 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.43.120:46093
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Starting receiver 0
19/01/19 11:47:28 INFO SocketReceiver: Connecting to localhost:9999
19/01/19 11:47:28 INFO SocketReceiver: Connected to localhost:9999
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
19/01/19 11:47:28 INFO MemoryStore: Block input-0-1547894848400 stored as bytes in memory (estimated size 55.0 B, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added input-0-1547894848400 in memory on 192.168.43.120:37585 (size: 55.0 B, free: 997.2 MB)
19/01/19 11:47:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/01/19 11:47:28 WARN BlockManager: Block input-0-1547894848400 replicated to only 0 peer(s) instead of 1 peers
19/01/19 11:47:28 INFO BlockGenerator: Pushed block input-0-1547894848400
19/01/19 11:47:29 INFO JobScheduler: Added jobs for time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Starting job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Finished job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 ERROR JobScheduler: Error running job streaming job 1547894849000 ms.0
java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
19/01/19 11:47:29 INFO ReceiverTracker: Sent stop signal to all 1 receivers
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Received stop signal
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver:
19/01/19 11:47:29 INFO SocketReceiver: Closed socket to localhost:9999
19/01/19 11:47:29 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Called receiver onStop
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Deregistering receiver 0
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver 0
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving data: java.net.SocketException: Socket closed
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Receiver has been stopped
19/01/19 11:47:29 INFO BlockGenerator: Stopping BlockGenerator
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for BlockGenerator after time 1547894849400
19/01/19 11:47:29 INFO BlockGenerator: Waiting for block pushing thread to terminate
19/01/19 11:47:29 INFO BlockGenerator: Pushing out the last 0 blocks
19/01/19 11:47:29 INFO BlockGenerator: Stopped block pushing thread
19/01/19 11:47:29 INFO BlockGenerator: Stopped BlockGenerator
Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:196)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver without error
19/01/19 11:47:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver
19/01/19 11:47:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1063 ms on localhost (executor driver) (1/1)
19/01/19 11:47:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/01/19 11:47:29 INFO DAGScheduler: ResultStage 0 (start at NetworkWordCount.java:28) finished in 1.085 s
19/01/19 11:47:29 INFO ReceiverTracker: All of the receivers have deregistered successfully
19/01/19 11:47:29 INFO ReceiverTracker: ReceiverTracker stopped
19/01/19 11:47:29 INFO JobGenerator: Stopping JobGenerator immediately
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for JobGenerator after time 1547894849000
19/01/19 11:47:29 INFO JobGenerator: Stopped JobGenerator
19/01/19 11:47:29 INFO JobScheduler: Stopped JobScheduler
19/01/19 11:47:29 INFO StreamingContext: StreamingContext stopped successfully
19/01/19 11:47:29 INFO SparkContext: Invoking stop() from shutdown hook
19/01/19 11:47:29 INFO SparkUI: Stopped Spark web UI at http://192.168.43.120:4040
19/01/19 11:47:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/01/19 11:47:29 INFO MemoryStore: MemoryStore cleared
19/01/19 11:47:29 INFO BlockManager: BlockManager stopped
19/01/19 11:47:29 INFO BlockManagerMaster: BlockManagerMaster stopped
19/01/19 11:47:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/01/19 11:47:29 INFO SparkContext: Successfully stopped SparkContext
19/01/19 11:47:29 INFO ShutdownHookManager: Shutdown hook called
19/01/19 11:47:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-74f53f9e-d91c-4870-855c-f4272031804b
Process finished with exit code 1
EDIT and FIX : The problem lies with the calling of print()
with a JavaDStream
or a JavaPairDStream
.
I have completed the script for the word count as such:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
public class NetworkWordCount {
public static void main(String args) throws InterruptedException {
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")
.setMaster("local");
JavaStreamingContext jssc =
new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines =
jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words =
lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs =
words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts =
pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
jssc.start();
jssc.awaitTermination();
}
}
wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
Once I launch the nc -lk 9999
and enter the following lines:
Hello my man
How you doing
Im not sure this is working
but i hope it doesnt mess up
I get the following results:
First entry:
Hello-----1
my-----1
man-----1
Second entry:
you-----1
How-----1
doing-----1
Third entry:
Im-----1
this-----1
not-----1
is-----1
sure-----1
working-----1
Fourth entry:
up-----1
it-----1
hope-----1
mess-----1
i-----1
doesnt-----1
but-----1
I can't explain why this solution works but fails when using JavaDStream.print()
. An explanation would be very helpful.
Thank you very much
java apache-spark spark-streaming
add a comment |
I am trying to implement a simple wordcount example for sparkStreaming by listening in localhost:9999
and sending streams of text using nc -lk 9999
.
I haven't implemented the logic of the wordcount yet, seeing as I can't execute this simple code. I think something might be wrong with the way I try to acquire the streams of information.
Bellow is my code and the exception that is generated:
Code:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class NetworkWordCount {
public static void main(String args) throws InterruptedException{
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")//;
.setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
lines.print();
jssc.start();
jssc.awaitTermination();
}
}
Log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/19 11:47:26 INFO SparkContext: Running Spark version 2.2.1
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/root/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
19/01/19 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/19 11:47:27 WARN Utils: Your hostname, imad-Alienware-15-R3 resolves to a loopback address: 127.0.1.1; using 192.168.43.120 instead (on interface wlp61s0)
19/01/19 11:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/19 11:47:27 INFO SparkContext: Submitted application: NetworkWordCount
19/01/19 11:47:27 INFO SecurityManager: Changing view acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing view acls groups to:
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls groups to:
19/01/19 11:47:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
19/01/19 11:47:27 INFO Utils: Successfully started service 'sparkDriver' on port 46093.
19/01/19 11:47:27 INFO SparkEnv: Registering MapOutputTracker
19/01/19 11:47:27 INFO SparkEnv: Registering BlockManagerMaster
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/01/19 11:47:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cc0cd249-2827-43ee-8497-14cba267d755
19/01/19 11:47:27 INFO MemoryStore: MemoryStore started with capacity 997.2 MB
19/01/19 11:47:27 INFO SparkEnv: Registering OutputCommitCoordinator
19/01/19 11:47:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/01/19 11:47:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.43.120:4040
19/01/19 11:47:27 INFO Executor: Starting executor ID driver on host localhost
19/01/19 11:47:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37585.
19/01/19 11:47:27 INFO NettyBlockTransferService: Server created on 192.168.43.120:37585
19/01/19 11:47:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/01/19 11:47:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.43.120:37585 with 997.2 MB RAM, BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
19/01/19 11:47:28 INFO ReceiverTracker: Starting 1 receivers
19/01/19 11:47:28 INFO ReceiverTracker: ReceiverTracker started
19/01/19 11:47:28 INFO SocketInputDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO SocketInputDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO SocketInputDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5e72eed9
19/01/19 11:47:28 INFO ForEachDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO ForEachDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO ForEachDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7779b595
19/01/19 11:47:28 INFO RecurringTimer: Started timer for JobGenerator at time 1547894849000
19/01/19 11:47:28 INFO ReceiverTracker: Receiver 0 started
19/01/19 11:47:28 INFO JobGenerator: Started JobGenerator at 1547894849000 ms
19/01/19 11:47:28 INFO JobScheduler: Started JobScheduler
19/01/19 11:47:28 INFO DAGScheduler: Got job 0 (start at NetworkWordCount.java:28) with 1 output partitions
19/01/19 11:47:28 INFO DAGScheduler: Final stage: ResultStage 0 (start at NetworkWordCount.java:28)
19/01/19 11:47:28 INFO DAGScheduler: Parents of final stage: List()
19/01/19 11:47:28 INFO DAGScheduler: Missing parents: List()
19/01/19 11:47:28 INFO StreamingContext: StreamingContext started
19/01/19 11:47:28 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620), which has no missing parents
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.5 KB, free 997.2 MB)
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.43.120:37585 (size: 15.9 KB, free: 997.2 MB)
19/01/19 11:47:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
19/01/19 11:47:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620) (first 15 tasks are for partitions Vector(0))
19/01/19 11:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
19/01/19 11:47:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5414 bytes)
19/01/19 11:47:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/19 11:47:28 INFO RecurringTimer: Started timer for BlockGenerator at time 1547894848600
19/01/19 11:47:28 INFO BlockGenerator: Started BlockGenerator
19/01/19 11:47:28 INFO BlockGenerator: Started block pushing thread
19/01/19 11:47:28 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.43.120:46093
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Starting receiver 0
19/01/19 11:47:28 INFO SocketReceiver: Connecting to localhost:9999
19/01/19 11:47:28 INFO SocketReceiver: Connected to localhost:9999
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
19/01/19 11:47:28 INFO MemoryStore: Block input-0-1547894848400 stored as bytes in memory (estimated size 55.0 B, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added input-0-1547894848400 in memory on 192.168.43.120:37585 (size: 55.0 B, free: 997.2 MB)
19/01/19 11:47:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/01/19 11:47:28 WARN BlockManager: Block input-0-1547894848400 replicated to only 0 peer(s) instead of 1 peers
19/01/19 11:47:28 INFO BlockGenerator: Pushed block input-0-1547894848400
19/01/19 11:47:29 INFO JobScheduler: Added jobs for time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Starting job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Finished job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 ERROR JobScheduler: Error running job streaming job 1547894849000 ms.0
java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
19/01/19 11:47:29 INFO ReceiverTracker: Sent stop signal to all 1 receivers
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Received stop signal
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver:
19/01/19 11:47:29 INFO SocketReceiver: Closed socket to localhost:9999
19/01/19 11:47:29 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Called receiver onStop
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Deregistering receiver 0
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver 0
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving data: java.net.SocketException: Socket closed
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Receiver has been stopped
19/01/19 11:47:29 INFO BlockGenerator: Stopping BlockGenerator
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for BlockGenerator after time 1547894849400
19/01/19 11:47:29 INFO BlockGenerator: Waiting for block pushing thread to terminate
19/01/19 11:47:29 INFO BlockGenerator: Pushing out the last 0 blocks
19/01/19 11:47:29 INFO BlockGenerator: Stopped block pushing thread
19/01/19 11:47:29 INFO BlockGenerator: Stopped BlockGenerator
Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:196)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver without error
19/01/19 11:47:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver
19/01/19 11:47:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1063 ms on localhost (executor driver) (1/1)
19/01/19 11:47:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/01/19 11:47:29 INFO DAGScheduler: ResultStage 0 (start at NetworkWordCount.java:28) finished in 1.085 s
19/01/19 11:47:29 INFO ReceiverTracker: All of the receivers have deregistered successfully
19/01/19 11:47:29 INFO ReceiverTracker: ReceiverTracker stopped
19/01/19 11:47:29 INFO JobGenerator: Stopping JobGenerator immediately
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for JobGenerator after time 1547894849000
19/01/19 11:47:29 INFO JobGenerator: Stopped JobGenerator
19/01/19 11:47:29 INFO JobScheduler: Stopped JobScheduler
19/01/19 11:47:29 INFO StreamingContext: StreamingContext stopped successfully
19/01/19 11:47:29 INFO SparkContext: Invoking stop() from shutdown hook
19/01/19 11:47:29 INFO SparkUI: Stopped Spark web UI at http://192.168.43.120:4040
19/01/19 11:47:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/01/19 11:47:29 INFO MemoryStore: MemoryStore cleared
19/01/19 11:47:29 INFO BlockManager: BlockManager stopped
19/01/19 11:47:29 INFO BlockManagerMaster: BlockManagerMaster stopped
19/01/19 11:47:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/01/19 11:47:29 INFO SparkContext: Successfully stopped SparkContext
19/01/19 11:47:29 INFO ShutdownHookManager: Shutdown hook called
19/01/19 11:47:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-74f53f9e-d91c-4870-855c-f4272031804b
Process finished with exit code 1
EDIT and FIX : The problem lies with the calling of print()
with a JavaDStream
or a JavaPairDStream
.
I have completed the script for the word count as such:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
public class NetworkWordCount {
public static void main(String args) throws InterruptedException {
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")
.setMaster("local");
JavaStreamingContext jssc =
new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines =
jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words =
lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs =
words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts =
pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
jssc.start();
jssc.awaitTermination();
}
}
wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
Once I launch the nc -lk 9999
and enter the following lines:
Hello my man
How you doing
Im not sure this is working
but i hope it doesnt mess up
I get the following results:
First entry:
Hello-----1
my-----1
man-----1
Second entry:
you-----1
How-----1
doing-----1
Third entry:
Im-----1
this-----1
not-----1
is-----1
sure-----1
working-----1
Fourth entry:
up-----1
it-----1
hope-----1
mess-----1
i-----1
doesnt-----1
but-----1
I can't explain why this solution works but fails when using JavaDStream.print()
. An explanation would be very helpful.
Thank you very much
java apache-spark spark-streaming
I am trying to implement a simple wordcount example for sparkStreaming by listening in localhost:9999
and sending streams of text using nc -lk 9999
.
I haven't implemented the logic of the wordcount yet, seeing as I can't execute this simple code. I think something might be wrong with the way I try to acquire the streams of information.
Bellow is my code and the exception that is generated:
Code:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class NetworkWordCount {
public static void main(String args) throws InterruptedException{
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")//;
.setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
lines.print();
jssc.start();
jssc.awaitTermination();
}
}
Log:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/19 11:47:26 INFO SparkContext: Running Spark version 2.2.1
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.hadoop.security.authentication.util.KerberosUtil (file:/root/.m2/repository/org/apache/hadoop/hadoop-auth/2.6.5/hadoop-auth-2.6.5.jar) to method sun.security.krb5.Config.getInstance()
WARNING: Please consider reporting this to the maintainers of org.apache.hadoop.security.authentication.util.KerberosUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
19/01/19 11:47:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/01/19 11:47:27 WARN Utils: Your hostname, imad-Alienware-15-R3 resolves to a loopback address: 127.0.1.1; using 192.168.43.120 instead (on interface wlp61s0)
19/01/19 11:47:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
19/01/19 11:47:27 INFO SparkContext: Submitted application: NetworkWordCount
19/01/19 11:47:27 INFO SecurityManager: Changing view acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls to: root
19/01/19 11:47:27 INFO SecurityManager: Changing view acls groups to:
19/01/19 11:47:27 INFO SecurityManager: Changing modify acls groups to:
19/01/19 11:47:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
19/01/19 11:47:27 INFO Utils: Successfully started service 'sparkDriver' on port 46093.
19/01/19 11:47:27 INFO SparkEnv: Registering MapOutputTracker
19/01/19 11:47:27 INFO SparkEnv: Registering BlockManagerMaster
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/01/19 11:47:27 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-cc0cd249-2827-43ee-8497-14cba267d755
19/01/19 11:47:27 INFO MemoryStore: MemoryStore started with capacity 997.2 MB
19/01/19 11:47:27 INFO SparkEnv: Registering OutputCommitCoordinator
19/01/19 11:47:27 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/01/19 11:47:27 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.43.120:4040
19/01/19 11:47:27 INFO Executor: Starting executor ID driver on host localhost
19/01/19 11:47:27 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37585.
19/01/19 11:47:27 INFO NettyBlockTransferService: Server created on 192.168.43.120:37585
19/01/19 11:47:27 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/01/19 11:47:27 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.43.120:37585 with 997.2 MB RAM, BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.43.120, 37585, None)
19/01/19 11:47:27 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
19/01/19 11:47:28 INFO ReceiverTracker: Starting 1 receivers
19/01/19 11:47:28 INFO ReceiverTracker: ReceiverTracker started
19/01/19 11:47:28 INFO SocketInputDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO SocketInputDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO SocketInputDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5e72eed9
19/01/19 11:47:28 INFO ForEachDStream: Slide time = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Storage level = Serialized 1x Replicated
19/01/19 11:47:28 INFO ForEachDStream: Checkpoint interval = null
19/01/19 11:47:28 INFO ForEachDStream: Remember interval = 1000 ms
19/01/19 11:47:28 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@7779b595
19/01/19 11:47:28 INFO RecurringTimer: Started timer for JobGenerator at time 1547894849000
19/01/19 11:47:28 INFO ReceiverTracker: Receiver 0 started
19/01/19 11:47:28 INFO JobGenerator: Started JobGenerator at 1547894849000 ms
19/01/19 11:47:28 INFO JobScheduler: Started JobScheduler
19/01/19 11:47:28 INFO DAGScheduler: Got job 0 (start at NetworkWordCount.java:28) with 1 output partitions
19/01/19 11:47:28 INFO DAGScheduler: Final stage: ResultStage 0 (start at NetworkWordCount.java:28)
19/01/19 11:47:28 INFO DAGScheduler: Parents of final stage: List()
19/01/19 11:47:28 INFO DAGScheduler: Missing parents: List()
19/01/19 11:47:28 INFO StreamingContext: StreamingContext started
19/01/19 11:47:28 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620), which has no missing parents
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 46.5 KB, free 997.2 MB)
19/01/19 11:47:28 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 15.9 KB, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.43.120:37585 (size: 15.9 KB, free: 997.2 MB)
19/01/19 11:47:28 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
19/01/19 11:47:28 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:620) (first 15 tasks are for partitions Vector(0))
19/01/19 11:47:28 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
19/01/19 11:47:28 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 5414 bytes)
19/01/19 11:47:28 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/01/19 11:47:28 INFO RecurringTimer: Started timer for BlockGenerator at time 1547894848600
19/01/19 11:47:28 INFO BlockGenerator: Started BlockGenerator
19/01/19 11:47:28 INFO BlockGenerator: Started block pushing thread
19/01/19 11:47:28 INFO ReceiverTracker: Registered receiver for stream 0 from 192.168.43.120:46093
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Starting receiver 0
19/01/19 11:47:28 INFO SocketReceiver: Connecting to localhost:9999
19/01/19 11:47:28 INFO SocketReceiver: Connected to localhost:9999
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Called receiver 0 onStart
19/01/19 11:47:28 INFO ReceiverSupervisorImpl: Waiting for receiver to be stopped
19/01/19 11:47:28 INFO MemoryStore: Block input-0-1547894848400 stored as bytes in memory (estimated size 55.0 B, free 997.1 MB)
19/01/19 11:47:28 INFO BlockManagerInfo: Added input-0-1547894848400 in memory on 192.168.43.120:37585 (size: 55.0 B, free: 997.2 MB)
19/01/19 11:47:28 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/01/19 11:47:28 WARN BlockManager: Block input-0-1547894848400 replicated to only 0 peer(s) instead of 1 peers
19/01/19 11:47:28 INFO BlockGenerator: Pushed block input-0-1547894848400
19/01/19 11:47:29 INFO JobScheduler: Added jobs for time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Starting job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 INFO JobScheduler: Finished job streaming job 1547894849000 ms.0 from job set of time 1547894849000 ms
19/01/19 11:47:29 ERROR JobScheduler: Error running job streaming job 1547894849000 ms.0
java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
Exception in thread "main" java.lang.IllegalArgumentException
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:443)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:426)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:257)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:256)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:256)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1354)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.take(RDD.scala:1327)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:735)
at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
19/01/19 11:47:29 INFO ReceiverTracker: Sent stop signal to all 1 receivers
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Received stop signal
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Stopped by driver:
19/01/19 11:47:29 INFO SocketReceiver: Closed socket to localhost:9999
19/01/19 11:47:29 WARN SocketReceiver: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Called receiver onStop
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Deregistering receiver 0
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error receiving data
java.net.SocketException: Socket closed
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:141)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:121)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$1.getNext(SocketInputDStream.scala:119)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:91)
at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:72)
19/01/19 11:47:29 ERROR ReceiverTracker: Deregistered receiver for stream 0: Stopped by driver
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver 0
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error receiving data: java.net.SocketException: Socket closed
19/01/19 11:47:29 WARN ReceiverSupervisorImpl: Receiver has been stopped
19/01/19 11:47:29 INFO BlockGenerator: Stopping BlockGenerator
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for BlockGenerator after time 1547894849400
19/01/19 11:47:29 INFO BlockGenerator: Waiting for block pushing thread to terminate
19/01/19 11:47:29 INFO BlockGenerator: Pushing out the last 0 blocks
19/01/19 11:47:29 INFO BlockGenerator: Stopped block pushing thread
19/01/19 11:47:29 INFO BlockGenerator: Stopped BlockGenerator
Exception in thread "receiver-supervisor-future-0" java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply$mcV$sp(ReceiverSupervisor.scala:196)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at org.apache.spark.streaming.receiver.ReceiverSupervisor$$anonfun$restartReceiver$1.apply(ReceiverSupervisor.scala:189)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
19/01/19 11:47:29 INFO ReceiverSupervisorImpl: Stopped receiver without error
19/01/19 11:47:29 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 794 bytes result sent to driver
19/01/19 11:47:29 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1063 ms on localhost (executor driver) (1/1)
19/01/19 11:47:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/01/19 11:47:29 INFO DAGScheduler: ResultStage 0 (start at NetworkWordCount.java:28) finished in 1.085 s
19/01/19 11:47:29 INFO ReceiverTracker: All of the receivers have deregistered successfully
19/01/19 11:47:29 INFO ReceiverTracker: ReceiverTracker stopped
19/01/19 11:47:29 INFO JobGenerator: Stopping JobGenerator immediately
19/01/19 11:47:29 INFO RecurringTimer: Stopped timer for JobGenerator after time 1547894849000
19/01/19 11:47:29 INFO JobGenerator: Stopped JobGenerator
19/01/19 11:47:29 INFO JobScheduler: Stopped JobScheduler
19/01/19 11:47:29 INFO StreamingContext: StreamingContext stopped successfully
19/01/19 11:47:29 INFO SparkContext: Invoking stop() from shutdown hook
19/01/19 11:47:29 INFO SparkUI: Stopped Spark web UI at http://192.168.43.120:4040
19/01/19 11:47:29 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/01/19 11:47:29 INFO MemoryStore: MemoryStore cleared
19/01/19 11:47:29 INFO BlockManager: BlockManager stopped
19/01/19 11:47:29 INFO BlockManagerMaster: BlockManagerMaster stopped
19/01/19 11:47:29 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/01/19 11:47:29 INFO SparkContext: Successfully stopped SparkContext
19/01/19 11:47:29 INFO ShutdownHookManager: Shutdown hook called
19/01/19 11:47:29 INFO ShutdownHookManager: Deleting directory /tmp/spark-74f53f9e-d91c-4870-855c-f4272031804b
Process finished with exit code 1
EDIT and FIX : The problem lies with the calling of print()
with a JavaDStream
or a JavaPairDStream
.
I have completed the script for the word count as such:
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
public class NetworkWordCount {
public static void main(String args) throws InterruptedException {
SparkConf conf = new SparkConf()
.setAppName("NetworkWordCount")
.setMaster("local");
JavaStreamingContext jssc =
new JavaStreamingContext(conf, Durations.seconds(1));
JavaReceiverInputDStream<String> lines =
jssc.socketTextStream("localhost", 9999);
JavaDStream<String> words =
lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
JavaPairDStream<String, Integer> pairs =
words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts =
pairs.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
jssc.start();
jssc.awaitTermination();
}
}
wordCounts.foreachRDD(rdd -> rdd.foreach(data -> System.out.println(data._1+"-----"+data._2)));
Once I launch the nc -lk 9999
and enter the following lines:
Hello my man
How you doing
Im not sure this is working
but i hope it doesnt mess up
I get the following results:
First entry:
Hello-----1
my-----1
man-----1
Second entry:
you-----1
How-----1
doing-----1
Third entry:
Im-----1
this-----1
not-----1
is-----1
sure-----1
working-----1
Fourth entry:
up-----1
it-----1
hope-----1
mess-----1
i-----1
doesnt-----1
but-----1
I can't explain why this solution works but fails when using JavaDStream.print()
. An explanation would be very helpful.
Thank you very much
java apache-spark spark-streaming
java apache-spark spark-streaming
edited Jan 19 at 16:46
Aetos
asked Jan 19 at 11:50
AetosAetos
176214
176214
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54266765%2fsparkstreaming-error-in-simple-example-java-lang-illegalargumentexception%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f54266765%2fsparkstreaming-error-in-simple-example-java-lang-illegalargumentexception%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown