001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import java.io.DataInput;
022import java.io.DataOutput;
023import java.util.ArrayList;
024import java.util.List;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.io.NullWritable;
028import org.apache.hadoop.io.Writable;
029import org.apache.hadoop.mapreduce.InputFormat;
030import org.apache.hadoop.mapreduce.InputSplit;
031import org.apache.hadoop.mapreduce.JobContext;
032import org.apache.hadoop.mapreduce.RecordReader;
033import org.apache.hadoop.mapreduce.TaskAttemptContext;
034
035/**
036 * Input format that creates a configurable number of map tasks
037 * each provided with a single row of NullWritables. This can be
038 * useful when trying to write mappers which don't have any real
039 * input (eg when the mapper is simply producing random data as output)
040 */
041public class NMapInputFormat extends InputFormat<NullWritable, NullWritable> {
042  private static final String NMAPS_KEY = "nmapinputformat.num.maps";
043
044  @Override
045  public RecordReader<NullWritable, NullWritable> createRecordReader(
046      InputSplit split, TaskAttemptContext tac) {
047    return new SingleRecordReader<>(NullWritable.get(), NullWritable.get());
048  }
049
050  @Override
051  public List<InputSplit> getSplits(JobContext context) {
052    int count = getNumMapTasks(context.getConfiguration());
053    List<InputSplit> splits = new ArrayList<>(count);
054    for (int i = 0; i < count; i++) {
055      splits.add(new NullInputSplit());
056    }
057    return splits;
058  }
059
060  public static void setNumMapTasks(Configuration conf, int numTasks) {
061    conf.setInt(NMAPS_KEY, numTasks);
062  }
063
064  public static int getNumMapTasks(Configuration conf) {
065    return conf.getInt(NMAPS_KEY, 1);
066  }
067
068  private static class NullInputSplit extends InputSplit implements Writable {
069    @Override
070    public long getLength() {
071      return 0;
072    }
073
074    @Override
075    public String[] getLocations() {
076      return new String[] {};
077    }
078
079    @Override
080    public void readFields(DataInput in) {
081    }
082
083    @Override
084    public void write(DataOutput out) {
085    }
086  }
087
088  private static class SingleRecordReader<K, V>
089    extends RecordReader<K, V> {
090
091    private final K key;
092    private final V value;
093    boolean providedKey = false;
094
095    SingleRecordReader(K key, V value) {
096      this.key = key;
097      this.value = value;
098    }
099
100    @Override
101    public void close() {
102    }
103
104    @Override
105    public K getCurrentKey() {
106      return key;
107    }
108
109    @Override
110    public V getCurrentValue(){
111      return value;
112    }
113
114    @Override
115    public float getProgress() {
116      return 0;
117    }
118
119    @Override
120    public void initialize(InputSplit split, TaskAttemptContext tac) {
121    }
122
123    @Override
124    public boolean nextKeyValue() {
125      if (providedKey) {
126        return false;
127      }
128
129      providedKey = true;
130      return true;
131    }
132  }
133}