MapReduce/Hadoop solution to secondary sort


This section provides a complete MapReduce implementation of the secondary sort problem using the Hadoop framework. Input The input will be a set of files, where each record (line) will have the following format: Format: <year><,><month><,><day><,><temperature> Example: 2012, 01, 01, 35 2011, 12, 23, -4 Expected output The expected output will have the following format: Format: <year><-><month>: <temperature1><,><temperature2><,> … where temperature1 <= temperature2 <= … Example: 2012-01: 5, 10, 35, 45, … 2001-11: 40, 46, 47, 48, … 2005-08: 38, 50, 52, 70, …   map() function The map() function parses and tokenizes the input and then injects the value (temper ature) into the reducer key, as shown in Example 4. Example 4. map() for secondary sorting image010 image011 reduce() function The reducer’s primary function is to concatenate the values (which are already sorted through the Secondary Sort design pattern) and emit them as output. The reduce() function is given in Example 5. Example 5. reduce() for secondary sorting image012 image013 Hadoop implementation classes The classes shown in Table 1 are used to solve the problem. Table 1. Classes used in MapReduce/Hadoop solution image014 How is the value injected into the key? The first comparator (the DateTemperature Pair.compareTo() method) controls the sort order of the keys, while thesecond comparator (the method) controls which keys are grouped together into a single call to thereduce() method. The combination of these two comparators allows one to set up jobs that act like one has defined an order for the values. The SecondarySortDriver is the driver class, which registers the custom plug-in classes (DateTemperaturePartitioner andDateTemperatureGroupingComparator) with the MapReduce/Hadoop framework. This driver class is presented in Example 6. Example 6. SecondarySortDriver class image015 image016 Sample run of Hadoop implementation Input image017 HDFS input image019 The script image020 Log of sample run image021 image022 Inspecting the output image023 How to sort in ascending or descending order You can easily control the sorting order of the values (ascending or descending) by using the DateTemperaturePair.compareTo() method as follows: image024 Spark solution in secondary sort To solve a secondary sorting problem in Spark, we have at least two options: Option #1 Read and buffer all of the values for a given key in an Array or List data structure and then do an in-reducer sort on the values. Air Max 2015 Zwart Oranje Goedkoop This solution works if one has a small set of values (which will fit in memory) per reducer key. Option #2 Use the Spark framework for sorting the reducer values (this option does not require in-reducer sorting of values passed to the reducer). This approach involves “creating a composite key by adding a part of, or the entire value to, the natural key to achieve your sorting objectives.” This option always scales (because you are not limited by the memory of a commodity server). Time series as input To demonstrate secondary sorting, let’s use time series data: image025 image026 Expected output Our expected output is as follows. Nike Darwin Goedkoop Note that the values of reducers are grouped by name and sorted by time: name t1 t2 t3 t4 t5 … x => [3, 9, 6] y => [7, 5, 1] z => [4, 8, 7, 0] p => [9, 6, 7, 0, 3]   Option 1: Secondary sorting in memory Since Spark has a very powerful and high-level API, I will present the entire solution in a single Java class. Air Max 2015 Dames Goedkoop The Spark API is built upon the basic abstractionconcept of the RDD (resilient distributed data set). To fully utilize Spark’s API, we have to under‐ stand RDDs. Nike Air Max 2014 Dame An RDD<T> (i.e., an RDD of type T) object represents an immutable, partitioned collection of elements (of type T) that can be operated on in parallel. Nike Flyknit Air Max Dame The RDD<T> class contains the basic MapReduceoperations available on all RDDs, such as map(), filter(), and persist(), while the JavaPairRDD<K,V> class contains MapReduce operations such as mapToPair(),flatMapToPair(), and groupByKey(). In addition, Spark’s PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as reduce(),groupByKey(), and join()., JavaRDD<T> is a list of objects of type T, and JavaPairRDD<K,V> is a list of objects of type Tuple2<K,V> (where each tuplerepresents a key-value pair). The Spark-based algorithm is listed next. Although there are 10 steps, most of them are trivial and some are provided for debugging purposes only:

  1. We import the required Java/Spark classes. The main Java classes for MapReduce are given in the package. This packageincludes the following classes and interfaces:
  • JavaRDDLike (interface)
  • JavaDoubleRDD
  • JavaPairRDD
  • JavaRDD
  • JavaSparkContext
  • StorageLevels
  1. We pass input data as arguments and validate.
  2. We connect to the Spark master by creating a JavaSparkContext object, which is used to create new RDDs.
  3. Using the context object (created in step 3), we create an RDD for the input file; the resulting RDD will be a JavaRDD<String>. Each element of this RDDwill be a record of time series data: <name><,><time><,><value>.
  4. Next we want to create key-value pairs from a JavaRDD<String>, where the key is the name and the value is a pair of (time, value). The resulting RDD willbe a JavaPairRDD<String, Tuple2<Integer, Integer>>.
  5. To validate step 5, we collect all values from the JavaPairRDD<> and print them.
  6. We group JavaPairRDD<> elements by the key (name). To accomplish this, we use

