001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util.bulkdatagenerator; 019 020import java.io.IOException; 021import java.math.BigDecimal; 022import java.util.List; 023import java.util.Map; 024import java.util.Random; 025import org.apache.commons.math3.util.Pair; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.KeyValue; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.io.ImmutableBytesWritable; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.io.NullWritable; 032import org.apache.hadoop.io.Text; 033import org.apache.hadoop.mapreduce.Mapper; 034 035import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 036import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 037 038public class BulkDataGeneratorMapper 039 extends Mapper<Text, NullWritable, ImmutableBytesWritable, KeyValue> { 040 041 /** Counter enumeration to count number of rows generated. */ 042 public static enum Counters { 043 ROWS_GENERATED 044 } 045 046 public static final String SPLIT_COUNT_KEY = 047 BulkDataGeneratorMapper.class.getName() + "split.count"; 048 049 private static final String ORG_ID = "00D000000000062"; 050 private static final int MAX_EVENT_ID = Integer.MAX_VALUE; 051 private static final int MAX_VEHICLE_ID = 100; 052 private static final int MAX_SPEED_KPH = 140; 053 private static final int NUM_LOCATIONS = 10; 054 private static int splitCount = 1; 055 private static final Random random = new Random(System.currentTimeMillis()); 056 private static final Map<String, Pair<BigDecimal, BigDecimal>> LOCATIONS = 057 Maps.newHashMapWithExpectedSize(NUM_LOCATIONS); 058 private static final List<String> LOCATION_KEYS = Lists.newArrayListWithCapacity(NUM_LOCATIONS); 059 static { 060 LOCATIONS.put("Belém", new Pair<>(BigDecimal.valueOf(-01.45), BigDecimal.valueOf(-48.48))); 061 LOCATIONS.put("Brasília", new Pair<>(BigDecimal.valueOf(-15.78), BigDecimal.valueOf(-47.92))); 062 LOCATIONS.put("Campinas", new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-47.05))); 063 LOCATIONS.put("Cuiaba", new Pair<>(BigDecimal.valueOf(-07.25), BigDecimal.valueOf(-58.42))); 064 LOCATIONS.put("Manaus", new Pair<>(BigDecimal.valueOf(-03.10), BigDecimal.valueOf(-60.00))); 065 LOCATIONS.put("Porto Velho", 066 new Pair<>(BigDecimal.valueOf(-08.75), BigDecimal.valueOf(-63.90))); 067 LOCATIONS.put("Recife", new Pair<>(BigDecimal.valueOf(-08.10), BigDecimal.valueOf(-34.88))); 068 LOCATIONS.put("Rio de Janeiro", 069 new Pair<>(BigDecimal.valueOf(-22.90), BigDecimal.valueOf(-43.23))); 070 LOCATIONS.put("Santarém", new Pair<>(BigDecimal.valueOf(-02.43), BigDecimal.valueOf(-54.68))); 071 LOCATIONS.put("São Paulo", new Pair<>(BigDecimal.valueOf(-23.53), BigDecimal.valueOf(-46.62))); 072 LOCATION_KEYS.addAll(LOCATIONS.keySet()); 073 } 074 075 final static byte[] COLUMN_FAMILY_BYTES = Utility.COLUMN_FAMILY.getBytes(); 076 077 /** {@inheritDoc} */ 078 @Override 079 protected void setup(Context context) throws IOException, InterruptedException { 080 Configuration c = context.getConfiguration(); 081 splitCount = c.getInt(SPLIT_COUNT_KEY, 1); 082 } 083 084 /** 085 * Generates a single record based on value set to the key by 086 * {@link BulkDataGeneratorRecordReader#getCurrentKey()}. 087 * {@link Utility.TableColumnNames#TOOL_EVENT_ID} is first part of row key. Keeping first 088 * {@link Utility#SPLIT_PREFIX_LENGTH} characters as index of the record to be generated ensures 089 * that records are equally distributed across all regions of the table since region boundaries 090 * are generated in similar fashion. Check {@link Utility#createTable(Admin, String, int, Map)} 091 * method for region split info. 092 * @param key - The key having index of next record to be generated 093 * @param value - Value associated with the key (not used) 094 * @param context - Context of the mapper container 095 */ 096 @Override 097 protected void map(Text key, NullWritable value, Context context) 098 throws IOException, InterruptedException { 099 100 int recordIndex = Integer.parseInt(key.toString()); 101 102 // <6-characters-region-boundary-prefix>_<15-random-chars>_<record-index-for-this-mapper-task> 103 final String toolEventId = 104 String.format("%0" + Utility.SPLIT_PREFIX_LENGTH + "d", recordIndex % (splitCount + 1)) + "_" 105 + EnvironmentEdgeManager.currentTime() + (1e14 + (random.nextFloat() * 9e13)) + "_" 106 + recordIndex; 107 final String eventId = String.valueOf(Math.abs(random.nextInt(MAX_EVENT_ID))); 108 final String vechileId = String.valueOf(Math.abs(random.nextInt(MAX_VEHICLE_ID))); 109 final String speed = String.valueOf(Math.abs(random.nextInt(MAX_SPEED_KPH))); 110 final String location = LOCATION_KEYS.get(random.nextInt(NUM_LOCATIONS)); 111 final Pair<BigDecimal, BigDecimal> coordinates = LOCATIONS.get(location); 112 final BigDecimal latitude = coordinates.getFirst(); 113 final BigDecimal longitude = coordinates.getSecond(); 114 115 final ImmutableBytesWritable hKey = 116 new ImmutableBytesWritable(String.format("%s:%s", toolEventId, ORG_ID).getBytes()); 117 addKeyValue(context, hKey, Utility.TableColumnNames.ORG_ID, ORG_ID); 118 addKeyValue(context, hKey, Utility.TableColumnNames.TOOL_EVENT_ID, toolEventId); 119 addKeyValue(context, hKey, Utility.TableColumnNames.EVENT_ID, eventId); 120 addKeyValue(context, hKey, Utility.TableColumnNames.VEHICLE_ID, vechileId); 121 addKeyValue(context, hKey, Utility.TableColumnNames.SPEED, speed); 122 addKeyValue(context, hKey, Utility.TableColumnNames.LATITUDE, latitude.toString()); 123 addKeyValue(context, hKey, Utility.TableColumnNames.LONGITUDE, longitude.toString()); 124 addKeyValue(context, hKey, Utility.TableColumnNames.LOCATION, location); 125 addKeyValue(context, hKey, Utility.TableColumnNames.TIMESTAMP, 126 String.valueOf(EnvironmentEdgeManager.currentTime())); 127 128 context.getCounter(Counters.ROWS_GENERATED).increment(1); 129 } 130 131 private void addKeyValue(final Context context, ImmutableBytesWritable key, 132 final Utility.TableColumnNames columnName, final String value) 133 throws IOException, InterruptedException { 134 KeyValue kv = 135 new KeyValue(key.get(), COLUMN_FAMILY_BYTES, columnName.getColumnName(), value.getBytes()); 136 context.write(key, kv); 137 } 138}