Monday, February 17, 2014

Coding for Anagram in Map Reduce, Hive and Pig

Anagram:
Given a list of strings, the task is to find the two or more strings having same list of characters but in different order.
For example, ACT is an Anagram to CAT.
Note: If the list of words contains same word repeated again, then those are considered as Anagrams. For example, if ACT occurs twice in the list, then first ACT will be anagram to second ACT.

Map Reduce:
Assuming Text input file:
Map input           -  key is LongWritable, value is Text.
Map output         -  key is Text, value is Text.
Reduce output    -   key is Text, value is Text
Map task:
Map  takes each word from list and then sorts the characters in ascending in that word. Sends this sorted word as key and original word as value.

public class Anagrammap extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        String outvaltemp = value.toString();
        char[] chars = outvaltemp.toCharArray();
        Arrays.sort(chars);
        String outval = new String(chars);
        context.write(new Text(outval),value);
    }
}

Reduce task:
Reduce task takes the sorted chars of word as key and all words with same sorted chars as list of values. Then it concatenates all the values with comma between and produces final output.
public class Anagramreduce extends Reducer<Text    , Text, Text, Text> {
protected void reduce(Text key, Iterable<Text> value,Context context)
            throws IOException, InterruptedException {
        int count = 0;
        String temp;
        String temp1 = "";
        while(value.iterator().hasNext())
        {
            temp = value.iterator().next().toString();
            count++;
       
            temp1 = temp1 + temp + ",";
       
        }
       
        if (count > 1)
        {
            context.write(key, new Text(temp1));
        }
}
}

You can modify this code as not to return key. Instead u can return a NullWritable object.


Anagram in Hive:
Input is a list of words one per each line. First create a table called angword with a single column - word(String).
Create Table if not exists angword (word String)
---
--
hive> select wordsjnd from
    > (select srt as srt1, WordsJoin(word) as wordsjnd from
    > (select wordsort(word) as srt, word from angword ) wrd
    > group by srt
    > having count(srt) > 1) wrd2;

In this query, the inside most part --
select wordsort(word) as srt, word from angword   -- will  first query the table angword and selects word which is sorted on its characters as srt and then the actual word itself. For this a hive UDF wordsort has been used.
Next the middle query - select srt as srt1, WordsJoin(word) as wordsjnd from XXXXXXX group by srt   having count(srt) > 1
This query  groups the output of inner query on sorted word srt. And then an aggregation function WordsJoin is performed on the actual words that fall under a group. This function will join the words with a comma in between. Then only those groups with count > 1 is selected. Count =1 implies there are no anagrams for that word.
Outer query - select wordsjnd from XXXXXX - selects only the joined words leaving the sorted chars of those words.

UDF - wordsort:
This function takes a String as input, collects individual characters of that String, then put those characters in sorted order and then returns those sorted characters as a String.
public class WordSort extends UDF{
    private String unsrt_word = "";
public String evaluate(String word)
    {
        unsrt_word = word;
        char[] chars = unsrt_word.toCharArray();
        Arrays.sort(chars);
        String srt_word = new String(chars);
        return srt_word;
    }

}

UDAF - WordsJoin:
This takes a group of Strings as its input and joins them with a comma in between and return that entire output as a single string.
public final class WordsJoin extends UDAF {
public static class Word2join {
        private String word;
    }
public static class WordsJoinEvaluator implements UDAFEvaluator {

        Word2join w2j;

        public WordsJoinEvaluator() {
            super();
            w2j = new Word2join();
            init();
        }
        public void init() {
            w2j.word = "";
        }
        public boolean iterate(String w) {
            if (w != null) {
                if (w2j.word == "")
                    w2j.word = w;
                else
                    w2j.word = w2j.word + ", " + w;
            }
            return true;
        }
        public Word2join terminatePartial() {
            // This is SQL standard - average of zero items should be null.
            return w2j;
        }
        public boolean merge(Word2join o) {
            if (o != null) {
                if (w2j.word == "")
                    w2j.word = o.word;
                else
                    w2j.word = w2j.word + ", " + o.word;
            }
            return true;
        }
        public String terminate() {
            // This is SQL standard - average of zero items should be null.
            return w2j.word;
        }
    }

}



