An Intermediate Algebra for
Optimizing RDF Graph
Pattern Matching on MapReduce
Padmashree Ravindra, HyeongSik Kim, Kemafor Anyanwu
COUL – Semantic COmpUting
research Lab
Outline
2/30
 Introduction
 Background
 MapReduce, Pig and Join Processing
 RDF Graph Pattern Matching in Pig
 Approach
 TripleGroup data model and Nested TripleGroup
Algebra (NTGA)
 Comparing NTGA based plans and Pig Latin plans for
graph pattern matching queries
 Evaluation
 Related Work
 Conclusion and Future Work
Basics: MapReduce
3/30
 Large scale processing of data on a cluster of
commodity grade machines
 Users encode task as map / reduce functions,
which are executed in parallel across the cluster
 Apache Hadoop* – open-source implementation
 Key Terms
 Hadoop Distributed File System (HDFS)
 Slave nodes / Task Tracker – Mappers (Reducers) execute
the map (reduce) function
 Master node / Job Tracker – manages and assigns
tasks to Mappers / Reducers
* http://hadoop.apache.org/
Data Processing on Hadoop
4/30
Input
HDFS Reads
Map exec
Disk
Disk
(k1, v1)
Disk
(k1, v2)
Local Writes
(k1, v3)
Remote
Reads
(k1, {v1, v2, v3})
Reduce exec
(k1, val)
HDFS
Writes
Output
Each
Supports
MR cycle
Partition
 I/O
Parallelism
and communication costs
Joins in Map Reduce
Single Join Workload
 Map phase – scan input records
 map func. annotates each record based
on join column e.g. (joinKey, Record)
 Reduce phase – records with same joinKey
collected by same reduce task
 reduce func. joins the tuples
 Output written into HDFS
5/30
Data Processing in Pig
6/30
 Express data flow using high-level query
primitives
usability, code reuse, automatic optimization
Pig Latin
Data model : atom, tuple, bag (nesting)
Operators : LOAD, STORE, JOIN, GROUP BY, COGROUP,
FOREACH, SPLIT, aggr. functions
• Ex.Equijoin on REL A (column 0) and REL B (column 1)
JOIN A by $0, B by $1;
Extensibility support via UDFs
 Dataflow is compiled into a workflow of
MapReduce jobs
Example Pig Query Plan
7/30
A = LOAD
B = LOAD
SELECT ?vlabel ?hpage ?price
Input.rdf
Input.rdf
?prod
WHERE
FILTER
C = LOAD
FILTER(label)
(homepage)
Input.rdf
{ ?v homepage ?hpage .
?v label ?vlabel.
T1 = JOIN A ON Sub,
FILTER(country)
MR1
?v country ?vcountry .
B ON Sub;
?o vendor ?v .
C ON Sub,
?o price ?price .
MR2 T2 = JOINT1
H= LOAD
ON Sub;
?o delDays ?delDays .
Input.rdf
?o product ?prod .}
…….
MR6
#MR cycles = #Joins = 6
(I/O & communication costs) * 6
Loads have I/Os as well
Expensive!!!
(SPLIT Operator)*
FILTER(product)
T3 = JOIN H ON Sub,
T7 ON Sub;
STORE
8/30
Possible Optimizations : m-way Join
Input
MR1
map
JOIN
SJ1
Disk
MR2
reduce
map
JOIN
SJ2
HDFS
Disk
JOIN
J1
reduce
Disk
SJ1
J1
SJ2
reduce
map
SELECT ?vlabel ?hpage ?price
?prod
WHERE
{ ?v homepage ?hpage .
?v label ?vlabel.
Join ?v country ?vcountry .
between ?o vendor ?v .
Stars ?o price ?price .
?o delDays ?delDays .
?o product ?prod .}
MR3
#MR cycles reduced from
6 to 3
BUT ?
9/30
Still expensive! I MR cycle/star-join
Many pattern matching queries involve
multiple star join subpatterns
 50% of BSBM* benchmark queries have two or more
