View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapred;
20  
21  import java.io.IOException;
22  
23  import org.apache.hadoop.fs.FileAlreadyExistsException;
24  import org.apache.hadoop.fs.FileSystem;
25  import org.apache.hadoop.hbase.TableName;
26  import org.apache.hadoop.hbase.classification.InterfaceAudience;
27  import org.apache.hadoop.hbase.classification.InterfaceStability;
28  import org.apache.hadoop.hbase.client.BufferedMutator;
29  import org.apache.hadoop.hbase.client.Connection;
30  import org.apache.hadoop.hbase.client.ConnectionFactory;
31  import org.apache.hadoop.hbase.client.Put;
32  import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33  import org.apache.hadoop.mapred.FileOutputFormat;
34  import org.apache.hadoop.mapred.InvalidJobConfException;
35  import org.apache.hadoop.mapred.JobConf;
36  import org.apache.hadoop.mapred.RecordWriter;
37  import org.apache.hadoop.mapred.Reporter;
38  import org.apache.hadoop.util.Progressable;
39  
40  /**
41   * Convert Map/Reduce output and write it to an HBase table
42   */
43  @InterfaceAudience.Public
44  @InterfaceStability.Stable
45  public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
46  
47    /** JobConf parameter that specifies the output table */
48    public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
49  
50    /**
51     * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
52     * and write to an HBase table.
53     */
54    protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
55      private BufferedMutator m_mutator;
56      private Connection conn;
57
58      /**
59       * Instantiate a TableRecordWriter with a BufferedMutator for batch writing.
60       */
61      public TableRecordWriter(JobConf job) throws IOException {
62        // expecting exactly one path
63        TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE));
64        try {
65          this.conn = ConnectionFactory.createConnection(job);
66          this.m_mutator = conn.getBufferedMutator(tableName);
67        } finally {
68          if (this.m_mutator == null) {
69            conn.close();
70            conn = null;
71          }
72        }
73      }
74
75      public void close(Reporter reporter) throws IOException {
76        if (this.m_mutator != null) {
77          this.m_mutator.close();
78        }
79        if (conn != null) {
80          this.conn.close();
81        }
82      }
83
84      public void write(ImmutableBytesWritable key, Put value) throws IOException {
85        m_mutator.mutate(new Put(value));
86      }
87    }
88
89    /**
90     * Creates a new record writer.
91     * 
92     * Be aware that the baseline javadoc gives the impression that there is a single
93     * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
94     * RecordWriter per call of this method. You must close the returned RecordWriter when done.
95     * Failure to do so will drop writes.
96     *
97     * @param ignored Ignored filesystem
98     * @param job Current JobConf
99     * @param name Name of the job
100    * @param progress
101    * @return The newly created writer instance.
102    * @throws IOException When creating the writer fails.
103    */
104   @Override
105   public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name,
106       Progressable progress)
107   throws IOException {
108     // Clear write buffer on fail is true by default so no need to reset it.
109     return new TableRecordWriter(job);
110   }
111
112   @Override
113   public void checkOutputSpecs(FileSystem ignored, JobConf job)
114   throws FileAlreadyExistsException, InvalidJobConfException, IOException {
115     String tableName = job.get(OUTPUT_TABLE);
116     if (tableName == null) {
117       throw new IOException("Must specify table name");
118     }
119   }
120 }