Map Reduce & Hadoop
June 3, 2015
HS Oh, HR Lee, JY Choi
YS Lee, SH Choi
Outline
 Part1

Introduction to Hadoop

MapReduce Tutorial with Simple Example

Hadoop v2.0: YARN
 Part2

MapReduce

Hive

Stream Data Processing: Storm

Spark

Up-to-date Trends
2
MapReduce
 Overview
 Task flow
 Shuffle configurables
 Combiner
 Partitioner

Custom Partitioner Example
 Number of Maps and Reduces
 How to write MapReduce functions
3
MapReduce Overview
A
A
B
A
A
A
B
B
B
A
B
B
http://www.micronautomata.com/big_data
4
MapReduce Task flow
http://grepalex.com/2012/09/10/sorting-text-files-with-mapreduce/
5
MapReduce Shuffle Configurables
6
http://grepalex.com/2012/11/26/hadoop-shuffle-configurables/
Combiner
 Mini Reducer
 Functionally same as the reducer
 Performs on each map task(locally), reduces communication cost
 Using combiner when Reduce function is both commutative and associative
7
http://www.kalyanhadooptraining.com/2014_07_01_archive.html
Partitioner
 Divides Map’s output key, value pair by rule
 Default strategy is hashing

HashPartitioner
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
public int getPartition(K2 key, V2 value, int numReduceTasks)
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
8
Custom Partitioner Example
 Input with name, age, sex, and score
 Map outputs divide by range of age
public static class AgePartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numReduceTasks) {
String [] nameAgeScore = value.toString().split("\t");
String age = nameAgeScore[1];
int ageInt = Integer.parseInt(age);
//this is done to avoid performing mod with 0
if(numReduceTasks == 0)
return 0;
//if the age is <20, assign partition 0
if(ageInt <=20){
return 0;
}
//else if the age is between 20 and 50, assign partition 1
if(ageInt >20 && ageInt <=50){
return 1 % numReduceTasks;
}
//otherwise assign partition 2
else
return 2 % numReduceTasks;
}
}
9
http://hadooptutorial.wikispaces.com/Custom+partitioner
Number of Maps and Reduces
 The number of Maps = DFS blocks

To adjust DFS block size to adjust the number of maps

Right level of parallelism for maps → 10~100 maps/node

mapred.map.tasks parameter is just a hint
 The number of Reduces


Suggested values

Set # of reduce tasks a little bit less than # of total slot

A task time between 5 and 15 min

Create the fewest files possible
conf.setNumReduceTasks(int num)
http://wiki.apache.org/hadoop/HowManyMapsAndReduces
10
How to write MapReduce functions [1/2]
 Java Word Count Example
Input part
Output part
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
Input part
Output part
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output,
Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html
11
How to write MapReduce functions [2/2]
 Python Word Count Example
Mapper.py
Reducer.py
#!/usr/bin/python
#!/usr/bin/python
import sys
import sys
for line in sys.stdin:
for word in line.strip().split():
print "%s\t%d" % (word, 1)
current_word = None
current_count = 1
How to excute
bin/Hadoop jar share/Hadoop/tools/lib/Hadoopstreaming-2.4.0.jar
-files /home/hduser/Mapper.py,
/home/hduser/Reducer.py
-mapper /home/hduser/Mapper.py
-reducer /home/hduser/Reducer.py
-input /input/count_of_monte_cristo.txt
-output /output
for line in sys.stdin:
word, count = line.strip().split('\t')
if current_word:
if word == current_word:
current_count += int(count)
else:
print "%s\t%d" % (current_word, current_count)
current_count = 1
current_word = word
if current_count > 1:
print "%s\t%d" % (current_word, current_count)
http://dogdogfish.com/2014/05/19/hadoop-wordcount-in-python/
12
Hadoop Ecosystem
Hive &
Stream Data Processing: Storm
13
The World of Big Data Tools
MapReduce Model
DAG Model
Graph Model
BSP / Collective Model
Hadoop
MPI
HaLoop
Giraph
Twister
For
Iterations /
Learning
Hama
GraphLab
Spark
GraphX
Harp
Flink
Dryad /
DryadLINQ
For
Query
REEF
Pig / PigLatin
Hive
Drill
Tez
SparkSQL(Shark)
MRQL
For
Streaming
S4
Samza
Storm
Spark Streaming
From Bingjing Zhang
14
Hive
 Data warehousing on top of Hadoop
 Designed to


enable easy data summarization

ad-hoc querying

analysis of large volumes of data
HiveQL statements are automatically translated into MapReduce jobs
15
Advantages
 Higher level query language

Simplifies working with large amounts of data
 Lower learning curve than Pig or MapReduce

HiveQL is much closer to SQL than Pig

Less trial and error than Pig
16
Disadvantages
 Updating data is complicated

Mainly because of using HDFS

Can add records

Can overwrite partitions
 No real time access to data

