View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.mapreduce;
20  
21  import java.io.IOException;
22  
23  import org.apache.commons.logging.Log;
24  import org.apache.commons.logging.LogFactory;
25  import org.apache.hadoop.hbase.classification.InterfaceAudience;
26  import org.apache.hadoop.hbase.classification.InterfaceStability;
27  import org.apache.hadoop.conf.Configurable;
28  import org.apache.hadoop.conf.Configuration;
29  import org.apache.hadoop.hbase.HBaseConfiguration;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.TableName;
32  import org.apache.hadoop.hbase.TableNotEnabledException;
33  import org.apache.hadoop.hbase.TableNotFoundException;
34  import org.apache.hadoop.hbase.client.Admin;
35  import org.apache.hadoop.hbase.client.BufferedMutator;
36  import org.apache.hadoop.hbase.client.Connection;
37  import org.apache.hadoop.hbase.client.ConnectionFactory;
38  import org.apache.hadoop.hbase.client.Delete;
39  import org.apache.hadoop.hbase.client.Mutation;
40  import org.apache.hadoop.hbase.client.Put;
41  import org.apache.hadoop.mapreduce.JobContext;
42  import org.apache.hadoop.mapreduce.OutputCommitter;
43  import org.apache.hadoop.mapreduce.OutputFormat;
44  import org.apache.hadoop.mapreduce.RecordWriter;
45  import org.apache.hadoop.mapreduce.TaskAttemptContext;
46
47  /**
48   * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
49   * while the output value <u>must</u> be either a {@link Put} or a
50   * {@link Delete} instance.
51   */
52  @InterfaceAudience.Public
53  @InterfaceStability.Stable
54  public class TableOutputFormat<KEY> extends OutputFormat<KEY, Mutation>
55  implements Configurable {
56
57    private static final Log LOG = LogFactory.getLog(TableOutputFormat.class);
58  
59    /** Job parameter that specifies the output table. */
60    public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
61
62    /**
63     * Prefix for configuration property overrides to apply in {@link #setConf(Configuration)}.
64     * For keys matching this prefix, the prefix is stripped, and the value is set in the
65     * configuration with the resulting key, ie. the entry "hbase.mapred.output.key1 = value1"
66     * would be set in the configuration as "key1 = value1".  Use this to set properties
67     * which should only be applied to the {@code TableOutputFormat} configuration and not the
68     * input configuration.
69     */
70    public static final String OUTPUT_CONF_PREFIX = "hbase.mapred.output.";
71
72    /**
73     * Optional job parameter to specify a peer cluster.
74     * Used specifying remote cluster when copying between hbase clusters (the
75     * source is picked up from <code>hbase-site.xml</code>).
76     * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)
77     */
78    public static final String QUORUM_ADDRESS = OUTPUT_CONF_PREFIX + "quorum";
79  
80    /** Optional job parameter to specify peer cluster's ZK client port */
81    public static final String QUORUM_PORT = OUTPUT_CONF_PREFIX + "quorum.port";
82
83    /** Optional specification of the rs class name of the peer cluster */
84    public static final String
85        REGION_SERVER_CLASS = OUTPUT_CONF_PREFIX + "rs.class";
86    /** Optional specification of the rs impl name of the peer cluster */
87    public static final String
88        REGION_SERVER_IMPL = OUTPUT_CONF_PREFIX + "rs.impl";
89  
90    /** The configuration. */
91    private Configuration conf = null;
92
93    /**
94     * Writes the reducer output to an HBase table.
95     */
96    protected class TableRecordWriter
97    extends RecordWriter<KEY, Mutation> {
98  
99      private Connection connection;
100     private BufferedMutator mutator;
101
102     /**
103      * @throws IOException 
104      * 
105      */
106     public TableRecordWriter() throws IOException {
107       String tableName = conf.get(OUTPUT_TABLE);
108       this.connection = ConnectionFactory.createConnection(conf);
109       this.mutator = connection.getBufferedMutator(TableName.valueOf(tableName));
110       LOG.info("Created table instance for "  + tableName);
111     }
112     /**
113      * Closes the writer, in this case flush table commits.
114      *
115      * @param context  The context.
116      * @throws IOException When closing the writer fails.
117      * @see RecordWriter#close(TaskAttemptContext)
118      */
119     @Override
120     public void close(TaskAttemptContext context)
121     throws IOException {
122       mutator.close();
123       connection.close();
124     }
125
126     /**
127      * Writes a key/value pair into the table.
128      *
129      * @param key  The key.
130      * @param value  The value.
131      * @throws IOException When writing fails.
132      * @see RecordWriter#write(Object, Object)
133      */
134     @Override
135     public void write(KEY key, Mutation value)
136     throws IOException {
137       if (!(value instanceof Put) && !(value instanceof Delete)) {
138         throw new IOException("Pass a Delete or a Put");
139       }
140       mutator.mutate(value);
141     }
142   }
143
144   /**
145    * Creates a new record writer.
146    * 
147    * Be aware that the baseline javadoc gives the impression that there is a single
148    * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new
149    * RecordWriter per call of this method. You must close the returned RecordWriter when done.
150    * Failure to do so will drop writes.
151    *
152    * @param context  The current task context.
153    * @return The newly created writer instance.
154    * @throws IOException When creating the writer fails.
155    * @throws InterruptedException When the jobs is cancelled.
156    */
157   @Override
158   public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
159   throws IOException, InterruptedException {
160     return new TableRecordWriter();
161   }
162
163   /**
164    * Checks if the output table exists and is enabled.
165    *
166    * @param context  The current context.
167    * @throws IOException When the check fails.
168    * @throws InterruptedException When the job is aborted.
169    * @see OutputFormat#checkOutputSpecs(JobContext)
170    */
171   @Override
172   public void checkOutputSpecs(JobContext context) throws IOException,
173       InterruptedException {
174 
175     try (Admin admin = ConnectionFactory.createConnection(getConf()).getAdmin()) {
176       TableName tableName = TableName.valueOf(this.conf.get(OUTPUT_TABLE));
177       if (!admin.tableExists(tableName)) {
178         throw new TableNotFoundException("Can't write, table does not exist:" +
179             tableName.getNameAsString());
180       }
181
182       if (!admin.isTableEnabled(tableName)) {
183         throw new TableNotEnabledException("Can't write, table is not enabled: " +
184             tableName.getNameAsString());
185       }
186     }
187   }
188
189   /**
190    * Returns the output committer.
191    *
192    * @param context  The current context.
193    * @return The committer.
194    * @throws IOException When creating the committer fails.
195    * @throws InterruptedException When the job is aborted.
196    * @see OutputFormat#getOutputCommitter(TaskAttemptContext)
197    */
198   @Override
199   public OutputCommitter getOutputCommitter(TaskAttemptContext context)
200   throws IOException, InterruptedException {
201     return new TableOutputCommitter();
202   }
203
204   @Override
205   public Configuration getConf() {
206     return conf;
207   }
208
209   @Override
210   public void setConf(Configuration otherConf) {
211     String tableName = otherConf.get(OUTPUT_TABLE);
212     if(tableName == null || tableName.length() <= 0) {
213       throw new IllegalArgumentException("Must specify table name");
214     }
215
216     String address = otherConf.get(QUORUM_ADDRESS);
217     int zkClientPort = otherConf.getInt(QUORUM_PORT, 0);
218     String serverClass = otherConf.get(REGION_SERVER_CLASS);
219     String serverImpl = otherConf.get(REGION_SERVER_IMPL);
220
221     try {
222       this.conf = HBaseConfiguration.createClusterConf(otherConf, address, OUTPUT_CONF_PREFIX);
223
224       if (serverClass != null) {
225         this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
226       }
227       if (zkClientPort != 0) {
228         this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort);
229       }
230     } catch(IOException e) {
231       LOG.error(e);
232       throw new RuntimeException(e);
233     }
234   }
235 }