View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.conf.Configuration;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.hbase.HBaseConfiguration;
26  import org.apache.hadoop.hbase.client.Put;
27  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
28  import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
29  import org.apache.hadoop.hbase.util.Bytes;
30  import org.apache.hadoop.io.LongWritable;
31  import org.apache.hadoop.io.Text;
32  import org.apache.hadoop.mapreduce.Job;
33  import org.apache.hadoop.mapreduce.Mapper;
34  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
35  import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
36  import org.apache.hadoop.util.GenericOptionsParser;
37  
38  /**
39   * Sample Uploader MapReduce
40   * <p>
41   * This is EXAMPLE code.  You will need to change it to work for your context.
42   * <p>
43   * Uses {@link TableReducer} to put the data into HBase. Change the InputFormat
44   * to suit your data.  In this example, we are importing a CSV file.
45   * <p>
46   * <pre>row,family,qualifier,value</pre>
47   * <p>
48   * The table and columnfamily we're to insert into must preexist.
49   * <p>
50   * There is no reducer in this example as it is not necessary and adds
51   * significant overhead.  If you need to do any massaging of data before
52   * inserting into HBase, you can do this in the map as well.
53   * <p>Do the following to start the MR job:
54   * <pre>
55   * ./bin/hadoop org.apache.hadoop.hbase.mapreduce.SampleUploader /tmp/input.csv TABLE_NAME
56   * </pre>
57   * <p>
58   * This code was written against HBase 0.21 trunk.
59   */
60  public class SampleUploader {
61  
62    private static final String NAME = "SampleUploader";
63  
64    static class Uploader
65    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
66  
67      private long checkpoint = 100;
68      private long count = 0;
69  
70      @Override
71      public void map(LongWritable key, Text line, Context context)
72      throws IOException {
73  
74        // Input is a CSV file
75        // Each map() is a single line, where the key is the line number
76        // Each line is comma-delimited; row,family,qualifier,value
77  
78        // Split CSV line
79        String [] values = line.toString().split(",");
80        if(values.length != 4) {
81          return;
82        }
83  
84        // Extract each value
85        byte [] row = Bytes.toBytes(values[0]);
86        byte [] family = Bytes.toBytes(values[1]);
87        byte [] qualifier = Bytes.toBytes(values[2]);
88        byte [] value = Bytes.toBytes(values[3]);
89  
90        // Create Put
91        Put put = new Put(row);
92        put.add(family, qualifier, value);
93  
94        // Uncomment below to disable WAL. This will improve performance but means
95        // you will experience data loss in the case of a RegionServer crash.
96        // put.setWriteToWAL(false);
97  
98        try {
99          context.write(new ImmutableBytesWritable(row), put);
100       } catch (InterruptedException e) {
101         e.printStackTrace();
102       }
103 
104       // Set status every checkpoint lines
105       if(++count % checkpoint == 0) {
106         context.setStatus("Emitting Put " + count);
107       }
108     }
109   }
110 
111   /**
112    * Job configuration.
113    */
114   public static Job configureJob(Configuration conf, String [] args)
115   throws IOException {
116     Path inputPath = new Path(args[0]);
117     String tableName = args[1];
118     Job job = new Job(conf, NAME + "_" + tableName);
119     job.setJarByClass(Uploader.class);
120     FileInputFormat.setInputPaths(job, inputPath);
121     job.setInputFormatClass(SequenceFileInputFormat.class);
122     job.setMapperClass(Uploader.class);
123     // No reducers.  Just write straight to table.  Call initTableReducerJob
124     // because it sets up the TableOutputFormat.
125     TableMapReduceUtil.initTableReducerJob(tableName, null, job);
126     job.setNumReduceTasks(0);
127     return job;
128   }
129 
130   /**
131    * Main entry point.
132    *
133    * @param args  The command line parameters.
134    * @throws Exception When running the job fails.
135    */
136   public static void main(String[] args) throws Exception {
137     Configuration conf = HBaseConfiguration.create();
138     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
139     if(otherArgs.length != 2) {
140       System.err.println("Wrong number of arguments: " + otherArgs.length);
141       System.err.println("Usage: " + NAME + " <input> <tablename>");
142       System.exit(-1);
143     }
144     Job job = configureJob(conf, otherArgs);
145     System.exit(job.waitForCompletion(true) ? 0 : 1);
146   }
147 }