star patterns
Our proposal:
Coalesce the computation of ALL star-join
subpatterns into a single MR cycle
How?
Don’t think of them as a set of joins!
Think of it as a GROUP BY operation
GROUPBY Subject
Sub
Prop
Obj
&V1
&V1
&V1
type
label
country
Vendor
Vendor1
US
&V1
homepage
www.ven...
&Offer1
&Offer1
&Offer1
&Offer1
&Offer1
type
vendor
product
price
delDays
Offer
&V1
&P1
108
2
&Offer1
validToDate
01/01/2011
&Offer1
validFromDate
08/01/2011
&Rev1
&Rev1
&Rev1
type
reviewFor
rating1
Review
&P1
9
&Rev1
reviewer
&R1
10/30
WHERE
{ ?v homepage ?hpage .
?v label ?vlabel.
?v country ?vcountry .
?o vendor ?v .
?o price ?price .
?o delDays ?delDays .
?o product ?prod .}
1 MapReduce Cycle!!!
What are we proposing?
11/30
A new data model (TripleGroup) and
algebra (Nested TripleGroup Algebra NTGA) for more efficient graph pattern
matching on MapReduce platforms
Outline
12/30
 Introduction
 Background
 MapReduce, Pig and Join Processing
 RDF Graph Pattern Matching in Pig
 Approach
 TripleGroup data model and Nested TripleGroup
Algebra (NTGA)
 Comparing NTGA based plans and Pig Latin plans for
graph pattern matching queries
 Evaluation
 Related Work
 Conclusion and Future Work
Our Approach : RAPID+
13/30
 Goal : Minimize I/O and communication costs
by reducing MR cycles
Reinterpret and refactor operations into a more
suitable (coalesced) set of operators – NTGA
 Foundation:
 Re-interpret multiple star-joins as a
grouping operation
 leads to “groups of Triples” (TripleGroups)
instead of n-tuples
 different structure BUT “content equivalent”
 NTGA- algebra on TripleGroups
NTGA – Data Model
14/30
 Data model based on nested TripleGroups
 More naturally capture graphs
{(&Offer1, price, 108),
 TripleGroup –
(&Offer1, vendor, &V1),
(&Offer1, product, &P1),
groups of triples sharing
(&Offer1, delDays, 2)}
Subject / Object component
 Can be nested at the Object component
{(&Offer1, price, 108),
(&Offer1, vendor, {(&V1, label, vendor1),
(&V1, country, US),
(&V1, homepage, www.vendors….)}
(&Offer1, product, &P1),
(&Offer1, delDays, 2)}
NTGA Operators…(1)
15/30
 TG_Flatten – generate equivalent n-tuple
(&V1, label, vendor1),
(&V1, country, US),
(&V1, homepage, www.ven...)}
“Content
Equivalence”
TG_Flatten
(&V1, label, vendor1, &V1, country, US, &V1, homepage, www.ven...)
t1
t2
t3
 TG_Unnest – unnest a nested TripleGroup
{(&Offer1, price, 108),
(&Offer1, vendor,{(&V1, label, vendor1),
(&V1, country, US),
(&V1, homepage, www.ven..)}
(&Offer1, product, &P1),
(&Offer1, delDays, 2)}
TG_Unnest
{(&Offer1, price, 108),
(&Offer1, vendor, &V1),
(&V1, label, vendor1),
(&V1, country, US),
(&V1, homepage, www.ven..)}
(&Offer1, product, &P1),
(&Offer1, delDays, 2)}
NTGA Operators…(2)
16/30
 TG_GroupFilter – retain only TripleGroups
that satisfy the required query sub structure
 Structure-based filtering
TG
{ (&V1, label, vendor1),
(&V1, country, US),
(&V1, homepage, www.ven..) },
TG{price, vendor, delDays, product}
TG_GroupFilter
(TG, {price, vendor, delDays, product})
{ (&Offer1, price, 108),
(&Offer1, vendor, &V1),
(&Offer1, product, &P1),
(&Offer1, delDays, 2) } ,
{ (&Offer2, vendor, &V2),
(&Offer2, product, &P3),
(&Offer2, delDays, 1) } }
{ (&Offer1, price, 108),
(&Offer1, vendor, &V1),
(&Offer1, product, &P1),
(&Offer1, delDays, 2) }
Eliminate TripleGroups
with missing triples (edges)
NTGA Operators…(3)
17/30
 TG_Filter – filter out triples that do not satisfy
