001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.List;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.fs.FileStatus;
025import org.apache.hadoop.fs.FileSystem;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.fs.PathFilter;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.io.hfile.HFile;
030import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
031import org.apache.hadoop.hbase.io.hfile.HFileScanner;
032import org.apache.hadoop.io.NullWritable;
033import org.apache.hadoop.mapreduce.InputSplit;
034import org.apache.hadoop.mapreduce.JobContext;
035import org.apache.hadoop.mapreduce.RecordReader;
036import org.apache.hadoop.mapreduce.TaskAttemptContext;
037import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
038import org.apache.hadoop.mapreduce.lib.input.FileSplit;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.slf4j.Logger;
041import org.slf4j.LoggerFactory;
042
043/**
044 * Simple MR input format for HFiles. This code was borrowed from Apache Crunch project. Updated to
045 * the recent version of HBase.
046 */
047@InterfaceAudience.Private
048public class HFileInputFormat extends FileInputFormat<NullWritable, Cell> {
049
050  private static final Logger LOG = LoggerFactory.getLogger(HFileInputFormat.class);
051
052  /**
053   * File filter that removes all "hidden" files. This might be something worth removing from a more
054   * general purpose utility; it accounts for the presence of metadata files created in the way
055   * we're doing exports.
056   */
057  static final PathFilter HIDDEN_FILE_FILTER = new PathFilter() {
058    @Override
059    public boolean accept(Path p) {
060      String name = p.getName();
061      return !name.startsWith("_") && !name.startsWith(".");
062    }
063  };
064
065  /**
066   * Record reader for HFiles.
067   */
068  private static class HFileRecordReader extends RecordReader<NullWritable, Cell> {
069
070    private Reader in;
071    protected Configuration conf;
072    private HFileScanner scanner;
073
074    /**
075     * A private cache of the key value so it doesn't need to be loaded twice from the scanner.
076     */
077    private Cell value = null;
078    private long count;
079    private boolean seeked = false;
080
081    @Override
082    public void initialize(InputSplit split, TaskAttemptContext context)
083      throws IOException, InterruptedException {
084      FileSplit fileSplit = (FileSplit) split;
085      conf = context.getConfiguration();
086      Path path = fileSplit.getPath();
087      FileSystem fs = path.getFileSystem(conf);
088      LOG.info("Initialize HFileRecordReader for {}", path);
089      this.in = HFile.createReader(fs, path, conf);
090
091      // The file info must be loaded before the scanner can be used.
092      // This seems like a bug in HBase, but it's easily worked around.
093      this.scanner = in.getScanner(conf, false, false);
094
095    }
096
097    @Override
098    public boolean nextKeyValue() throws IOException, InterruptedException {
099      boolean hasNext;
100      if (!seeked) {
101        LOG.info("Seeking to start");
102        hasNext = scanner.seekTo();
103        seeked = true;
104      } else {
105        hasNext = scanner.next();
106      }
107      if (!hasNext) {
108        return false;
109      }
110      value = scanner.getCell();
111      count++;
112      return true;
113    }
114
115    @Override
116    public NullWritable getCurrentKey() throws IOException, InterruptedException {
117      return NullWritable.get();
118    }
119
120    @Override
121    public Cell getCurrentValue() throws IOException, InterruptedException {
122      return value;
123    }
124
125    @Override
126    public float getProgress() throws IOException, InterruptedException {
127      // This would be inaccurate if KVs are not uniformly-sized or we have performed a seek to
128      // the start row, but better than nothing anyway.
129      return 1.0f * count / in.getEntries();
130    }
131
132    @Override
133    public void close() throws IOException {
134      if (in != null) {
135        in.close();
136        in = null;
137      }
138    }
139  }
140
141  @Override
142  protected List<FileStatus> listStatus(JobContext job) throws IOException {
143    List<FileStatus> result = new ArrayList<FileStatus>();
144
145    // Explode out directories that match the original FileInputFormat filters
146    // since HFiles are written to directories where the
147    // directory name is the column name
148    for (FileStatus status : super.listStatus(job)) {
149      if (status.isDirectory()) {
150        FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
151        for (FileStatus match : fs.listStatus(status.getPath(), HIDDEN_FILE_FILTER)) {
152          result.add(match);
153        }
154      } else {
155        result.add(status);
156      }
157    }
158    return result;
159  }
160
161  @Override
162  public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split,
163    TaskAttemptContext context) throws IOException, InterruptedException {
164    return new HFileRecordReader();
165  }
166
167  @Override
168  protected boolean isSplitable(JobContext context, Path filename) {
169    // This file isn't splittable.
170    return false;
171  }
172}