001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.List;
026
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.io.NullWritable;
029import org.apache.hadoop.io.Writable;
030import org.apache.hadoop.mapreduce.InputFormat;
031import org.apache.hadoop.mapreduce.InputSplit;
032import org.apache.hadoop.mapreduce.JobContext;
033import org.apache.hadoop.mapreduce.RecordReader;
034import org.apache.hadoop.mapreduce.TaskAttemptContext;
035
036/**
037 * Input format that creates a configurable number of map tasks
038 * each provided with a single row of NullWritables. This can be
039 * useful when trying to write mappers which don't have any real
040 * input (eg when the mapper is simply producing random data as output)
041 */
042public class NMapInputFormat extends InputFormat<NullWritable, NullWritable> {
043  private static final String NMAPS_KEY = "nmapinputformat.num.maps";
044
045  @Override
046  public RecordReader<NullWritable, NullWritable> createRecordReader(
047      InputSplit split,
048      TaskAttemptContext tac) throws IOException, InterruptedException {
049    return new SingleRecordReader<>(NullWritable.get(), NullWritable.get());
050  }
051
052  @Override
053  public List<InputSplit> getSplits(JobContext context) throws IOException,
054      InterruptedException {
055    int count = getNumMapTasks(context.getConfiguration());
056    List<InputSplit> splits = new ArrayList<>(count);
057    for (int i = 0; i < count; i++) {
058      splits.add(new NullInputSplit());
059    }
060    return splits;
061  }
062
063  public static void setNumMapTasks(Configuration conf, int numTasks) {
064    conf.setInt(NMAPS_KEY, numTasks);
065  }
066
067  public static int getNumMapTasks(Configuration conf) {
068    return conf.getInt(NMAPS_KEY, 1);
069  }
070
071  private static class NullInputSplit extends InputSplit implements Writable {
072    @Override
073    public long getLength() throws IOException, InterruptedException {
074      return 0;
075    }
076
077    @Override
078    public String[] getLocations() throws IOException, InterruptedException {
079      return new String[] {};
080    }
081
082    @Override
083    public void readFields(DataInput in) throws IOException {
084    }
085
086    @Override
087    public void write(DataOutput out) throws IOException {
088    }
089  }
090
091  private static class SingleRecordReader<K, V>
092    extends RecordReader<K, V> {
093
094    private final K key;
095    private final V value;
096    boolean providedKey = false;
097
098    SingleRecordReader(K key, V value) {
099      this.key = key;
100      this.value = value;
101    }
102
103    @Override
104    public void close() {
105    }
106
107    @Override
108    public K getCurrentKey() {
109      return key;
110    }
111
112    @Override
113    public V getCurrentValue(){
114      return value;
115    }
116
117    @Override
118    public float getProgress() {
119      return 0;
120    }
121
122    @Override
123    public void initialize(InputSplit split, TaskAttemptContext tac) {
124    }
125
126    @Override
127    public boolean nextKeyValue() {
128      if (providedKey) return false;
129      providedKey = true;
130      return true;
131    }
132
133  }
134}