001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mapreduce; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.fs.FileStatus; 025import org.apache.hadoop.fs.FileSystem; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.fs.PathFilter; 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.io.hfile.HFile; 030import org.apache.hadoop.hbase.io.hfile.HFile.Reader; 031import org.apache.hadoop.hbase.io.hfile.HFileScanner; 032import org.apache.hadoop.io.NullWritable; 033import org.apache.hadoop.mapreduce.InputSplit; 034import org.apache.hadoop.mapreduce.JobContext; 035import org.apache.hadoop.mapreduce.RecordReader; 036import org.apache.hadoop.mapreduce.TaskAttemptContext; 037import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 038import org.apache.hadoop.mapreduce.lib.input.FileSplit; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.slf4j.Logger; 041import org.slf4j.LoggerFactory; 042 043/** 044 * Simple MR input format for HFiles. This code was borrowed from Apache Crunch project. Updated to 045 * the recent version of HBase. 046 */ 047@InterfaceAudience.Private 048public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> { 049 050 private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class); 051 052 /** 053 * File filter that removes all "hidden" files. This might be something worth removing from a more 054 * general purpose utility; it accounts for the presence of metadata files created in the way 055 * we're doing exports. 056 */ 057 static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() { 058 @Override 059 public boolean accept(Path p) { 060 String name = p.getName(); 061 return !name.startsWith("_") && !name.startsWith("."); 062 } 063 }; 064 065 /** 066 * Record reader for HFiles. 067 */ 068 private static class HFileRecordReader extends RecordReader<NullWritable, Cell> { 069 070 private Reader in; 071 protected Configuration conf; 072 private HFileScanner scanner; 073 074 /** 075 * A private cache of the key value so it doesn't need to be loaded twice from the scanner. 076 */ 077 private Cell value = null; 078 private long count; 079 private boolean seeked = false; 080 081 @Override 082 public void initialize(InputSplit split, TaskAttemptContext context) 083 throws IOException, InterruptedException { 084 FileSplit fileSplit = (FileSplit) split; 085 conf = context.getConfiguration(); 086 Path path = fileSplit.getPath(); 087 FileSystem fs = path.getFileSystem(conf); 088 LOG.info("Initialize HFileRecordReader for {}", path); 089 this.in = HFile.createReader(fs, path, conf); 090 091 // The file info must be loaded before the scanner can be used. 092 // This seems like a bug in HBase, but it's easily worked around. 093 this.scanner = in.getScanner(conf, false, false); 094 095 } 096 097 @Override 098 public boolean nextKeyValue() throws IOException, InterruptedException { 099 boolean hasNext; 100 if (!seeked) { 101 LOG.info("Seeking to start"); 102 hasNext = scanner.seekTo(); 103 seeked = true; 104 } else { 105 hasNext = scanner.next(); 106 } 107 if (!hasNext) { 108 return false; 109 } 110 value = scanner.getCell(); 111 count++; 112 return true; 113 } 114 115 @Override 116 public NullWritable getCurrentKey() throws IOException, InterruptedException { 117 return NullWritable.get(); 118 } 119 120 @Override 121 public Cell getCurrentValue() throws IOException, InterruptedException { 122 return value; 123 } 124 125 @Override 126 public float getProgress() throws IOException, InterruptedException { 127 // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to 128 // the start row, but better than nothing anyway. 129 return 1.0f * count / in.getEntries(); 130 } 131 132 @Override 133 public void close() throws IOException { 134 if (in != null) { 135 in.close(); 136 in = null; 137 } 138 } 139 } 140 141 @Override 142 protected List<FileStatus> listStatus(JobContext job) throws IOException { 143 List<FileStatus> result = new ArrayList<FileStatus>(); 144 145 // Explode out directories that match the original FileInputFormat filters 146 // since HFiles are written to directories where the 147 // directory name is the column name 148 for (FileStatus status : super.listStatus(job)) { 149 if (status.isDirectory()) { 150 FileSystem fs = status.getPath().getFileSystem(job.getConfiguration()); 151 for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) { 152 result.add(match); 153 } 154 } else { 155 result.add(status); 156 } 157 } 158 return result; 159 } 160 161 @Override 162 public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, 163 TaskAttemptContext context) throws IOException, InterruptedException { 164 return new HFileRecordReader(); 165 } 166 167 @Override 168 protected boolean isSplitable(JobContext context, Path filename) { 169 // This file isn't splittable. 170 return false; 171 } 172}