Beliebte Suchanfragen
//

Map/Reduce with Hadoop and Pig

25.10.2012 | 7 minutes of reading time

Big data. One of the buzz words of the software industry in the last decade. We all heard about it but I am not sure if we actually can comprehend it as we should and as it deserves. It reminds me of the Universe – mankind has knowledge that it is big, huge, vast, but no one can really understand the size of it. Same can be said for the amount of data being collected and processed every day somewhere in the clouds if IT. As Google’s CEO, Eric Schmidt, once said: “There were 5 exabytes of information created by the entire world between the dawn of civilization and 2003. Now that same amount is created every two days.”

Mankind is clearly capable of storing and persisting this hardly imaginable bulk of data, that’s for sure. What impresses me more is that we are able to process it and analyze it in reasonable time.

For those that don’t know what Map/Reduce is, it is a programming model, or framework if you like it more that way, for processing large (seriously large) data sets in distributed manner, using large number of computers, i.e. nodes.
This algorithm consists of two steps – map and reduce. During the mapping phase, master node takes the input, creates smaller sub-problems out of it and distributes those to computers that are actually performing the processing – worker nodes. After the data was processed, it is being sent back to the master node. That is when reduce step begins: master node aggregates all the responses and combines them and creates the answer to the original problem.
Apache Hadoop is very popular free implementation of this framework. Very, very powerful one. Several tools are built on top of it and, thus, provide several ways to approach the problem of processing big data. One of those is Apache Pig – platform for analyzing large data sets. It consist of high level programming language (Pig Latin) for expressing data analysis programs, and its compiler which produces Map/Reduce programs ready to be executed using Apache Hadoop.

I had some experience with Apache Pig and it was good. Pig Latin is not difficult to learn and whole platform provides good tool for the job. But, I wanted to see how it would compare to “native” Map/Reduce job programs written in Java using Apache Hadoop APIs.
For that reason I imagined use-case merely familiar to any of you #sarcasm: I imagined a social network site and put myself in a role of a member. Some of my friends are members too and we are connected. Being embarrassingly popular person, I have many, many friends and connections. Naturally, I don’t want to talk to all of them nor to see what every and each of them is doing. I just want to see those that are important to me. For that reason, system will calculate the weight of my relationships and present me only my heaviest friends.

Interactions between two people can be various:
– viewing profile details – sneak a peak feature on mouse hover over friend’s name, for example
– viewing full profile
– commenting on friend’s status, comment, photo or whatever
– liking friend’s status, comment, photo or whatever
– sending a message to a friend, etc.

Each of those actions would have a certain weight expressed in a form of a number, giving us result – friendship weight, calculated as a sum of all interactions.

For my own purposes, I decided that raw data used as input would be an CSV file containing only basic information: time-stamp of the interaction between two users, username of the source user (he or she caused the interaction), username of the target user, interaction type and interaction weight. Thus, a single interaction record looks like this:


1341147920675,jason.bourne,jane.doe,VIEW_PROFILE,10

Having my input data placed in the proper location in the Hadoop file system, next step would be to run the job that will return sorted list of users (descending by friendship weight) for each user in the input file.

One illustration of simple Map/Reduce job that solves this problem is implemented in Java. Small map function could look like this:

1@Override
2protected void map(LongWritable offset, Text text, Context context) throws IOException, InterruptedException {
3   String[] tokens = text.toString().split(",");
4   String sourceUser = tokens[1];
5   String targetUser = tokens[2];
6   int points = Integer.parseInt(tokens[4]);
7   context.write(new Text(sourceUser), new InteractionWritable(targetUser, points));
8}

It tokenizes each input record and extracts from it users involved in interaction and interaction weight. Those parts of information become the output of the map function and the input for the reduce function which could be something like this:

