The results and ramblings of research

phewww!

MR and Avro – Passing Avro through Pairs

leave a comment »

The mapper class is as follows:



public static class ReadData extends Mapper<AvroKey<XYZ>,NullWritable, Text, AvroValue<Pair>>{
      private final Text keyOut = new Text();
      private final AvroValue<Pair> valOut = new AvroValue<>();
      protected void map(AvroKey<XYZ>; key, NullWritable value, Context context) throws IOException, InterruptedException {
           XYZ data = key.datum();
           keyOut.set(data.getKey());
           valOut.datum(new Pair<CharSequence, XYZ>(data.getOtherKey(), data));
           context.write(keyOut, valOut);
      }
}

Advertisements

Written by anujjaiswal

April 20, 2015 at 5:03 pm

Posted in Uncategorized

AWS EMR High Performance Bootstrap Actions

leave a comment »

In this post, I describe some EMR bootstrap scripts that are especially helpful in ensuring that the Hadoop clusters run great. As a general rule, I use the c3.xlarge compute or m3.xlarge spot nodes and have been consistently deploying medium sized clusters (>30 nodes).

  1. Aggregated logging: To setup more advanced logging, please use the following configure-hadoop bootstrap
-y, yarn.log-aggregation-enable=true, 
-y, yarn.log-aggregation.retain-seconds=-1, 
-y, yarn.log-aggregation.retain-check-interval-seconds=3000, 
-y, yarn.nodemanager.remote-app-log-dir=s3://s3_log_location/cluster/aggregated_logs/
  1. Ensuring correct disk utilization – Since some of the inputs I use are very disk heavy, the first issue I faced was that the cluster would run out of disk. Moreover, it became rather clear that this issue was due to incorrect disk utilization, since all the output were written to the boot disk and not the ephemeral storage disks (which had sufficient disk space, 2x40GB per node). It seems that the mount points were incorrectly setup, which I override using configure-hadoop bootstrap and the following (Note: these settings are for EC2 instances which have two ephemeral disks per instance. Please modify if you have a different number of ephemeral disks):
-m, mapred.local.dir=/mnt/var/lib/hadoop/mapred,/mnt1/var/lib/hadoop/mapred,
-h, dfs.data.dir=/mnt/var/lib/hadoop/dfs,/mnt1/var/lib/hadoop/dfs,
-h, dfs.name.dir=/mnt/var/lib/hadoop/dfs-name,/mnt1/var/lib/hadoop/dfs-name, 
-c, hadoop.tmp.dir=/mnt/var/lib/hadoop/tmp,/mnt1/var/lib/hadoop/tmp, 
-c, fs.s3.buffer.dir=/mnt/var/lib/hadoop/s3,/mnt1/var/lib/hadoop/s3, 
-y, yarn.nodemanager.local-dirs=/mnt/var/lib/hadoop/tmp/nm-local-dir,/mnt1/var/lib/hadoop/tmp/nm-local-dir
  1. Snappy Compression- You can setup cluster-wide Snappy compression of mapper outputs using configure-hadoop bootstrap and the following:
-m, mapreduce.map.output.compress=true, 
-m, mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec, 
-m, mapreduce.output.fileoutputformat.compress=true, 
-m, mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
  1. Multipart Uploads to S3 – You can enable this setting to ensure that S3 uploads don’t timeout and extremely large output S3 files can be written.
-c, fs.s3n.multipart.uploads.enabled=true,
-c, fs.s3n.multipart.uploads.split.size=524288000
  1. Out Of Memory and GC errors – A common issue that occurs is Out of Memory or GC Overhead exceeded errors. A pretty easy fix for these issues are modifying the settings below. You can either bootstrap a cluster (using configure-hadoop) or set these using Configuration.setConf(). In our case, we have set the mapper JVM to allow upto 4G of memory.
-m &amp;quot;mapreduce.map.java.opts=-Xmx4096m -XX:-UseGCOverheadLimit&amp;quot;
-m mapreduce.map.memory.mb=4096

Written by anujjaiswal

February 10, 2015 at 7:35 am

Posted in AWS, Hadoop, MapRed

Tagged with , ,

Moving all indices in a Database to a new Tablespace

leave a comment »

