View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configurable;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.client.BufferedMutator;
33  import org.apache.hadoop.hbase.client.Connection;
34  import org.apache.hadoop.hbase.client.ConnectionFactory;
35  import org.apache.hadoop.hbase.client.Delete;
36  import org.apache.hadoop.hbase.client.Mutation;
37  import org.apache.hadoop.hbase.client.Put;
38  import org.apache.hadoop.mapreduce.JobContext;
39  import org.apache.hadoop.mapreduce.OutputCommitter;
40  import org.apache.hadoop.mapreduce.OutputFormat;
41  import org.apache.hadoop.mapreduce.RecordWriter;
42  import org.apache.hadoop.mapreduce.TaskAttemptContext;
43  
44  /**
45   * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
46   * while the output value <u>must</u> be either a {@link Put} or a
47   * {@link Delete} instance.
48   */
49  @InterfaceAudience.Public
50  @InterfaceStability.Stable
51  public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation>
52  implements Configurable {
53  
54    private static final Log LOG = LogFactory.getLog(TableOutputFormat.class);
55  
56    /** Job parameter that specifies the output table. */
57    public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
58  
59    /**
60     * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}.
61     * For keys matching this prefix, the prefix is stripped, and the value is set in the
62     * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1"
63     * would be set in the configuration as "key1 = value1".  Use this to set properties
64     * which should only be applied to the {@code TableOutputFormat} configuration and not the
65     * input configuration.
66     */
67    public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
68  
69    /**
70     * Optional job parameter to specify a peer cluster.
71     * Used specifying remote cluster when copying between hbase clusters (the
72     * source is picked up from <code>hbase-site.xml</code>).
73     * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)
74     */
75    public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum";
76  
77    /** Optional job parameter to specify peer cluster's ZK client port */
78    public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port";
79  
80    /** Optional specification of the rs class name of the peer cluster */
81    public static final String
82        REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class";
83    /** Optional specification of the rs impl name of the peer cluster */
84    public static final String
85        REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl";
86  
87    /** The configuration. */
88    private Configuration conf = null;
89  
90    /**
91     * Writes the reducer output to an HBase table.
92     */
93    protected class TableRecordWriter
94    extends RecordWriter<KEY, Mutation> {
95  
96      private Connection connection;
97      private BufferedMutator mutator;
98  
99      /**
100      * @throws IOException 
101      * 
102      */
103     public TableRecordWriter() throws IOException {
104       String tableName = conf.get(OUTPUT_TABLE);
105       this.connection = ConnectionFactory.createConnection(conf);
106       this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
107       LOG.info("Created table instance for "  + tableName);
108     }
109     /**
110      * Closes the writer, in this case flush table commits.
111      *
112      * @param context  The context.
113      * @throws IOException When closing the writer fails.
114      * @see RecordWriter#close(TaskAttemptContext)
115      */
116     @Override
117     public void close(TaskAttemptContext context)
118     throws IOException {
119       mutator.close();
120       connection.close();
121     }
122 
123     /**
124      * Writes a key/value pair into the table.
125      *
126      * @param key  The key.
127      * @param value  The value.
128      * @throws IOException When writing fails.
129      * @see RecordWriter#write(Object, Object)
130      */
131     @Override
132     public void write(KEY key, Mutation value)
133     throws IOException {
134       if (!(value instanceof Put) && !(value instanceof Delete)) {
135         throw new IOException("Pass a Delete or a Put");
136       }
137       mutator.mutate(value);
138     }
139   }
140 
141   /**
142    * Creates a new record writer.
143    * 
144    * Be aware that the baseline javadoc gives the impression that there is a single
145    * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
146    * RecordWriter per call of this method. You must close the returned RecordWriter when done.
147    * Failure to do so will drop writes.
148    *
149    * @param context  The current task context.
150    * @return The newly created writer instance.
151    * @throws IOException When creating the writer fails.
152    * @throws InterruptedException When the jobs is cancelled.
153    */
154   @Override
155   public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
156   throws IOException, InterruptedException {
157     return new TableRecordWriter();
158   }
159 
160   /**
161    * Checks if the output target exists.
162    *
163    * @param context  The current context.
164    * @throws IOException When the check fails.
165    * @throws InterruptedException When the job is aborted.
166    * @see OutputFormat#checkOutputSpecs(JobContext)
167    */
168   @Override
169   public void checkOutputSpecs(JobContext context) throws IOException,
170       InterruptedException {
171     // TODO Check if the table exists?
172 
173   }
174 
175   /**
176    * Returns the output committer.
177    *
178    * @param context  The current context.
179    * @return The committer.
180    * @throws IOException When creating the committer fails.
181    * @throws InterruptedException When the job is aborted.
182    * @see OutputFormat#getOutputCommitter(TaskAttemptContext)
183    */
184   @Override
185   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
186   throws IOException, InterruptedException {
187     return new TableOutputCommitter();
188   }
189 
190   @Override
191   public Configuration getConf() {
192     return conf;
193   }
194 
195   @Override
196   public void setConf(Configuration otherConf) {
197     String tableName = otherConf.get(OUTPUT_TABLE);
198     if(tableName == null || tableName.length() <= 0) {
199       throw new IllegalArgumentException("Must specify table name");
200     }
201 
202     String address = otherConf.get(QUORUM_ADDRESS);
203     int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
204     String serverClass = otherConf.get(REGION_SERVER_CLASS);
205     String serverImpl = otherConf.get(REGION_SERVER_IMPL);
206 
207     try {
208       this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
209 
210       if (serverClass != null) {
211         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
212       }
213       if (zkClientPort != 0) {
214         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
215       }
216     } catch(IOException e) {
217       LOG.error(e);
218       throw new RuntimeException(e);
219     }
220   }
221 }