001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util.bulkdatagenerator;
019
020import java.io.IOException;
021import java.math.BigDecimal;
022import java.util.List;
023import java.util.Map;
024import java.util.Random;
025import org.apache.commons.math3.util.Pair;
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.KeyValue;
028import org.apache.hadoop.hbase.client.Admin;
029import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
031import org.apache.hadoop.io.NullWritable;
032import org.apache.hadoop.io.Text;
033import org.apache.hadoop.mapreduce.Mapper;
034
035import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
036import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
037
038public class BulkDataGeneratorMapper
039  extends Mapper<Text, NullWritable, ImmutableBytesWritable, KeyValue> {
040
041  /** Counter enumeration to count number of rows generated. */
042  public static enum Counters {
043    ROWS_GENERATED
044  }
045
046  public static final String SPLIT_COUNT_KEY =
047    BulkDataGeneratorMapper.class.getName() + "split.count";
048
049  private static final String ORG_ID = "00D000000000062";
050  private static final int MAX_EVENT_ID = Integer.MAX_VALUE;
051  private static final int MAX_VEHICLE_ID = 100;
052  private static final int MAX_SPEED_KPH = 140;
053  private static final int NUM_LOCATIONS = 10;
054  private static int splitCount = 1;
055  private static final Random random = new Random(System.currentTimeMillis());
056  private static final Map<String, Pair<BigDecimal, BigDecimal>> LOCATIONS =
057    Maps.newHashMapWithExpectedSize(NUM_LOCATIONS);
058  private static final List<String> LOCATION_KEYS = Lists.newArrayListWithCapacity(NUM_LOCATIONS);
059  static {
060    LOCATIONS.put("Belém", new Pair<>(BigDecimal.valueOf(-01.45), BigDecimal.valueOf(-48.48)));
061    LOCATIONS.put("Brasília", new Pair<>(BigDecimal.valueOf(-15.78), BigDecimal.valueOf(-47.92)));
062    LOCATIONS.put("Campinas", new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-47.05)));
063    LOCATIONS.put("Cuiaba", new Pair<>(BigDecimal.valueOf(-07.25), BigDecimal.valueOf(-58.42)));
064    LOCATIONS.put("Manaus", new Pair<>(BigDecimal.valueOf(-03.10), BigDecimal.valueOf(-60.00)));
065    LOCATIONS.put("Porto Velho",
066      new Pair<>(BigDecimal.valueOf(-08.75), BigDecimal.valueOf(-63.90)));
067    LOCATIONS.put("Recife", new Pair<>(BigDecimal.valueOf(-08.10), BigDecimal.valueOf(-34.88)));
068    LOCATIONS.put("Rio de Janeiro",
069      new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-43.23)));
070    LOCATIONS.put("Santarém", new Pair<>(BigDecimal.valueOf(-02.43), BigDecimal.valueOf(-54.68)));
071    LOCATIONS.put("São Paulo", new Pair<>(BigDecimal.valueOf(-23.53), BigDecimal.valueOf(-46.62)));
072    LOCATION_KEYS.addAll(LOCATIONS.keySet());
073  }
074
075  final static byte[] COLUMN_FAMILY_BYTES = Utility.COLUMN_FAMILY.getBytes();
076
077  /** {@inheritDoc} */
078  @Override
079  protected void setup(Context context) throws IOException, InterruptedException {
080    Configuration c = context.getConfiguration();
081    splitCount = c.getInt(SPLIT_COUNT_KEY, 1);
082  }
083
084  /**
085   * Generates a single record based on value set to the key by
086   * {@link BulkDataGeneratorRecordReader#getCurrentKey()}.
087   * {@link Utility.TableColumnNames#TOOL_EVENT_ID} is first part of row key. Keeping first
088   * {@link Utility#SPLIT_PREFIX_LENGTH} characters as index of the record to be generated ensures
089   * that records are equally distributed across all regions of the table since region boundaries
090   * are generated in similar fashion. Check {@link Utility#createTable(Admin, String, int, Map)}
091   * method for region split info.
092   * @param key     - The key having index of next record to be generated
093   * @param value   - Value associated with the key (not used)
094   * @param context - Context of the mapper container
095   */
096  @Override
097  protected void map(Text key, NullWritable value, Context context)
098    throws IOException, InterruptedException {
099
100    int recordIndex = Integer.parseInt(key.toString());
101
102    // <6-characters-region-boundary-prefix>_<15-random-chars>_<record-index-for-this-mapper-task>
103    final String toolEventId =
104      String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", recordIndex % (splitCount + 1)) + "_"
105        + EnvironmentEdgeManager.currentTime() + (1e14 + (random.nextFloat() * 9e13)) + "_"
106        + recordIndex;
107    final String eventId = String.valueOf(Math.abs(random.nextInt(MAX_EVENT_ID)));
108    final String vechileId = String.valueOf(Math.abs(random.nextInt(MAX_VEHICLE_ID)));
109    final String speed = String.valueOf(Math.abs(random.nextInt(MAX_SPEED_KPH)));
110    final String location = LOCATION_KEYS.get(random.nextInt(NUM_LOCATIONS));
111    final Pair<BigDecimal, BigDecimal> coordinates = LOCATIONS.get(location);
112    final BigDecimal latitude = coordinates.getFirst();
113    final BigDecimal longitude = coordinates.getSecond();
114
115    final ImmutableBytesWritable hKey =
116      new ImmutableBytesWritable(String.format("%s:%s", toolEventId, ORG_ID).getBytes());
117    addKeyValue(context, hKey, Utility.TableColumnNames.ORG_ID, ORG_ID);
118    addKeyValue(context, hKey, Utility.TableColumnNames.TOOL_EVENT_ID, toolEventId);
119    addKeyValue(context, hKey, Utility.TableColumnNames.EVENT_ID, eventId);
120    addKeyValue(context, hKey, Utility.TableColumnNames.VEHICLE_ID, vechileId);
121    addKeyValue(context, hKey, Utility.TableColumnNames.SPEED, speed);
122    addKeyValue(context, hKey, Utility.TableColumnNames.LATITUDE, latitude.toString());
123    addKeyValue(context, hKey, Utility.TableColumnNames.LONGITUDE, longitude.toString());
124    addKeyValue(context, hKey, Utility.TableColumnNames.LOCATION, location);
125    addKeyValue(context, hKey, Utility.TableColumnNames.TIMESTAMP,
126      String.valueOf(EnvironmentEdgeManager.currentTime()));
127
128    context.getCounter(Counters.ROWS_GENERATED).increment(1);
129  }
130
131  private void addKeyValue(final Context context, ImmutableBytesWritable key,
132    final Utility.TableColumnNames columnName, final String value)
133    throws IOException, InterruptedException {
134    KeyValue kv =
135      new KeyValue(key.get(), COLUMN_FAMILY_BYTES, columnName.getColumnName(), value.getBytes());
136    context.write(key, kv);
137  }
138}