001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.fs.PathFilter; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.io.hfile.HFile; 030import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 031import org.apache.hadoop.hbase.io.hfile.HFileScanner; 032import org.apache.hadoop.io.NullWritable; 033import org.apache.hadoop.mapreduce.InputSplit; 034import org.apache.hadoop.mapreduce.JobContext; 035import org.apache.hadoop.mapreduce.RecordReader; 036import org.apache.hadoop.mapreduce.TaskAttemptContext; 037import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 038import org.apache.hadoop.mapreduce.lib.input.FileSplit; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Simple MR input format for HFiles. 045 * This code was borrowed from Apache Crunch project. 046 * Updated to the recent version of HBase. 047 */ 048@InterfaceAudience.Private 049public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> { 050 051 private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class); 052 053 /** 054 * File filter that removes all "hidden" files. This might be something worth removing from 055 * a more general purpose utility; it accounts for the presence of metadata files created 056 * in the way we're doing exports. 057 */ 058 static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { 059 @Override 060 public boolean accept(Path p) { 061 String name = p.getName(); 062 return !name.startsWith("_") && !name.startsWith("."); 063 } 064 }; 065 066 /** 067 * Record reader for HFiles. 068 */ 069 private static class HFileRecordReader extends RecordReader<NullWritable, Cell> { 070 071 private Reader in; 072 protected Configuration conf; 073 private HFileScanner scanner; 074 075 /** 076 * A private cache of the key value so it doesn't need to be loaded twice from the scanner. 077 */ 078 private Cell value = null; 079 private long count; 080 private boolean seeked = false; 081 082 @Override 083 public void initialize(InputSplit split, TaskAttemptContext context) 084 throws IOException, InterruptedException { 085 FileSplit fileSplit = (FileSplit) split; 086 conf = context.getConfiguration(); 087 Path path = fileSplit.getPath(); 088 FileSystem fs = path.getFileSystem(conf); 089 LOG.info("Initialize HFileRecordReader for {}", path); 090 this.in = HFile.createReader(fs, path, conf); 091 092 // The file info must be loaded before the scanner can be used. 093 // This seems like a bug in HBase, but it's easily worked around. 094 this.in.loadFileInfo(); 095 this.scanner = in.getScanner(false, false); 096 097 } 098 099 100 @Override 101 public boolean nextKeyValue() throws IOException, InterruptedException { 102 boolean hasNext; 103 if (!seeked) { 104 LOG.info("Seeking to start"); 105 hasNext = scanner.seekTo(); 106 seeked = true; 107 } else { 108 hasNext = scanner.next(); 109 } 110 if (!hasNext) { 111 return false; 112 } 113 value = scanner.getCell(); 114 count++; 115 return true; 116 } 117 118 @Override 119 public NullWritable getCurrentKey() throws IOException, InterruptedException { 120 return NullWritable.get(); 121 } 122 123 @Override 124 public Cell getCurrentValue() throws IOException, InterruptedException { 125 return value; 126 } 127 128 @Override 129 public float getProgress() throws IOException, InterruptedException { 130 // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to 131 // the start row, but better than nothing anyway. 132 return 1.0f * count / in.getEntries(); 133 } 134 135 @Override 136 public void close() throws IOException { 137 if (in != null) { 138 in.close(); 139 in = null; 140 } 141 } 142 } 143 144 @Override 145 protected List<FileStatus> listStatus(JobContext job) throws IOException { 146 List<FileStatus> result = new ArrayList<FileStatus>(); 147 148 // Explode out directories that match the original FileInputFormat filters 149 // since HFiles are written to directories where the 150 // directory name is the column name 151 for (FileStatus status : super.listStatus(job)) { 152 if (status.isDirectory()) { 153 FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); 154 for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { 155 result.add(match); 156 } 157 } else { 158 result.add(status); 159 } 160 } 161 return result; 162 } 163 164 @Override 165 public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context) 166 throws IOException, InterruptedException { 167 return new HFileRecordReader(); 168 } 169 170 @Override 171 protected boolean isSplitable(JobContext context, Path filename) { 172 // This file isn't splittable. 173 return false; 174 } 175}