View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.conf.Configured;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.client.Put;
28  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
29  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
30  import org.apache.hadoop.hbase.util.Bytes;
31  import org.apache.hadoop.io.LongWritable;
32  import org.apache.hadoop.io.Text;
33  import org.apache.hadoop.mapreduce.Job;
34  import org.apache.hadoop.mapreduce.Mapper;
35  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
36  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
37  import org.apache.hadoop.util.Tool;
38  import org.apache.hadoop.util.ToolRunner;
39  
40  /**
41   * Sample Uploader MapReduce
42   * <p>
43   * This is EXAMPLE code.  You will need to change it to work for your context.
44   * <p>
45   * Uses {@link TableReducer} to put the data into HBase. Change the InputFormat
46   * to suit your data.  In this example, we are importing a CSV file.
47   * <p>
48   * <pre>row,family,qualifier,value</pre>
49   * <p>
50   * The table and columnfamily we're to insert into must preexist.
51   * <p>
52   * There is no reducer in this example as it is not necessary and adds
53   * significant overhead.  If you need to do any massaging of data before
54   * inserting into HBase, you can do this in the map as well.
55   * <p>Do the following to start the MR job:
56   * <pre>
57   * ./bin/hadoop org.apache.hadoop.hbase.mapreduce.SampleUploader /tmp/input.csv TABLE_NAME
58   * </pre>
59   * <p>
60   * This code was written against HBase 0.21 trunk.
61   */
62  public class SampleUploader extends Configured implements Tool {
63  
64    private static final String NAME = "SampleUploader";
65  
66    static class Uploader
67    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
68  
69      private long checkpoint = 100;
70      private long count = 0;
71  
72      @Override
73      public void map(LongWritable key, Text line, Context context)
74      throws IOException {
75  
76        // Input is a CSV file
77        // Each map() is a single line, where the key is the line number
78        // Each line is comma-delimited; row,family,qualifier,value
79  
80        // Split CSV line
81        String [] values = line.toString().split(",");
82        if(values.length != 4) {
83          return;
84        }
85  
86        // Extract each value
87        byte [] row = Bytes.toBytes(values[0]);
88        byte [] family = Bytes.toBytes(values[1]);
89        byte [] qualifier = Bytes.toBytes(values[2]);
90        byte [] value = Bytes.toBytes(values[3]);
91  
92        // Create Put
93        Put put = new Put(row);
94        put.add(family, qualifier, value);
95  
96        // Uncomment below to disable WAL. This will improve performance but means
97        // you will experience data loss in the case of a RegionServer crash.
98        // put.setWriteToWAL(false);
99  
100       try {
101         context.write(new ImmutableBytesWritable(row), put);
102       } catch (InterruptedException e) {
103         e.printStackTrace();
104       }
105 
106       // Set status every checkpoint lines
107       if(++count % checkpoint == 0) {
108         context.setStatus("Emitting Put " + count);
109       }
110     }
111   }
112 
113   /**
114    * Job configuration.
115    */
116   public static Job configureJob(Configuration conf, String [] args)
117   throws IOException {
118     Path inputPath = new Path(args[0]);
119     String tableName = args[1];
120     Job job = new Job(conf, NAME + "_" + tableName);
121     job.setJarByClass(Uploader.class);
122     FileInputFormat.setInputPaths(job, inputPath);
123     job.setInputFormatClass(SequenceFileInputFormat.class);
124     job.setMapperClass(Uploader.class);
125     // No reducers.  Just write straight to table.  Call initTableReducerJob
126     // because it sets up the TableOutputFormat.
127     TableMapReduceUtil.initTableReducerJob(tableName, null, job);
128     job.setNumReduceTasks(0);
129     return job;
130   }
131 
132   /**
133    * Main entry point.
134    *
135    * @param otherArgs  The command line parameters after ToolRunner handles standard.
136    * @throws Exception When running the job fails.
137    */
138   public int run(String[] otherArgs) throws Exception {
139     if(otherArgs.length != 2) {
140       System.err.println("Wrong number of arguments: " + otherArgs.length);
141       System.err.println("Usage: " + NAME + " <input> <tablename>");
142       return -1;
143     }
144     Job job = configureJob(getConf(), otherArgs);
145     return (job.waitForCompletion(true) ? 0 : 1);
146   }
147 
148   public static void main(String[] args) throws Exception {
149     int status = ToolRunner.run(HBaseConfiguration.create(), new SampleUploader(), args);
150     System.exit(status);
151   }
152 }