001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.IOException;
022import org.apache.yetus.audience.InterfaceAudience;
023import org.slf4j.Logger;
024import org.slf4j.LoggerFactory;
025import org.apache.hadoop.conf.Configurable;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.HBaseConfiguration;
028import org.apache.hadoop.hbase.HConstants;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.TableNotEnabledException;
031import org.apache.hadoop.hbase.TableNotFoundException;
032import org.apache.hadoop.hbase.client.Admin;
033import org.apache.hadoop.hbase.client.BufferedMutator;
034import org.apache.hadoop.hbase.client.Connection;
035import org.apache.hadoop.hbase.client.ConnectionFactory;
036import org.apache.hadoop.hbase.client.Delete;
037import org.apache.hadoop.hbase.client.Mutation;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.mapreduce.JobContext;
040import org.apache.hadoop.mapreduce.OutputCommitter;
041import org.apache.hadoop.mapreduce.OutputFormat;
042import org.apache.hadoop.mapreduce.RecordWriter;
043import org.apache.hadoop.mapreduce.TaskAttemptContext;
044
045/**
046 * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
047 * while the output value <u>must</u> be either a {@link Put} or a
048 * {@link Delete} instance.
049 */
050@InterfaceAudience.Public
051public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation>
052implements Configurable {
053
054  private static final Logger LOG = LoggerFactory.getLogger(TableOutputFormat.class);
055
056  /** Job parameter that specifies the output table. */
057  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
058
059  /**
060   * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}.
061   * For keys matching this prefix, the prefix is stripped, and the value is set in the
062   * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1"
063   * would be set in the configuration as "key1 = value1".  Use this to set properties
064   * which should only be applied to the {@code TableOutputFormat} configuration and not the
065   * input configuration.
066   */
067  public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
068
069  /**
070   * Optional job parameter to specify a peer cluster.
071   * Used specifying remote cluster when copying between hbase clusters (the
072   * source is picked up from <code>hbase-site.xml</code>).
073   * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)
074   */
075  public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum";
076
077  /** Optional job parameter to specify peer cluster's ZK client port */
078  public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port";
079
080  /** Optional specification of the rs class name of the peer cluster */
081  public static final String
082      REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class";
083  /** Optional specification of the rs impl name of the peer cluster */
084  public static final String
085      REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl";
086
087  /** The configuration. */
088  private Configuration conf = null;
089
090  /**
091   * Writes the reducer output to an HBase table.
092   */
093  protected class TableRecordWriter
094  extends RecordWriter<KEY, Mutation> {
095
096    private Connection connection;
097    private BufferedMutator mutator;
098
099    /**
100     * @throws IOException
101     *
102     */
103    public TableRecordWriter() throws IOException {
104      String tableName = conf.get(OUTPUT_TABLE);
105      this.connection = ConnectionFactory.createConnection(conf);
106      this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
107      LOG.info("Created table instance for "  + tableName);
108    }
109    /**
110     * Closes the writer, in this case flush table commits.
111     *
112     * @param context  The context.
113     * @throws IOException When closing the writer fails.
114     * @see RecordWriter#close(TaskAttemptContext)
115     */
116    @Override
117    public void close(TaskAttemptContext context) throws IOException {
118      try {
119        if (mutator != null) {
120          mutator.close();
121        }
122      } finally {
123        if (connection != null) {
124          connection.close();
125        }
126      }
127    }
128
129    /**
130     * Writes a key/value pair into the table.
131     *
132     * @param key  The key.
133     * @param value  The value.
134     * @throws IOException When writing fails.
135     * @see RecordWriter#write(Object, Object)
136     */
137    @Override
138    public void write(KEY key, Mutation value)
139    throws IOException {
140      if (!(value instanceof Put) && !(value instanceof Delete)) {
141        throw new IOException("Pass a Delete or a Put");
142      }
143      mutator.mutate(value);
144    }
145  }
146
147  /**
148   * Creates a new record writer.
149   *
150   * Be aware that the baseline javadoc gives the impression that there is a single
151   * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
152   * RecordWriter per call of this method. You must close the returned RecordWriter when done.
153   * Failure to do so will drop writes.
154   *
155   * @param context  The current task context.
156   * @return The newly created writer instance.
157   * @throws IOException When creating the writer fails.
158   * @throws InterruptedException When the jobs is cancelled.
159   */
160  @Override
161  public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
162  throws IOException, InterruptedException {
163    return new TableRecordWriter();
164  }
165
166  /**
167   * Checks if the output table exists and is enabled.
168   *
169   * @param context  The current context.
170   * @throws IOException When the check fails.
171   * @throws InterruptedException When the job is aborted.
172   * @see OutputFormat#checkOutputSpecs(JobContext)
173   */
174  @Override
175  public void checkOutputSpecs(JobContext context) throws IOException,
176      InterruptedException {
177    Configuration hConf = getConf();
178    if (hConf == null) {
179      hConf = context.getConfiguration();
180    }
181
182    try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) {
183      TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
184      if (!admin.tableExists(tableName)) {
185        throw new TableNotFoundException("Can't write, table does not exist:" +
186            tableName.getNameAsString());
187      }
188
189      if (!admin.isTableEnabled(tableName)) {
190        throw new TableNotEnabledException("Can't write, table is not enabled: " +
191            tableName.getNameAsString());
192      }
193    }
194  }
195
196  /**
197   * Returns the output committer.
198   *
199   * @param context  The current context.
200   * @return The committer.
201   * @throws IOException When creating the committer fails.
202   * @throws InterruptedException When the job is aborted.
203   * @see OutputFormat#getOutputCommitter(TaskAttemptContext)
204   */
205  @Override
206  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
207  throws IOException, InterruptedException {
208    return new TableOutputCommitter();
209  }
210
211  @Override
212  public Configuration getConf() {
213    return conf;
214  }
215
216  @Override
217  public void setConf(Configuration otherConf) {
218    String tableName = otherConf.get(OUTPUT_TABLE);
219    if(tableName == null || tableName.length() <= 0) {
220      throw new IllegalArgumentException("Must specify table name");
221    }
222
223    String address = otherConf.get(QUORUM_ADDRESS);
224    int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
225    String serverClass = otherConf.get(REGION_SERVER_CLASS);
226    String serverImpl = otherConf.get(REGION_SERVER_IMPL);
227
228    try {
229      this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
230
231      if (serverClass != null) {
232        this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
233      }
234      if (zkClientPort != 0) {
235        this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
236      }
237    } catch(IOException e) {
238      LOG.error(e.toString(), e);
239      throw new RuntimeException(e);
240    }
241  }
242}