001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.OptionalLong; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.fs.FileStatus; 026import org.apache.hadoop.fs.FileSystem; 027import org.apache.hadoop.fs.Path; 028import org.apache.hadoop.fs.PathFilter; 029import org.apache.hadoop.hbase.Cell; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.hadoop.hbase.io.hfile.HFile; 032import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 033import org.apache.hadoop.hbase.io.hfile.HFileScanner; 034import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 035import org.apache.hadoop.io.NullWritable; 036import org.apache.hadoop.mapreduce.InputSplit; 037import org.apache.hadoop.mapreduce.JobContext; 038import org.apache.hadoop.mapreduce.RecordReader; 039import org.apache.hadoop.mapreduce.TaskAttemptContext; 040import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 041import org.apache.hadoop.mapreduce.lib.input.FileSplit; 042import org.apache.yetus.audience.InterfaceAudience; 043import org.slf4j.Logger; 044import org.slf4j.LoggerFactory; 045 046/** 047 * Simple MR input format for HFiles. This code was borrowed from Apache Crunch project. Updated to 048 * the recent version of HBase. 049 */ 050@InterfaceAudience.Private 051public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> { 052 053 private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class); 054 055 /** 056 * File filter that removes all "hidden" files. This might be something worth removing from a more 057 * general purpose utility; it accounts for the presence of metadata files created in the way 058 * we're doing exports. 059 */ 060 static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { 061 @Override 062 public boolean accept(Path p) { 063 String name = p.getName(); 064 return !name.startsWith("_") && !name.startsWith("."); 065 } 066 }; 067 068 /** 069 * Record reader for HFiles. 070 */ 071 private static class HFileRecordReader extends RecordReader<NullWritable, Cell> { 072 073 private Reader in; 074 protected Configuration conf; 075 private HFileScanner scanner; 076 077 /** 078 * A private cache of the key value so it doesn't need to be loaded twice from the scanner. 079 */ 080 private Cell value = null; 081 private long count; 082 private boolean seeked = false; 083 private OptionalLong bulkloadSeqId; 084 085 @Override 086 public void initialize(InputSplit split, TaskAttemptContext context) 087 throws IOException, InterruptedException { 088 FileSplit fileSplit = (FileSplit) split; 089 conf = context.getConfiguration(); 090 Path path = fileSplit.getPath(); 091 FileSystem fs = path.getFileSystem(conf); 092 LOG.info("Initialize HFileRecordReader for {}", path); 093 this.in = HFile.createReader(fs, path, conf); 094 this.bulkloadSeqId = StoreFileInfo.getBulkloadSeqId(path); 095 096 // The file info must be loaded before the scanner can be used. 097 // This seems like a bug in HBase, but it's easily worked around. 098 this.scanner = in.getScanner(conf, false, false); 099 100 } 101 102 @Override 103 public boolean nextKeyValue() throws IOException, InterruptedException { 104 boolean hasNext; 105 if (!seeked) { 106 LOG.info("Seeking to start"); 107 hasNext = scanner.seekTo(); 108 seeked = true; 109 } else { 110 hasNext = scanner.next(); 111 } 112 if (!hasNext) { 113 return false; 114 } 115 value = scanner.getCell(); 116 if (value != null && bulkloadSeqId.isPresent()) { 117 PrivateCellUtil.setSequenceId(value, bulkloadSeqId.getAsLong()); 118 } 119 count++; 120 return true; 121 } 122 123 @Override 124 public NullWritable getCurrentKey() throws IOException, InterruptedException { 125 return NullWritable.get(); 126 } 127 128 @Override 129 public Cell getCurrentValue() throws IOException, InterruptedException { 130 return value; 131 } 132 133 @Override 134 public float getProgress() throws IOException, InterruptedException { 135 // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to 136 // the start row, but better than nothing anyway. 137 return 1.0f * count / in.getEntries(); 138 } 139 140 @Override 141 public void close() throws IOException { 142 if (in != null) { 143 in.close(); 144 in = null; 145 } 146 } 147 } 148 149 @Override 150 protected List<FileStatus> listStatus(JobContext job) throws IOException { 151 List<FileStatus> result = new ArrayList<FileStatus>(); 152 153 // Explode out directories that match the original FileInputFormat filters 154 // Typically these are <regionname>-level dirs, only requiring 1 level of recursion to 155 // get the <columnname>-level dirs where the HFile are written, but in some cases 156 // <tablename>-level dirs are provided requiring 2 levels of recursion. 157 for (FileStatus status : super.listStatus(job)) { 158 addFilesRecursively(job, status, result); 159 } 160 return result; 161 } 162 163 private static void addFilesRecursively(JobContext job, FileStatus status, 164 List<FileStatus> result) throws IOException { 165 if (status.isDirectory()) { 166 FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); 167 for (FileStatus fileStatus : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { 168 addFilesRecursively(job, fileStatus, result); 169 } 170 } else { 171 result.add(status); 172 } 173 } 174 175 @Override 176 public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, 177 TaskAttemptContext context) throws IOException, InterruptedException { 178 return new HFileRecordReader(); 179 } 180 181 @Override 182 protected boolean isSplitable(JobContext context, Path filename) { 183 // This file isn't splittable. 184 return false; 185 } 186}