the filter condition (FILTER clause)
 Value-based filtering
TG{price, vendor, delDays, product}
{ (&Offer1, price, 108),
(&Offer1, vendor, &V1),
(&Offer1, product, &P1),
(&Offer1, delDays, 2) } ,
{ (&Offer3, vendor, &V2),
(&Offer3, product, &P3),
(&Offer3, price, 306),
(&Offer3, delDays, 1) } }
TG{price, vendor, delDays, product}
TG_Filterprice<200(TG)
{ (&Offer1, price, 108),
(&Offer1, vendor, &V1),
(&Offer1, product, &P1),
(&Offer1, delDays, 2) }
Eliminate TripleGroups with
triples that do not satisfy
filter condition
NTGA Operators…(4)
18/30
 TG_Join – join between different structure
TripleGroups based on join triple patterns
TG{price, vendor, delDays, product}
TG{label, country, homepage}
{ (&Offer1, price, 108),
(&Offer1, vendor, &V1),
(&Offer1, product, &P1),
(&Offer1, delDays, 2) }
(&V1, label, vendor1),
(&V1, country, US),
(&V1, homepage, ww.ven...)}
?o vendor ?v
?v country ?vcountry
TG_Join
{(&Offer1, price, 108),
(&Offer1, vendor, {(&V1, label, vendor1),
(&V1, country, US),
(&V1, homepage, www.ven..)}
(&Offer1, product, &P1),
(&Offer1, delDays, 2)}
Pattern Matching using NTGA in Pig
Subject
&V1
&V1
&V1
&V1
&Offer1
&Offer1
&Offer1
&Offer1
&Offer1
&Offer1
&Offer1
Property
type
label
country
homepage
type
vendor
product
price
delDays
validToDate
validFromDate
Object
VENDOR
Vendor1
US
www.ven...
OFFER
&v1
&p1
108
2
01/01/2011
08/01/2011
LoadFilter
(load +
TG_Filter)
{(&Offer1, price, 108),
(&Offer1, vendor, {(&V1, label, vendor1),
(&V1, country, US),
(&V1, homepage, www.ven..)}
(&Offer1, product, &P1),
(&Offer1, delDays, 2)}
Subject
&V1
&V1
&V1
&Offer1
&Offer1
&Offer1
&Offer1
Property
label
country
homepage
vendor
product
price
delDays
StarGroupFilter
RDFJoin
(TG_Join)
Object
Vendor1
US
www.ven..
&v1
&p1
108
2
(TG_GroupBy+
TG_GroupFilter)
{ (&V1, label, vendor1),
(&V1, country, US),
(&V1, homepage, www.ven..) },
{ (&Offer1, price, 108),
(&Offer1, vendor, &V1),
(&Offer1, product, &P1),
(&Offer1, delDays, 2) } }
Mapping to Pig Latin/Relational
Algebra
20/30
21/30
RDFMap: Efficient Data Representation
Compact representation of intermediate results
during TripleGroup based processing
Efficient look-up of triples matching a given
Property type via property-based indexing scheme
Ability to represent structure-label information
for groups of triples.
Outline
22/30
 Introduction
 Background
 MapReduce, Pig and Join Processing
 RDF Graph Pattern Matching in Pig
 Approach
 TripleGroup data model and Nested TripleGroup
Algebra (NTGA)
 Comparing NTGA based plans and Pig Latin plans for
graph pattern matching queries
 Evaluation
 Related Work
 Conclusion and Future Work
Evaluation
23/30
 Setup: 5-node to 25-node Hadoop clusters
on NCSU’s Virtual Computing Lab*
 Dataset: Synthetic benchmark dataset
generated using BSBM** tool
(max. 40GB data – approx. 175 million triples)
 Evaluation of Pig (Pig_opt) vs. RAPID+
 Task 1 – Scalability with size of RDF graphs
 Task 2 – Scalability with denser star patterns
 Task 3 – Scalability with increasing cluster sizes
*https://vcl.ncsu.edu
**http://www4.wiwiss.fu-berlin.de/bizer/BerlinSPARQLBenchmark/spec/
Experimental Results…(1)
24/30
Cost Analysis across Increasing size of RDF graphs (5-node)
Key Observations:
Benefit of TripleGroup based processing seen across data sizes – up
to 60% in some cases (RAPID+ << Pig_opt < Pig)
Pig approaches did not complete for large data size
Experimental Results…(2)
Cost Analysis across Increasing Star
Query
#Triple
Density
(5-node / 20GB)
Key Observations:
 RAPID+ maintains a consistent %gain
of 50% across the varying density
 Costs savings by eliminating redundant
Subject values and join triples
25/30
%gain
Patterns
#Edges in
Stars
Q1
3
1:2
56.8
Q2
4
2:2
46.7
Q3
5
2:3
47.8
Q4
6
3:3
51.6
Q5
7
3:4
57.4
Q6
8
4:4
58.4
Q7
9
5:4
58.6
Q8
10
6:4
57.3
Q9*
6
2:4
65.4
Q10*
10
2:4:4
61.5
%gain of RAPID+ over Pig
(10-node / 32GB)
Experimental Results…(3)
26/30
Cost Analysis across Increasing Cluster Sizes
Query pattern with three star-joins
and two chain-joins (32GB)
Key Observations:
 RAPID+ has 56% gain for 10node cluster over Pig approaches
 Pig approaches catch up with
increasing cluster size
 Increasing nodes decrease
probability of disk spills with
the SPLIT approach
 RAPID+ still maintains 45%
gain across the experiments
And some Updates…
27/30
 Additional evaluation –
 Up to 65% performance gain on another synthetic
benchmark dataset* for three/two star-join queries
 Experiments extended to 1 billion 3-ary triples (43GB) –
31% (10-node) to 41% (30-node) performance gain
RAPID+ now includes a SPARQL interface
In Future: Cost-based optimizations to select
Pig vs. NTGA execution plans
Join us for a demo of [email protected]**
*Pavlo, A.,Paulson, E., Rasin, A., Abadi, D.J., DeWitt, D.J., Madden, S., Stonebraker, M : A Comparison of
Approaches to Large-scale Data Analysis. In Proc. Of the 35th SIGMOD International Conference on
Management of data (2009)
**Kim, H., Ravindra, P., Anyanwu, K : From SPARQL to MapReduce: The Journey
using a Nested TripleGroup Algebra. To appear In: Proc. International Conference on
Very Large Data Bases. (VLDB 2011)
Related Work
28/30
MapReduce-based Processing
High-level
Dataflow
Languages Indexing
Pig Latin
[Olston08],
[HiveQL]
[JAQL]
[Husain10]*,
Hadoop++
[Dittrich10],
HadoopDB
[Abadi09]
Partitioning
Schemes
Other
Rule-based Extensions
[Newman08] *
MapOptimizations
[Hunter08]*
[Afrati10]
Reasoning
[Urbani07] *
ReduceMerge
[Yang07]
Conclusion
29/30
TripleGroup based processing for
evaluating pattern matching queries on
MapReduce platforms
NTGA Operators re-factored to minimize
#MR cycles  minimize costs
 Reduce costs of repeated data handling via
operator coalescing
Efficient data representation (RDFMap)
References
30/30
[Dean04] Dean, J., Ghemawat, S.: Mapreduce: simplified data processing on large clusters. Commun. ACM 51 (2008) 107–
113
[Olston08] Olston, C., Reed, B., Srivastava, U., Kumar, R., Tomkins, A.: Pig latin: a not-so-foreign language for data
processing. In: Proc. International Conference on Management of data. (2008)
[Abadi09] Abouzied, A., Bajda-Pawlikowski, K., Huang, J., Abadi, D.J., Silberschatz, A.: Hadoopdb in action: building
real world applications. In: Proc. International Conference on Management of data. (2010)
[Sridhar09] Sridhar, R., Ravindra, P., Anyanwu, K.:RAPID: Enabling scalable ad-hoc analytics on the semantic web.
ISWC 2009
[Yu08] Yu,Y., Isard, M., Fetterly,D., Badiu,M ., Erlingsson,U., Gunda,P.K. , and Currey,J.: DryadLINQ: A system
for generalpurpose distributed data-parallel computing using a high-level language. OSDI 2008
[Newman08] Newman, A., Li, Y.F., Hunter, J.: Scalable semantics: The silver lining of cloud computing. In: eScience.
IEEE International Conference on. (2008)
[Hunter08] Newman, A., Hunter, J., Li, Y., Bouton, C., Davis, M.: A scale-out rdf molecule store for distributed processing
of biomedical data. In: Semantic Web for Health Care and Life Sciences Workshop. (2008)
[Urbani07] Urbani, J., Kotoulas, S., Oren, E., Harmelen, F.: Scalable distributed reasoning using mapreduce. In: Proc.
International Semantic Web Conference. (2009)
[Abadi07] Abadi, D.J., Marcus, A., Madden, S.R., Hollenbach, K.: Scalable Semantic Web Data Management Using
Vertical Partitioning. VLDB 2007
[Dittrich10] Dittrich, J., Quiane-Ruiz, J., Jindal, A., Kargin, Y., Setty, V., Schad, J.: Hadoop++: Making a Yellow Elephant
Run Like a Cheetah (Without It Even Noticing). VLDB 2010/PVLDB
[Yang07] Yang, H., Dasdan, A., Hsiao, R., Parker Jr., D.S.: Map-reduce-merge: simplified relational data processing on
large clusters. SIGMOD 2007
[Afrati10] Afrati, F.N., Ullman, J.D.: Optimizing joins in a map-reduce environment. In: Proc. International Conference on
Extending Database Technology. (2010)
[Husain10] Husain, M., Khan, L., Kantarcioglu, M., Thuraisingham, B.: Data intensive query processing for large rdf
graphs using cloud computing tools. In: Cloud Computing (CLOUD), IEEE International Conference on. (2010)
[HiveQL] http://hadoop.apache.org/hive/
[JAQL], http://code.google.com/p/jaql
Thank You!
Environment
Node Specifications
 Single / duo core Intel X86
 2.33 GHz processor speed
 4G memory
 Red Hat Linux
Pig 0.5.0
Hadoop 0.20
 Block size 256MB
Benchmark Data*
Log files of HTTP server traffic
Column-delimited text file
Rankings:
pageRank | PageURL | avgDuration
UserVisits:
sourceIPAddr | destinationURL | visitDate | adRevenue |
UserAgent | cCode | lCode | sKeyword | avgTimeOnSite
*Pavlo, A.,Paulson, E., Rasin, A., Abadi, D.J., DeWitt, D.J., Madden, S.,
Stonebraker, M : A Comparison of Approaches to Large-scale Data Analysis. In
Proc. Of the 35th SIGMOD International Conference on Management of data (2009)
Scripts (Q1)
A = load '/data/' using PigStorage(' ');
A1 = filter A by $1 eq 'pageRank' or $1 eq 'pageURL' or $1 eq 'destURL'
or $1 eq 'srcIP' or $1 eq 'adRevenue' or ($1 eq 'type' and ($2 eq
'Ranking' or $2 eq 'UserVisits'));
B = group A1 by $0 PARALLEL 5;
C = foreach B generate
flatten(ReassembleRDF($1,'pageURL|destURL','1'));
D = group C by $0 PARALLEL 5;
E = foreach D generate flatten(ReassembleRDF($1,'srcIP|','2')) as
(srcIP:chararray, vals:bytearray);
store E into '/q1_app1';
Scripts (Q1)
A1 = load '/data/' using PigStorage(' ');
split A1 into pageRank IF $1 eq 'pageRank',
srcIP IF $1 eq 'srcIP‘, pageURL IF $1 eq 'pageURL',
destURL IF $1 eq 'destURL‘, adRevenue IF $1 eq 'adRevenue',
typeRanking IF $1 eq 'type' and $2 eq 'Ranking',
typeUV IF $1 eq 'type' and $2 eq 'UserVisits';
Ranking = join pageURL by $0, pageRank by $0, typeRanking by $0
PARALLEL 5;
UserVisits = join srcIP by $0, destURL by $0, adRevenue by $0, typeUV
by $0 PARALLEL 5;
C1 = join Ranking by $2, UserVisits by $5 PARALLEL 5;
D1 = foreach C1 generate $11, $17, $5;
store D1 into '/q1_app2';
Experiment Results
Percentage Performance Gain
= (exec time 1) – (exec time 2)
(exec time 1)
Possible Optimizations (2)
 Coalesce join operations into as few MR
cycles as possible
Compute star patterns via m-way JOIN
 Star-join using m-way JOIN = 1 MR cycle
 Reduced #MR cycles  Reduced I/O +
communication costs
Structured Data Processing in Pig
UserVisits
srcIP
destURL
visitDate
158.112.27.3
url1
1979/12/12
339.08142 ….
158.112.27.3
url5
1979/12/15
180.334 ….
150.121.18.6
url1
1979/12/28
550.7889 ….
…
…
…
Ranking
pageRank
…
pageURL
avgDur
11
url1
96
23
url2
3
18
url3
87
…
adRevenue
…
…
…
LOAD
UserVisits
LOAD
Ranking
FILTER(visitDate)
JOIN UserVisits ON destURL,
Ranking ON pageURL;
STORE
…
Query: Retrieve the pageRank and adRevenue of pages visited by
particular users between “1979/12/01”
and “1979/12/30”
JOIN: Pig Latin  MapReduce
UserVisits
url1
url2
url1
Ranking
srcIP
destURL
visitDate
adRev
…
158.112.27.3
url1
1979/12/12
339.081
…
158.112.27.3
url2
1979/12/15
180.334
…
150.121.18.6
url1
1979/12/28
550.78
…
pageRank
url1
url2
pageURL
avgDur
11
url1
96
23
url2
3
map
Annotate based on
JOIN
join
key
UserVisits ON destURL,
reduce
Ranking ON
pageURL;
Package tuples
Reducer 1
url1
158.112.27.3
url1
url1
11
…
150.121.18.6
url1
url1
11
…
pageRank
url2
Reducer 2
158.112.27.3
url2
…
srcIP
destURL
pageURL
…
…
158.112.27.3
url1
url1
339.081
…
…
150.121.18.6
url1
url1
550.78
…
…
158.112.27.3
url2
url2
180.334
…
url2
3
…
RDF Data Model
(Resource Description Framework)
Statements (triples)
Sub
Prop
Subject
Prop
&R1
type
(&UV1
srcIP
&R1
pageRank
Obj
Object
Ranking
158.112.27.3)
11
Ranking
&R1
pageURL
Url1
&R1
avgDuration
97
&UV1 type
UserVisits
&UV1 srcIP
158.112.27.3
&UV1 destURL
url1
&UV1 adRevenue
339.08142
&UV1 visitDate
1979/12/12
&UV1 userAgent
SCOPE
&UV1 cCode
VNM
&UV1 iCode
VNM-KH
&UV1 sKeyword
comets
&UV1 avgTime
3
UserVisits
Graph representation
Example SPARQL Query
Sub
Prop
Obj
&V1
&V1
&V1
&V1
&Offer1
&Offer1
&Offer1
&Offer1
&Offer1
type
label
country
homepage
type
vendor
product
price
delDays
Vendor
Vendor1
US
www.ven...
Offer
&V1
&P1
108
2
&Offer1
validToDate
01/01/2011
&Offer1
&Rev1
&Rev1
&Rev1
validFromDate
type
reviewFor
rating1
08/01/2011
Review
&P1
9
&Rev1
reviewer
&R1
Data: Description of Vendors,
their product Offers, and Reviews
of products (BSBM* dataset)
Query: Retrieve the details of
US-based Vendors
SELECT ?vlabel ?hpage
WHERE {?v type
Vendor .
?v country ?vcountry .
?v label
?vlabel .
?v homepage ?hpage .}
FILTER (?vcountry = “US”);
*http://www4.wiwiss.fu-berlin.de/bizer/BerlinSPARQLBenchmark/
Descargar

www4.ncsu.edu