As the title outlines, I need to move all the indices in a database to a new tablespace. Turns out it isn’t too tough. Firstly, we can list all indices in a database using the following command:

select * from pg_catalog.pg_indexes where schemaname = 'public' order by tablename

Thus, the simple way to get all indices in database is the command:

select indexname from pg_catalog.pg_indexes where schemaname = 'public' order by tablename

And so, a simple set of instructions can be then got by executing:

select 'ALTER INDEX '||indexname||' SET TABLESPACE t1 ;' from pg_catalog.pg_indexes where schemaname = 'public' order by tablename;

After that, copy paste and execute the above in DB shell.

Written by anujjaiswal

September 10, 2013 at 3:53 pm

Posted in Database, PostGRES

General Scripting Commands (To be continued)

leave a comment »

As a small startup, we utilize AWS extensively. Most of our files are stored in S3, and sometimes it is painful to download files one by one. However, linux is da bomb. I am listing some of the commands that are especially useful. Please note s3cmd must be installed and configured correctly.

  1. Creating a list of files to download  
     s3cmd ls --recursive s3://bucket/ | grep -i condition > files.txt
  2. Downloading a list of files stored in a file
     while read line do s3cmd get $line done > files.txt
  3. However, spaces in filenames always kill you. So the following works like a charm. Notice the quotes 🙂
     while read line do s3cmd -v get "${line}"  done < files.txt
  4. Get a total set of files in bucket folder 
     s3cmd ls s3://bucket/ |wc 

    or

     s3cmd ls --recursive s3://bucket/ | wc 
  5. Get a list of s3 files (URL’s) and save to file. Especially useful if you are passing a file which contains all s3 files to be processed, e.g., in hadoop with NLineInputFormat
     s3cmd ls --recursive s3://bucket/ | awk '{print $4}' > output_file.txt 

Written by anujjaiswal

June 26, 2013 at 11:51 am

Posted in AWS, Bash

Stress Testing our AWS Hadoop Deployment – Some more results

with one comment

So to continue our previous post, we have been working to deploy a stable, robust hadoop cluster on AWS. We had experienced numerous issues with our previous deployments, however, as we outlined in our previous post, some parameters allowed us to magically have a pretty stable cluster. Obvious, next steps were to stress test our cluster. To do so we ran our tests on a small 10+1 node hadoop cluster (10 datanodes&tasktracker nodes + 1 namenode&jobtracker node). We ran the followings:

Test 1 – DFSIO Tests

1) Write Tests – We first ran the DFSIO write test using

$ hadoop jar hadoop-mapreduce-client-jobclient-2.0.0-cdh4.1.2-tests.jar TestDFSIO -write -nrFiles 100 -fileSize 1000

The above command runs a write test which generates 100 output files each size of size 1GB for a total of 100GB.

2) Read Tests – Next we ran the DFSIO read test using

$ hadoop jar hadoop-mapreduce-client-jobclient-2.0.0-cdh4.1.2-tests.jar TestDFSIO -read -nrFiles 100 -fileSize 1000

Note, the default directory for the outputs is /benchmarks/TestDFSIO.

Lastly, clean the output folders (delete test files) using

$ hadoop jar hadoop-mapreduce-client-jobclient-2.0.0-cdh4.1.2-tests.jar TestDFSIO -clean

Please download jar for running the test.

Outputs:

Write Test
13/05/06 16:16:50 INFO fs.TestDFSIO: —– TestDFSIO —– : write
13/05/06 16:16:50 INFO fs.TestDFSIO: Date & time: Mon May 06 16:16:50 PDT 2013
13/05/06 16:16:50 INFO fs.TestDFSIO: Number of files: 100
13/05/06 16:16:50 INFO fs.TestDFSIO: Total MBytes processed: 100000.0
13/05/06 16:16:50 INFO fs.TestDFSIO: Throughput mb/sec: 6.0895988251458775
13/05/06 16:16:50 INFO fs.TestDFSIO: Average IO rate mb/sec: 6.641181468963623
13/05/06 16:16:50 INFO fs.TestDFSIO: IO rate std deviation: 1.9043254369666331
13/05/06 16:16:50 INFO fs.TestDFSIO: Test exec time sec: 390.825
13/05/06 16:16:50 INFO fs.TestDFSIO:

