001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.HashMap; 022import java.util.Map; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.HBaseConfiguration; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.BufferedMutator; 027import org.apache.hadoop.hbase.client.Connection; 028import org.apache.hadoop.hbase.client.ConnectionFactory; 029import org.apache.hadoop.hbase.client.Delete; 030import org.apache.hadoop.hbase.client.Durability; 031import org.apache.hadoop.hbase.client.Mutation; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.mapreduce.JobContext; 036import org.apache.hadoop.mapreduce.OutputCommitter; 037import org.apache.hadoop.mapreduce.OutputFormat; 038import org.apache.hadoop.mapreduce.RecordWriter; 039import org.apache.hadoop.mapreduce.TaskAttemptContext; 040import org.apache.yetus.audience.InterfaceAudience; 041import org.slf4j.Logger; 042import org.slf4j.LoggerFactory; 043 044/** 045 * <p> 046 * Hadoop output format that writes to one or more HBase tables. The key is taken to be the table 047 * name while the output value <em>must</em> be either a {@link Put} or a {@link Delete} instance. 048 * All tables must already exist, and all Puts and Deletes must reference only valid column 049 * families. 050 * </p> 051 * <p> 052 * Write-ahead logging (WAL) for Puts can be disabled by setting {@link #WAL_PROPERTY} to 053 * {@link #WAL_OFF}. Default value is {@link #WAL_ON}. Note that disabling write-ahead logging is 054 * only appropriate for jobs where loss of data due to region server failure can be tolerated (for 055 * example, because it is easy to rerun a bulk import). 056 * </p> 057 */ 058@InterfaceAudience.Public 059public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> { 060 /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */ 061 public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal"; 062 /** Property value to use write-ahead logging */ 063 public static final boolean WAL_ON = true; 064 /** Property value to disable write-ahead logging */ 065 public static final boolean WAL_OFF = false; 066 067 /** 068 * Record writer for outputting to multiple HTables. 069 */ 070 protected static class MultiTableRecordWriter 071 extends RecordWriter<ImmutableBytesWritable, Mutation> { 072 private static final Logger LOG = LoggerFactory.getLogger(MultiTableRecordWriter.class); 073 Connection connection; 074 Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>(); 075 Configuration conf; 076 boolean useWriteAheadLogging; 077 078 /** 079 * HBaseConfiguration to used whether to use write ahead logging. This can be turned off ( 080 * <tt>false</tt>) to improve performance when bulk loading data. 081 */ 082 public MultiTableRecordWriter(Configuration conf, boolean useWriteAheadLogging) 083 throws IOException { 084 LOG.debug( 085 "Created new MultiTableRecordReader with WAL " + (useWriteAheadLogging ? "on" : "off")); 086 this.conf = conf; 087 this.useWriteAheadLogging = useWriteAheadLogging; 088 } 089 090 /** 091 * the name of the table, as a string 092 * @return the named mutator if there is a problem opening a table 093 */ 094 BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException { 095 if (this.connection == null) { 096 this.connection = ConnectionFactory.createConnection(conf); 097 } 098 if (!mutatorMap.containsKey(tableName)) { 099 LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get()) + "\" for writing"); 100 101 BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName.get())); 102 mutatorMap.put(tableName, mutator); 103 } 104 return mutatorMap.get(tableName); 105 } 106 107 @Override 108 public void close(TaskAttemptContext context) throws IOException { 109 for (BufferedMutator mutator : mutatorMap.values()) { 110 mutator.close(); 111 } 112 if (connection != null) { 113 connection.close(); 114 } 115 } 116 117 /** 118 * Writes an action (Put or Delete) to the specified table. the table being updated. the update, 119 * either a put or a delete. if the action is not a put or a delete. 120 */ 121 @Override 122 public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException { 123 BufferedMutator mutator = getBufferedMutator(tableName); 124 // The actions are not immutable, so we defensively copy them 125 if (action instanceof Put) { 126 Put put = new Put((Put) action); 127 put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL : Durability.SKIP_WAL); 128 mutator.mutate(put); 129 } else if (action instanceof Delete) { 130 Delete delete = new Delete((Delete) action); 131 mutator.mutate(delete); 132 } else throw new IllegalArgumentException("action must be either Delete or Put"); 133 } 134 } 135 136 @Override 137 public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { 138 // we can't know ahead of time if it's going to blow up when the user 139 // passes a table name that doesn't exist, so nothing useful here. 140 } 141 142 @Override 143 public OutputCommitter getOutputCommitter(TaskAttemptContext context) 144 throws IOException, InterruptedException { 145 return new TableOutputCommitter(); 146 } 147 148 @Override 149 public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context) 150 throws IOException, InterruptedException { 151 Configuration conf = context.getConfiguration(); 152 return new MultiTableRecordWriter(HBaseConfiguration.create(conf), 153 conf.getBoolean(WAL_PROPERTY, WAL_ON)); 154 } 155 156}