001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.conf.Configured; 024import org.apache.hadoop.fs.Path; 025import org.apache.hadoop.hbase.HBaseConfiguration; 026import org.apache.hadoop.hbase.client.Put; 027import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 028import org.apache.hadoop.hbase.util.Bytes; 029import org.apache.hadoop.io.LongWritable; 030import org.apache.hadoop.io.Text; 031import org.apache.hadoop.mapreduce.Job; 032import org.apache.hadoop.mapreduce.Mapper; 033import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 034import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; 035import org.apache.hadoop.util.Tool; 036import org.apache.hadoop.util.ToolRunner; 037import org.apache.yetus.audience.InterfaceAudience; 038 039/** 040 * Sample Uploader MapReduce 041 * <p> 042 * This is EXAMPLE code. You will need to change it to work for your context. 043 * <p> 044 * Uses {@link TableReducer} to put the data into HBase. Change the InputFormat 045 * to suit your data. In this example, we are importing a CSV file. 046 * <p> 047 * <pre>row,family,qualifier,value</pre> 048 * <p> 049 * The table and columnfamily we're to insert into must preexist. 050 * <p> 051 * There is no reducer in this example as it is not necessary and adds 052 * significant overhead. If you need to do any massaging of data before 053 * inserting into HBase, you can do this in the map as well. 054 * <p>Do the following to start the MR job: 055 * <pre> 056 * ./bin/hadoop org.apache.hadoop.hbase.mapreduce.SampleUploader /tmp/input.csv TABLE_NAME 057 * </pre> 058 * <p> 059 * This code was written against HBase 0.21 trunk. 060 */ 061@InterfaceAudience.Private 062public class SampleUploader extends Configured implements Tool { 063 064 private static final String NAME = "SampleUploader"; 065 066 static class Uploader 067 extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> { 068 069 private long checkpoint = 100; 070 private long count = 0; 071 072 @Override 073 public void map(LongWritable key, Text line, Context context) 074 throws IOException { 075 076 // Input is a CSV file 077 // Each map() is a single line, where the key is the line number 078 // Each line is comma-delimited; row,family,qualifier,value 079 080 // Split CSV line 081 String [] values = line.toString().split(","); 082 if(values.length != 4) { 083 return; 084 } 085 086 // Extract each value 087 byte [] row = Bytes.toBytes(values[0]); 088 byte [] family = Bytes.toBytes(values[1]); 089 byte [] qualifier = Bytes.toBytes(values[2]); 090 byte [] value = Bytes.toBytes(values[3]); 091 092 // Create Put 093 Put put = new Put(row); 094 put.addColumn(family, qualifier, value); 095 096 // Uncomment below to disable WAL. This will improve performance but means 097 // you will experience data loss in the case of a RegionServer crash. 098 // put.setWriteToWAL(false); 099 100 try { 101 context.write(new ImmutableBytesWritable(row), put); 102 } catch (InterruptedException e) { 103 e.printStackTrace(); 104 } 105 106 // Set status every checkpoint lines 107 if(++count % checkpoint == 0) { 108 context.setStatus("Emitting Put " + count); 109 } 110 } 111 } 112 113 /** 114 * Job configuration. 115 */ 116 public static Job configureJob(Configuration conf, String [] args) 117 throws IOException { 118 Path inputPath = new Path(args[0]); 119 String tableName = args[1]; 120 Job job = new Job(conf, NAME + "_" + tableName); 121 job.setJarByClass(Uploader.class); 122 FileInputFormat.setInputPaths(job, inputPath); 123 job.setInputFormatClass(SequenceFileInputFormat.class); 124 job.setMapperClass(Uploader.class); 125 // No reducers. Just write straight to table. Call initTableReducerJob 126 // because it sets up the TableOutputFormat. 127 TableMapReduceUtil.initTableReducerJob(tableName, null, job); 128 job.setNumReduceTasks(0); 129 return job; 130 } 131 132 /** 133 * Main entry point. 134 * 135 * @param otherArgs The command line parameters after ToolRunner handles standard. 136 * @throws Exception When running the job fails. 137 */ 138 public int run(String[] otherArgs) throws Exception { 139 if(otherArgs.length != 2) { 140 System.err.println("Wrong number of arguments: " + otherArgs.length); 141 System.err.println("Usage: " + NAME + " <input> <tablename>"); 142 return -1; 143 } 144 Job job = configureJob(getConf(), otherArgs); 145 return (job.waitForCompletion(true) ? 0 : 1); 146 } 147 148 public static void main(String[] args) throws Exception { 149 int status = ToolRunner.run(HBaseConfiguration.create(), new SampleUploader(), args); 150 System.exit(status); 151 } 152}