Read Test
13/05/06 16:23:01 INFO fs.TestDFSIO: —– TestDFSIO —– : read
13/05/06 16:23:02 INFO fs.TestDFSIO: Date & time: Mon May 06 16:23:01 PDT 2013
13/05/06 16:23:02 INFO fs.TestDFSIO: Number of files: 100
13/05/06 16:23:02 INFO fs.TestDFSIO: Total MBytes processed: 100000.0
13/05/06 16:23:02 INFO fs.TestDFSIO: Throughput mb/sec: 18.524110055442662
13/05/06 16:23:02 INFO fs.TestDFSIO: Average IO rate mb/sec: 20.380735397338867
13/05/06 16:23:02 INFO fs.TestDFSIO: IO rate std deviation: 6.731484273400149
13/05/06 16:23:02 INFO fs.TestDFSIO: Test exec time sec: 171.871

Test 2 – 50 Chained Jobs

The next part of our test was designed to test the overall infrastructure under load. Our cluster consisted of 15 compute nodes (TT+DN) and 1 JT+NN nodes. Furthermore, we some of the jobs that were run included extremely large number of mappers (10K-15K upwards) and large number of reducers. Overall, we feel that such a test would replicate real-world load and give us a fair idea of how our compute nodes perform. Note, we did use the following parameter – mapred.reduce.slowstart.completed.maps and set it to 0.95. This ensured that the reducers start only after a large number of mappers are completed ensuring the reducers dont timeout due to inactivity. Our jobs were executed over a 1 day period and the graphs below illustrate the cluster utilization.

Cluster Load Per Processor over 1 day

Cluster Load Per Processor over 1 day

 

Cluster Memory utilization over 1 day

Cluster Memory utilization over 1 day

Cluster Network over 1day

Cluster Network over 1day

Cluster CPU utilization over 1 day

Cluster CPU utilization over 1 day

Written by anujjaiswal

May 17, 2013 at 10:44 am

Posted in AWS, Hadoop, HDFS

Tagged with , , ,

Towards building robust hadoop cluster

with 2 comments

We have been working on using AWS to build our hadoop infrastructure. AWS is a pretty interesting cloud environment, and I do feel it is a great tool for any startup. However, we have been overwhelmed with numerous issues in getting our cluster running in a robust manner. A colleague (Swatz, and check out here blog) and I have identified some simple parameters that magically help produce a uber cluster:

1) Check your ulimits: Hadoop writes numerous files. The default linux kernel has a maximum open file limit of 1024 which is too low. We increased this to 65536 (nofile parameter in /etc/security/limits.conf). Check your maximum open file usage using:

sudo lsof | awk '{if(NR>1) print $3}' | sort | uniq -c | sort -nr
   1858 mapred
    946 root
    241 hdfs
     37 syslog
     33 ganglia
     28 messagebus
     16 daemon

2) Use ephemeral disks for storage, hadoop temporary files and HDFS. They are pretty fast and dont have the network lag of EBS.
3) Swap files are always good. We generally add 2-4 1GB swap files. Swap files are never a bad idea.
4) Don’t be scared if a couple of EC2 machines go down once in a while. Hardware failure are pretty common and can occur on amazon AWS as well. If it happens consistently, you need to check your configurations.
5) Use an instance store AMI. EBS is slow and has a network overhead that is in my opinion not worth it. The link works if you follow the instructions :).
6) Logs are written for a reason. Check hadoop tasktracker, datanode, namenode and jobtracker logs when stuff blows up. Check your instance log files to see if you configuration issues like incorrect partition mounting etc.
7) Use the right instance type on Amazon. Remember Hadoop needs high IO (network and hard drive) bandwitdhs, RAM and decent CPU. Rule of thumb: “First IOPS then FLOPS!”.
8) Don’t put too many mappers and reducers on a compute node. Try finding the sweet spot for your startup. We found 6 Mappers and 4 Reducers per node with each Mapper having 1200M of RAM and Reducers having 1600M of RAM works best for our needs.

Good rule of thumb is 4:3 per core

Using more reducers is not recommended. In general case,

