001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.net.URI;
022import java.net.URISyntaxException;
023import org.apache.commons.lang3.StringUtils;
024import org.apache.hadoop.conf.Configurable;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.TableNotEnabledException;
030import org.apache.hadoop.hbase.TableNotFoundException;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.BufferedMutator;
033import org.apache.hadoop.hbase.client.Connection;
034import org.apache.hadoop.hbase.client.ConnectionFactory;
035import org.apache.hadoop.hbase.client.Delete;
036import org.apache.hadoop.hbase.client.Durability;
037import org.apache.hadoop.hbase.client.Mutation;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.util.ReflectionUtils;
040import org.apache.hadoop.mapreduce.JobContext;
041import org.apache.hadoop.mapreduce.OutputCommitter;
042import org.apache.hadoop.mapreduce.OutputFormat;
043import org.apache.hadoop.mapreduce.RecordWriter;
044import org.apache.hadoop.mapreduce.TaskAttemptContext;
045import org.apache.yetus.audience.InterfaceAudience;
046import org.slf4j.Logger;
047import org.slf4j.LoggerFactory;
048
049/**
050 * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored while the output
051 * value <u>must</u> be either a {@link Put} or a {@link Delete} instance.
052 */
053@InterfaceAudience.Public
054public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation> implements Configurable {
055
056  private static final Logger LOG = LoggerFactory.getLogger(TableOutputFormat.class);
057
058  /** Job parameter that specifies the output table. */
059  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
060
061  /** Property value to use write-ahead logging */
062  public static final boolean WAL_ON = true;
063
064  /** Property value to disable write-ahead logging */
065  public static final boolean WAL_OFF = false;
066
067  /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
068  public static final String WAL_PROPERTY = "hbase.mapreduce.tableoutputformat.write.wal";
069
070  /**
071   * Optional job parameter to specify a peer cluster. Used specifying remote cluster when copying
072   * between hbase clusters (the source is picked up from <code>hbase-site.xml</code>).
073   * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job,
074   *      Class, java.net.URI)
075   */
076  public static final String OUTPUT_CLUSTER = "hbase.mapred.outputcluster";
077
078  /**
079   * The configuration key for specifying a custom
080   * {@link org.apache.hadoop.mapreduce.OutputCommitter} implementation to be used by
081   * {@link TableOutputFormat}. The value for this property should be the fully qualified class name
082   * of the custom committer. If this property is not set, {@link TableOutputCommitter} will be used
083   * by default.
084   */
085  public static final String OUTPUT_COMMITTER_CLASS =
086    "hbase.mapreduce.tableoutputformat.output.committer.class";
087
088  /**
089   * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}. For
090   * keys matching this prefix, the prefix is stripped, and the value is set in the configuration
091   * with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1" would be set in the
092   * configuration as "key1 = value1". Use this to set properties which should only be applied to
093   * the {@code TableOutputFormat} configuration and not the input configuration.
094   * @deprecated Since 3.0.0, will be removed in 4.0.0. You do not need to use this way for
095   *             specifying configurations any more, you can specify any configuration with the
096   *             connection uri's queries specified by the {@link #OUTPUT_CLUSTER} parameter.
097   */
098  @Deprecated
099  public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
100
101  /**
102   * Optional job parameter to specify a peer cluster. Used specifying remote cluster when copying
103   * between hbase clusters (the source is picked up from <code>hbase-site.xml</code>).
104   * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job,
105   *      Class, String)
106   * @deprecated Since 3.0.0, will be removed in 4.0.0. Use {@link #OUTPUT_CLUSTER} to specify the
107   *             peer cluster instead.
108   */
109  @Deprecated
110  public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum";
111
112  /**
113   * Optional job parameter to specify peer cluster's ZK client port.
114   * @deprecated Since 3.0.0, will be removed in 4.0.0. You do not need to use this way for
115   *             specifying configurations any more, you can specify any configuration with the
116   *             connection uri's queries specified by the {@link #OUTPUT_CLUSTER} parameter.
117   */
118  @Deprecated
119  public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port";
120
121  /**
122   * Optional specification of the rs class name of the peer cluster.
123   * @deprecated Since 2.5.9, 2.6.1 and 2.7.0, will be removed in 4.0.0. Does not take effect from
124   *             long ago, see HBASE-6044.
125   */
126  @Deprecated
127  public static final String REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class";
128  /**
129   * Optional specification of the rs impl name of the peer cluster
130   * @deprecated Since 2.5.9, 2.6.1 and 2.7.0, will be removed in 4.0.0. Does not take effect from
131   *             long ago, see HBASE-6044.
132   */
133  @Deprecated
134  public static final String REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl";
135
136  /** The configuration. */
137  private Configuration conf = null;
138
139  private static Connection createConnection(Configuration conf) throws IOException {
140    String outputCluster = conf.get(OUTPUT_CLUSTER);
141    if (!StringUtils.isBlank(outputCluster)) {
142      URI uri;
143      try {
144        uri = new URI(outputCluster);
145      } catch (URISyntaxException e) {
146        throw new IOException(
147          "malformed connection uri: " + outputCluster + ", please check config " + OUTPUT_CLUSTER,
148          e);
149      }
150      return ConnectionFactory.createConnection(uri, conf);
151    } else {
152      return ConnectionFactory.createConnection(conf);
153    }
154  }
155
156  /**
157   * Writes the reducer output to an HBase table.
158   */
159  protected class TableRecordWriter extends RecordWriter<KEY, Mutation> {
160
161    private Connection connection;
162    private BufferedMutator mutator;
163    boolean useWriteAheadLogging;
164
165    public TableRecordWriter() throws IOException {
166      this.connection = createConnection(conf);
167      String tableName = conf.get(OUTPUT_TABLE);
168      this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
169      LOG.info("Created table instance for " + tableName);
170      this.useWriteAheadLogging = conf.getBoolean(WAL_PROPERTY, WAL_ON);
171    }
172
173    /**
174     * Closes the writer, in this case flush table commits.
175     * @param context The context.
176     * @throws IOException When closing the writer fails.
177     * @see RecordWriter#close(TaskAttemptContext)
178     */
179    @Override
180    public void close(TaskAttemptContext context) throws IOException {
181      try {
182        if (mutator != null) {
183          mutator.close();
184        }
185      } finally {
186        if (connection != null) {
187          connection.close();
188        }
189      }
190    }
191
192    /**
193     * Writes a key/value pair into the table.
194     * @param key   The key.
195     * @param value The value.
196     * @throws IOException When writing fails.
197     * @see RecordWriter#write(Object, Object)
198     */
199    @Override
200    public void write(KEY key, Mutation value) throws IOException {
201      if (!(value instanceof Put) && !(value instanceof Delete)) {
202        throw new IOException("Pass a Delete or a Put");
203      }
204      if (!useWriteAheadLogging) {
205        value.setDurability(Durability.SKIP_WAL);
206      }
207      mutator.mutate(value);
208    }
209  }
210
211  /**
212   * Creates a new record writer. Be aware that the baseline javadoc gives the impression that there
213   * is a single {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
214   * RecordWriter per call of this method. You must close the returned RecordWriter when done.
215   * Failure to do so will drop writes.
216   * @param context The current task context.
217   * @return The newly created writer instance.
218   * @throws IOException          When creating the writer fails.
219   * @throws InterruptedException When the job is cancelled.
220   */
221  @Override
222  public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
223    throws IOException, InterruptedException {
224    return new TableRecordWriter();
225  }
226
227  /**
228   * Checks if the output table exists and is enabled.
229   * @param context The current context.
230   * @throws IOException          When the check fails.
231   * @throws InterruptedException When the job is aborted.
232   * @see OutputFormat#checkOutputSpecs(JobContext)
233   */
234  @Override
235  public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
236    Configuration hConf = getConf();
237    if (hConf == null) {
238      hConf = context.getConfiguration();
239    }
240
241    try (Connection connection = createConnection(hConf); Admin admin = connection.getAdmin()) {
242      TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
243      if (!admin.tableExists(tableName)) {
244        throw new TableNotFoundException(
245          "Can't write, table does not exist:" + tableName.getNameAsString());
246      }
247
248      if (!admin.isTableEnabled(tableName)) {
249        throw new TableNotEnabledException(
250          "Can't write, table is not enabled: " + tableName.getNameAsString());
251      }
252    }
253  }
254
255  /**
256   * Returns the output committer.
257   * @param context The current context.
258   * @return The committer.
259   * @throws IOException          When creating the committer fails.
260   * @throws InterruptedException When the job is aborted.
261   * @see OutputFormat#getOutputCommitter(TaskAttemptContext)
262   */
263  @Override
264  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
265    throws IOException, InterruptedException {
266    Configuration hConf = getConf();
267    if (hConf == null) {
268      hConf = context.getConfiguration();
269    }
270
271    try {
272      Class<? extends OutputCommitter> outputCommitter =
273        hConf.getClass(OUTPUT_COMMITTER_CLASS, TableOutputCommitter.class, OutputCommitter.class);
274      return ReflectionUtils.newInstance(outputCommitter);
275    } catch (Exception e) {
276      throw new IOException("Could not create the configured OutputCommitter", e);
277    }
278  }
279
280  @Override
281  public Configuration getConf() {
282    return conf;
283  }
284
285  @Override
286  public void setConf(Configuration otherConf) {
287    String tableName = otherConf.get(OUTPUT_TABLE);
288    if (tableName == null || tableName.length() <= 0) {
289      throw new IllegalArgumentException("Must specify table name");
290    }
291
292    String address = otherConf.get(QUORUM_ADDRESS);
293    int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
294
295    try {
296      this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
297      if (zkClientPort != 0) {
298        this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
299      }
300    } catch (IOException e) {
301      LOG.error(e.toString(), e);
302      throw new RuntimeException(e);
303    }
304  }
305}