001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileStatus;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.fs.PathFilter;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.io.hfile.HFile;
030import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
031import org.apache.hadoop.hbase.io.hfile.HFileScanner;
032import org.apache.hadoop.io.NullWritable;
033import org.apache.hadoop.mapreduce.InputSplit;
034import org.apache.hadoop.mapreduce.JobContext;
035import org.apache.hadoop.mapreduce.RecordReader;
036import org.apache.hadoop.mapreduce.TaskAttemptContext;
037import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
038import org.apache.hadoop.mapreduce.lib.input.FileSplit;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Simple MR input format for HFiles.
045 * This code was borrowed from Apache Crunch project.
046 * Updated to the recent version of HBase.
047 */
048@InterfaceAudience.Private
049public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
050
051  private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class);
052
053  /**
054   * File filter that removes all "hidden" files. This might be something worth removing from
055   * a more general purpose utility; it accounts for the presence of metadata files created
056   * in the way we're doing exports.
057   */
058  static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
059    @Override
060    public boolean accept(Path p) {
061      String name = p.getName();
062      return !name.startsWith("_") && !name.startsWith(".");
063    }
064  };
065
066  /**
067   * Record reader for HFiles.
068   */
069  private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
070
071    private Reader in;
072    protected Configuration conf;
073    private HFileScanner scanner;
074
075    /**
076     * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
077     */
078    private Cell value = null;
079    private long count;
080    private boolean seeked = false;
081
082    @Override
083    public void initialize(InputSplit split, TaskAttemptContext context)
084        throws IOException, InterruptedException {
085      FileSplit fileSplit = (FileSplit) split;
086      conf = context.getConfiguration();
087      Path path = fileSplit.getPath();
088      FileSystem fs = path.getFileSystem(conf);
089      LOG.info("Initialize HFileRecordReader for {}", path);
090      this.in = HFile.createReader(fs, path, conf);
091
092      // The file info must be loaded before the scanner can be used.
093      // This seems like a bug in HBase, but it's easily worked around.
094      this.scanner = in.getScanner(false, false);
095
096    }
097
098
099    @Override
100    public boolean nextKeyValue() throws IOException, InterruptedException {
101      boolean hasNext;
102      if (!seeked) {
103        LOG.info("Seeking to start");
104        hasNext = scanner.seekTo();
105        seeked = true;
106      } else {
107        hasNext = scanner.next();
108      }
109      if (!hasNext) {
110        return false;
111      }
112      value = scanner.getCell();
113      count++;
114      return true;
115    }
116
117    @Override
118    public NullWritable getCurrentKey() throws IOException, InterruptedException {
119      return NullWritable.get();
120    }
121
122    @Override
123    public Cell getCurrentValue() throws IOException, InterruptedException {
124      return value;
125    }
126
127    @Override
128    public float getProgress() throws IOException, InterruptedException {
129      // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to
130      // the start row, but better than nothing anyway.
131      return 1.0f * count / in.getEntries();
132    }
133
134    @Override
135    public void close() throws IOException {
136      if (in != null) {
137        in.close();
138        in = null;
139      }
140    }
141  }
142
143  @Override
144  protected List<FileStatus> listStatus(JobContext job) throws IOException {
145    List<FileStatus> result = new ArrayList<FileStatus>();
146
147    // Explode out directories that match the original FileInputFormat filters
148    // since HFiles are written to directories where the
149    // directory name is the column name
150    for (FileStatus status : super.listStatus(job)) {
151      if (status.isDirectory()) {
152        FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
153        for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
154          result.add(match);
155        }
156      } else {
157        result.add(status);
158      }
159    }
160    return result;
161  }
162
163  @Override
164  public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split, TaskAttemptContext context)
165      throws IOException, InterruptedException {
166    return new HFileRecordReader();
167  }
168
169  @Override
170  protected boolean isSplitable(JobContext context, Path filename) {
171    // This file isn't splittable.
172    return false;
173  }
174}