001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapreduce; 020 021import java.io.IOException; 022import org.apache.yetus.audience.InterfaceAudience; 023import org.slf4j.Logger; 024import org.slf4j.LoggerFactory; 025import org.apache.hadoop.conf.Configurable; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseConfiguration; 028import org.apache.hadoop.hbase.HConstants; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.TableNotEnabledException; 031import org.apache.hadoop.hbase.TableNotFoundException; 032import org.apache.hadoop.hbase.client.Admin; 033import org.apache.hadoop.hbase.client.BufferedMutator; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Delete; 037import org.apache.hadoop.hbase.client.Mutation; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.mapreduce.JobContext; 040import org.apache.hadoop.mapreduce.OutputCommitter; 041import org.apache.hadoop.mapreduce.OutputFormat; 042import org.apache.hadoop.mapreduce.RecordWriter; 043import org.apache.hadoop.mapreduce.TaskAttemptContext; 044 045/** 046 * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored 047 * while the output value <u>must</u> be either a {@link Put} or a 048 * {@link Delete} instance. 049 */ 050@InterfaceAudience.Public 051public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation> 052implements Configurable { 053 054 private static final Logger LOG = LoggerFactory.getLogger(TableOutputFormat.class); 055 056 /** Job parameter that specifies the output table. */ 057 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; 058 059 /** 060 * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}. 061 * For keys matching this prefix, the prefix is stripped, and the value is set in the 062 * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1" 063 * would be set in the configuration as "key1 = value1". Use this to set properties 064 * which should only be applied to the {@code TableOutputFormat} configuration and not the 065 * input configuration. 066 */ 067 public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output."; 068 069 /** 070 * Optional job parameter to specify a peer cluster. 071 * Used specifying remote cluster when copying between hbase clusters (the 072 * source is picked up from <code>hbase-site.xml</code>). 073 * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String) 074 */ 075 public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum"; 076 077 /** Optional job parameter to specify peer cluster's ZK client port */ 078 public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port"; 079 080 /** Optional specification of the rs class name of the peer cluster */ 081 public static final String 082 REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class"; 083 /** Optional specification of the rs impl name of the peer cluster */ 084 public static final String 085 REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl"; 086 087 /** The configuration. */ 088 private Configuration conf = null; 089 090 /** 091 * Writes the reducer output to an HBase table. 092 */ 093 protected class TableRecordWriter 094 extends RecordWriter<KEY, Mutation> { 095 096 private Connection connection; 097 private BufferedMutator mutator; 098 099 /** 100 * @throws IOException 101 * 102 */ 103 public TableRecordWriter() throws IOException { 104 String tableName = conf.get(OUTPUT_TABLE); 105 this.connection = ConnectionFactory.createConnection(conf); 106 this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); 107 LOG.info("Created table instance for " + tableName); 108 } 109 /** 110 * Closes the writer, in this case flush table commits. 111 * 112 * @param context The context. 113 * @throws IOException When closing the writer fails. 114 * @see RecordWriter#close(TaskAttemptContext) 115 */ 116 @Override 117 public void close(TaskAttemptContext context) throws IOException { 118 try { 119 if (mutator != null) { 120 mutator.close(); 121 } 122 } finally { 123 if (connection != null) { 124 connection.close(); 125 } 126 } 127 } 128 129 /** 130 * Writes a key/value pair into the table. 131 * 132 * @param key The key. 133 * @param value The value. 134 * @throws IOException When writing fails. 135 * @see RecordWriter#write(Object, Object) 136 */ 137 @Override 138 public void write(KEY key, Mutation value) 139 throws IOException { 140 if (!(value instanceof Put) && !(value instanceof Delete)) { 141 throw new IOException("Pass a Delete or a Put"); 142 } 143 mutator.mutate(value); 144 } 145 } 146 147 /** 148 * Creates a new record writer. 149 * 150 * Be aware that the baseline javadoc gives the impression that there is a single 151 * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new 152 * RecordWriter per call of this method. You must close the returned RecordWriter when done. 153 * Failure to do so will drop writes. 154 * 155 * @param context The current task context. 156 * @return The newly created writer instance. 157 * @throws IOException When creating the writer fails. 158 * @throws InterruptedException When the jobs is cancelled. 159 */ 160 @Override 161 public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context) 162 throws IOException, InterruptedException { 163 return new TableRecordWriter(); 164 } 165 166 /** 167 * Checks if the output table exists and is enabled. 168 * 169 * @param context The current context. 170 * @throws IOException When the check fails. 171 * @throws InterruptedException When the job is aborted. 172 * @see OutputFormat#checkOutputSpecs(JobContext) 173 */ 174 @Override 175 public void checkOutputSpecs(JobContext context) throws IOException, 176 InterruptedException { 177 Configuration hConf = getConf(); 178 if (hConf == null) { 179 hConf = context.getConfiguration(); 180 } 181 182 try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) { 183 TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE)); 184 if (!admin.tableExists(tableName)) { 185 throw new TableNotFoundException("Can't write, table does not exist:" + 186 tableName.getNameAsString()); 187 } 188 189 if (!admin.isTableEnabled(tableName)) { 190 throw new TableNotEnabledException("Can't write, table is not enabled: " + 191 tableName.getNameAsString()); 192 } 193 } 194 } 195 196 /** 197 * Returns the output committer. 198 * 199 * @param context The current context. 200 * @return The committer. 201 * @throws IOException When creating the committer fails. 202 * @throws InterruptedException When the job is aborted. 203 * @see OutputFormat#getOutputCommitter(TaskAttemptContext) 204 */ 205 @Override 206 public OutputCommitter getOutputCommitter(TaskAttemptContext context) 207 throws IOException, InterruptedException { 208 return new TableOutputCommitter(); 209 } 210 211 @Override 212 public Configuration getConf() { 213 return conf; 214 } 215 216 @Override 217 public void setConf(Configuration otherConf) { 218 String tableName = otherConf.get(OUTPUT_TABLE); 219 if(tableName == null || tableName.length() <= 0) { 220 throw new IllegalArgumentException("Must specify table name"); 221 } 222 223 String address = otherConf.get(QUORUM_ADDRESS); 224 int zkClientPort = otherConf.getInt(QUORUM_PORT, 0); 225 String serverClass = otherConf.get(REGION_SERVER_CLASS); 226 String serverImpl = otherConf.get(REGION_SERVER_IMPL); 227 228 try { 229 this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX); 230 231 if (serverClass != null) { 232 this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); 233 } 234 if (zkClientPort != 0) { 235 this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); 236 } 237 } catch(IOException e) { 238 LOG.error(e.toString(), e); 239 throw new RuntimeException(e); 240 } 241 } 242}