View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.TableName;
30  import org.apache.hadoop.hbase.client.Connection;
31  import org.apache.hadoop.hbase.client.ConnectionFactory;
32  import org.apache.hadoop.hbase.client.HTable;
33  import org.apache.hadoop.hbase.client.Put;
34  import org.apache.hadoop.hbase.client.Table;
35  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
36  import org.apache.hadoop.fs.FileAlreadyExistsException;
37  import org.apache.hadoop.mapred.InvalidJobConfException;
38  import org.apache.hadoop.mapred.JobConf;
39  import org.apache.hadoop.mapred.FileOutputFormat;
40  import org.apache.hadoop.mapred.RecordWriter;
41  import org.apache.hadoop.mapred.Reporter;
42  import org.apache.hadoop.util.Progressable;
43  
44  /**
45   * Convert Map/Reduce output and write it to an HBase table
46   */
47  @InterfaceAudience.Public
48  @InterfaceStability.Stable
49  public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
50  
51    /** JobConf parameter that specifies the output table */
52    public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
53    private static final Log LOG = LogFactory.getLog(TableOutputFormat.class);
54  
55    /**
56     * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
57     * and write to an HBase table.
58     */
59    protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
60      private final Connection conn;
61      private final Table table;
62  
63      /**
64       * Instantiate a TableRecordWriter with the HBase HClient for writing. Assumes control over the
65       * lifecycle of {@code conn}.
66       */
67      public TableRecordWriter(Connection conn, TableName tableName) throws IOException {
68        this.conn = conn;
69        this.table = conn.getTable(tableName);
70        ((HTable) this.table).setAutoFlush(false, true);
71      }
72  
73      public void close(Reporter reporter) throws IOException {
74        table.close();
75        conn.close();
76      }
77  
78      public void write(ImmutableBytesWritable key, Put value) throws IOException {
79        table.put(new Put(value));
80      }
81    }
82  
83    @Override
84    public RecordWriter<ImmutableBytesWritable, Put> getRecordWriter(FileSystem ignored, JobConf job,
85        String name, Progressable progress) throws IOException {
86      TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
87      Connection conn = null;
88      try {
89        conn = ConnectionFactory.createConnection(HBaseConfiguration.create(job));
90      } catch(IOException e) {
91        LOG.error(e);
92        throw e;
93      }
94      return new TableRecordWriter(conn, tableName);
95    }
96  
97    @Override
98    public void checkOutputSpecs(FileSystem ignored, JobConf job)
99        throws FileAlreadyExistsException, InvalidJobConfException, IOException {
100     String tableName = job.get(OUTPUT_TABLE);
101     if(tableName == null) {
102       throw new IOException("Must specify table name");
103     }
104   }
105 }