001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapred;
020
021import java.io.IOException;
022
023import org.apache.hadoop.fs.FileAlreadyExistsException;
024import org.apache.hadoop.fs.FileSystem;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.yetus.audience.InterfaceAudience;
027import org.apache.hadoop.hbase.client.BufferedMutator;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.Put;
031import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
032import org.apache.hadoop.mapred.FileOutputFormat;
033import org.apache.hadoop.mapred.InvalidJobConfException;
034import org.apache.hadoop.mapred.JobConf;
035import org.apache.hadoop.mapred.RecordWriter;
036import org.apache.hadoop.mapred.Reporter;
037import org.apache.hadoop.util.Progressable;
038
039/**
040 * Convert Map/Reduce output and write it to an HBase table
041 */
042@InterfaceAudience.Public
043public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
044
045  /** JobConf parameter that specifies the output table */
046  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
047
048  /**
049   * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
050   * and write to an HBase table.
051   */
052  protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
053    private BufferedMutator m_mutator;
054    private Connection conn;
055
056
057    /**
058     * Instantiate a TableRecordWriter with the HBase HClient for writing.
059     *
060     * @deprecated since 2.0.0 and will be removed in 3.0.0. Please use
061     *   {@code #TableRecordWriter(JobConf)} instead. This version does not clean up connections and
062     *   will leak connections (removed in 2.0).
063     * @see <a href="https://issues.apache.org/jira/browse/HBASE-16774">HBASE-16774</a>
064     */
065    @Deprecated
066    public TableRecordWriter(final BufferedMutator mutator) throws IOException {
067      this.m_mutator = mutator;
068      this.conn = null;
069    }
070
071    /**
072     * Instantiate a TableRecordWriter with a BufferedMutator for batch writing.
073     */
074    public TableRecordWriter(JobConf job) throws IOException {
075      // expecting exactly one path
076      TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
077      try {
078        this.conn = ConnectionFactory.createConnection(job);
079        this.m_mutator = conn.getBufferedMutator(tableName);
080      } finally {
081        if (this.m_mutator == null) {
082          conn.close();
083          conn = null;
084        }
085      }
086    }
087
088    public void close(Reporter reporter) throws IOException {
089      try {
090        if (this.m_mutator != null) {
091          this.m_mutator.close();
092        }
093      } finally {
094        if (conn != null) {
095          this.conn.close();
096        }
097      }
098    }
099
100    public void write(ImmutableBytesWritable key, Put value) throws IOException {
101      m_mutator.mutate(new Put(value));
102    }
103  }
104
105  /**
106   * Creates a new record writer.
107   *
108   * Be aware that the baseline javadoc gives the impression that there is a single
109   * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
110   * RecordWriter per call of this method. You must close the returned RecordWriter when done.
111   * Failure to do so will drop writes.
112   *
113   * @param ignored Ignored filesystem
114   * @param job Current JobConf
115   * @param name Name of the job
116   * @param progress
117   * @return The newly created writer instance.
118   * @throws IOException When creating the writer fails.
119   */
120  @Override
121  public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
122      Progressable progress)
123  throws IOException {
124    // Clear write buffer on fail is true by default so no need to reset it.
125    return new TableRecordWriter(job);
126  }
127
128  @Override
129  public void checkOutputSpecs(FileSystem ignored, JobConf job)
130  throws FileAlreadyExistsException, InvalidJobConfException, IOException {
131    String tableName = job.get(OUTPUT_TABLE);
132    if (tableName == null) {
133      throw new IOException("Must specify table name");
134    }
135  }
136}