001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.net.URI; 022import java.net.URISyntaxException; 023import org.apache.commons.lang3.StringUtils; 024import org.apache.hadoop.conf.Configurable; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.TableNotEnabledException; 030import org.apache.hadoop.hbase.TableNotFoundException; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.BufferedMutator; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Durability; 037import org.apache.hadoop.hbase.client.Mutation; 038import org.apache.hadoop.hbase.client.Put; 039import org.apache.hadoop.hbase.util.ReflectionUtils; 040import org.apache.hadoop.mapreduce.JobContext; 041import org.apache.hadoop.mapreduce.OutputCommitter; 042import org.apache.hadoop.mapreduce.OutputFormat; 043import org.apache.hadoop.mapreduce.RecordWriter; 044import org.apache.hadoop.mapreduce.TaskAttemptContext; 045import org.apache.yetus.audience.InterfaceAudience; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049/** 050 * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored while the output 051 * value <u>must</u> be either a {@link Put} or a {@link Delete} instance. 052 */ 053@InterfaceAudience.Public 054public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation> implements Configurable { 055 056 private static final Logger LOG = LoggerFactory.getLogger(TableOutputFormat.class); 057 058 /** Job parameter that specifies the output table. */ 059 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; 060 061 /** Property value to use write-ahead logging */ 062 public static final boolean WAL_ON = true; 063 064 /** Property value to disable write-ahead logging */ 065 public static final boolean WAL_OFF = false; 066 067 /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */ 068 public static final String WAL_PROPERTY = "hbase.mapreduce.tableoutputformat.write.wal"; 069 070 /** 071 * Optional job parameter to specify a peer cluster. Used specifying remote cluster when copying 072 * between hbase clusters (the source is picked up from <code>hbase-site.xml</code>). 073 * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, 074 * Class, java.net.URI) 075 */ 076 public static final String OUTPUT_CLUSTER = "hbase.mapred.outputcluster"; 077 078 /** 079 * The configuration key for specifying a custom 080 * {@link org.apache.hadoop.mapreduce.OutputCommitter} implementation to be used by 081 * {@link TableOutputFormat}. The value for this property should be the fully qualified class name 082 * of the custom committer. If this property is not set, {@link TableOutputCommitter} will be used 083 * by default. 084 */ 085 public static final String OUTPUT_COMMITTER_CLASS = 086 "hbase.mapreduce.tableoutputformat.output.committer.class"; 087 088 /** 089 * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}. For 090 * keys matching this prefix, the prefix is stripped, and the value is set in the configuration 091 * with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1" would be set in the 092 * configuration as "key1 = value1". Use this to set properties which should only be applied to 093 * the {@code TableOutputFormat} configuration and not the input configuration. 094 * @deprecated Since 3.0.0, will be removed in 4.0.0. You do not need to use this way for 095 * specifying configurations any more, you can specify any configuration with the 096 * connection uri's queries specified by the {@link #OUTPUT_CLUSTER} parameter. 097 */ 098 @Deprecated 099 public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output."; 100 101 /** 102 * Optional job parameter to specify a peer cluster. Used specifying remote cluster when copying 103 * between hbase clusters (the source is picked up from <code>hbase-site.xml</code>). 104 * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, 105 * Class, String) 106 * @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #OUTPUT_CLUSTER} to specify the 107 * peer cluster instead. 108 */ 109 @Deprecated 110 public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum"; 111 112 /** 113 * Optional job parameter to specify peer cluster's ZK client port. 114 * @deprecated Since 3.0.0, will be removed in 4.0.0. You do not need to use this way for 115 * specifying configurations any more, you can specify any configuration with the 116 * connection uri's queries specified by the {@link #OUTPUT_CLUSTER} parameter. 117 */ 118 @Deprecated 119 public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port"; 120 121 /** 122 * Optional specification of the rs class name of the peer cluster. 123 * @deprecated Since 2.5.9, 2.6.1 and 2.7.0, will be removed in 4.0.0. Does not take effect from 124 * long ago, see HBASE-6044. 125 */ 126 @Deprecated 127 public static final String REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class"; 128 /** 129 * Optional specification of the rs impl name of the peer cluster 130 * @deprecated Since 2.5.9, 2.6.1 and 2.7.0, will be removed in 4.0.0. Does not take effect from 131 * long ago, see HBASE-6044. 132 */ 133 @Deprecated 134 public static final String REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl"; 135 136 /** The configuration. */ 137 private Configuration conf = null; 138 139 private static Connection createConnection(Configuration conf) throws IOException { 140 String outputCluster = conf.get(OUTPUT_CLUSTER); 141 if (!StringUtils.isBlank(outputCluster)) { 142 URI uri; 143 try { 144 uri = new URI(outputCluster); 145 } catch (URISyntaxException e) { 146 throw new IOException( 147 "malformed connection uri: " + outputCluster + ", please check config " + OUTPUT_CLUSTER, 148 e); 149 } 150 return ConnectionFactory.createConnection(uri, conf); 151 } else { 152 return ConnectionFactory.createConnection(conf); 153 } 154 } 155 156 /** 157 * Writes the reducer output to an HBase table. 158 */ 159 protected class TableRecordWriter extends RecordWriter<KEY, Mutation> { 160 161 private Connection connection; 162 private BufferedMutator mutator; 163 boolean useWriteAheadLogging; 164 165 public TableRecordWriter() throws IOException { 166 this.connection = createConnection(conf); 167 String tableName = conf.get(OUTPUT_TABLE); 168 this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); 169 LOG.info("Created table instance for " + tableName); 170 this.useWriteAheadLogging = conf.getBoolean(WAL_PROPERTY, WAL_ON); 171 } 172 173 /** 174 * Closes the writer, in this case flush table commits. 175 * @param context The context. 176 * @throws IOException When closing the writer fails. 177 * @see RecordWriter#close(TaskAttemptContext) 178 */ 179 @Override 180 public void close(TaskAttemptContext context) throws IOException { 181 try { 182 if (mutator != null) { 183 mutator.close(); 184 } 185 } finally { 186 if (connection != null) { 187 connection.close(); 188 } 189 } 190 } 191 192 /** 193 * Writes a key/value pair into the table. 194 * @param key The key. 195 * @param value The value. 196 * @throws IOException When writing fails. 197 * @see RecordWriter#write(Object, Object) 198 */ 199 @Override 200 public void write(KEY key, Mutation value) throws IOException { 201 if (!(value instanceof Put) && !(value instanceof Delete)) { 202 throw new IOException("Pass a Delete or a Put"); 203 } 204 if (!useWriteAheadLogging) { 205 value.setDurability(Durability.SKIP_WAL); 206 } 207 mutator.mutate(value); 208 } 209 } 210 211 /** 212 * Creates a new record writer. Be aware that the baseline javadoc gives the impression that there 213 * is a single {@link RecordWriter} per job but in HBase, it is more natural if we give you a new 214 * RecordWriter per call of this method. You must close the returned RecordWriter when done. 215 * Failure to do so will drop writes. 216 * @param context The current task context. 217 * @return The newly created writer instance. 218 * @throws IOException When creating the writer fails. 219 * @throws InterruptedException When the job is cancelled. 220 */ 221 @Override 222 public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context) 223 throws IOException, InterruptedException { 224 return new TableRecordWriter(); 225 } 226 227 /** 228 * Checks if the output table exists and is enabled. 229 * @param context The current context. 230 * @throws IOException When the check fails. 231 * @throws InterruptedException When the job is aborted. 232 * @see OutputFormat#checkOutputSpecs(JobContext) 233 */ 234 @Override 235 public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { 236 Configuration hConf = getConf(); 237 if (hConf == null) { 238 hConf = context.getConfiguration(); 239 } 240 241 try (Connection connection = createConnection(hConf); Admin admin = connection.getAdmin()) { 242 TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE)); 243 if (!admin.tableExists(tableName)) { 244 throw new TableNotFoundException( 245 "Can't write, table does not exist:" + tableName.getNameAsString()); 246 } 247 248 if (!admin.isTableEnabled(tableName)) { 249 throw new TableNotEnabledException( 250 "Can't write, table is not enabled: " + tableName.getNameAsString()); 251 } 252 } 253 } 254 255 /** 256 * Returns the output committer. 257 * @param context The current context. 258 * @return The committer. 259 * @throws IOException When creating the committer fails. 260 * @throws InterruptedException When the job is aborted. 261 * @see OutputFormat#getOutputCommitter(TaskAttemptContext) 262 */ 263 @Override 264 public OutputCommitter getOutputCommitter(TaskAttemptContext context) 265 throws IOException, InterruptedException { 266 Configuration hConf = getConf(); 267 if (hConf == null) { 268 hConf = context.getConfiguration(); 269 } 270 271 try { 272 Class<? extends OutputCommitter> outputCommitter = 273 hConf.getClass(OUTPUT_COMMITTER_CLASS, TableOutputCommitter.class, OutputCommitter.class); 274 return ReflectionUtils.newInstance(outputCommitter); 275 } catch (Exception e) { 276 throw new IOException("Could not create the configured OutputCommitter", e); 277 } 278 } 279 280 @Override 281 public Configuration getConf() { 282 return conf; 283 } 284 285 @Override 286 public void setConf(Configuration otherConf) { 287 String tableName = otherConf.get(OUTPUT_TABLE); 288 if (tableName == null || tableName.length() <= 0) { 289 throw new IllegalArgumentException("Must specify table name"); 290 } 291 292 String address = otherConf.get(QUORUM_ADDRESS); 293 int zkClientPort = otherConf.getInt(QUORUM_PORT, 0); 294 295 try { 296 this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX); 297 if (zkClientPort != 0) { 298 this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); 299 } 300 } catch (IOException e) { 301 LOG.error(e.toString(), e); 302 throw new RuntimeException(e); 303 } 304 } 305}