1@Override
2protected void reduce(Text token, Iterable<InteractionWritable> counts, Context context) throws IOException, InterruptedException {
3   try {
4      Map<Text, IntWritable> interactionGroup = new HashMap<Text, IntWritable>();
5      Iterator<InteractionWritable> i = counts.iterator();
6      while (i.hasNext()) {
7         InteractionWritable interaction = i.next();
8         Text targetUser = new Text(interaction.getTargetUser().toString());
9         int weight = interaction.getPoints().get();
10 
11         IntWritable weightWritable = interactionGroup.get(targetUser);
12         if (weightWritable != null) {
13            weight += weightWritable.get();
14         }
15         interactionGroup.put(targetUser, new IntWritable(weight));
16      }
17 
18      InteractionCollector interactionCollector = new InteractionCollector();
19      Iterator<Entry<Text, IntWritable>> iEntry = interactionGroup.entrySet().iterator();
20      while (iEntry.hasNext()) {
21         Entry<Text, IntWritable> entry = iEntry.next();
22         interactionCollector.addEntry(entry);
23      }
24      List<Entry<Text, IntWritable>> orderedInteractions = interactionCollector.getInteractions();
25      for (Entry<Text, IntWritable> entry : orderedInteractions) {
26         context.write(token, new Text(entry.getKey().toString() + " " + entry.getValue().get()));
27      }
28   } catch (Exception e) {
29      // Of course, do something more sensible.
30      e.printStackTrace();
31   }
32}

What it does is summing up the interaction weight (for each source and target user pair), takes care about ordering and writes out the result. Not too complicated.
On the other hand, pig script doing the same job is even more simple:

1interactionRecords = LOAD '/blog/user_interaction_big.txt' USING PigStorage(',') AS (
2   timestamp: long,
3   sourceUser: chararray,
4   targetUser: chararray,
5   eventType: chararray,
6   eventWeight: int
7);
8 
9interactionData = FOREACH interactionRecords GENERATE
10   sourceUser,
11   targetUser,
12   eventWeight;
13 
14groupedByInteraction = GROUP interactionData BY (sourceUser, targetUser);
15summarizedInteraction = FOREACH groupedByInteraction GENERATE
16   group.sourceUser AS sourceUser,
17   group.targetUser AS targetUser,
18   SUM(interactionData.eventWeight) AS eventWeight;
19 
20result = ORDER summarizedInteraction BY sourceUser, eventWeight DESC;
21 
22DUMP result;

It does the same steps as Java implementation – loads input data, extracts only needed parts, groups it, sums the interaction weight and prints out the result.

There are some obvious pros and cons of each approach. Java implementation is more verbose and demands more coding than implementing a Pig script as it was expected. On the other hand, example given in this article is very, very simple and cannot be used as proper measurement. If use-case was much more complicated, we could easily get into situation where we would really need to think how to design and organize our code. Pig platform allows calling scripts from other scripts, passing the parameters from one script to another and has other useful stuff that could help in that endevour but I don’t think it can handle complicated use cases particularly good. After all, Pig Latin is script language and, at the moment, there is no IDE or text editor that can help in maintaining and refactoring Pig code as well as it might be needed. There are some Eclipse plugins, for instance, but they are far from refactoring feature Eclipse offers for Java code.
Another very interesting thing to point out is performance. Again, I will have to say that results I am presenting here are strictly informational and not to be taken very seriously. I was doing tests in single data node Hadoop cluster installed in virtual machine which is not really a production environment. For one thousand records, Pig script needed more than minute and a half to do the job while Java Map/Reduce class did its part for about ten seconds. When run against much bigger set of data, five millions of records, script finished in two minutes (roughly) comparing to native Map/Reduce time of around forty seconds. Difference between two runs in both approaches was almost equal – around thirty seconds. Obviously, there is a lot of overhead in loading pig platform, preparing it to preprocess and run the script.

The intention of this simple example was to make a comparison between these two solutions, mainly out of the plain curiosity of the author. Besides that, this use-case can show how much “our” data and our behavior can reveal about us. I know I wouldn’t be able to say who is my best friend or with whom I interact the most.

share post

//

More articles in this subject area

Discover exciting further topics and let the codecentric world inspire you.

//

Gemeinsam bessere Projekte umsetzen.

Wir helfen deinem Unternehmen.

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.