001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.HashMap;
022import java.util.Map;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HBaseConfiguration;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.BufferedMutator;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.Delete;
030import org.apache.hadoop.hbase.client.Durability;
031import org.apache.hadoop.hbase.client.Mutation;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.mapreduce.JobContext;
036import org.apache.hadoop.mapreduce.OutputCommitter;
037import org.apache.hadoop.mapreduce.OutputFormat;
038import org.apache.hadoop.mapreduce.RecordWriter;
039import org.apache.hadoop.mapreduce.TaskAttemptContext;
040import org.apache.yetus.audience.InterfaceAudience;
041import org.slf4j.Logger;
042import org.slf4j.LoggerFactory;
043
044/**
045 * <p>
046 * Hadoop output format that writes to one or more HBase tables. The key is taken to be the table
047 * name while the output value <em>must</em> be either a {@link Put} or a {@link Delete} instance.
048 * All tables must already exist, and all Puts and Deletes must reference only valid column
049 * families.
050 * </p>
051 * <p>
052 * Write-ahead logging (WAL) for Puts can be disabled by setting {@link #WAL_PROPERTY} to
053 * {@link #WAL_OFF}. Default value is {@link #WAL_ON}. Note that disabling write-ahead logging is
054 * only appropriate for jobs where loss of data due to region server failure can be tolerated (for
055 * example, because it is easy to rerun a bulk import).
056 * </p>
057 */
058@InterfaceAudience.Public
059public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Mutation> {
060  /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (WAL) */
061  public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
062  /** Property value to use write-ahead logging */
063  public static final boolean WAL_ON = true;
064  /** Property value to disable write-ahead logging */
065  public static final boolean WAL_OFF = false;
066
067  /**
068   * Record writer for outputting to multiple HTables.
069   */
070  protected static class MultiTableRecordWriter
071    extends RecordWriter<ImmutableBytesWritable, Mutation> {
072    private static final Logger LOG = LoggerFactory.getLogger(MultiTableRecordWriter.class);
073    Connection connection;
074    Map<ImmutableBytesWritable, BufferedMutator> mutatorMap = new HashMap<>();
075    Configuration conf;
076    boolean useWriteAheadLogging;
077
078    /**
079     * n * HBaseConfiguration to used n * whether to use write ahead logging. This can be turned off
080     * ( <tt>false</tt>) to improve performance when bulk loading data.
081     */
082    public MultiTableRecordWriter(Configuration conf, boolean useWriteAheadLogging)
083      throws IOException {
084      LOG.debug(
085        "Created new MultiTableRecordReader with WAL " + (useWriteAheadLogging ? "on" : "off"));
086      this.conf = conf;
087      this.useWriteAheadLogging = useWriteAheadLogging;
088    }
089
090    /**
091     * n * the name of the table, as a string
092     * @return the named mutator n * if there is a problem opening a table
093     */
094    BufferedMutator getBufferedMutator(ImmutableBytesWritable tableName) throws IOException {
095      if (this.connection == null) {
096        this.connection = ConnectionFactory.createConnection(conf);
097      }
098      if (!mutatorMap.containsKey(tableName)) {
099        LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get()) + "\" for writing");
100
101        BufferedMutator mutator = connection.getBufferedMutator(TableName.valueOf(tableName.get()));
102        mutatorMap.put(tableName, mutator);
103      }
104      return mutatorMap.get(tableName);
105    }
106
107    @Override
108    public void close(TaskAttemptContext context) throws IOException {
109      for (BufferedMutator mutator : mutatorMap.values()) {
110        mutator.close();
111      }
112      if (connection != null) {
113        connection.close();
114      }
115    }
116
117    /**
118     * Writes an action (Put or Delete) to the specified table. n * the table being updated. n * the
119     * update, either a put or a delete. n * if the action is not a put or a delete.
120     */
121    @Override
122    public void write(ImmutableBytesWritable tableName, Mutation action) throws IOException {
123      BufferedMutator mutator = getBufferedMutator(tableName);
124      // The actions are not immutable, so we defensively copy them
125      if (action instanceof Put) {
126        Put put = new Put((Put) action);
127        put.setDurability(useWriteAheadLogging ? Durability.SYNC_WAL : Durability.SKIP_WAL);
128        mutator.mutate(put);
129      } else if (action instanceof Delete) {
130        Delete delete = new Delete((Delete) action);
131        mutator.mutate(delete);
132      } else throw new IllegalArgumentException("action must be either Delete or Put");
133    }
134  }
135
136  @Override
137  public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
138    // we can't know ahead of time if it's going to blow up when the user
139    // passes a table name that doesn't exist, so nothing useful here.
140  }
141
142  @Override
143  public OutputCommitter getOutputCommitter(TaskAttemptContext context)
144    throws IOException, InterruptedException {
145    return new TableOutputCommitter();
146  }
147
148  @Override
149  public RecordWriter<ImmutableBytesWritable, Mutation> getRecordWriter(TaskAttemptContext context)
150    throws IOException, InterruptedException {
151    Configuration conf = context.getConfiguration();
152    return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
153      conf.getBoolean(WAL_PROPERTY, WAL_ON));
154  }
155
156}