001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util.bulkdatagenerator;
019
020import java.io.IOException;
021import org.apache.hadoop.io.NullWritable;
022import org.apache.hadoop.io.Text;
023import org.apache.hadoop.mapreduce.InputSplit;
024import org.apache.hadoop.mapreduce.RecordReader;
025import org.apache.hadoop.mapreduce.TaskAttemptContext;
026
027import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
028
029public class BulkDataGeneratorRecordReader extends RecordReader<Text, NullWritable> {
030
031  private int numRecordsToCreate = 0;
032  private int createdRecords = 0;
033  private Text key = new Text();
034  private NullWritable value = NullWritable.get();
035
036  public static final String RECORDS_PER_MAPPER_TASK_KEY =
037    BulkDataGeneratorInputFormat.class.getName() + "records.per.mapper.task";
038
039  @Override
040  public void initialize(InputSplit split, TaskAttemptContext context)
041    throws IOException, InterruptedException {
042    // Get the number of records to create from the configuration
043    this.numRecordsToCreate = context.getConfiguration().getInt(RECORDS_PER_MAPPER_TASK_KEY, -1);
044    Preconditions.checkArgument(numRecordsToCreate > 0,
045      "Number of records to be created by per mapper should be greater than 0.");
046  }
047
048  @Override
049  public boolean nextKeyValue() {
050    createdRecords++;
051    return createdRecords <= numRecordsToCreate;
052  }
053
054  @Override
055  public Text getCurrentKey() {
056    // Set the index of record to be created
057    key.set(String.valueOf(createdRecords));
058    return key;
059  }
060
061  @Override
062  public NullWritable getCurrentValue() {
063    return value;
064  }
065
066  @Override
067  public float getProgress() throws IOException, InterruptedException {
068    return (float) createdRecords / (float) numRecordsToCreate;
069  }
070
071  @Override
072  public void close() throws IOException {
073
074  }
075}