001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023
024import java.io.IOException;
025import java.util.List;
026import java.util.Random;
027import java.util.concurrent.ThreadLocalRandom;
028import java.util.stream.Stream;
029import org.apache.hadoop.conf.Configuration;
030import org.apache.hadoop.fs.Path;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.KeyValue;
035import org.apache.hadoop.hbase.TableName;
036import org.apache.hadoop.hbase.client.Put;
037import org.apache.hadoop.hbase.client.RegionLocator;
038import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
039import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
040import org.apache.hadoop.hbase.util.Bytes;
041import org.apache.hadoop.io.NullWritable;
042import org.apache.hadoop.mapreduce.Job;
043import org.apache.hadoop.mapreduce.Mapper;
044import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
045
046public abstract class HFileOutputFormat2TestBase {
047
048  protected static final int ROWSPERSPLIT = 1024;
049  protected static final int DEFAULT_VALUE_LENGTH = 1000;
050
051  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
052  protected static final byte[][] FAMILIES =
053    { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) };
054  protected static final TableName[] TABLE_NAMES = Stream
055    .of("TestTable", "TestTable2", "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
056
057  protected static HBaseTestingUtil UTIL = new HBaseTestingUtil();
058
059  /**
060   * Simple mapper that makes KeyValue output.
061   */
062  protected static class RandomKVGeneratingMapper
063    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
064
065    private int keyLength;
066    protected static final int KEYLEN_DEFAULT = 10;
067    protected static final String KEYLEN_CONF = "randomkv.key.length";
068
069    private int valLength;
070    private static final int VALLEN_DEFAULT = 10;
071    private static final String VALLEN_CONF = "randomkv.val.length";
072    private static final byte[] QUALIFIER = Bytes.toBytes("data");
073    private boolean multiTableMapper = false;
074    private TableName[] tables = null;
075
076    @Override
077    protected void setup(Context context) throws IOException, InterruptedException {
078      super.setup(context);
079
080      Configuration conf = context.getConfiguration();
081      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
082      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
083      multiTableMapper =
084        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
085      if (multiTableMapper) {
086        tables = TABLE_NAMES;
087      } else {
088        tables = new TableName[] { TABLE_NAMES[0] };
089      }
090    }
091
092    @Override
093    protected void map(NullWritable n1, NullWritable n2,
094      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
095      throws java.io.IOException, InterruptedException {
096
097      byte keyBytes[] = new byte[keyLength];
098      byte valBytes[] = new byte[valLength];
099
100      int taskId = context.getTaskAttemptID().getTaskID().getId();
101      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
102      byte[] key;
103      for (int j = 0; j < tables.length; ++j) {
104        for (int i = 0; i < ROWSPERSPLIT; i++) {
105          Bytes.random(keyBytes);
106          // Ensure that unique tasks generate unique keys
107          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
108          Bytes.random(valBytes);
109          key = keyBytes;
110          if (multiTableMapper) {
111            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
112          }
113
114          for (byte[] family : FAMILIES) {
115            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
116            context.write(new ImmutableBytesWritable(key), kv);
117          }
118        }
119      }
120    }
121  }
122
123  /**
124   * Simple mapper that makes Put output.
125   */
126  protected static class RandomPutGeneratingMapper
127    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
128
129    private int keyLength;
130    protected static final int KEYLEN_DEFAULT = 10;
131    protected static final String KEYLEN_CONF = "randomkv.key.length";
132
133    private int valLength;
134    protected static final int VALLEN_DEFAULT = 10;
135    protected static final String VALLEN_CONF = "randomkv.val.length";
136    protected static final byte[] QUALIFIER = Bytes.toBytes("data");
137    private boolean multiTableMapper = false;
138    private TableName[] tables = null;
139
140    @Override
141    protected void setup(Context context) throws IOException, InterruptedException {
142      super.setup(context);
143
144      Configuration conf = context.getConfiguration();
145      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
146      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
147      multiTableMapper =
148        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
149      if (multiTableMapper) {
150        tables = TABLE_NAMES;
151      } else {
152        tables = new TableName[] { TABLE_NAMES[0] };
153      }
154    }
155
156    @Override
157    protected void map(NullWritable n1, NullWritable n2,
158      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context)
159      throws java.io.IOException, InterruptedException {
160
161      byte keyBytes[] = new byte[keyLength];
162      byte valBytes[] = new byte[valLength];
163
164      int taskId = context.getTaskAttemptID().getTaskID().getId();
165      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
166
167      byte[] key;
168      for (int j = 0; j < tables.length; ++j) {
169        for (int i = 0; i < ROWSPERSPLIT; i++) {
170          Bytes.random(keyBytes);
171          // Ensure that unique tasks generate unique keys
172          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
173          Bytes.random(valBytes);
174          key = keyBytes;
175          if (multiTableMapper) {
176            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
177          }
178
179          for (byte[] family : FAMILIES) {
180            Put p = new Put(keyBytes);
181            p.addColumn(family, QUALIFIER, valBytes);
182            // set TTL to very low so that the scan does not return any value
183            p.setTTL(1l);
184            context.write(new ImmutableBytesWritable(key), p);
185          }
186        }
187      }
188    }
189  }
190
191  protected static void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
192    if (putSortReducer) {
193      job.setInputFormatClass(NMapInputFormat.class);
194      job.setMapperClass(RandomPutGeneratingMapper.class);
195      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
196      job.setMapOutputValueClass(Put.class);
197    } else {
198      job.setInputFormatClass(NMapInputFormat.class);
199      job.setMapperClass(RandomKVGeneratingMapper.class);
200      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
201      job.setMapOutputValueClass(KeyValue.class);
202    }
203  }
204
205  protected static byte[][] generateRandomStartKeys(int numKeys) {
206    Random random = ThreadLocalRandom.current();
207    byte[][] ret = new byte[numKeys][];
208    // first region start key is always empty
209    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
210    for (int i = 1; i < numKeys; i++) {
211      ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
212    }
213    return ret;
214  }
215
216  /**
217   * This method takes some time and is done inline uploading data. For example, doing the mapfile
218   * test, generation of the key and value consumes about 30% of CPU time.
219   * @return Generated random value to insert into a table cell.
220   */
221  protected static byte[] generateData(final Random r, int length) {
222    byte[] b = new byte[length];
223    int i;
224
225    for (i = 0; i < (length - 8); i += 8) {
226      b[i] = (byte) (65 + r.nextInt(26));
227      b[i + 1] = b[i];
228      b[i + 2] = b[i];
229      b[i + 3] = b[i];
230      b[i + 4] = b[i];
231      b[i + 5] = b[i];
232      b[i + 6] = b[i];
233      b[i + 7] = b[i];
234    }
235
236    byte a = (byte) (65 + r.nextInt(26));
237    for (; i < length; i++) {
238      b[i] = a;
239    }
240    return b;
241  }
242
243  protected static byte[][] generateRandomSplitKeys(int numKeys) {
244    Random random = ThreadLocalRandom.current();
245    byte[][] ret = new byte[numKeys][];
246    for (int i = 0; i < numKeys; i++) {
247      ret[i] = generateData(random, DEFAULT_VALUE_LENGTH);
248    }
249    return ret;
250  }
251
252  protected static void runIncrementalPELoad(Configuration conf,
253    List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer)
254    throws IOException, InterruptedException, ClassNotFoundException {
255    Job job = Job.getInstance(conf, "testLocalMRIncrementalLoad");
256    job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("runIncrementalPELoad"));
257    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
258      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
259      CellSerialization.class.getName());
260    setupRandomGeneratorMapper(job, putSortReducer);
261    if (tableInfo.size() > 1) {
262      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
263      int sum = 0;
264      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
265        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
266      }
267      assertEquals(sum, job.getNumReduceTasks());
268    } else {
269      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
270      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(),
271        regionLocator);
272      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
273    }
274
275    FileOutputFormat.setOutputPath(job, outDir);
276
277    assertFalse(UTIL.getTestFileSystem().exists(outDir));
278
279    assertTrue(job.waitForCompletion(true));
280  }
281}