top of page
BlogPageTop

Understanding SparkContext textFile & parallelize method


In this blog you will learn,

  • How Spark reads text file or any other external dataset.

  • Referencing a dataset (SparkContext's textfile), SparkContext parallelize method and spark dataset textFile method.

As we read in previous post, Apache Spark has mainly three types of objects or you can say data structures (also called Spark APIs) - RDDs, dataframe and datasets. RDD was the primary API when Apache Spark was founded.


 

RDD - Resilient Distributed Dataset

  • Consider you have collection of 100 words and you distribute them across 10 partitions so that each partition has 10 words (more or less). Each partition has a backup so that it can be recovered in case of failure (resilient).

  • Now, this seems very generic. In practical environment data will be distributed in a cluster with thousand of nodes (with backup nodes), and if you want to access the data you need to apply Spark actions which you will learn soon. This type of immutable distributed collection of elements is called RDD.


Dataframes

  • This has also similar distribution of elements like RDD but in this case, data is organized into a structure, like a table of relational database. Consider you have distributed collection of [row] type object, like a record distributed across thousand of nodes. You will get more clear picture when we will create dataframe, so don't worry.


Datasets

  • Dataset was introduced in late 2016. Do you remember case class which you created in "Just enough Scala for Spark"? Dataset is like the collection of strongly typed such objects, like the following case class Order which has 2 attributes orderNum (Int) and orderItem (String).



It was the introduction, so even if you don't understand, thats's fine. You will get more clear picture with practical examples.



Question is.. Which data structure you should implement?


It totally depends on the business use case which data structure you should implement.

  • For instance, Datasets and RDDs are basically used for unstructured data like streams of media texts, when schema and columnar format of data is not mandatory requirement (like accessing data by column name and any other tabular attributes).

  • Also, RRDs are often used when you want full control over physical distribution of data over thousands of nodes in a cluster.

  • Similarly, Dataframes are often used with Spark SQL when you have structured data and you need schema and columnar format of data maintained throughout the process.

  • Datasets are also used in such scenario where you have unstructured or semi-structured data and you want to run Spark SQL.


That being said, we have mainly following methods to load data in Spark.

  1. SparkContext's textfile method which results into RDD.

  2. SparkContext's parallelize collection, which also results into RDD.

  3. Spark read textFile method which results into Dataset.

  4. SQLContext read json which results into Dataframe.

  5. Spark session read json which results into Dataframe.

  6. You can also create these with parquet files, read parquet method. Similarly there are other methods, it's difficult to list all of them but these examples will give you a picture how you can create them.



1. SparkContext textfile [spark.rdd family]


Text file RDDs can be created using SparkContext's textfile method. Define SparkConf and SparkContext like we did in earlier post and use SparkContext to read the textfile. I have created a sample text file with text data regarding - Where is Mount Everest? Got the answer from Wikipedia.

scala> val dataFile = sc.textFile("/Users/Rajput/Documents/testdata/MountEverest.txt")

dataFile: org.apache.spark.rdd.RDD[String] = /Users/Rajput/Documents/testdata/MountEverest.txt MapPartitionsRDD[1] at textFile at <console>:27



  • File has 9 lines and you can see the first line in above screenshot.

  • Further, you can count the number of words in the file by splitting the text (with space character) and applying count() action. You will learn about transformations like flatMap and action count soon, so don't worry.

scala> dataFile.flatMap(line => line.split(" ")).count()

res4: Long = 544

  • Right now the motive is to tell - how you read text file with textFile member of SparkContext family. The resultant is an RDD.


Important notes:

  • We can use wildcards characters to read multiple files together ("/file/path/*.txt).

  • It can read compressed files (*.gz), files from HDFS, Amazon S3, Hbase etc.



2. SparkContext parallelize collection [spark.rdd family]


This method is used to distribute the collection of same type of elements (in an array, list etc). This distributed dataset can be operated in parallel.


// Parallelizing list of strings

scala> val distData = sc.parallelize(List("apple","orange","banana","grapes"))

distData: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:27


// 4 total elements

scala> distData.count()

res5: Long = 4


or like these,

scala> sc.parallelize(Array("Hello Dataneb! How are you?"))

res3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:25


scala> sc.parallelize(Array("Hello","Spark","Dataneb","Apache"))

res4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:25

scala> sc.parallelize(List(1 to 10))

res6: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[2] at parallelize at <console>:25

scala> sc.parallelize(1 to 10)

res7: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:25


scala> sc.parallelize(1 to 10 by 2)

res8: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:25


You can also see the size of partitions,


scala> res8.partitions.size

res13: Int = 4



3. Read text file to create Dataset [spark.sql family]


You can create dataset from a text file or any other file system like HDFS. Here, you can use default spark session which gets created when you start spark-shell.


// creating dataset

scala> val distDataset = spark.read.textFile("/Users/Rajput/Documents/testdata/MountEverest.txt")

distDataset: org.apache.spark.sql.Dataset[String] = [value: string]


// 9 lines

scala> distDataset.count()

res0: Long = 9


// 544 total word count

scala> distDataset.flatMap(line => line.split(" ")).count()

res2: Long = 544


// 5 Lines with Everest

scala> distDataset.filter(line => line.contains("Everest")).count()

res3: Long = 5


Here is the shell screenshot;




4. SQLContext read json to create Dataframe [spark.sql family]


You can create dataframes with SQLContext. SQLContext is a type of class in Spark which is like entry point for Spark SQL.


// you need to import sql library to create SQLContext

scala> import org.apache.spark.sql._

import org.apache.spark.sql._


// telling Spark to use same configuration as Spark context

scala> val sqlContext = new SQLContext(sc)

sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@40eb85e9


My json file looks like this,

[ { "color": "red", "value": "#f00" }, { "color": "green", "value": "#0f0" }, { "color": "blue", "value": "#00f" }, { "color": "cyan", "value": "#0ff" }, { "color": "magenta", "value": "#f0f" }, { "color": "yellow", "value": "#ff0" }, { "color": "black", "value": "#000" } ]


// creating dataframe

scala> val df = sqlContext.read.json("/Volumes/MYLAB/testdata/multilinecolors.json")

df: org.apache.spark.sql.DataFrame = [color: string, value: string]


// printing schema of dataframe, like a table

scala> df.printSchema()

root

|-- color: string (nullable = true)

|-- value: string (nullable = true)


// storing this dataframe into temp table

scala> df.registerTempTable("tmpTable")


// retrieving data

scala> sqlContext.sql("select * from tmpTable").show()

+-------+-----+

| color|value|

+-------+-----+

| red| #f00|

| green| #0f0|

| blue| #00f|

| cyan| #0ff|

|magenta| #f0f|

| yellow| #ff0|

| black| #000|

+-------+-----+



5. Spark Session to create dataframe [spark.sql family]


You can also create dataframe from default spark session which is created when you start the spark-shell. Refer spark-shell blog.


scala> spark

res14: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@6c9fe061


scala> spark.read.json("/Volumes/MYLAB/testdata/multilinecolors.json")

res16: org.apache.spark.sql.DataFrame = [color: string, value: string]


scala> res16.show()

+-------+-----+

| color|value|

+-------+-----+

| red| #f00|

| green| #0f0|

| blue| #00f|

| cyan| #0ff|

|magenta| #f0f|

| yellow| #ff0|

| black| #000|

+-------+-----+


scala> res16.printSchema()

root

|-- color: string (nullable = true)

|-- value: string (nullable = true)


scala> res16.select("color").show()

+-------+

| color|

+-------+

| red|

| green|

| blue|

| cyan|

|magenta|

| yellow|

| black|

+-------+


scala> res16.filter($"color"==="blue").show()

+-----+-----+

|color|value|

+-----+-----+

| blue| #00f|

+-----+-----+



You can also convert dataframe back to JSON like this,

scala> res16.toJSON.show(false)

+----------------------------------+

|value |

+----------------------------------+

|{"color":"red","value":"#f00"} |

|{"color":"green","value":"#0f0"} |

|{"color":"blue","value":"#00f"} |

|{"color":"cyan","value":"#0ff"} |

|{"color":"magenta","value":"#f0f"}|

|{"color":"yellow","value":"#ff0"} |

|{"color":"black","value":"#000"} |

+----------------------------------+


You can also create dataframes from parquet, text files etc. You will learn this soon.


That's all guys! If you have any question or suggestion please write in comments section below. Thank you folks.



Navigation menu

1. Apache Spark and Scala Installation

2. Getting Familiar with Scala IDE

3. Spark data structure basics

4. Spark Shell

5. Reading data files in Spark

6. Writing data files in Spark

7. Spark streaming

תגובות


Want to share your thoughts about this blog?

Disclaimer: Please note that the information provided on this website is for general informational purposes only and should not be taken as legal advice. Dataneb is a platform for individuals to share their personal experiences with visa and immigration processes, and their views and opinions may not necessarily reflect those of the website owners or administrators. 

 

While we strive to keep the information up-to-date and accurate, we make no representations or warranties of any kind, express or implied, about the completeness, accuracy, reliability, suitability, or availability with respect to the website or the information, products, services, or related graphics contained on the website for any purpose. Any reliance you place on such information is therefore strictly at your own risk. 

 

We strongly advise that you consult with a qualified immigration attorney or official government agencies for any specific questions or concerns related to your individual situation. We are not responsible for any losses, damages, or legal disputes arising from the use of information provided on this website. 

 

By using this website, you acknowledge and agree to the above disclaimer and Google's Terms of Use and Privacy Policy.

bottom of page