Use other means like HBase or Impala
 High latency
17
Hive Architecture
18
Metastore
19
Compiler
 Parser
 Semantic Analyzer
 Logical Plan Generator
 Query Plan Generator
20
Hive Architecture
21
HiveQL
 While based on SQL, HiveQL does not strictly follow the full SQL-92 stan
dard.
 HiveQL offers extensions not in SQL, including multitable inserts and crea
te table as select, but only offers basic support for indexes.
 HiveQL lacks support for transactions and materialized views, and only li
mited subquery support.
 Support for insert, update, and delete with full ACID functionality was mad
e available with release 0.14.
22
Datatypes in Hive
 Primitive datatypes

TINYINT

SMALLINT

INT

BIGINT

BOOLEAN

FLOAT

DOUBLE

STRING
23
HiveQL – Group By
pv_users
•
pageid_age_sum
pageid
age
pageid
age
Count
1
25
1
25
1
2
25
1
32
1
1
32
1
21
1
2
25
2
25
2
3
27
1
21
3
27
1
…
…
…
…
…
…
…
…
…
…
18570
30
18570
30
1
18570
26
18570
26
1
HiveQL : INSERT INTO TABLE pageid_age_sum SELECT pageid, age, c
ount(1) FROM pv_users GROUP BY pageid, age;
24
HiveQL – Group By in MapReduce
Map
Shuffle
Reduce
age
key
value
key
value
pageid
age
Count
1
25
<1,25>
1
<1,25>
1
1
25
1
2
25
<2,25>
1
<1,32>
1
1
32
1
1
32
<1,32>
1
<1,21>
1
1
21
1
pageid
age
key
value
key
value
2
25
<2,25>
1
<2,25>
1
2
25
2
3
27
<3,27>
1
<2,25>
1
1
21
<1,21>
1
key
value
3
27
1
<3,27>
1
age
key
value
18570
30
<18570,30>
1
18570
26
<18570,26>
key
value
<18570,30>
1
1
<18570,26>
25
…
pageid
…
…
…
pageid
1
18570
30
1
18570
26
1
Stream Data Processing
26
Distributed Stream Processing Engine
 Stream data

Unbounded sequence of event tuples

E.g., sensor data, stock trading data, web traffic data, …
 Since large volume of data flows from many sources, centralized
systems can no longer process in real time.
27
Distributed Stream Processing Engine
 General Stream Processing Model

Stream processing involves processing data before storing.
 c.f.
Batch systems(like Hadoop) provide processing data after
storing.

Processing Element (PE): A processing unit in stream engine

Generally stream processing engine creates a logical network of
stream processing elements(PE) connected in directed acyclic
graph(DAG).
28
Distributed Stream Processing Engine
29
DSPE Systems

Apache Storm (Current release: 0.10)
 Developed by Twitter



Donated to Apache Software Foundation in 2013
Pull based messaging

http://storm.apache.org/
Apache S4 (Current release: 0.6)
 Developed by Yahoo

Donated to Apache Software Foundation in 2011
 S4 stands for Simple Scalable Streaming Systems



Push based messaging
http://incubator.apache.org/s4/
Apache Samza (Current release: 0.9)




Developed by LinkedIn
Donated to Apache Software Foundation in 2013
Messaging using message broker(Kafka)
http://samza.apache.org/
30
Apache Storm
 System Architecture
31
Apache Storm
 Topology

A PE DAG on Storm
 Spout: Starting point of data stream can be listening to HTTP port or pulling
from queue

Bolt: Process incoming stream tuple
 Bolt pulls message from upstream PE.
 Bolts don’t take excessive amount of messages.
 Stream grouping

Shuffle grouping, fields grouping, partial key grouping, all grouping,
global grouping, …
 Message Processing Guarantee

Each PE keeps the output message until downstream PE
processes the message and sends acknowledgement message.
32
Apache Storm: Spouts
Source of streams
33
Apache Storm: Bolts
Tuple
Tuple
Tuple
Tuple
Processes input streams and produces new streams
34
Apache Storm: Topology
Network of spouts and bolts
35
Apache Storm: Task
Spouts and bolts execute as many tasks across the cluster
36
Apache Storm: Stream grouping
 Shuffle grouping: pick a random task
 Fields grouping: consistent hashing on a subset of tuple fields
 All grouping: send to all tasks
 Global grouping: pick task with lowest id
37
Apache Storm
 Supported language

Python, Java, Clojure
 Tutorial
Bolt ‘exclaim1’ appends the string “!!” to its input.
Bolt ‘exclaim2’ appends the string “**” to its input.
38
Apache Storm
exclaim1
word
exclaim2
39
References
1. Apache Hive, https://hive.apache.org/
2. Design - Apache Hive,
https://cwiki.apache.org/confluence/display/Hive/Design
3. Apache Storm, https://storm.apache.org/
40
Fast, Interactive, Language-Integrated Cluster Computing
Spark
41
Motivation
 Most current cluster programming models are based