Anagram in Pig:
Below script can be used to generate Anagrams from an input that contains list of Strings - one per line.

inpfile = load load '/file/location/file' as (word:chararray);(word:chararray);
inpsrted = foreach inpfile generate WordSort(word) as srt, word;
grped = group inpsrted by srt;
aftrgrp = foreach grped generate group, COUNT(inpsrted) as count1, WordsJoin(inpsrted) as jndword;
angrms = filter aftrgrp by count1 > 1;
anagrams = foreach angrms generate jndword;

Explanation
inpfile = load '/file/location/file' as (word:chararray);
This loads the file to inpfile.

inpsrted = foreach inpfile generate WordSort(word) as srt, word;
For each of the input word, its sorted order is generated and stored as srt.
WordSort is Pig user defined function:
public class WordSort extends EvalFunc<String>{

    @Override
    public String exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0) {
            return null;
        }
        String unsrt_word = (String)input.get(0);
        char[] chars = unsrt_word.toCharArray();
        Arrays.sort(chars);
        String srt_word = new String(chars);
        return srt_word;
    }

}


grped = group inpsrted by srt;
Grouped on sorted word.

aftrgrp = foreach grped generate group, COUNT(inpsrted) as count1,   WordsJoin(inpsrted) as jndword;

For each group count is calcuated - stored as count1. Along with this, all the words of that group are joined using a function WordsJoin. This is an aggregate fuction:
    public String exec(Tuple input) throws IOException {
        // TODO Auto-generated method stub
        DataBag bag = (DataBag)input.get(0);
        Iterator<Tuple> it = bag.iterator();
        Tuple tup;
        String wordcomb = "";
       
        while(it.hasNext())
        {
            if (wordcomb == "")
            {
                tup = (Tuple)it.next();
                wordcomb = (String)tup.get(1);
            }
            else
            {
                tup = (Tuple)it.next();
                wordcomb = wordcomb + ", " + (String)tup.get(1);
            }
        }
       
        return wordcomb;
    }


angrms = filter aftrgrp by count1 > 1;

The grouped output is then filtered for all count > 1. This means all words that doesnot have anagrams are not selected.

anagrams = foreach angrms generate jndword;
Output only joined word that contains anagrams which are separated by comma, ignoring the sorted keyword.











Saturday, February 8, 2014

Hadoop installation steps

Step wise installation guide for hadoop single node cluster:
1. Install Linux Ubunto
2. open terminal and perform the following steps/commands:
    a. Sudo Su
    b. Type password
    c. apt-get udpate            ------ to update to latest version
    d. apt-get install openjdk-6-jdk   ------ updates java
    e. apt-get install ssh-server      ------ updates ssh serveer
    f. apt-get install eclipse

3. Add hadoop user using the following commands:
    $ sudo addgroup hadoop
    $ sudo adduser --ingroup hadoop xxuser

4. Configuring SSH:
   We need to configure SSH access to localhost for the user created in the above step.
   a. Generate SSH key:
    $ su - xxuser
        $ ssh-keygen -t rsa -P ""
      I have kept password as spaces. You may have to enter a password.

      When you receive prompt to enter the file in which to save the key , enter the location where you want to save the key:
       eg: /home/xxuser/.ssh/key
     
   b. Use the following command to enable SSH access to your local machine with newly created key.
      $ cat $HOME/.ssh/key.pub >> $HOME/.ssh/authorized_keys

   c. Issue the command:
      $ ssh localhost
      When you receive the prompt: "Are you sure you want to continue connecting (yes/no)?" enter yes.



5. download a stable version of hadoop -  hadoop-**.tar.gz -   from apache mirror sites
   refer to http://www.apache.org/dyn/closer.cgi/hadoop/common/

6. copy it to a folder  --- I copied it into a folder created in Home /work/ -- pwd inside work gives /home/xxx/work
    and then untar/unzip it and it creates a folder like hadoop-*.*.*
 
   with ls command inside work folder  - You should be able to see the hadoop-*.*.* folder

