001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.fs.PathFilter; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.io.hfile.HFile; 030import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 031import org.apache.hadoop.hbase.io.hfile.HFileScanner; 032import org.apache.hadoop.io.NullWritable; 033import org.apache.hadoop.mapreduce.InputSplit; 034import org.apache.hadoop.mapreduce.JobContext; 035import org.apache.hadoop.mapreduce.RecordReader; 036import org.apache.hadoop.mapreduce.TaskAttemptContext; 037import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 038import org.apache.hadoop.mapreduce.lib.input.FileSplit; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Simple MR input format for HFiles. 045 * This code was borrowed from Apache Crunch project. 046 * Updated to the recent version of HBase. 047 */ 048@InterfaceAudience.Private 049public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> { 050 051 private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class); 052 053 /** 054 * File filter that removes all "hidden" files. This might be something worth removing from 055 * a more general purpose utility; it accounts for the presence of metadata files created 056 * in the way we're doing exports. 057 */ 058 static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { 059 @Override 060 public boolean accept(Path p) { 061 String name = p.getName(); 062 return !name.startsWith("_") && !name.startsWith("."); 063 } 064 }; 065 066 /** 067 * Record reader for HFiles. 068 */ 069 private static class HFileRecordReader extends RecordReader<NullWritable, Cell> { 070 071 private Reader in; 072 protected Configuration conf; 073 private HFileScanner scanner; 074 075 /** 076 * A private cache of the key value so it doesn't need to be loaded twice from the scanner. 077 */ 078 private Cell value = null; 079 private long count; 080 private boolean seeked = false; 081 082 @Override 083 public void initialize(InputSplit split, TaskAttemptContext context) 084 throws IOException, InterruptedException { 085 FileSplit fileSplit = (FileSplit) split; 086 conf = context.getConfiguration(); 087 Path path = fileSplit.getPath(); 088 FileSystem fs = path.getFileSystem(conf); 089 LOG.info("Initialize HFileRecordReader for {}", path); 090 this.in = HFile.createReader(fs, path, conf); 091 092 // The file info must be loaded before the scanner can be used. 093 // This seems like a bug in HBase, but it's easily worked around. 094 this.scanner = in.getScanner(false, false); 095 096 } 097 098 099 @Override 100 public boolean nextKeyValue() throws IOException, InterruptedException { 101 boolean hasNext; 102 if (!seeked) { 103 LOG.info("Seeking to start"); 104 hasNext = scanner.seekTo(); 105 seeked = true; 106 } else { 107 hasNext = scanner.next(); 108 } 109 if (!hasNext) { 110 return false; 111 } 112 value = scanner.getCell(); 113 count++; 114 return true; 115 } 116 117 @Override 118 public NullWritable getCurrentKey() throws IOException, InterruptedException { 119 return NullWritable.get(); 120 } 121 122 @Override 123 public Cell getCurrentValue() throws IOException, InterruptedException { 124 return value; 125 } 126 127 @Override 128 public float getProgress() throws IOException, InterruptedException { 129 // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to 130 // the start row, but better than nothing anyway. 131 return 1.0f * count / in.getEntries(); 132 } 133 134 @Override 135 public void close() throws IOException { 136 if (in != null) { 137 in.close(); 138 in = null; 139 } 140 } 141 } 142 143 @Override 144 protected List<FileStatus> listStatus(JobContext job) throws IOException { 145 List<FileStatus> result = new ArrayList<FileStatus>(); 146 147 // Explode out directories that match the original FileInputFormat filters 148 // since HFiles are written to directories where the 149 // directory name is the column name 150 for (FileStatus status : super.listStatus(job)) { 151 if (status.isDirectory()) { 152 FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); 153 for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { 154 result.add(match); 155 } 156 } else { 157 result.add(status); 158 } 159 } 160 return result; 161 } 162 163 @Override 164 public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context) 165 throws IOException, InterruptedException { 166 return new HFileRecordReader(); 167 } 168 169 @Override 170 protected boolean isSplitable(JobContext context, Path filename) { 171 // This file isn't splittable. 172 return false; 173 } 174}