on acyclic data flow from stable storage to stable
storage
 Benefits of data flow: runtime can decide where to
run tasks and can automatically recover from
failures
Map
Input
Reduce
Output
Map
Reduce
Map
42
Motivation
 Acyclic data flow is inefficient for applications that
repeatedly reuse a working set of data:
 Iterative
algorithms (machine learning, graphs)
 Interactive
data mining tools (R, Excel, Python)
 With such frameworks, apps reload data from stable
storage on each query
43
Solution: Resilient Distributed Datasets(RDDs)
 Allow apps to keep working sets in memory for
efficient reuse
 Retain the attractive properties of MapReduce
 Fault
tolerance, data locality, scalability
 Support a wide range of applications
 Batch,
Query processing, Stream processing,
Graph processing, Machine learning
44
RDD Operations
Transformations
(define a new RDD)
map
filter
sample
groupByKey
reduceByKey
sortByKey
flatMap
union
join
cogroup
cross
mapValues
collect
reduce
count
save
lookupKey
Actions
(return a result to driver
program)
45
Example: Log Mining
 Load error messages from a log into memory, then
interactively search for various patterns
Base RDD
Transformed RDD
lines = sc.textFile(“hdfs://...”)
results
errors = lines.filter(_.startsWith(“ERROR”))
tasks
messages = errors.map(_.split(‘\t’)(2))
Driver
cachedMsgs = messages.cache()
Worker
Block 1
Action
cachedMsgs.filter(_.contains(“foo”)).count
Cache 2
cachedMsgs.filter(_.contains(“bar”)).count
Worker
. . .
Cache 3
Result:
Result:full-text
scaledsearch
to 1 TBofdata
Wikipedia
in 5-7 sec
in <1
sec
(vs(vs
17020sec
secfor
foron-disk
on-diskdata)
data)
46
Worker
Block 3
Block 2
RDD Fault Tolerance
 RDDs maintain lineage information that can be used to
reconstruct lost partitions
messages = textFile(...).filter(_.startsWith(“ERROR”))
.map(_.split(‘\t’)(2))
HDFS File
Filtered RDD
filter
(func = _.contains(...))
Mapped RDD
map
(func = _.split(...))
47
Performance
 Logistic Regression
https://databricks.com/blog/2014/03/20/apache-spark-a-delight-for-developers.html
48
Fault Recovery
 Run K-means on 75-node cluster
 Each iteration consists of 400 tasks working on 100GB data
 RDD is reconstructed by using lineage

Recovery overhead: 24s (≈ 30%)

Lineage graph: ≤10KB
49
Matei et al, Resilient Distributed Datasets, NSDI `12
Generality
 Various type of applications can be built atop RDD
 Can be combined in single application and run on
Spark Runtime
50
http://spark.apache.org
Interactive Analytics
 Interactive shell is provided

Program returns the result directly

Run ad-hoc queries
51
Demo
 WordCount in Scala API
 Show the result on the shell

counts.saveAsTextFile() → counts.collect()
52
Conclusion
 Performance

Fast due to caching data in memory
 Fault-tolerance

Fast recovery by using lineage history
 Programmability

Multiple languages support

Simple & Integrated programming model
53
Up-to-date Trends
54
Up-to-date Trends
 Batch + Real-time Analytics
 Big-Data-as-a-Service
55
Trend1: Batch + Real-time Analytics
 Lambda Architecture
1.
Data
Dispatched
to both the
batch layer and the
speed layer
2.
Batch layer
Manage
the master
dataset (an immutable,
append-only set of raw
data)
Pre-compute
the batch
views.
56
Trend1: Batch + Real-time Analytics
 Lambda Architecture
3.
Serving layer
Index
the batch views
Can
be queried in lowlatency, ad-hoc way.
4.
Speed layer
Deals
with recent data
only (serving layer’s
update cost is high)
5.
Merge results from batch
views and real-time views
when answering queries.
57
Trend2: Big-Data-as-a-Service
 Big-Data-as-a-Service
 Big
data analytics systems are provided as Cloud
service
Programming
API & Monitoring interface
Infrastructure
can also be provided as a service
 No
worry for distributing data, resource
optimization, resource provision, etc.
Users
can focus on the data itself
58
Trend2: Big-Data-as-a-service
 Google Cloud Dataflow
<Programming API>
<Monitoring UI>
59
References
1. Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin
Ma, Murphy McCauley, Michael J. Franklin, Scott Shenker, Ion Stoica,
Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory
Cluster Computing, NSDI`12
2. Apache Spark, http://spark.apache.org
3. Databricks, http://www.databricks.com
4. Lambda Architecture, http://lambda-architecture.net
5. Google Cloud Dataflow http://cloud.google.com/dataflow
60
Questions?
Thank you
61
Descargar

4190.203 Systems Programming