7. In bashrc file copy the following settings:
    export HADOOP_HOME="/home/xxx/work/hadoop-*.*.*"   ---- to store path of hadoop
    export JAVA_HOME="/usr/lib/jvm/java-6-openjdk"     ---- java path
    export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin  ---- helps to run program from anywhere

8. Inside hadoop folder you should be able to see another folder called conf navigate into that folder
   pwd here should give - /home/xxx/work/hadoop-*.*.*/conf

   Inside this folder there will be many files that defines configuration settings for hadoop.
   You need to edit : core-site.xml, hdfs-site.xml, mapred-site.xml, masters and slaves.
   localhost will be used in these settings. It can be obained by typing ' hostname' in the terminal.

 a. In core-site.xml add the following code:
    <configuration>
      <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:9000</value>
      </property>
      <property>
        <name>hadoop.tmp.dir</name>
        <value>/home/xxx/</value>
      </property>
    </configuration>

     First property here is the hostname followed by colon(:) and then 9000.
     This property specifies url for the name node

     Second property refers to base for temporary directories.

 b.In hdfs-site.xml add the following code
    <configuration>
     <property>
      <name>dfs.replication</name>
      <value>1</value>
     </property>
     <property>
      <name>dfs.name.dir</name>
      <value>/home/xxx/dfs/name</value>
     </property>
     <property>
      <name>dfs.data.dir</name>
      <value>/home/xxx/dfs/data</value>
     </property>
     <property>
      <name>dfs.datanode.max.xcievers</name>
      <value>4096</value>
     </property>
    </configuration>

    property 1 - dfs.replication - This refers to replication factor. I kept it as 1. This has to be specified as per the replication factor needed.
    property 2 - dfs.name.dir    - Path on the local filesystem where the NameNode stores the namespace and transactions logs persistently
    property 3 - dfs.data.dir    - Comma separated list of paths on the local filesystem of a DataNode where it should store its blocks.
        property 4 - dfs.datanode.max.xcievers  - An upper bound of number of files a datanode can serve at any one time.

 c.In mapred-site.xml add the following code
    <configuration>
      <property>
       <name>mapred.job.tracker</name>
       <value>localhost:9001</value>
      </property>
      <property>
       <name>mapred.local.dir</name>
       <value>/home/xxx/mapred/local</value>
      </property>
      <property>
       <name>mapred.system.dir</name>
       <value>/hadoop/mapred/system</value>
      </property>
      <property>
            <name>hadoop.job.history.user.location</name>
       <value>/history</value>
      </property>
    </configuration>
    property1 - mapred.job.tracker - Host or IP and port of JobTracker.
    property2 - mapred.local.dir   - Comma-separated list of paths on the local filesystem where temporary MapReduce data is written.
    property3 - mapred.system.dir  - Path on the HDFS  where the MapReduce framework stores system files

 d. In masters file - write hostname
 e. In slave file   - write hostname
 f. In hadoop-env.sh, you need to specify the java implementation. So, put - "export JAVA_HOME=/usr/lib/jvm/java-6-openjdk" just under java implementation to use.

9. Before starting hdfs daemons for first time, format the namenode.
   .../hadoop/bin/hadoop namenode -format

10. start-all.sh  command will start all the daemons of the hdfs. You will be prompted to enter the password. Once this is done, type jps on the command line. You should be able to see the following:
nnnn JobTracker
nnnn TaskTracker
nnnn DataNode
nnnn NameNode
nnnn SecondaryNameNode
nnnn Jps

If any of these is missing, then there is a problem in starting that particular daemon.
Stop-all.sh command will stop all the daemons.

11. start-dfs.sh -- this will start hdfs cluster. But mapred related daemons will still be inactive.
jps after this command will show:
nnnn SecondaryNameNode
nnnn DataNode
nnnn NameNode
nnnn Jps

stop-dfs.sh is the command to stop.

12. start-mapred.sh  -- this will start jobtracker and tasktracker daemons. Always use this command after starting hdfs cluster.
    stop-mapred.sh   -- stops jobtracker and tasktracker daemons.