View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.conf.Configured;
25  import org.apache.hadoop.fs.Path;
26  import org.apache.hadoop.hbase.HBaseConfiguration;
27  import org.apache.hadoop.hbase.client.Put;
28  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.apache.hadoop.io.LongWritable;
31  import org.apache.hadoop.io.Text;
32  import org.apache.hadoop.mapreduce.Job;
33  import org.apache.hadoop.mapreduce.Mapper;
34  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
35  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
36  import org.apache.hadoop.util.Tool;
37  import org.apache.hadoop.util.ToolRunner;
38  
39  /**
40   * Sample Uploader MapReduce
41   * <p>
42   * This is EXAMPLE code.  You will need to change it to work for your context.
43   * <p>
44   * Uses {@link TableReducer} to put the data into HBase. Change the InputFormat
45   * to suit your data.  In this example, we are importing a CSV file.
46   * <p>
47   * <pre>row,family,qualifier,value</pre>
48   * <p>
49   * The table and columnfamily we're to insert into must preexist.
50   * <p>
51   * There is no reducer in this example as it is not necessary and adds
52   * significant overhead.  If you need to do any massaging of data before
53   * inserting into HBase, you can do this in the map as well.
54   * <p>Do the following to start the MR job:
55   * <pre>
56   * ./bin/hadoop org.apache.hadoop.hbase.mapreduce.SampleUploader /tmp/input.csv TABLE_NAME
57   * </pre>
58   * <p>
59   * This code was written against HBase 0.21 trunk.
60   */
61  public class SampleUploader extends Configured implements Tool {
62  
63    private static final String NAME = "SampleUploader";
64  
65    static class Uploader
66    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
67  
68      private long checkpoint = 100;
69      private long count = 0;
70  
71      @Override
72      public void map(LongWritable key, Text line, Context context)
73      throws IOException {
74  
75        // Input is a CSV file
76        // Each map() is a single line, where the key is the line number
77        // Each line is comma-delimited; row,family,qualifier,value
78  
79        // Split CSV line
80        String [] values = line.toString().split(",");
81        if(values.length != 4) {
82          return;
83        }
84  
85        // Extract each value
86        byte [] row = Bytes.toBytes(values[0]);
87        byte [] family = Bytes.toBytes(values[1]);
88        byte [] qualifier = Bytes.toBytes(values[2]);
89        byte [] value = Bytes.toBytes(values[3]);
90  
91        // Create Put
92        Put put = new Put(row);
93        put.addColumn(family, qualifier, value);
94  
95        // Uncomment below to disable WAL. This will improve performance but means
96        // you will experience data loss in the case of a RegionServer crash.
97        // put.setWriteToWAL(false);
98  
99        try {
100         context.write(new ImmutableBytesWritable(row), put);
101       } catch (InterruptedException e) {
102         e.printStackTrace();
103       }
104 
105       // Set status every checkpoint lines
106       if(++count % checkpoint == 0) {
107         context.setStatus("Emitting Put " + count);
108       }
109     }
110   }
111 
112   /**
113    * Job configuration.
114    */
115   public static Job configureJob(Configuration conf, String [] args)
116   throws IOException {
117     Path inputPath = new Path(args[0]);
118     String tableName = args[1];
119     Job job = new Job(conf, NAME + "_" + tableName);
120     job.setJarByClass(Uploader.class);
121     FileInputFormat.setInputPaths(job, inputPath);
122     job.setInputFormatClass(SequenceFileInputFormat.class);
123     job.setMapperClass(Uploader.class);
124     // No reducers.  Just write straight to table.  Call initTableReducerJob
125     // because it sets up the TableOutputFormat.
126     TableMapReduceUtil.initTableReducerJob(tableName, null, job);
127     job.setNumReduceTasks(0);
128     return job;
129   }
130 
131   /**
132    * Main entry point.
133    *
134    * @param otherArgs  The command line parameters after ToolRunner handles standard.
135    * @throws Exception When running the job fails.
136    */
137   public int run(String[] otherArgs) throws Exception {
138     if(otherArgs.length != 2) {
139       System.err.println("Wrong number of arguments: " + otherArgs.length);
140       System.err.println("Usage: " + NAME + " <input> <tablename>");
141       return -1;
142     }
143     Job job = configureJob(getConf(), otherArgs);
144     return (job.waitForCompletion(true) ? 0 : 1);
145   }
146 
147   public static void main(String[] args) throws Exception {
148     int status = ToolRunner.run(HBaseConfiguration.create(), new SampleUploader(), args);
149     System.exit(status);
150   }
151 }