001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import java.util.HashMap; 023import java.util.Map; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseConfiguration; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.BufferedMutator; 032import org.apache.hadoop.hbase.client.Connection; 033import org.apache.hadoop.hbase.client.ConnectionFactory; 034import org.apache.hadoop.hbase.client.Delete; 035import org.apache.hadoop.hbase.client.Mutation; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Durability; 038import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.mapreduce.JobContext; 041import org.apache.hadoop.mapreduce.OutputCommitter; 042import org.apache.hadoop.mapreduce.OutputFormat; 043import org.apache.hadoop.mapreduce.RecordWriter; 044import org.apache.hadoop.mapreduce.TaskAttemptContext; 045 046/** 047 * <p> 048 * Hadoop output format that writes to one or more HBase tables. The key is 049 * taken to be the table name while the output value <em>must</em> be either a 050 * {@link Put} or a {@link Delete} instance. All tables must already exist, and 051 * all Puts and Deletes must reference only valid column families. 052 * </p> 053 * 054 * <p> 055 * Write-ahead logging (WAL) for Puts can be disabled by setting 056 * {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}. 057 * Note that disabling write-ahead logging is only appropriate for jobs where 058 * loss of data due to region server failure can be tolerated (for example, 059 * because it is easy to rerun a bulk import). 060 * </p> 061 */ 062@InterfaceAudience.Public 063public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> { 064 /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */ 065 public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal"; 066 /** Property value to use write-ahead logging */ 067 public static final boolean WAL_ON = true; 068 /** Property value to disable write-ahead logging */ 069 public static final boolean WAL_OFF = false; 070 /** 071 * Record writer for outputting to multiple HTables. 072 */ 073 protected static class MultiTableRecordWriter extends 074 RecordWriter<ImmutableBytesWritable, Mutation> { 075 private static final Logger LOG = LoggerFactory.getLogger(MultiTableRecordWriter.class); 076 Connection connection; 077 Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>(); 078 Configuration conf; 079 boolean useWriteAheadLogging; 080 081 /** 082 * @param conf 083 * HBaseConfiguration to used 084 * @param useWriteAheadLogging 085 * whether to use write ahead logging. This can be turned off ( 086 * <tt>false</tt>) to improve performance when bulk loading data. 087 */ 088 public MultiTableRecordWriter(Configuration conf, 089 boolean useWriteAheadLogging) throws IOException { 090 LOG.debug("Created new MultiTableRecordReader with WAL " 091 + (useWriteAheadLogging ? "on" : "off")); 092 this.conf = conf; 093 this.useWriteAheadLogging = useWriteAheadLogging; 094 } 095 096 /** 097 * @param tableName 098 * the name of the table, as a string 099 * @return the named mutator 100 * @throws IOException 101 * if there is a problem opening a table 102 */ 103 BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException { 104 if(this.connection == null){ 105 this.connection = ConnectionFactory.createConnection(conf); 106 } 107 if (!mutatorMap.containsKey(tableName)) { 108 LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing"); 109 110 BufferedMutator mutator = 111 connection.getBufferedMutator(TableName.valueOf(tableName.get())); 112 mutatorMap.put(tableName, mutator); 113 } 114 return mutatorMap.get(tableName); 115 } 116 117 @Override 118 public void close(TaskAttemptContext context) throws IOException { 119 for (BufferedMutator mutator : mutatorMap.values()) { 120 mutator.close(); 121 } 122 if (connection != null) { 123 connection.close(); 124 } 125 } 126 127 /** 128 * Writes an action (Put or Delete) to the specified table. 129 * 130 * @param tableName 131 * the table being updated. 132 * @param action 133 * the update, either a put or a delete. 134 * @throws IllegalArgumentException 135 * if the action is not a put or a delete. 136 */ 137 @Override 138 public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException { 139 BufferedMutator mutator = getBufferedMutator(tableName); 140 // The actions are not immutable, so we defensively copy them 141 if (action instanceof Put) { 142 Put put = new Put((Put) action); 143 put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL 144 : Durability.SKIP_WAL); 145 mutator.mutate(put); 146 } else if (action instanceof Delete) { 147 Delete delete = new Delete((Delete) action); 148 mutator.mutate(delete); 149 } else 150 throw new IllegalArgumentException( 151 "action must be either Delete or Put"); 152 } 153 } 154 155 @Override 156 public void checkOutputSpecs(JobContext context) throws IOException, 157 InterruptedException { 158 // we can't know ahead of time if it's going to blow up when the user 159 // passes a table name that doesn't exist, so nothing useful here. 160 } 161 162 @Override 163 public OutputCommitter getOutputCommitter(TaskAttemptContext context) 164 throws IOException, InterruptedException { 165 return new TableOutputCommitter(); 166 } 167 168 @Override 169 public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context) 170 throws IOException, InterruptedException { 171 Configuration conf = context.getConfiguration(); 172 return new MultiTableRecordWriter(HBaseConfiguration.create(conf), 173 conf.getBoolean(WAL_PROPERTY, WAL_ON)); 174 } 175 176}