the groupByKey() method. The result will be the RDD: JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> Note that the resulting list (Iterable<Tuple2<Integer, Integer>>) is unsorted. In general, Spark’s reduceByKey() is preferred over groupByKey() forperformance reasons, but here we have no other option than groupByKey() (since reduceByKey() does not allow us to sort the values in place for a given key).

  1. To validate step 7, we collect all values from the JavaPairRDD<String, Itera ble<Tuple2<Integer, Integer>>> and print them.
  2. We sort the reducer’s values to get the final output. We accomplish this by writing a custom mapValues() method. We just sort the values (the key remainsthe same).
  3. To validate the final result, we collect all values from the sorted JavaPairRDD<> and print them.

A solution for option #1 is implemented by a single driver class: SecondarySorting (see Example 7). All steps, 1–10, are listed inside the class definition, which will be presented in the following sections. Typically, a Spark application consists of a driver program that runs the user’s main() function and executes various parallel operations on a cluster. Parallel operations will be achieved through the extensive use of RDDs. Example 7. SecondarySort class overall structure image027 Step 1: import required classes As shown in Example 8, the main Spark package for the Java API is, which includes the JavaRDD, JavaPairRDD, andJavaS parkContext classes. JavaSparkContext is a factory class for creating new RDDs (such as JavaRDD and JavaPairRDD objects). Example 8. Step 1: Import required classes image028 Step 2: Read input parameters This step, demonstrated in Example 9, reads the HDFS input file (Spark may read data from HDFS and other persistent stores, such as a Linux filesystem), which might look like /dir1/dir2/myfile.txt.   Example 9. Step 2: Read input parameters image029 Step 3: Connect to the Spark master To work with RDDs, first you need to create a JavaSparkContext object (as shown in Example 10), which is a factory for creating JavaRDD and JavaPairRDDobjects. It is also possible to create a JavaSparkContext object by injecting a SparkConf object into the JavaSparkContext’s class constructor. This approach is useful when you read your cluster configurations from an XML file. In a nutshell, the JavaSparkContext object has the following responsibilities:

  • Initializes the application driver.
  • Registers the application driver to the cluster manager. (If you are using the Spark cluster, then this will be the Spark master; if you are using YARN, then it will be YARN’s resource manager.)
  • Obtains a list of executors for executing your application driver.

Example 10. Step 3: Connect to the Spark master image030 Step 4: Use the JavaSparkContext to create a JavaRDD This step, illustrated in Example 11, reads an HDFS file and creates a JavaRDD<String> (which represents a set of records where each record is a String object). By definition, Spark’s RDDs are immutable (i.e., they cannot be altered or modified). Note that Spark’s RDDs are the basic abstraction for parallel execution.Note also that one may use textFile() to read HDFS or non-HDFS files. Example 11. Step 4: Create JavaRDD image031 Step 5: Create key-value pairs from the JavaRDD This step, shown in Example 12, implements a mapper. Each record (from the JavaRDD<String> and consisting of <name><,><time><,><value>) is converted to a key-value pair, where the key is a name and the value is a Tuple2(time, value).   Example 12. Step 5: Create key-value pairs from JavaRDD image032 Step 6: Validate step 5 To debug and validate the steps in Spark (as shown in Example 13), one may use JavaRDD.collect() and JavaPairRDD.collect(). Note that collect() is used fordebugging and educational purposes (but avoid using collect() for debugging purposes in production clusters; doing so will impact performance). Also, one mayuse JavaRDD.saveAsTextFile() for debugging as well as creating the desired outputs. Example 13. Step 6: Validate step5 image033 Step 7: Group JavaPairRDD elements by the key (name) We implement the reducer operation using groupByKey(). As it is in Example 14, it is much easier to implement the reducer through Spark thanMapReduce/Hadoop. Note that in Spark, in general, reduceByKey() is more efficient than groupByKey(). Here, however, we cannot use reduceByKey(). Example 14. Step 7: Group JavaPairRDD elements image034 Step 8: Validate step 7 This step, shown in Example 15, validates the previous step by using the collect() function, which gets all values from the groups RDD. Example 15. Step 8: Validate step 7 image035 The following shows the output of this step. As one can see, the reducer values are not sorted: y 2,5 1,7 3,1 ===== x 2,9 1,3 3,6 ===== z 1,4 2,8 3,7 4,0 ===== p 2,6 4,7 6,0 7,3 1,9 ===== Step 9: Sort the reducer’s value in memory This step, shown in Example 16, uses another powerful Spark method, mapVal ues(), to just sort the values generated by reducers. The mapValues() methodenables us to convert (K, V1) into (K, V2), where V2 is a sorted V1. One important note about Spark’s RDD is that it is immutable and cannot be altered/updatedby any means. For example, in this step, to sort our values, we have to copy them into another list first. Immutability applies to the RDD itself and itselements. Example 16. Step 9: sort the reducer’s values in memory image036 image037 Step 10: Output final result The collect() method collects all of the RDD’s elements into a java.util.List object. Then we iterate through the List to get all the final elements (see Example 17).

Share on FacebookShare on LinkedInShare on Google+Tweet about this on TwitterShare on VK


High Five! You just read 2 awesome articles, in row. You may want to subscribe to our blog newsletter for new blog posts.