001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileStatus;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.fs.PathFilter;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.io.hfile.HFile;
030import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
031import org.apache.hadoop.hbase.io.hfile.HFileScanner;
032import org.apache.hadoop.io.NullWritable;
033import org.apache.hadoop.mapreduce.InputSplit;
034import org.apache.hadoop.mapreduce.JobContext;
035import org.apache.hadoop.mapreduce.RecordReader;
036import org.apache.hadoop.mapreduce.TaskAttemptContext;
037import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
038import org.apache.hadoop.mapreduce.lib.input.FileSplit;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Simple MR input format for HFiles.
045 * This code was borrowed from Apache Crunch project.
046 * Updated to the recent version of HBase.
047 */
048@InterfaceAudience.Private
049public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
050
051  private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class);
052
053  /**
054   * File filter that removes all "hidden" files. This might be something worth removing from
055   * a more general purpose utility; it accounts for the presence of metadata files created
056   * in the way we're doing exports.
057   */
058  static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
059    @Override
060    public boolean accept(Path p) {
061      String name = p.getName();
062      return !name.startsWith("_") && !name.startsWith(".");
063    }
064  };
065
066  /**
067   * Record reader for HFiles.
068   */
069  private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
070
071    private Reader in;
072    protected Configuration conf;
073    private HFileScanner scanner;
074
075    /**
076     * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
077     */
078    private Cell value = null;
079    private long count;
080    private boolean seeked = false;
081
082    @Override
083    public void initialize(InputSplit split, TaskAttemptContext context)
084        throws IOException, InterruptedException {
085      FileSplit fileSplit = (FileSplit) split;
086      conf = context.getConfiguration();
087      Path path = fileSplit.getPath();
088      FileSystem fs = path.getFileSystem(conf);
089      LOG.info("Initialize HFileRecordReader for {}", path);
090      this.in = HFile.createReader(fs, path, conf);
091
092      // The file info must be loaded before the scanner can be used.
093      // This seems like a bug in HBase, but it's easily worked around.
094      this.in.loadFileInfo();
095      this.scanner = in.getScanner(false, false);
096
097    }
098
099
100    @Override
101    public boolean nextKeyValue() throws IOException, InterruptedException {
102      boolean hasNext;
103      if (!seeked) {
104        LOG.info("Seeking to start");
105        hasNext = scanner.seekTo();
106        seeked = true;
107      } else {
108        hasNext = scanner.next();
109      }
110      if (!hasNext) {
111        return false;
112      }
113      value = scanner.getCell();
114      count++;
115      return true;
116    }
117
118    @Override
119    public NullWritable getCurrentKey() throws IOException, InterruptedException {
120      return NullWritable.get();
121    }
122
123    @Override
124    public Cell getCurrentValue() throws IOException, InterruptedException {
125      return value;
126    }
127
128    @Override
129    public float getProgress() throws IOException, InterruptedException {
130      // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to
131      // the start row, but better than nothing anyway.
132      return 1.0f * count / in.getEntries();
133    }
134
135    @Override
136    public void close() throws IOException {
137      if (in != null) {
138        in.close();
139        in = null;
140      }
141    }
142  }
143
144  @Override
145  protected List<FileStatus> listStatus(JobContext job) throws IOException {
146    List<FileStatus> result = new ArrayList<FileStatus>();
147
148    // Explode out directories that match the original FileInputFormat filters
149    // since HFiles are written to directories where the
150    // directory name is the column name
151    for (FileStatus status : super.listStatus(job)) {
152      if (status.isDirectory()) {
153        FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
154        for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
155          result.add(match);
156        }
157      } else {
158        result.add(status);
159      }
160    }
161    return result;
162  }
163
164  @Override
165  public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context)
166      throws IOException, InterruptedException {
167    return new HFileRecordReader();
168  }
169
170  @Override
171  protected boolean isSplitable(JobContext context, Path filename) {
172    // This file isn't splittable.
173    return false;
174  }
175}