= we usually choose the optimal number of reducers as the max no. of reduce slots or less (to catch up with failure nodes) , available in the cluster.
= which creates the fewest files possible
= Each task time between 5 and 15 minutes
So, overloading the number of reducers, will
– have bad performance because of network transfers with various InputFormats
– shuffle errors
– affects the workflow system, if you have dependent jobs in the next queue.(like Oozie)
– causes DDoS (distributed denial of Service) in the shuffle phase and will be
a problem for the next read in the pipeline.

We RAN the terasort algorithm to stress test our AWS cluster. First generate the input data for terasort (on a Terabyte)

$ hadoop jar hadoop-*examples*.jar teragen 10000000000 /user/use_name/tera-in/ -Dmapred.map.tasks=10000

Note, the input is 10billion and not 1 trillion since number of records are input to teragen. (each record is 100bytes ensuring the output data is 1 trillion bytes).
Next run the terasort algorithm

$ hadoop jar hadoop-*examples*.jar terasort /user/use_name/tera-in/ /user/use_name/tera-out/ -Dmapred.reduce.tasks=5000

Last validate the outputs using

$ hadoop jar hadoop-*examples*.jar teravalidate /user/use_name/tera-out/ /user/use_name/tera-validate/

We did find some the changes above resulted in a very robust cluster. Some graphs of our cluster:

CPU Usage of Cluster over 1 hour

CPU Usage of Cluster over 1 hour

Note the first half of the CPU is the mapper phase and the second for reducer. The load per processor is shown below.

Load on processors for Tera Sort

Load on processors for Tera Sort

The memory usage of the cluster is shown below:

Memory Usage of Cluster for Terasort

Memory Usage of Cluster for Terasort

Lastly, the network io graph

Network IO during Terasort

Network IO during Terasort

Overall, our changes have helped created a much more robust cluster. 🙂

Written by anujjaiswal

May 2, 2013 at 1:02 pm

Posted in AWS, EC2, Hadoop, Hardware, HDFS, Linux, MapRed

Measuring Disk IO on Linux using dd

with 5 comments

Ever since we have been developing our Twitter analytics system SensePlace2, we have encountered numerous IO issues when reading/writing to PostGRESQL database backed. We started out with the database on a RAID 10 on 4×7.2K 1TB Seagate Constellation drives which was essentially not the best option but we were constrained by our budget. This year we have upgraded to a mix of 8x600GB 15K Seagate Cheetah on which we have initialized 2 x RAID 10 arrays for data, 4x146GB 15K Seagate Cheetah as a single RAID 10 array for indexes and 6x146GB 15K 3Gbps Fujistsu Cheetah as a single RAID 10 array for the PostGRESQL WAL and our lucene indexes (All drives our 6Gbps SAS unless specified). Obviously, we noticed significant performance improvements (an order of 100 would be an understatement) due to which I decided to do a quick IO performance test. These tests were run on a dual hexa-core Xeon X5650 @ 2.67GHz on a Dell 810 chassis with a Dell PERC 700 RAID card. In addition, a Dell Powervault MD1200 is connected to the 810 chassis by a PERC 800 RAID adapter. I use the linux command dd for all my tests below.

To measure sequential write performance, the following command works fine.

dd if=/dev/zero of=/r6/outfile count=512 bs=1024k

This command will measure sequential read rates.

dd if=/r6/outfile of=/dev/null bs=4096k

IO Read Write performance

The figure above illustrates the IO read/write performance on all disk. The 4×7.2K RAID 10 array has the lowest performance. The write rates are extremely poor (about 400MB/s). The 15K cheetahs performed exceptionally in all cases (>1GB/s write and 5GB/s read). The only outlier was the 6x15K Fujistsu RAID 10 array, however, these drives were scavenged from one of our older servers and has a 3Gbps SAS adapter due to which performance was slightly lower than the other 15K drives. These tests are sequential read/writes only and I agree that random read/write test would make sense, however, based on these numbers the 15K drives will blow away the 7.2K drives in random IO tests (I am sure since the random read latency of the 15K drives are half of the 7.2K drives). Thus, buying hard drives is therefore a difficult tradeoff between storage size, IO performance required, cost (both of Hard drives and RAID hardware).

Written by anujjaiswal

March 15, 2012 at 2:20 pm