Skip to content
This repository has been archived by the owner on Jul 20, 2022. It is now read-only.

aws-blog-spark-parquet-conversion: java.lang.ClassCastException when tranforming JSON into parquet #89

Open
fabioptoi opened this issue Sep 5, 2017 · 4 comments

Comments

@fabioptoi
Copy link

Hello,

I was trying to use the same script to transform GZIP JSON data into parquet, but I encountered the following error:

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

I'm using the EMR configurations (instance types, number, version, ...) as shown in this example. I created my hive table using ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'. An example of the DDL scripts to create and alter my table is:

createtable1.hql

DROP TABLE my_events;
CREATE EXTERNAL TABLE IF NOT EXISTS my_events (text string,number int)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
 LOCATION 's3://bucket/json/';

addpartitions1.py

ALTER TABLE my_events ADD PARTITION (year=2017, month=8, day=1, hour=0) location 's3://bucket/json/2017/08/01/00';

In order for spark to be able to read my table, I had to do the following modifications on the configuration file in my master node (adding the SERDE to my both driver and executor's classpath):

  1. Edit /etc/spark/conf/spark-defaults.conf on EMR master node
  2. Add /usr/lib/hive-hcatalog/share/hcatalog/* to spark.driver.extraClassPath and spark.executor.extraClassPath property

The python script is practically the same. I only added a few type casting to some columns in my dataframe, before calling the write2parquet function, like:

rdf = rdf.withColumn('columnName', rdf['columnName'].cast(DoubleType()))

Nevertheless, I tried without those type castings and still got the same error. The following is the script used without type casting:

convert2parquet1.py

from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import SparkSession

def write2parquet(start):
    df = rdf.filter((rdf.day >= start) & (rdf.day <= start + 10))
    df.repartition(*partitionby).write.partitionBy(partitionby).mode("append").parquet(output, compression=codec)

partitionby = ['year', 'month', 'day', 'hour']
output = '/user/hadoop/myevents_pq'
codec = 'snappy'
hivetablename = 'default.my_events'

spark = SparkSession.builder.appName("Convert2Parquet").enableHiveSupport().getOrCreate()
rdf = spark.table(hivetablename)

futures = []
pool = ThreadPoolExecutor(1)

for i in [1]:
    futures.append(pool.submit(write2parquet, i))
for x in as_completed(futures):
    pass

spark.stop()

Is there something I'm missing here?

@fabioptoi fabioptoi changed the title java.lang.ClassCastException when tranforming JSON into parquet aws-blog-spark-parquet-conversion: java.lang.ClassCastException when tranforming JSON into parquet Sep 5, 2017
@alexwbai
Copy link

alexwbai commented Mar 4, 2018

Hey fabioptoi, did you ever get anywhere with this?

@fabioptoi
Copy link
Author

Hey @alexwbai ! No, actually I ended up not running ETL for my files now since it was taking too much time to resolve problems with EMR/Spark (like the one in this Issue) and ended up prioritizing other things. Is there anything you can help with this issue?

@alexwbai
Copy link

Hey fabioptoi! No I've basically run into the same issue as you. I've successfully converted a days worth of data to parquet from json but when i go for a whole month, the spark job stalls on stage two. I had opened a case with AWS and they are pointing me to the error you had described above (which I had been ignoring until this point since I had success before). Still trying to figure it out... If I find anything I'll update here..

@alexwbai
Copy link

alexwbai commented Mar 26, 2018

Hey @fabioptoi ok I've gotten this to work. It's been a journey of pain (but a great learning experience!). My goal here was to convert a month's worth of GZIP JSON data to SNAPPY PARQUET (similar to you I think). I had about 190GB of JSON data broken up into about 400k small files partitioned by day.

I'll break this up in the errors I ran into and then how I got around them:

  1. Errors about the JsonSerDe not being found when running in a cluster. Alternative to your solution above, I ran my spark-submit referencing the JAR they were looking for: --jars /usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core-2.3.2-amzn-1.jar . Both solutions worked for me.

  2. For some reason which I still haven't figured out, Spark was looking for a "ArrayList" column although I never declared that anywhere. It was throwing a ClassCastException error similar to the one you describe above but still completing the parquet transformation on a test of a day's worth of data successfully. I had opened a ticket with AWS and they confirmed to me to ignore that as it wasn't affecting my job. I ran a line count on the parquet vs and json and it matched up and had no issues after spot-checking the parquet.

  3. Job started running finally but then crashes midway through with a Java Heap Out of Memory Error. Did some digging and found out the driver was running out of memory. I added this flag to my spark-submit which fixed that error: --driver-memory 8G

  4. Errors like running out of "overhead memory" or 'lost node' errors on the second stage of my job: For me this stuff all came down to needing to configure my spark job correctly and I went through a long trial and error phase. I believe unless you are doing the exact same job as this github project outlines, you'll need to fine tune your job from their default 85 executors, 5 gigs of ram settings. I found this BRILLIANT site that has a great excel cheat sheet to help you create a job that maximizes parallelization in your job. If I found this earlier on I'm quite confident I would have saved 10 hours of my life:

http://c2fo.io/c2fo/spark/aws/emr/2016/07/06/apache-spark-config-cheatsheet/

Below are the final settings I used on my job. I used 3 x r3.8xlarge instances with an m4-2xlarge master (overkill..?) and put the job process in the background incase my session got disconnected:

Deployment config parameters in the EMR console:
classification=yarn-site,properties=[yarn.nodemanager.vmem-check-enabled=false]

Spark-submit settings:
spark-submit \ --jars /usr/lib/hive-hcatalog/share/hcatalog/hive-hcatalog-core-2.3.2-amzn-1.jar \ --master yarn \ --deploy-mode cluster \ --num-executors 120 \ --executor-memory 5G \ --conf spark.yarn.driver.memoryOverhead=4096 \ --driver-memory 27G \ --executor-cores 1 \ --driver-cores 5 \ --conf spark.default.parallelism=240 \ mypythonscript.py &

This completed on my data in about 1.3 hours and I've since used s3-dist-cp to put it back in S3 and have loaded it into Redshift Spectrum and Athena successfully.

Good luck!
AlexW

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants