001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.net.URI; 022import java.net.URISyntaxException; 023import org.apache.commons.lang3.StringUtils; 024import org.apache.hadoop.conf.Configurable; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseConfiguration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.TableNotEnabledException; 030import org.apache.hadoop.hbase.TableNotFoundException; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.BufferedMutator; 033import org.apache.hadoop.hbase.client.Connection; 034import org.apache.hadoop.hbase.client.ConnectionFactory; 035import org.apache.hadoop.hbase.client.Delete; 036import org.apache.hadoop.hbase.client.Mutation; 037import org.apache.hadoop.hbase.client.Put; 038import org.apache.hadoop.mapreduce.JobContext; 039import org.apache.hadoop.mapreduce.OutputCommitter; 040import org.apache.hadoop.mapreduce.OutputFormat; 041import org.apache.hadoop.mapreduce.RecordWriter; 042import org.apache.hadoop.mapreduce.TaskAttemptContext; 043import org.apache.yetus.audience.InterfaceAudience; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047/** 048 * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored while the output 049 * value <u>must</u> be either a {@link Put} or a {@link Delete} instance. 050 */ 051@InterfaceAudience.Public 052public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation> implements Configurable { 053 054 private static final Logger LOG = LoggerFactory.getLogger(TableOutputFormat.class); 055 056 /** Job parameter that specifies the output table. */ 057 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; 058 059 /** 060 * Optional job parameter to specify a peer cluster. Used specifying remote cluster when copying 061 * between hbase clusters (the source is picked up from <code>hbase-site.xml</code>). 062 * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, 063 * Class, java.net.URI) 064 */ 065 public static final String OUTPUT_CLUSTER = "hbase.mapred.outputcluster"; 066 067 /** 068 * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}. For 069 * keys matching this prefix, the prefix is stripped, and the value is set in the configuration 070 * with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1" would be set in the 071 * configuration as "key1 = value1". Use this to set properties which should only be applied to 072 * the {@code TableOutputFormat} configuration and not the input configuration. 073 * @deprecated Since 3.0.0, will be removed in 4.0.0. You do not need to use this way for 074 * specifying configurations any more, you can specify any configuration with the 075 * connection uri's queries specified by the {@link #OUTPUT_CLUSTER} parameter. 076 */ 077 @Deprecated 078 public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output."; 079 080 /** 081 * Optional job parameter to specify a peer cluster. Used specifying remote cluster when copying 082 * between hbase clusters (the source is picked up from <code>hbase-site.xml</code>). 083 * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, 084 * Class, String) 085 * @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #OUTPUT_CLUSTER} to specify the 086 * peer cluster instead. 087 */ 088 @Deprecated 089 public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum"; 090 091 /** 092 * Optional job parameter to specify peer cluster's ZK client port. 093 * @deprecated Since 3.0.0, will be removed in 4.0.0. You do not need to use this way for 094 * specifying configurations any more, you can specify any configuration with the 095 * connection uri's queries specified by the {@link #OUTPUT_CLUSTER} parameter. 096 */ 097 @Deprecated 098 public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port"; 099 100 /** 101 * Optional specification of the rs class name of the peer cluster. 102 * @deprecated Since 2.5.9, 2.6.1 and 2.7.0, will be removed in 4.0.0. Does not take effect from 103 * long ago, see HBASE-6044. 104 */ 105 @Deprecated 106 public static final String REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class"; 107 /** 108 * Optional specification of the rs impl name of the peer cluster 109 * @deprecated Since 2.5.9, 2.6.1 and 2.7.0, will be removed in 4.0.0. Does not take effect from 110 * long ago, see HBASE-6044. 111 */ 112 @Deprecated 113 public static final String REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl"; 114 115 /** The configuration. */ 116 private Configuration conf = null; 117 118 private static Connection createConnection(Configuration conf) throws IOException { 119 String outputCluster = conf.get(OUTPUT_CLUSTER); 120 if (!StringUtils.isBlank(outputCluster)) { 121 URI uri; 122 try { 123 uri = new URI(outputCluster); 124 } catch (URISyntaxException e) { 125 throw new IOException( 126 "malformed connection uri: " + outputCluster + ", please check config " + OUTPUT_CLUSTER, 127 e); 128 } 129 return ConnectionFactory.createConnection(uri, conf); 130 } else { 131 return ConnectionFactory.createConnection(conf); 132 } 133 } 134 135 /** 136 * Writes the reducer output to an HBase table. 137 */ 138 protected class TableRecordWriter extends RecordWriter<KEY, Mutation> { 139 140 private Connection connection; 141 private BufferedMutator mutator; 142 143 public TableRecordWriter() throws IOException { 144 this.connection = createConnection(conf); 145 String tableName = conf.get(OUTPUT_TABLE); 146 this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName)); 147 LOG.info("Created table instance for " + tableName); 148 } 149 150 /** 151 * Closes the writer, in this case flush table commits. 152 * @param context The context. 153 * @throws IOException When closing the writer fails. 154 * @see RecordWriter#close(TaskAttemptContext) 155 */ 156 @Override 157 public void close(TaskAttemptContext context) throws IOException { 158 try { 159 if (mutator != null) { 160 mutator.close(); 161 } 162 } finally { 163 if (connection != null) { 164 connection.close(); 165 } 166 } 167 } 168 169 /** 170 * Writes a key/value pair into the table. 171 * @param key The key. 172 * @param value The value. 173 * @throws IOException When writing fails. 174 * @see RecordWriter#write(Object, Object) 175 */ 176 @Override 177 public void write(KEY key, Mutation value) throws IOException { 178 if (!(value instanceof Put) && !(value instanceof Delete)) { 179 throw new IOException("Pass a Delete or a Put"); 180 } 181 mutator.mutate(value); 182 } 183 } 184 185 /** 186 * Creates a new record writer. Be aware that the baseline javadoc gives the impression that there 187 * is a single {@link RecordWriter} per job but in HBase, it is more natural if we give you a new 188 * RecordWriter per call of this method. You must close the returned RecordWriter when done. 189 * Failure to do so will drop writes. 190 * @param context The current task context. 191 * @return The newly created writer instance. 192 * @throws IOException When creating the writer fails. 193 * @throws InterruptedException When the job is cancelled. 194 */ 195 @Override 196 public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context) 197 throws IOException, InterruptedException { 198 return new TableRecordWriter(); 199 } 200 201 /** 202 * Checks if the output table exists and is enabled. 203 * @param context The current context. 204 * @throws IOException When the check fails. 205 * @throws InterruptedException When the job is aborted. 206 * @see OutputFormat#checkOutputSpecs(JobContext) 207 */ 208 @Override 209 public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { 210 Configuration hConf = getConf(); 211 if (hConf == null) { 212 hConf = context.getConfiguration(); 213 } 214 215 try (Connection connection = createConnection(hConf); Admin admin = connection.getAdmin()) { 216 TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE)); 217 if (!admin.tableExists(tableName)) { 218 throw new TableNotFoundException( 219 "Can't write, table does not exist:" + tableName.getNameAsString()); 220 } 221 222 if (!admin.isTableEnabled(tableName)) { 223 throw new TableNotEnabledException( 224 "Can't write, table is not enabled: " + tableName.getNameAsString()); 225 } 226 } 227 } 228 229 /** 230 * Returns the output committer. 231 * @param context The current context. 232 * @return The committer. 233 * @throws IOException When creating the committer fails. 234 * @throws InterruptedException When the job is aborted. 235 * @see OutputFormat#getOutputCommitter(TaskAttemptContext) 236 */ 237 @Override 238 public OutputCommitter getOutputCommitter(TaskAttemptContext context) 239 throws IOException, InterruptedException { 240 return new TableOutputCommitter(); 241 } 242 243 @Override 244 public Configuration getConf() { 245 return conf; 246 } 247 248 @Override 249 public void setConf(Configuration otherConf) { 250 String tableName = otherConf.get(OUTPUT_TABLE); 251 if (tableName == null || tableName.length() <= 0) { 252 throw new IllegalArgumentException("Must specify table name"); 253 } 254 255 String address = otherConf.get(QUORUM_ADDRESS); 256 int zkClientPort = otherConf.getInt(QUORUM_PORT, 0); 257 258 try { 259 this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX); 260 if (zkClientPort != 0) { 261 this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); 262 } 263 } catch (IOException e) { 264 LOG.error(e.toString(), e); 265 throw new RuntimeException(e); 266 } 267 } 268}