scala - Spark - Can't read files from Google Cloud Storage when configuring gcs connector manually -


i have spark cluster deployed using bdutil google cloud. installed gui on driver instance able run intellij it, can try run spark processes in interactive mode.

the first issue faced spark-env.sh , core-site.xml not used @ when running intellij. managed set configuration manually in scala copying values configuration files. there way avoid ?

the last thing not working if gcs connector seems "see" folder set source, each time tries read actual files in folder, java.io.eofexception.

here's code tests :

object sparkbasictest {    def main(args: array[string]) {      val conf = new sparkconf().setappname("simple application")      conf.setmaster("spark://research-m:7077")      conf.set("spark.akka.framesize", "512")     conf.set("spark.driver.maxresultsize", "1631m")     conf.set("spark.yarn.executor.memoryoverhead", "384")      conf.set("spark.driver.memory", "3263m")     conf.set("spark.executor.memory", "10444m")     conf.set("spark.driver.extraclasspath", ":/home/hadoop/hadoop-install/lib/gcs-connector-1.4.0-hadoop1.jar")      val path = "stage/out/scored"      val sc = new sparkcontext(conf)      sc.hadoopconfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.googlehadoopfilesystem")     sc.hadoopconfiguration.set("fs.abstractfilesystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.googlehadoopfs")     sc.hadoopconfiguration.set("fs.gs.project.id", "xxxxx")     sc.hadoopconfiguration.set("fs.gs.system.bucket", "yyyyy")     sc.hadoopconfiguration.set("fs.gs.metadata.cache.directory", "/hadoop_gcs_connector_metadata_cache")     sc.hadoopconfiguration.set("fs.gs.metadata.cache.enable", "true")     sc.hadoopconfiguration.set("fs.gs.metadata.cache.type", "filesystem_backed")     sc.hadoopconfiguration.set("fs.gs.working.dir", "/")     sc.hadoopconfiguration.set("fs.default.name", "gs://yyyyyy/")     sc.hadoopconfiguration.set("fs.defaultfs", "gs://yyyyyy/")     sc.hadoopconfiguration.set("hadoop.tmp.dir", "/hadoop/tmp")     sc.hadoopconfiguration.set("dfs.datanode.data.dir.perm", "755")      val lines = sc.textfile(path)     val result = lines.count()    }  } 

and output after running :

using spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/07/27 12:00:47 info sparkcontext: running spark version 1.4.0 15/07/27 12:00:48 warn nativecodeloader: unable load native-hadoop library platform... using builtin-java classes applicable 15/07/27 12:00:48 info securitymanager: changing view acls to: antvoice 15/07/27 12:00:48 info securitymanager: changing modify acls to: antvoice 15/07/27 12:00:48 info securitymanager: securitymanager: authentication disabled; ui acls disabled; users view permissions: set(antvoice); users modify permissions: set(antvoice) 15/07/27 12:00:49 info slf4jlogger: slf4jlogger started 15/07/27 12:00:49 info remoting: starting remoting 15/07/27 12:00:50 info remoting: remoting started; listening on addresses :[akka.tcp://sparkdriver@10.240.63.109:45952] 15/07/27 12:00:50 info utils: started service 'sparkdriver' on port 45952. 15/07/27 12:00:50 info sparkenv: registering mapoutputtracker 15/07/27 12:00:50 info sparkenv: registering blockmanagermaster 15/07/27 12:00:50 info diskblockmanager: created local directory @ /mnt/pd1/hadoop/spark/tmp/spark-dbaf72cb-599b-40c9-a9f8-ad9ede2b0654/blockmgr-24fd090a-b9df-4754-8022-ccaf8800ca2a 15/07/27 12:00:50 info memorystore: memorystore started capacity 1566.8 mb 15/07/27 12:00:50 info httpfileserver: http file server directory /mnt/pd1/hadoop/spark/tmp/spark-dbaf72cb-599b-40c9-a9f8-ad9ede2b0654/httpd-27e69b24-ad3d-4019-9bf7-37649c2ebc8e 15/07/27 12:00:50 info httpserver: starting http server 15/07/27 12:00:50 info utils: started service 'http file server' on port 57505. 15/07/27 12:00:50 info sparkenv: registering outputcommitcoordinator 15/07/27 12:00:56 info utils: started service 'sparkui' on port 4040. 15/07/27 12:00:56 info sparkui: started sparkui @ http://10.240.63.109:4040 15/07/27 12:00:56 info appclient$clientactor: connecting master akka.tcp://sparkmaster@research-m:7077/user/master... 15/07/27 12:00:57 info sparkdeployschedulerbackend: connected spark cluster app id app-20150727120057-0000 15/07/27 12:00:57 info appclient$clientactor: executor added: app-20150727120057-0000/0 on worker-20150727114108-10.240.205.199-50284 (10.240.205.199:50284) 2 cores 15/07/27 12:00:57 info sparkdeployschedulerbackend: granted executor id app-20150727120057-0000/0 on hostport 10.240.205.199:50284 2 cores, 10.2 gb ram 15/07/27 12:00:57 info appclient$clientactor: executor updated: app-20150727120057-0000/0 running 15/07/27 12:00:57 info appclient$clientactor: executor updated: app-20150727120057-0000/0 loading 15/07/27 12:00:57 info utils: started service 'org.apache.spark.network.netty.nettyblocktransferservice' on port 38947. 15/07/27 12:00:57 info nettyblocktransferservice: server created on 38947 15/07/27 12:00:57 info blockmanagermaster: trying register blockmanager 15/07/27 12:00:57 info blockmanagermasterendpoint: registering block manager 10.240.63.109:38947 1566.8 mb ram, blockmanagerid(driver, 10.240.63.109, 38947) 15/07/27 12:00:57 info blockmanagermaster: registered blockmanager 15/07/27 12:00:57 info sparkdeployschedulerbackend: schedulerbackend ready scheduling beginning after reached minregisteredresourcesratio: 0.0 15/07/27 12:00:57 info deprecation: fs.default.name deprecated. instead, use fs.defaultfs 15/07/27 12:00:58 info memorystore: ensurefreespace(112832) called curmem=0, maxmem=1642919362 15/07/27 12:00:58 info memorystore: block broadcast_0 stored values in memory (estimated size 110.2 kb, free 1566.7 mb) 15/07/27 12:00:58 info memorystore: ensurefreespace(10627) called curmem=112832, maxmem=1642919362 15/07/27 12:00:58 info memorystore: block broadcast_0_piece0 stored bytes in memory (estimated size 10.4 kb, free 1566.7 mb) 15/07/27 12:00:58 info blockmanagerinfo: added broadcast_0_piece0 in memory on 10.240.63.109:38947 (size: 10.4 kb, free: 1566.8 mb) 15/07/27 12:00:58 info sparkcontext: created broadcast 0 textfile @ sparkbasictest.scala:36 15/07/27 12:00:58 info googlehadoopfilesystembase: ghfs version: 1.4.0-hadoop1 15/07/27 12:01:00 info sparkdeployschedulerbackend: registered executor: akkarpcendpointref(actor[akka.tcp://sparkexecutor@10.240.205.199:54716/user/executor#396919943]) id 0 15/07/27 12:01:00 info blockmanagermasterendpoint: registering block manager 10.240.205.199:36835 5.3 gb ram, blockmanagerid(0, 10.240.205.199, 36835) 15/07/27 12:01:02 info fileinputformat: total input paths process : 47 15/07/27 12:01:02 info sparkcontext: starting job: count @ sparkbasictest.scala:37 15/07/27 12:01:02 info dagscheduler: got job 0 (count @ sparkbasictest.scala:37) 47 output partitions (allowlocal=false) 15/07/27 12:01:02 info dagscheduler: final stage: resultstage 0(count @ sparkbasictest.scala:37) 15/07/27 12:01:02 info dagscheduler: parents of final stage: list() 15/07/27 12:01:02 info dagscheduler: missing parents: list() 15/07/27 12:01:02 info dagscheduler: submitting resultstage 0 (mappartitionsrdd[1] @ textfile @ sparkbasictest.scala:36), has no missing parents 15/07/27 12:01:02 info memorystore: ensurefreespace(2968) called curmem=123459, maxmem=1642919362 15/07/27 12:01:02 info memorystore: block broadcast_1 stored values in memory (estimated size 2.9 kb, free 1566.7 mb) 15/07/27 12:01:02 info memorystore: ensurefreespace(1752) called curmem=126427, maxmem=1642919362 15/07/27 12:01:02 info memorystore: block broadcast_1_piece0 stored bytes in memory (estimated size 1752.0 b, free 1566.7 mb) 15/07/27 12:01:02 info blockmanagerinfo: added broadcast_1_piece0 in memory on 10.240.63.109:38947 (size: 1752.0 b, free: 1566.8 mb) 15/07/27 12:01:02 info sparkcontext: created broadcast 1 broadcast @ dagscheduler.scala:874 15/07/27 12:01:02 info dagscheduler: submitting 47 missing tasks resultstage 0 (mappartitionsrdd[1] @ textfile @ sparkbasictest.scala:36) 15/07/27 12:01:02 info taskschedulerimpl: adding task set 0.0 47 tasks 15/07/27 12:01:03 info tasksetmanager: starting task 0.0 in stage 0.0 (tid 0, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: starting task 1.0 in stage 0.0 (tid 1, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: starting task 2.0 in stage 0.0 (tid 2, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 warn tasksetmanager: lost task 1.0 in stage 0.0 (tid 1, 10.240.205.199): java.io.eofexception     @ java.io.objectinputstream$blockdatainputstream.readfully(objectinputstream.java:2744)     @ java.io.objectinputstream.readfully(objectinputstream.java:1032)     @ org.apache.hadoop.io.dataoutputbuffer$buffer.write(dataoutputbuffer.java:63)     @ org.apache.hadoop.io.dataoutputbuffer.write(dataoutputbuffer.java:101)     @ org.apache.hadoop.io.utf8.readchars(utf8.java:216)     @ org.apache.hadoop.io.utf8.readstring(utf8.java:208)     @ org.apache.hadoop.mapred.filesplit.readfields(filesplit.java:87)     @ org.apache.hadoop.io.objectwritable.readobject(objectwritable.java:237)     @ org.apache.hadoop.io.objectwritable.readfields(objectwritable.java:66)     @ org.apache.spark.serializablewritable$$anonfun$readobject$1.apply$mcv$sp(serializablewritable.scala:45)     @ org.apache.spark.util.utils$.tryorioexception(utils.scala:1239)     @ org.apache.spark.serializablewritable.readobject(serializablewritable.scala:41)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:606)     @ java.io.objectstreamclass.invokereadobject(objectstreamclass.java:1017)     @ java.io.objectinputstream.readserialdata(objectinputstream.java:1893)     @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1798)     @ java.io.objectinputstream.readobject0(objectinputstream.java:1350)     @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:1990)     @ java.io.objectinputstream.readserialdata(objectinputstream.java:1915)     @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1798)     @ java.io.objectinputstream.readobject0(objectinputstream.java:1350)     @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:1990)     @ java.io.objectinputstream.readserialdata(objectinputstream.java:1915)     @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1798)     @ java.io.objectinputstream.readobject0(objectinputstream.java:1350)     @ java.io.objectinputstream.readobject(objectinputstream.java:370)     @ org.apache.spark.serializer.javadeserializationstream.readobject(javaserializer.scala:69)     @ org.apache.spark.serializer.javaserializerinstance.deserialize(javaserializer.scala:95)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:194)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:745)  15/07/27 12:01:03 info tasksetmanager: starting task 1.1 in stage 0.0 (tid 3, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: starting task 3.0 in stage 0.0 (tid 4, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: lost task 0.0 in stage 0.0 (tid 0) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 1] 15/07/27 12:01:03 info tasksetmanager: lost task 2.0 in stage 0.0 (tid 2) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 2] 15/07/27 12:01:03 info tasksetmanager: starting task 2.1 in stage 0.0 (tid 5, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: starting task 0.1 in stage 0.0 (tid 6, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: lost task 3.0 in stage 0.0 (tid 4) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 3] 15/07/27 12:01:03 info tasksetmanager: lost task 1.1 in stage 0.0 (tid 3) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 4] 15/07/27 12:01:03 info tasksetmanager: starting task 1.2 in stage 0.0 (tid 7, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: lost task 2.1 in stage 0.0 (tid 5) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 5] 15/07/27 12:01:03 info tasksetmanager: starting task 2.2 in stage 0.0 (tid 8, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: lost task 0.1 in stage 0.0 (tid 6) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 6] 15/07/27 12:01:03 info tasksetmanager: starting task 0.2 in stage 0.0 (tid 9, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: lost task 1.2 in stage 0.0 (tid 7) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 7] 15/07/27 12:01:03 info tasksetmanager: starting task 1.3 in stage 0.0 (tid 10, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: lost task 2.2 in stage 0.0 (tid 8) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 8] 15/07/27 12:01:03 info tasksetmanager: starting task 2.3 in stage 0.0 (tid 11, 10.240.205.199, process_local, 1416 bytes) 15/07/27 12:01:03 info tasksetmanager: lost task 0.2 in stage 0.0 (tid 9) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 9] 15/07/27 12:01:03 info tasksetmanager: lost task 1.3 in stage 0.0 (tid 10) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 10] 15/07/27 12:01:03 error tasksetmanager: task 1 in stage 0.0 failed 4 times; aborting job 15/07/27 12:01:03 info tasksetmanager: lost task 2.3 in stage 0.0 (tid 11) on executor 10.240.205.199: java.io.eofexception (null) [duplicate 11] 15/07/27 12:01:03 info taskschedulerimpl: removed taskset 0.0, tasks have completed, pool  15/07/27 12:01:03 info taskschedulerimpl: cancelling stage 0 15/07/27 12:01:03 info dagscheduler: resultstage 0 (count @ sparkbasictest.scala:37) failed in 0.319 s 15/07/27 12:01:03 info dagscheduler: job 0 failed: count @ sparkbasictest.scala:37, took 0.437413 s exception in thread "main" org.apache.spark.sparkexception: job aborted due stage failure: task 1 in stage 0.0 failed 4 times, recent failure: lost task 1.3 in stage 0.0 (tid 10, 10.240.205.199): java.io.eofexception     @ java.io.objectinputstream$blockdatainputstream.readfully(objectinputstream.java:2744)     @ java.io.objectinputstream.readfully(objectinputstream.java:1032)     @ org.apache.hadoop.io.dataoutputbuffer$buffer.write(dataoutputbuffer.java:63)     @ org.apache.hadoop.io.dataoutputbuffer.write(dataoutputbuffer.java:101)     @ org.apache.hadoop.io.utf8.readchars(utf8.java:216)     @ org.apache.hadoop.io.utf8.readstring(utf8.java:208)     @ org.apache.hadoop.mapred.filesplit.readfields(filesplit.java:87)     @ org.apache.hadoop.io.objectwritable.readobject(objectwritable.java:237)     @ org.apache.hadoop.io.objectwritable.readfields(objectwritable.java:66)     @ org.apache.spark.serializablewritable$$anonfun$readobject$1.apply$mcv$sp(serializablewritable.scala:45)     @ org.apache.spark.util.utils$.tryorioexception(utils.scala:1239)     @ org.apache.spark.serializablewritable.readobject(serializablewritable.scala:41)     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:606)     @ java.io.objectstreamclass.invokereadobject(objectstreamclass.java:1017)     @ java.io.objectinputstream.readserialdata(objectinputstream.java:1893)     @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1798)     @ java.io.objectinputstream.readobject0(objectinputstream.java:1350)     @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:1990)     @ java.io.objectinputstream.readserialdata(objectinputstream.java:1915)     @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1798)     @ java.io.objectinputstream.readobject0(objectinputstream.java:1350)     @ java.io.objectinputstream.defaultreadfields(objectinputstream.java:1990)     @ java.io.objectinputstream.readserialdata(objectinputstream.java:1915)     @ java.io.objectinputstream.readordinaryobject(objectinputstream.java:1798)     @ java.io.objectinputstream.readobject0(objectinputstream.java:1350)     @ java.io.objectinputstream.readobject(objectinputstream.java:370)     @ org.apache.spark.serializer.javadeserializationstream.readobject(javaserializer.scala:69)     @ org.apache.spark.serializer.javaserializerinstance.deserialize(javaserializer.scala:95)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:194)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:745)  driver stacktrace:     @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1266)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1257)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1256)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47)     @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1256)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:730)     @ scala.option.foreach(option.scala:236)     @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:730)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1450)     @ org.apache.spark.scheduler.dagschedulereventprocessloop.onreceive(dagscheduler.scala:1411)     @ org.apache.spark.util.eventloop$$anon$1.run(eventloop.scala:48) 15/07/27 12:01:03 info sparkcontext: invoking stop() shutdown hook 15/07/27 12:01:03 info sparkui: stopped spark web ui @ http://10.240.63.109:4040 15/07/27 12:01:03 info dagscheduler: stopping dagscheduler 15/07/27 12:01:03 info sparkdeployschedulerbackend: shutting down executors 15/07/27 12:01:03 info sparkdeployschedulerbackend: asking each executor shut down 15/07/27 12:01:03 info mapoutputtrackermasterendpoint: mapoutputtrackermasterendpoint stopped! 15/07/27 12:01:03 info utils: path = /mnt/pd1/hadoop/spark/tmp/spark-dbaf72cb-599b-40c9-a9f8-ad9ede2b0654/blockmgr-24fd090a-b9df-4754-8022-ccaf8800ca2a, present root deletion. 15/07/27 12:01:03 info memorystore: memorystore cleared 15/07/27 12:01:03 info blockmanager: blockmanager stopped 15/07/27 12:01:03 info blockmanagermaster: blockmanagermaster stopped 15/07/27 12:01:03 info outputcommitcoordinator$outputcommitcoordinatorendpoint: outputcommitcoordinator stopped! 15/07/27 12:01:03 info remoteactorrefprovider$remotingterminator: shutting down remote daemon. 15/07/27 12:01:03 info remoteactorrefprovider$remotingterminator: remote daemon shut down; proceeding flushing remote transports. 15/07/27 12:01:03 info sparkcontext: stopped sparkcontext 15/07/27 12:01:03 info utils: shutdown hook called 15/07/27 12:01:03 info utils: deleting directory /mnt/pd1/hadoop/spark/tmp/spark-dbaf72cb-599b-40c9-a9f8-ad9ede2b0654  process finished exit code 1 

what missing? in advance !

one possibility somehow have mismatch of hadoop versions on classpaths. in particular, if use spark's prebuilt tarball built hadoop 2 run on cluster has hadoop 1 installed, may hit error encountered. note stack trace indicates errors when trying "readobject", means trying deserialize class; if class definitions differ between classloaders, can happen.

i set few different intellij installations on different bdutil-deployed spark clusters, , encountered same stack trace saw when tried running cluster has spark-1.4.0-bin-hadoop2.6.tgz intellij library , driver, submitting node using spark-1.4.0-bin-hadoop1.tgz. here's related stack overflow question running on ec2 , here's manifestation wasn't mismatch, requirement add hadoop-client library classpath.


Comments

Popular posts from this blog

yii2 - Yii 2 Running a Cron in the basic template -

asp.net - 'System.Web.HttpContext' does not contain a definition for 'GetOwinContext' Mystery -

mercurial graft feature, can it copy? -