Solving Data Problem with StreamSets Transformer: Part-1

Rishi Jain
4 min readSep 4, 2020

--

Recently I took a spark scala course over one of the online education platforms. And during that course, we have to solve the data challenges by writing small scala programs. I thought of extending that task and decided to solve the same problem with the different method including StreamSets Transformer.

Challange:1 You are given a fake friend dataset of social networking platforms in CSV file format as below and you have to perform the various task.

Note: You can find all the resources in below git repo

git clone https://github.com/rishi871/SparkScala.gitid, Name, Age, Number of Friends
0,Will,33,385
1,Jean-Luc,26,2
2,Hugh,55,221
3,Deanna,40,465
4,Quark,68,21
5,Weyoun,59,318
6,Gowron,37,220
7,Will,54,307
  • Find the average number of friends for each age and sort them in ascending order.
  • Find the user between age 13–19 and other fun tasks

The traditional RDD way :

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
/** Compute the average number of friends by age in a social network. */
object FriendsByAge {

/** A function that splits a line of input into (age, numFriends) tuples. */
def parseLine(line: String) = {
// Split by commas
val fields = line.split(",")
// Extract the age and numFriends fields, and convert to integers
val age = fields(2).toInt
val numFriends = fields(3).toInt
// Create a tuple that is our result.
(age, numFriends)
}

/** Our main function where the action happens */
def main(args: Array[String]) {

// Set the log level to only print errors
Logger.getLogger("org").setLevel(Level.ERROR)

// Create a SparkContext using every core of the local machine
val sc = new SparkContext("local[*]", "FriendsByAge")

// Load each line of the source data into an RDD
val lines = sc.textFile("/Users/rishijain/Documents/SparkScala/fakefriends.csv")

// Use our parseLines function to convert to (age, numFriends) tuples
val rdd = lines.map(parseLine)

// Lots going on here...
// We are starting with an RDD of form (age, numFriends) where age is the KEY and numFriends is the VALUE
// We use mapValues to convert each numFriends value to a tuple of (numFriends, 1)
// Then we use reduceByKey to sum up the total numFriends and total instances for each age, by
// adding together all the numFriends values and 1's respectively.
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))

// So now we have tuples of (age, (totalFriends, totalInstances))
// To compute the average we divide totalFriends / totalInstances for each age.
val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)

// Collect the results from the RDD (This kicks off computing the DAG and actually executes the job)
val results = averagesByAge.collect()

// Sort and print the final results.
results.sorted.foreach(println)
}

}

Spark DataFrame/DataSet Method

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.log4j._
object DataFrames {case class Person(ID:Int, name:String, age:Int, numFriends:Int)def mapper(line:String): Person = {val fields = line.split(',')
val person:Person = Person(fields(0).toInt, fields(1), fields(2).toInt, fields(3).toInt)
return person
}/** Our main function where the action happens */def main(args: Array[String]) {// Set the log level to only print errorsLogger.getLogger("org").setLevel(Level.ERROR)// Use new SparkSession interface in Spark 2.0val spark = SparkSession.builder.appName("SparkSQL").master("local[*]").getOrCreate()// Convert our csv file to a DataSet, using our Person case// class to infer the schema.import spark.implicits._val lines = spark.sparkContext.textFile("../fakefriends.csv")val people = lines.map(mapper).toDS().cache()println("Here is our inferred schema:")people.printSchema()println("Let's select the name column:")people.select("name").show()println("Filter out anyone over 21:")people.filter(people("age") < 21).show()println("Group by age:")people.groupBy("age").count("numFriends").show()println("Make everyone 10 years older:")people.select(people("name"), people("age") + 10).show()spark.stop()}}

The StreamSets Transformer Way ( Let’s code through UI):

Check out my previous article where I explained what is StreamSets Transformer. You can download it from here

Stage 1: File Origin:

Configure a File origin to read data from the local file system. And defined schema manually as it was not available in the data source file.

Stage 2: We will use the Aggregate stage. Which provide various Aggregate function, for this challenge we will use Average Function

And then Sort the result with the Sort processor.

Next, I have used the Repartition processor: As we know spark split the data into partitions and performing operations on the partitions in parallel. As I wanted to combine the result and dump to one file I set this to 1

Note: The Repartition processor causes Spark to shuffle the data, redistributing the data so that it’s grouped differently across the partitions, which can be an expensive operation but for our this challenge we are good :)

And last, finally, dump this result back to the JSON file in our local file system

$cat /tmp/out/part-00000–7f15255d-cc63–4687-b4aa-ba7bd4ff97bf-c000.json | less{“Age”:18,”AvgFriendsByAge”:343.375}
{“Age”:19,”AvgFriendsByAge”:213.27272727272728}
{“Age”:20,”AvgFriendsByAge”:165.0}
{“Age”:21,”AvgFriendsByAge”:350.875}
{“Age”:22,”AvgFriendsByAge”:206.42857142857142}
{“Age”:23,”AvgFriendsByAge”:246.3}
{“Age”:24,”AvgFriendsByAge”:233.8}
{“Age”:25,”AvgFriendsByAge”:197.45454545454547}
{“Age”:26,”AvgFriendsByAge”:242.05882352941177}
{“Age”:27,”AvgFriendsByAge”:228.125}

Conclusion:

We just reviewed the different ways to solve the same problem. Please let me know if you have any other approach as well. In our next blog will try to solve the other problem.

--

--

Rishi Jain
Rishi Jain

Written by Rishi Jain

Software Support Engineer @StreamSets | Hadoop | DataOps | RHCA | Ex-RedHatter | Ex-Cloudera

No responses yet