001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapred; 019 020import java.io.IOException; 021import org.apache.hadoop.fs.FileAlreadyExistsException; 022import org.apache.hadoop.fs.FileSystem; 023import org.apache.hadoop.hbase.TableName; 024import org.apache.hadoop.hbase.client.BufferedMutator; 025import org.apache.hadoop.hbase.client.Connection; 026import org.apache.hadoop.hbase.client.ConnectionFactory; 027import org.apache.hadoop.hbase.client.Put; 028import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 029import org.apache.hadoop.mapred.FileOutputFormat; 030import org.apache.hadoop.mapred.InvalidJobConfException; 031import org.apache.hadoop.mapred.JobConf; 032import org.apache.hadoop.mapred.RecordWriter; 033import org.apache.hadoop.mapred.Reporter; 034import org.apache.hadoop.util.Progressable; 035import org.apache.yetus.audience.InterfaceAudience; 036 037/** 038 * Convert Map/Reduce output and write it to an HBase table 039 */ 040@InterfaceAudience.Public 041public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> { 042 043 /** JobConf parameter that specifies the output table */ 044 public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; 045 046 /** 047 * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) and write to an HBase 048 * table. 049 */ 050 protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> { 051 private BufferedMutator m_mutator; 052 private Connection conn; 053 054 /** 055 * Instantiate a TableRecordWriter with a BufferedMutator for batch writing. 056 */ 057 public TableRecordWriter(JobConf job) throws IOException { 058 // expecting exactly one path 059 TableName tableName = TableName.valueOf(job.get(OUTPUT_TABLE)); 060 try { 061 this.conn = ConnectionFactory.createConnection(job); 062 this.m_mutator = conn.getBufferedMutator(tableName); 063 } finally { 064 if (this.m_mutator == null) { 065 conn.close(); 066 conn = null; 067 } 068 } 069 } 070 071 public void close(Reporter reporter) throws IOException { 072 try { 073 if (this.m_mutator != null) { 074 this.m_mutator.close(); 075 } 076 } finally { 077 if (conn != null) { 078 this.conn.close(); 079 } 080 } 081 } 082 083 public void write(ImmutableBytesWritable key, Put value) throws IOException { 084 m_mutator.mutate(new Put(value)); 085 } 086 } 087 088 /** 089 * Creates a new record writer. Be aware that the baseline javadoc gives the impression that there 090 * is a single {@link RecordWriter} per job but in HBase, it is more natural if we give you a new 091 * RecordWriter per call of this method. You must close the returned RecordWriter when done. 092 * Failure to do so will drop writes. 093 * @param ignored Ignored filesystem 094 * @param job Current JobConf 095 * @param name Name of the job 096 * @return The newly created writer instance. 097 * @throws IOException When creating the writer fails. 098 */ 099 @Override 100 public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, 101 Progressable progress) throws IOException { 102 // Clear write buffer on fail is true by default so no need to reset it. 103 return new TableRecordWriter(job); 104 } 105 106 @Override 107 public void checkOutputSpecs(FileSystem ignored, JobConf job) 108 throws FileAlreadyExistsException, InvalidJobConfException, IOException { 109 String tableName = job.get(OUTPUT_TABLE); 110 if (tableName == null) { 111 throw new IOException("Must specify table name"); 112 } 113 } 114}