View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.classification.InterfaceAudience;
26  import org.apache.hadoop.classification.InterfaceStability;
27  import org.apache.hadoop.fs.FileSystem;
28  import org.apache.hadoop.hbase.HBaseConfiguration;
29  import org.apache.hadoop.hbase.client.HTable;
30  import org.apache.hadoop.hbase.client.Put;
31  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
32  import org.apache.hadoop.fs.FileAlreadyExistsException;
33  import org.apache.hadoop.mapred.InvalidJobConfException;
34  import org.apache.hadoop.mapred.JobConf;
35  import org.apache.hadoop.mapred.FileOutputFormat;
36  import org.apache.hadoop.mapred.RecordWriter;
37  import org.apache.hadoop.mapred.Reporter;
38  import org.apache.hadoop.util.Progressable;
39  
40  /**
41   * Convert Map/Reduce output and write it to an HBase table
42   */
43  @Deprecated
44  @InterfaceAudience.Public
45  @InterfaceStability.Stable
46  public class TableOutputFormat extends
47  FileOutputFormat<ImmutableBytesWritable, Put> {
48  
49    /** JobConf parameter that specifies the output table */
50    public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
51    private static final Log LOG = LogFactory.getLog(TableOutputFormat.class);
52  
53    /**
54     * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
55     * and write to an HBase table
56     */
57    protected static class TableRecordWriter
58      implements RecordWriter<ImmutableBytesWritable, Put> {
59      private HTable m_table;
60  
61      /**
62       * Instantiate a TableRecordWriter with the HBase HClient for writing.
63       *
64       * @param table
65       */
66      public TableRecordWriter(HTable table) {
67        m_table = table;
68      }
69  
70      public void close(Reporter reporter)
71        throws IOException {
72        m_table.close();
73      }
74  
75      public void write(ImmutableBytesWritable key,
76          Put value) throws IOException {
77        m_table.put(new Put(value));
78      }
79    }
80  
81    @Override
82    @SuppressWarnings("unchecked")
83    public RecordWriter getRecordWriter(FileSystem ignored,
84        JobConf job, String name, Progressable progress) throws IOException {
85  
86      // expecting exactly one path
87  
88      String tableName = job.get(OUTPUT_TABLE);
89      HTable table = null;
90      try {
91        table = new HTable(HBaseConfiguration.create(job), tableName);
92      } catch(IOException e) {
93        LOG.error(e);
94        throw e;
95      }
96      table.setAutoFlush(false, true);
97      return new TableRecordWriter(table);
98    }
99  
100   @Override
101   public void checkOutputSpecs(FileSystem ignored, JobConf job)
102   throws FileAlreadyExistsException, InvalidJobConfException, IOException {
103 
104     String tableName = job.get(OUTPUT_TABLE);
105     if(tableName == null) {
106       throw new IOException("Must specify table name");
107     }
108   }
109 }