001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import java.io.IOException; 022 023import org.apache.hadoop.fs.FileAlreadyExistsException; 024import org.apache.hadoop.fs.FileSystem; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.yetus.audience.InterfaceAudience; 027import org.apache.hadoop.hbase.client.BufferedMutator; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 032import org.apache.hadoop.mapred.FileOutputFormat; 033import org.apache.hadoop.mapred.InvalidJobConfException; 034import org.apache.hadoop.mapred.JobConf; 035import org.apache.hadoop.mapred.RecordWriter; 036import org.apache.hadoop.mapred.Reporter; 037import org.apache.hadoop.util.Progressable; 038 039/** 040 * Convert Map/Reduce output and write it to an HBase table 041 */ 042@InterfaceAudience.Public 043public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> { 044 045 /** JobConf parameter that specifies the output table */ 046 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; 047 048 /** 049 * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) 050 * and write to an HBase table. 051 */ 052 protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> { 053 private BufferedMutator m_mutator; 054 private Connection conn; 055 056 057 /** 058 * Instantiate a TableRecordWriter with the HBase HClient for writing. 059 * 060 * @deprecated since 2.0.0 and will be removed in 3.0.0. Please use 061 * {@code #TableRecordWriter(JobConf)} instead. This version does not clean up connections and 062 * will leak connections (removed in 2.0). 063 * @see <a href="https://issues.apache.org/jira/browse/HBASE-16774">HBASE-16774</a> 064 */ 065 @Deprecated 066 public TableRecordWriter(final BufferedMutator mutator) throws IOException { 067 this.m_mutator = mutator; 068 this.conn = null; 069 } 070 071 /** 072 * Instantiate a TableRecordWriter with a BufferedMutator for batch writing. 073 */ 074 public TableRecordWriter(JobConf job) throws IOException { 075 // expecting exactly one path 076 TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); 077 try { 078 this.conn = ConnectionFactory.createConnection(job); 079 this.m_mutator = conn.getBufferedMutator(tableName); 080 } finally { 081 if (this.m_mutator == null) { 082 conn.close(); 083 conn = null; 084 } 085 } 086 } 087 088 public void close(Reporter reporter) throws IOException { 089 try { 090 if (this.m_mutator != null) { 091 this.m_mutator.close(); 092 } 093 } finally { 094 if (conn != null) { 095 this.conn.close(); 096 } 097 } 098 } 099 100 public void write(ImmutableBytesWritable key, Put value) throws IOException { 101 m_mutator.mutate(new Put(value)); 102 } 103 } 104 105 /** 106 * Creates a new record writer. 107 * 108 * Be aware that the baseline javadoc gives the impression that there is a single 109 * {@link RecordWriter} per job but in HBase, it is more natural if we give you a new 110 * RecordWriter per call of this method. You must close the returned RecordWriter when done. 111 * Failure to do so will drop writes. 112 * 113 * @param ignored Ignored filesystem 114 * @param job Current JobConf 115 * @param name Name of the job 116 * @param progress 117 * @return The newly created writer instance. 118 * @throws IOException When creating the writer fails. 119 */ 120 @Override 121 public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, 122 Progressable progress) 123 throws IOException { 124 // Clear write buffer on fail is true by default so no need to reset it. 125 return new TableRecordWriter(job); 126 } 127 128 @Override 129 public void checkOutputSpecs(FileSystem ignored, JobConf job) 130 throws FileAlreadyExistsException, InvalidJobConfException, IOException { 131 String tableName = job.get(OUTPUT_TABLE); 132 if (tableName == null) { 133 throw new IOException("Must specify table name"); 134 } 135 } 136}