View Javadoc

1   /**
2    * Copyright 2009 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.mapreduce;
21  
22  import java.io.IOException;
23  import java.util.HashMap;
24  import java.util.Map;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.client.Delete;
31  import org.apache.hadoop.hbase.client.HTable;
32  import org.apache.hadoop.hbase.client.Put;
33  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34  import org.apache.hadoop.hbase.util.Bytes;
35  import org.apache.hadoop.io.Writable;
36  import org.apache.hadoop.mapreduce.JobContext;
37  import org.apache.hadoop.mapreduce.OutputCommitter;
38  import org.apache.hadoop.mapreduce.OutputFormat;
39  import org.apache.hadoop.mapreduce.RecordWriter;
40  import org.apache.hadoop.mapreduce.TaskAttemptContext;
41  
42  /**
43   * <p>
44   * Hadoop output format that writes to one or more HBase tables. The key is
45   * taken to be the table name while the output value <em>must</em> be either a
46   * {@link Put} or a {@link Delete} instance. All tables must already exist, and
47   * all Puts and Deletes must reference only valid column families.
48   * </p>
49   *
50   * <p>
51   * Write-ahead logging (HLog) for Puts can be disabled by setting
52   * {@link #WAL_PROPERTY} to {@link #WAL_OFF}. Default value is {@link #WAL_ON}.
53   * Note that disabling write-ahead logging is only appropriate for jobs where
54   * loss of data due to region server failure can be tolerated (for example,
55   * because it is easy to rerun a bulk import).
56   * </p>
57   */
58  public class MultiTableOutputFormat extends OutputFormat<ImmutableBytesWritable, Writable> {
59    /** Set this to {@link #WAL_OFF} to turn off write-ahead logging (HLog) */
60    public static final String WAL_PROPERTY = "hbase.mapreduce.multitableoutputformat.wal";
61    /** Property value to use write-ahead logging */
62    public static final boolean WAL_ON = true;
63    /** Property value to disable write-ahead logging */
64    public static final boolean WAL_OFF = false;
65    /**
66     * Record writer for outputting to multiple HTables.
67     */
68    protected static class MultiTableRecordWriter extends
69        RecordWriter<ImmutableBytesWritable, Writable> {
70      private static final Log LOG = LogFactory.getLog(MultiTableRecordWriter.class);
71      Map<ImmutableBytesWritable, HTable> tables;
72      Configuration conf;
73      boolean useWriteAheadLogging;
74  
75      /**
76       * @param conf
77       *          HBaseConfiguration to used
78       * @param useWriteAheadLogging
79       *          whether to use write ahead logging. This can be turned off (
80       *          <tt>false</tt>) to improve performance when bulk loading data.
81       */
82      public MultiTableRecordWriter(Configuration conf,
83          boolean useWriteAheadLogging) {
84        LOG.debug("Created new MultiTableRecordReader with WAL "
85            + (useWriteAheadLogging ? "on" : "off"));
86        this.tables = new HashMap<ImmutableBytesWritable, HTable>();
87        this.conf = conf;
88        this.useWriteAheadLogging = useWriteAheadLogging;
89      }
90  
91      /**
92       * @param tableName
93       *          the name of the table, as a string
94       * @return the named table
95       * @throws IOException
96       *           if there is a problem opening a table
97       */
98      HTable getTable(ImmutableBytesWritable tableName) throws IOException {
99        if (!tables.containsKey(tableName)) {
100         LOG.debug("Opening HTable \"" + Bytes.toString(tableName.get())+ "\" for writing");
101         HTable table = new HTable(conf, tableName.get());
102         table.setAutoFlush(false);
103         tables.put(tableName, table);
104       }
105       return tables.get(tableName);
106     }
107 
108     @Override
109     public void close(TaskAttemptContext context) throws IOException {
110       for (HTable table : tables.values()) {
111         table.flushCommits();
112       }
113     }
114 
115     /**
116      * Writes an action (Put or Delete) to the specified table.
117      *
118      * @param tableName
119      *          the table being updated.
120      * @param action
121      *          the update, either a put or a delete.
122      * @throws IllegalArgumentException
123      *          if the action is not a put or a delete.
124      */
125     @Override
126     public void write(ImmutableBytesWritable tableName, Writable action) throws IOException {
127       HTable table = getTable(tableName);
128       // The actions are not immutable, so we defensively copy them
129       if (action instanceof Put) {
130         Put put = new Put((Put) action);
131         put.setWriteToWAL(useWriteAheadLogging);
132         table.put(put);
133       } else if (action instanceof Delete) {
134         Delete delete = new Delete((Delete) action);
135         table.delete(delete);
136       } else
137         throw new IllegalArgumentException(
138             "action must be either Delete or Put");
139     }
140   }
141 
142   @Override
143   public void checkOutputSpecs(JobContext context) throws IOException,
144       InterruptedException {
145     // we can't know ahead of time if it's going to blow up when the user
146     // passes a table name that doesn't exist, so nothing useful here.
147   }
148 
149   @Override
150   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
151       throws IOException, InterruptedException {
152     return new TableOutputCommitter();
153   }
154 
155   @Override
156   public RecordWriter<ImmutableBytesWritable, Writable> getRecordWriter(TaskAttemptContext context)
157       throws IOException, InterruptedException {
158     Configuration conf = context.getConfiguration();
159     return new MultiTableRecordWriter(HBaseConfiguration.create(conf),
160         conf.getBoolean(WAL_PROPERTY, WAL_ON));
161   }
162 
163 }