001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.mapred; 020 021import java.io.IOException; 022 023import org.apache.yetus.audience.InterfaceAudience; 024import org.slf4j.Logger; 025import org.slf4j.LoggerFactory; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.hadoop.mapred.FileInputFormat; 032import org.apache.hadoop.mapred.JobConf; 033import org.apache.hadoop.mapred.JobConfigurable; 034import org.apache.hadoop.util.StringUtils; 035 036/** 037 * Convert HBase tabular data into a format that is consumable by Map/Reduce. 038 */ 039@InterfaceAudience.Public 040public class TableInputFormat extends TableInputFormatBase implements 041 JobConfigurable { 042 private static final Logger LOG = LoggerFactory.getLogger(TableInputFormat.class); 043 044 /** 045 * space delimited list of columns 046 */ 047 public static final String COLUMN_LIST = "hbase.mapred.tablecolumns"; 048 049 public void configure(JobConf job) { 050 try { 051 initialize(job); 052 } catch (Exception e) { 053 LOG.error(StringUtils.stringifyException(e)); 054 } 055 } 056 057 @Override 058 protected void initialize(JobConf job) throws IOException { 059 Path[] tableNames = FileInputFormat.getInputPaths(job); 060 String colArg = job.get(COLUMN_LIST); 061 String[] colNames = colArg.split(" "); 062 byte [][] m_cols = new byte[colNames.length][]; 063 for (int i = 0; i < m_cols.length; i++) { 064 m_cols[i] = Bytes.toBytes(colNames[i]); 065 } 066 setInputColumns(m_cols); 067 Connection connection = ConnectionFactory.createConnection(job); 068 initializeTable(connection, TableName.valueOf(tableNames[0].getName())); 069 } 070 071 public void validateInput(JobConf job) throws IOException { 072 // expecting exactly one path 073 Path [] tableNames = FileInputFormat.getInputPaths(job); 074 if (tableNames == null || tableNames.length > 1) { 075 throw new IOException("expecting one table name"); 076 } 077 078 // connected to table? 079 if (getTable() == null) { 080 throw new IOException("could not connect to table '" + 081 tableNames[0].getName() + "'"); 082 } 083 084 // expecting at least one column 085 String colArg = job.get(COLUMN_LIST); 086 if (colArg == null || colArg.length() == 0) { 087 throw new IOException("expecting at least one column"); 088 } 089 } 090}