001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Random;
037import java.util.Set;
038import java.util.concurrent.Callable;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.LocatedFileStatus;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.RemoteIterator;
047import org.apache.hadoop.hbase.ArrayBackedTag;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellUtil;
050import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.HColumnDescriptor;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HDFSBlocksDistribution;
057import org.apache.hadoop.hbase.HTableDescriptor;
058import org.apache.hadoop.hbase.HadoopShims;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.PerformanceEvaluation;
061import org.apache.hadoop.hbase.TableName;
062import org.apache.hadoop.hbase.Tag;
063import org.apache.hadoop.hbase.TagType;
064import org.apache.hadoop.hbase.TagUtil;
065import org.apache.hadoop.hbase.client.Admin;
066import org.apache.hadoop.hbase.client.Connection;
067import org.apache.hadoop.hbase.client.ConnectionFactory;
068import org.apache.hadoop.hbase.client.Put;
069import org.apache.hadoop.hbase.client.RegionLocator;
070import org.apache.hadoop.hbase.client.Result;
071import org.apache.hadoop.hbase.client.ResultScanner;
072import org.apache.hadoop.hbase.client.Scan;
073import org.apache.hadoop.hbase.client.Table;
074import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
075import org.apache.hadoop.hbase.io.compress.Compression;
076import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
077import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
078import org.apache.hadoop.hbase.io.hfile.CacheConfig;
079import org.apache.hadoop.hbase.io.hfile.HFile;
080import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
081import org.apache.hadoop.hbase.io.hfile.HFileScanner;
082import org.apache.hadoop.hbase.regionserver.BloomType;
083import org.apache.hadoop.hbase.regionserver.HRegion;
084import org.apache.hadoop.hbase.regionserver.HStore;
085import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
086import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
087import org.apache.hadoop.hbase.testclassification.LargeTests;
088import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
089import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
090import org.apache.hadoop.hbase.util.Bytes;
091import org.apache.hadoop.hbase.util.CommonFSUtils;
092import org.apache.hadoop.hbase.util.FSUtils;
093import org.apache.hadoop.hbase.util.ReflectionUtils;
094import org.apache.hadoop.hdfs.DistributedFileSystem;
095import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
096import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
097import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
098import org.apache.hadoop.io.NullWritable;
099import org.apache.hadoop.mapreduce.Job;
100import org.apache.hadoop.mapreduce.Mapper;
101import org.apache.hadoop.mapreduce.RecordWriter;
102import org.apache.hadoop.mapreduce.TaskAttemptContext;
103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
104import org.junit.ClassRule;
105import org.junit.Ignore;
106import org.junit.Test;
107import org.junit.experimental.categories.Category;
108import org.mockito.Mockito;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111
112/**
113 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
114 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
115 * values like those of {@link PerformanceEvaluation}.
116 */
117@Category({ VerySlowMapReduceTests.class, LargeTests.class })
118public class TestCellBasedHFileOutputFormat2 {
119
120  @ClassRule
121  public static final HBaseClassTestRule CLASS_RULE =
122    HBaseClassTestRule.forClass(TestCellBasedHFileOutputFormat2.class);
123
124  private final static int ROWSPERSPLIT = 1024;
125
126  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
127  private static final byte[][] FAMILIES =
128    { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) };
129  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3")
130    .map(TableName::valueOf).toArray(TableName[]::new);
131
132  private HBaseTestingUtility util = new HBaseTestingUtility();
133
134  private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedHFileOutputFormat2.class);
135
136  /**
137   * Simple mapper that makes KeyValue output.
138   */
139  static class RandomKVGeneratingMapper
140    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
141
142    private int keyLength;
143    private static final int KEYLEN_DEFAULT = 10;
144    private static final String KEYLEN_CONF = "randomkv.key.length";
145
146    private int valLength;
147    private static final int VALLEN_DEFAULT = 10;
148    private static final String VALLEN_CONF = "randomkv.val.length";
149    private static final byte[] QUALIFIER = Bytes.toBytes("data");
150    private boolean multiTableMapper = false;
151    private TableName[] tables = null;
152
153    @Override
154    protected void setup(Context context) throws IOException, InterruptedException {
155      super.setup(context);
156
157      Configuration conf = context.getConfiguration();
158      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
159      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
160      multiTableMapper =
161        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
162      if (multiTableMapper) {
163        tables = TABLE_NAMES;
164      } else {
165        tables = new TableName[] { TABLE_NAMES[0] };
166      }
167    }
168
169    @Override
170    protected void map(NullWritable n1, NullWritable n2,
171      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
172      throws java.io.IOException, InterruptedException {
173
174      byte keyBytes[] = new byte[keyLength];
175      byte valBytes[] = new byte[valLength];
176
177      int taskId = context.getTaskAttemptID().getTaskID().getId();
178      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
179      Random random = new Random();
180      byte[] key;
181      for (int j = 0; j < tables.length; ++j) {
182        for (int i = 0; i < ROWSPERSPLIT; i++) {
183          random.nextBytes(keyBytes);
184          // Ensure that unique tasks generate unique keys
185          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
186          random.nextBytes(valBytes);
187          key = keyBytes;
188          if (multiTableMapper) {
189            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
190          }
191
192          for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) {
193            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
194            context.write(new ImmutableBytesWritable(key), kv);
195          }
196        }
197      }
198    }
199  }
200
201  /**
202   * Simple mapper that makes Put output.
203   */
204  static class RandomPutGeneratingMapper
205    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
206
207    private int keyLength;
208    private static final int KEYLEN_DEFAULT = 10;
209    private static final String KEYLEN_CONF = "randomkv.key.length";
210
211    private int valLength;
212    private static final int VALLEN_DEFAULT = 10;
213    private static final String VALLEN_CONF = "randomkv.val.length";
214    private static final byte[] QUALIFIER = Bytes.toBytes("data");
215    private boolean multiTableMapper = false;
216    private TableName[] tables = null;
217
218    @Override
219    protected void setup(Context context) throws IOException, InterruptedException {
220      super.setup(context);
221
222      Configuration conf = context.getConfiguration();
223      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
224      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
225      multiTableMapper =
226        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
227      if (multiTableMapper) {
228        tables = TABLE_NAMES;
229      } else {
230        tables = new TableName[] { TABLE_NAMES[0] };
231      }
232    }
233
234    @Override
235    protected void map(NullWritable n1, NullWritable n2,
236      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context)
237      throws java.io.IOException, InterruptedException {
238
239      byte keyBytes[] = new byte[keyLength];
240      byte valBytes[] = new byte[valLength];
241
242      int taskId = context.getTaskAttemptID().getTaskID().getId();
243      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
244
245      Random random = new Random();
246      byte[] key;
247      for (int j = 0; j < tables.length; ++j) {
248        for (int i = 0; i < ROWSPERSPLIT; i++) {
249          random.nextBytes(keyBytes);
250          // Ensure that unique tasks generate unique keys
251          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
252          random.nextBytes(valBytes);
253          key = keyBytes;
254          if (multiTableMapper) {
255            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
256          }
257
258          for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) {
259            Put p = new Put(keyBytes);
260            p.addColumn(family, QUALIFIER, valBytes);
261            // set TTL to very low so that the scan does not return any value
262            p.setTTL(1l);
263            context.write(new ImmutableBytesWritable(key), p);
264          }
265        }
266      }
267    }
268  }
269
270  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
271    if (putSortReducer) {
272      job.setInputFormatClass(NMapInputFormat.class);
273      job.setMapperClass(RandomPutGeneratingMapper.class);
274      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
275      job.setMapOutputValueClass(Put.class);
276    } else {
277      job.setInputFormatClass(NMapInputFormat.class);
278      job.setMapperClass(RandomKVGeneratingMapper.class);
279      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
280      job.setMapOutputValueClass(KeyValue.class);
281    }
282  }
283
284  /**
285   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
286   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
287   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
288   */
289  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
290  @Test
291  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
292    Configuration conf = new Configuration(this.util.getConfiguration());
293    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
294    TaskAttemptContext context = null;
295    Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
296    try {
297      Job job = new Job(conf);
298      FileOutputFormat.setOutputPath(job, dir);
299      context = createTestTaskAttemptContext(job);
300      HFileOutputFormat2 hof = new HFileOutputFormat2();
301      writer = hof.getRecordWriter(context);
302      final byte[] b = Bytes.toBytes("b");
303
304      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
305      // changed by call to write. Check all in kv is same but ts.
306      KeyValue kv = new KeyValue(b, b, b);
307      KeyValue original = kv.clone();
308      writer.write(new ImmutableBytesWritable(), kv);
309      assertFalse(original.equals(kv));
310      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
311      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
312      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
313      assertNotSame(original.getTimestamp(), kv.getTimestamp());
314      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
315
316      // Test 2. Now test passing a kv that has explicit ts. It should not be
317      // changed by call to record write.
318      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
319      original = kv.clone();
320      writer.write(new ImmutableBytesWritable(), kv);
321      assertTrue(original.equals(kv));
322    } finally {
323      if (writer != null && context != null) writer.close(context);
324      dir.getFileSystem(conf).delete(dir, true);
325    }
326  }
327
328  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
329    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
330    TaskAttemptContext context =
331      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
332    return context;
333  }
334
335  /*
336   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
337   * time-restricted scans.
338   */
339  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
340  @Test
341  public void test_TIMERANGE() throws Exception {
342    Configuration conf = new Configuration(this.util.getConfiguration());
343    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
344    TaskAttemptContext context = null;
345    Path dir = util.getDataTestDir("test_TIMERANGE_present");
346    LOG.info("Timerange dir writing to dir: " + dir);
347    try {
348      // build a record writer using HFileOutputFormat2
349      Job job = new Job(conf);
350      FileOutputFormat.setOutputPath(job, dir);
351      context = createTestTaskAttemptContext(job);
352      HFileOutputFormat2 hof = new HFileOutputFormat2();
353      writer = hof.getRecordWriter(context);
354
355      // Pass two key values with explicit times stamps
356      final byte[] b = Bytes.toBytes("b");
357
358      // value 1 with timestamp 2000
359      KeyValue kv = new KeyValue(b, b, b, 2000, b);
360      KeyValue original = kv.clone();
361      writer.write(new ImmutableBytesWritable(), kv);
362      assertEquals(original, kv);
363
364      // value 2 with timestamp 1000
365      kv = new KeyValue(b, b, b, 1000, b);
366      original = kv.clone();
367      writer.write(new ImmutableBytesWritable(), kv);
368      assertEquals(original, kv);
369
370      // verify that the file has the proper FileInfo.
371      writer.close(context);
372
373      // the generated file lives 1 directory down from the attempt directory
374      // and is the only file, e.g.
375      // _attempt__0000_r_000000_0/b/1979617994050536795
376      FileSystem fs = FileSystem.get(conf);
377      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
378      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
379      FileStatus[] file = fs.listStatus(sub1[0].getPath());
380
381      // open as HFile Reader and pull out TIMERANGE FileInfo.
382      HFile.Reader rd =
383        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
384      Map<byte[], byte[]> finfo = rd.getHFileInfo();
385      byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
386      assertNotNull(range);
387
388      // unmarshall and check values.
389      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
390      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
391      assertEquals(1000, timeRangeTracker.getMin());
392      assertEquals(2000, timeRangeTracker.getMax());
393      rd.close();
394    } finally {
395      if (writer != null && context != null) writer.close(context);
396      dir.getFileSystem(conf).delete(dir, true);
397    }
398  }
399
400  /**
401   * Run small MR job.
402   */
403  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
404  @Test
405  public void testWritingPEData() throws Exception {
406    Configuration conf = util.getConfiguration();
407    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
408    FileSystem fs = testDir.getFileSystem(conf);
409
410    // Set down this value or we OOME in eclipse.
411    conf.setInt("mapreduce.task.io.sort.mb", 20);
412    // Write a few files.
413    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
414
415    Job job = new Job(conf, "testWritingPEData");
416    setupRandomGeneratorMapper(job, false);
417    // This partitioner doesn't work well for number keys but using it anyways
418    // just to demonstrate how to configure it.
419    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
420    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
421
422    Arrays.fill(startKey, (byte) 0);
423    Arrays.fill(endKey, (byte) 0xff);
424
425    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
426    // Set start and end rows for partitioner.
427    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
428    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
429    job.setReducerClass(CellSortReducer.class);
430    job.setOutputFormatClass(HFileOutputFormat2.class);
431    job.setNumReduceTasks(4);
432    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
433      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
434      CellSerialization.class.getName());
435
436    FileOutputFormat.setOutputPath(job, testDir);
437    assertTrue(job.waitForCompletion(false));
438    FileStatus[] files = fs.listStatus(testDir);
439    assertTrue(files.length > 0);
440  }
441
442  /**
443   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
444   */
445  @Test
446  public void test_WritingTagData() throws Exception {
447    Configuration conf = new Configuration(this.util.getConfiguration());
448    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
449    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
450    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
451    TaskAttemptContext context = null;
452    Path dir = util.getDataTestDir("WritingTagData");
453    try {
454      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
455      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
456      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
457      Job job = new Job(conf);
458      FileOutputFormat.setOutputPath(job, dir);
459      context = createTestTaskAttemptContext(job);
460      HFileOutputFormat2 hof = new HFileOutputFormat2();
461      writer = hof.getRecordWriter(context);
462      final byte[] b = Bytes.toBytes("b");
463
464      List<Tag> tags = new ArrayList<>();
465      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
466      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
467      writer.write(new ImmutableBytesWritable(), kv);
468      writer.close(context);
469      writer = null;
470      FileSystem fs = dir.getFileSystem(conf);
471      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
472      while (iterator.hasNext()) {
473        LocatedFileStatus keyFileStatus = iterator.next();
474        HFile.Reader reader =
475          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
476        HFileScanner scanner = reader.getScanner(conf, false, false, false);
477        scanner.seekTo();
478        Cell cell = scanner.getCell();
479        List<Tag> tagsFromCell =
480          TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
481        assertTrue(tagsFromCell.size() > 0);
482        for (Tag tag : tagsFromCell) {
483          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
484        }
485      }
486    } finally {
487      if (writer != null && context != null) writer.close(context);
488      dir.getFileSystem(conf).delete(dir, true);
489    }
490  }
491
492  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
493  @Test
494  public void testJobConfiguration() throws Exception {
495    Configuration conf = new Configuration(this.util.getConfiguration());
496    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
497      util.getDataTestDir("testJobConfiguration").toString());
498    Job job = new Job(conf);
499    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
500    Table table = Mockito.mock(Table.class);
501    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
502    setupMockStartKeys(regionLocator);
503    setupMockTableName(regionLocator);
504    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
505    assertEquals(job.getNumReduceTasks(), 4);
506  }
507
508  private byte[][] generateRandomStartKeys(int numKeys) {
509    Random random = new Random();
510    byte[][] ret = new byte[numKeys][];
511    // first region start key is always empty
512    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
513    for (int i = 1; i < numKeys; i++) {
514      ret[i] =
515        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
516    }
517    return ret;
518  }
519
520  private byte[][] generateRandomSplitKeys(int numKeys) {
521    Random random = new Random();
522    byte[][] ret = new byte[numKeys][];
523    for (int i = 0; i < numKeys; i++) {
524      ret[i] =
525        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
526    }
527    return ret;
528  }
529
530  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
531  @Test
532  public void testMRIncrementalLoad() throws Exception {
533    LOG.info("\nStarting test testMRIncrementalLoad\n");
534    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
535  }
536
537  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
538  @Test
539  public void testMRIncrementalLoadWithSplit() throws Exception {
540    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
541    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
542  }
543
544  /**
545   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
546   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
547   * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
548   * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
549   * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
550   * locality features more easily.
551   */
552  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
553  @Test
554  public void testMRIncrementalLoadWithLocality() throws Exception {
555    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
556    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
557    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
558  }
559
560  // @Ignore("Wahtevs")
561  @Test
562  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
563    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
564    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
565  }
566
567  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
568    boolean putSortReducer, String tableStr) throws Exception {
569    doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
570      Arrays.asList(tableStr));
571  }
572
573  @Test
574  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
575    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
576    doIncrementalLoadTest(false, false, true,
577      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
578  }
579
580  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
581    boolean putSortReducer, List<String> tableStr) throws Exception {
582    util = new HBaseTestingUtility();
583    Configuration conf = util.getConfiguration();
584    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
585    int hostCount = 1;
586    int regionNum = 5;
587    if (shouldKeepLocality) {
588      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
589      // explicit hostnames parameter just like MiniDFSCluster does.
590      hostCount = 3;
591      regionNum = 20;
592    }
593
594    String[] hostnames = new String[hostCount];
595    for (int i = 0; i < hostCount; ++i) {
596      hostnames[i] = "datanode_" + i;
597    }
598    util.startMiniCluster(1, hostCount, hostnames);
599
600    Map<String, Table> allTables = new HashMap<>(tableStr.size());
601    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
602    boolean writeMultipleTables = tableStr.size() > 1;
603    for (String tableStrSingle : tableStr) {
604      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
605      TableName tableName = TableName.valueOf(tableStrSingle);
606      Table table = util.createTable(tableName, FAMILIES, splitKeys);
607
608      RegionLocator r = util.getConnection().getRegionLocator(tableName);
609      assertEquals("Should start with empty table", 0, util.countRows(table));
610      int numRegions = r.getStartKeys().length;
611      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
612
613      allTables.put(tableStrSingle, table);
614      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
615    }
616    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
617    // Generate the bulk load files
618    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
619
620    for (Table tableSingle : allTables.values()) {
621      // This doesn't write into the table, just makes files
622      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
623    }
624    int numTableDirs = 0;
625    for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
626      Path tablePath = testDir;
627
628      if (writeMultipleTables) {
629        if (allTables.containsKey(tf.getPath().getName())) {
630          ++numTableDirs;
631          tablePath = tf.getPath();
632        } else {
633          continue;
634        }
635      }
636
637      // Make sure that a directory was created for every CF
638      int dir = 0;
639      for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
640        for (byte[] family : FAMILIES) {
641          if (Bytes.toString(family).equals(f.getPath().getName())) {
642            ++dir;
643          }
644        }
645      }
646      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
647    }
648    if (writeMultipleTables) {
649      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
650    }
651
652    Admin admin = util.getConnection().getAdmin();
653    try {
654      // handle the split case
655      if (shouldChangeRegions) {
656        Table chosenTable = allTables.values().iterator().next();
657        // Choose a semi-random table if multiple tables are available
658        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
659        admin.disableTable(chosenTable.getName());
660        util.waitUntilNoRegionsInTransition();
661
662        util.deleteTable(chosenTable.getName());
663        byte[][] newSplitKeys = generateRandomSplitKeys(14);
664        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
665
666        while (
667          util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations()
668            .size() != 15 || !admin.isTableAvailable(table.getName())
669        ) {
670          Thread.sleep(200);
671          LOG.info("Waiting for new region assignment to happen");
672        }
673      }
674
675      // Perform the actual load
676      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
677        Path tableDir = testDir;
678        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
679        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
680        if (writeMultipleTables) {
681          tableDir = new Path(testDir, tableNameStr);
682        }
683        Table currentTable = allTables.get(tableNameStr);
684        TableName currentTableName = currentTable.getName();
685        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable,
686          singleTableInfo.getRegionLocator());
687
688        // Ensure data shows up
689        int expectedRows = 0;
690        if (putSortReducer) {
691          // no rows should be extracted
692          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
693            util.countRows(currentTable));
694        } else {
695          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
696          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
697            util.countRows(currentTable));
698          Scan scan = new Scan();
699          ResultScanner results = currentTable.getScanner(scan);
700          for (Result res : results) {
701            assertEquals(FAMILIES.length, res.rawCells().length);
702            Cell first = res.rawCells()[0];
703            for (Cell kv : res.rawCells()) {
704              assertTrue(CellUtil.matchingRows(first, kv));
705              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
706            }
707          }
708          results.close();
709        }
710        String tableDigestBefore = util.checksumRows(currentTable);
711        // Check region locality
712        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
713        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
714          hbd.add(region.getHDFSBlocksDistribution());
715        }
716        for (String hostname : hostnames) {
717          float locality = hbd.getBlockLocalityIndex(hostname);
718          LOG.info("locality of [" + hostname + "]: " + locality);
719          assertEquals(100, (int) (locality * 100));
720        }
721
722        // Cause regions to reopen
723        admin.disableTable(currentTableName);
724        while (!admin.isTableDisabled(currentTableName)) {
725          Thread.sleep(200);
726          LOG.info("Waiting for table to disable");
727        }
728        admin.enableTable(currentTableName);
729        util.waitTableAvailable(currentTableName);
730        assertEquals("Data should remain after reopening of regions", tableDigestBefore,
731          util.checksumRows(currentTable));
732      }
733    } finally {
734      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
735        tableInfoSingle.getRegionLocator().close();
736      }
737      for (Entry<String, Table> singleTable : allTables.entrySet()) {
738        singleTable.getValue().close();
739        util.deleteTable(singleTable.getValue().getName());
740      }
741      testDir.getFileSystem(conf).delete(testDir, true);
742      util.shutdownMiniCluster();
743    }
744  }
745
746  private void runIncrementalPELoad(Configuration conf,
747    List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer)
748    throws IOException, InterruptedException, ClassNotFoundException {
749    Job job = new Job(conf, "testLocalMRIncrementalLoad");
750    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
751    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
752      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
753      CellSerialization.class.getName());
754    setupRandomGeneratorMapper(job, putSortReducer);
755    if (tableInfo.size() > 1) {
756      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
757      int sum = 0;
758      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
759        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
760      }
761      assertEquals(sum, job.getNumReduceTasks());
762    } else {
763      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
764      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
765        regionLocator);
766      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
767    }
768
769    FileOutputFormat.setOutputPath(job, outDir);
770
771    assertFalse(util.getTestFileSystem().exists(outDir));
772
773    assertTrue(job.waitForCompletion(true));
774  }
775
776  /**
777   * Test for {@link HFileOutputFormat2#configureCompression(Configuration, HTableDescriptor)} and
778   * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
779   * compression map is correctly serialized into and deserialized from configuration n
780   */
781  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
782  @Test
783  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
784    for (int numCfs = 0; numCfs <= 3; numCfs++) {
785      Configuration conf = new Configuration(this.util.getConfiguration());
786      Map<String, Compression.Algorithm> familyToCompression =
787        getMockColumnFamiliesForCompression(numCfs);
788      Table table = Mockito.mock(Table.class);
789      setupMockColumnFamiliesForCompression(table, familyToCompression);
790      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
791        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
792          Arrays.asList(table.getTableDescriptor())));
793
794      // read back family specific compression setting from the configuration
795      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
796        HFileOutputFormat2.createFamilyCompressionMap(conf);
797
798      // test that we have a value for all column families that matches with the
799      // used mock values
800      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
801        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(),
802          entry.getValue(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
803      }
804    }
805  }
806
807  private void setupMockColumnFamiliesForCompression(Table table,
808    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
809    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
810    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
811      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
812        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
813    }
814    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
815  }
816
817  /**
818   * @return a map from column family names to compression algorithms for testing column family
819   *         compression. Column family names have special characters
820   */
821  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
822    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
823    // use column family names having special characters
824    if (numCfs-- > 0) {
825      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
826    }
827    if (numCfs-- > 0) {
828      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
829    }
830    if (numCfs-- > 0) {
831      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
832    }
833    if (numCfs-- > 0) {
834      familyToCompression.put("Family3", Compression.Algorithm.NONE);
835    }
836    return familyToCompression;
837  }
838
839  /**
840   * Test for {@link HFileOutputFormat2#configureBloomType(HTableDescriptor, Configuration)} and
841   * {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the compression
842   * map is correctly serialized into and deserialized from configuration n
843   */
844  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
845  @Test
846  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
847    for (int numCfs = 0; numCfs <= 2; numCfs++) {
848      Configuration conf = new Configuration(this.util.getConfiguration());
849      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
850      Table table = Mockito.mock(Table.class);
851      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
852      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
853        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
854          Arrays.asList(table.getTableDescriptor())));
855
856      // read back family specific data block encoding settings from the
857      // configuration
858      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
859        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
860
861      // test that we have a value for all column families that matches with the
862      // used mock values
863      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
864        assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(),
865          entry.getValue(), retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
866      }
867    }
868  }
869
870  private void setupMockColumnFamiliesForBloomType(Table table,
871    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
872    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
873    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
874      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
875        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
876    }
877    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
878  }
879
880  /**
881   * @return a map from column family names to compression algorithms for testing column family
882   *         compression. Column family names have special characters
883   */
884  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
885    Map<String, BloomType> familyToBloomType = new HashMap<>();
886    // use column family names having special characters
887    if (numCfs-- > 0) {
888      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
889    }
890    if (numCfs-- > 0) {
891      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
892    }
893    if (numCfs-- > 0) {
894      familyToBloomType.put("Family3", BloomType.NONE);
895    }
896    return familyToBloomType;
897  }
898
899  /**
900   * Test for {@link HFileOutputFormat2#configureBlockSize(HTableDescriptor, Configuration)} and
901   * {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the compression
902   * map is correctly serialized into and deserialized from configuration n
903   */
904  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
905  @Test
906  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
907    for (int numCfs = 0; numCfs <= 3; numCfs++) {
908      Configuration conf = new Configuration(this.util.getConfiguration());
909      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
910      Table table = Mockito.mock(Table.class);
911      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
912      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
913        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
914          Arrays.asList(table.getTableDescriptor())));
915
916      // read back family specific data block encoding settings from the
917      // configuration
918      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
919        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
920
921      // test that we have a value for all column families that matches with the
922      // used mock values
923      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
924        assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(),
925          entry.getValue(), retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
926      }
927    }
928  }
929
930  private void setupMockColumnFamiliesForBlockSize(Table table,
931    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
932    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
933    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
934      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
935        .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
936    }
937    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
938  }
939
940  /**
941   * @return a map from column family names to compression algorithms for testing column family
942   *         compression. Column family names have special characters
943   */
944  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
945    Map<String, Integer> familyToBlockSize = new HashMap<>();
946    // use column family names having special characters
947    if (numCfs-- > 0) {
948      familyToBlockSize.put("Family1!@#!@#&", 1234);
949    }
950    if (numCfs-- > 0) {
951      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
952    }
953    if (numCfs-- > 0) {
954      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
955    }
956    if (numCfs-- > 0) {
957      familyToBlockSize.put("Family3", 0);
958    }
959    return familyToBlockSize;
960  }
961
962  /**
963   * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)}
964   * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that the
965   * compression map is correctly serialized into and deserialized from configuration n
966   */
967  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
968  @Test
969  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
970    for (int numCfs = 0; numCfs <= 3; numCfs++) {
971      Configuration conf = new Configuration(this.util.getConfiguration());
972      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
973        getMockColumnFamiliesForDataBlockEncoding(numCfs);
974      Table table = Mockito.mock(Table.class);
975      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
976      HTableDescriptor tableDescriptor = table.getTableDescriptor();
977      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
978        HFileOutputFormat2.serializeColumnFamilyAttribute(
979          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
980
981      // read back family specific data block encoding settings from the
982      // configuration
983      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
984        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
985
986      // test that we have a value for all column families that matches with the
987      // used mock values
988      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
989        assertEquals(
990          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(),
991          entry.getValue(),
992          retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
993      }
994    }
995  }
996
997  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
998    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
999    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1000    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1001      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
1002        .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
1003    }
1004    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1005  }
1006
1007  /**
1008   * @return a map from column family names to compression algorithms for testing column family
1009   *         compression. Column family names have special characters
1010   */
1011  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
1012    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1013    // use column family names having special characters
1014    if (numCfs-- > 0) {
1015      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1016    }
1017    if (numCfs-- > 0) {
1018      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
1019    }
1020    if (numCfs-- > 0) {
1021      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
1022    }
1023    if (numCfs-- > 0) {
1024      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1025    }
1026    return familyToDataBlockEncoding;
1027  }
1028
1029  private void setupMockStartKeys(RegionLocator table) throws IOException {
1030    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
1031      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
1032    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1033  }
1034
1035  private void setupMockTableName(RegionLocator table) throws IOException {
1036    TableName mockTableName = TableName.valueOf("mock_table");
1037    Mockito.doReturn(mockTableName).when(table).getName();
1038  }
1039
1040  /**
1041   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
1042   * from the column family descriptor
1043   */
1044  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1045  @Test
1046  public void testColumnFamilySettings() throws Exception {
1047    Configuration conf = new Configuration(this.util.getConfiguration());
1048    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1049    TaskAttemptContext context = null;
1050    Path dir = util.getDataTestDir("testColumnFamilySettings");
1051
1052    // Setup table descriptor
1053    Table table = Mockito.mock(Table.class);
1054    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1055    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1056    Mockito.doReturn(htd).when(table).getTableDescriptor();
1057    for (HColumnDescriptor hcd : HBaseTestingUtility.generateColumnDescriptors()) {
1058      htd.addFamily(hcd);
1059    }
1060
1061    // set up the table to return some mock keys
1062    setupMockStartKeys(regionLocator);
1063
1064    try {
1065      // partial map red setup to get an operational writer for testing
1066      // We turn off the sequence file compression, because DefaultCodec
1067      // pollutes the GZip codec pool with an incompatible compressor.
1068      conf.set("io.seqfile.compression.type", "NONE");
1069      conf.set("hbase.fs.tmp.dir", dir.toString());
1070      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1071      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1072
1073      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1074      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1075      setupRandomGeneratorMapper(job, false);
1076      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1077      FileOutputFormat.setOutputPath(job, dir);
1078      context = createTestTaskAttemptContext(job);
1079      HFileOutputFormat2 hof = new HFileOutputFormat2();
1080      writer = hof.getRecordWriter(context);
1081
1082      // write out random rows
1083      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1084      writer.close(context);
1085
1086      // Make sure that a directory was created for every CF
1087      FileSystem fs = dir.getFileSystem(conf);
1088
1089      // commit so that the filesystem has one directory per column family
1090      hof.getOutputCommitter(context).commitTask(context);
1091      hof.getOutputCommitter(context).commitJob(context);
1092      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1093      assertEquals(htd.getFamilies().size(), families.length);
1094      for (FileStatus f : families) {
1095        String familyStr = f.getPath().getName();
1096        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1097        // verify that the compression on this file matches the configured
1098        // compression
1099        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1100        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1101        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1102
1103        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1104        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1105        assertEquals(
1106          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")",
1107          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1108        assertEquals(
1109          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1110          hcd.getCompressionType(), reader.getFileContext().getCompression());
1111      }
1112    } finally {
1113      dir.getFileSystem(conf).delete(dir, true);
1114    }
1115  }
1116
1117  /**
1118   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
1119   * family descriptors
1120   */
1121  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1122    TaskAttemptContext context, Set<byte[]> families, int numRows)
1123    throws IOException, InterruptedException {
1124    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1125    int valLength = 10;
1126    byte valBytes[] = new byte[valLength];
1127
1128    int taskId = context.getTaskAttemptID().getTaskID().getId();
1129    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1130    final byte[] qualifier = Bytes.toBytes("data");
1131    Random random = new Random();
1132    for (int i = 0; i < numRows; i++) {
1133
1134      Bytes.putInt(keyBytes, 0, i);
1135      random.nextBytes(valBytes);
1136      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1137
1138      for (byte[] family : families) {
1139        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1140        writer.write(key, kv);
1141      }
1142    }
1143  }
1144
1145  /**
1146   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
1147   * excluded from minor compaction. Without the fix of HBASE-6901, an
1148   * ArrayIndexOutOfBoundsException will be thrown.
1149   */
1150  @Ignore("Flakey: See HBASE-9051")
1151  @Test
1152  public void testExcludeAllFromMinorCompaction() throws Exception {
1153    Configuration conf = util.getConfiguration();
1154    conf.setInt("hbase.hstore.compaction.min", 2);
1155    generateRandomStartKeys(5);
1156
1157    util.startMiniCluster();
1158    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
1159      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1160      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1161      final FileSystem fs = util.getDFSCluster().getFileSystem();
1162      assertEquals("Should start with empty table", 0, util.countRows(table));
1163
1164      // deep inspection: get the StoreFile dir
1165      final Path storePath =
1166        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1167          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1168            Bytes.toString(FAMILIES[0])));
1169      assertEquals(0, fs.listStatus(storePath).length);
1170
1171      // Generate two bulk load files
1172      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1173
1174      for (int i = 0; i < 2; i++) {
1175        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1176        runIncrementalPELoad(conf,
1177          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(),
1178            conn.getRegionLocator(TABLE_NAMES[0]))),
1179          testDir, false);
1180        // Perform the actual load
1181        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1182      }
1183
1184      // Ensure data shows up
1185      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1186      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
1187        util.countRows(table));
1188
1189      // should have a second StoreFile now
1190      assertEquals(2, fs.listStatus(storePath).length);
1191
1192      // minor compactions shouldn't get rid of the file
1193      admin.compact(TABLE_NAMES[0]);
1194      try {
1195        quickPoll(new Callable<Boolean>() {
1196          @Override
1197          public Boolean call() throws Exception {
1198            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1199            for (HRegion region : regions) {
1200              for (HStore store : region.getStores()) {
1201                store.closeAndArchiveCompactedFiles();
1202              }
1203            }
1204            return fs.listStatus(storePath).length == 1;
1205          }
1206        }, 5000);
1207        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1208      } catch (AssertionError ae) {
1209        // this is expected behavior
1210      }
1211
1212      // a major compaction should work though
1213      admin.majorCompact(TABLE_NAMES[0]);
1214      quickPoll(new Callable<Boolean>() {
1215        @Override
1216        public Boolean call() throws Exception {
1217          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1218          for (HRegion region : regions) {
1219            for (HStore store : region.getStores()) {
1220              store.closeAndArchiveCompactedFiles();
1221            }
1222          }
1223          return fs.listStatus(storePath).length == 1;
1224        }
1225      }, 5000);
1226
1227    } finally {
1228      util.shutdownMiniCluster();
1229    }
1230  }
1231
1232  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1233  @Test
1234  public void testExcludeMinorCompaction() throws Exception {
1235    Configuration conf = util.getConfiguration();
1236    conf.setInt("hbase.hstore.compaction.min", 2);
1237    generateRandomStartKeys(5);
1238
1239    util.startMiniCluster();
1240    try (Connection conn = ConnectionFactory.createConnection(conf);
1241      Admin admin = conn.getAdmin()) {
1242      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1243      final FileSystem fs = util.getDFSCluster().getFileSystem();
1244      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1245      assertEquals("Should start with empty table", 0, util.countRows(table));
1246
1247      // deep inspection: get the StoreFile dir
1248      final Path storePath =
1249        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1250          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1251            Bytes.toString(FAMILIES[0])));
1252      assertEquals(0, fs.listStatus(storePath).length);
1253
1254      // put some data in it and flush to create a storefile
1255      Put p = new Put(Bytes.toBytes("test"));
1256      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1257      table.put(p);
1258      admin.flush(TABLE_NAMES[0]);
1259      assertEquals(1, util.countRows(table));
1260      quickPoll(new Callable<Boolean>() {
1261        @Override
1262        public Boolean call() throws Exception {
1263          return fs.listStatus(storePath).length == 1;
1264        }
1265      }, 5000);
1266
1267      // Generate a bulk load file with more rows
1268      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1269
1270      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1271      runIncrementalPELoad(conf,
1272        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), regionLocator)),
1273        testDir, false);
1274
1275      // Perform the actual load
1276      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1277
1278      // Ensure data shows up
1279      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1280      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows + 1,
1281        util.countRows(table));
1282
1283      // should have a second StoreFile now
1284      assertEquals(2, fs.listStatus(storePath).length);
1285
1286      // minor compactions shouldn't get rid of the file
1287      admin.compact(TABLE_NAMES[0]);
1288      try {
1289        quickPoll(new Callable<Boolean>() {
1290          @Override
1291          public Boolean call() throws Exception {
1292            return fs.listStatus(storePath).length == 1;
1293          }
1294        }, 5000);
1295        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1296      } catch (AssertionError ae) {
1297        // this is expected behavior
1298      }
1299
1300      // a major compaction should work though
1301      admin.majorCompact(TABLE_NAMES[0]);
1302      quickPoll(new Callable<Boolean>() {
1303        @Override
1304        public Boolean call() throws Exception {
1305          return fs.listStatus(storePath).length == 1;
1306        }
1307      }, 5000);
1308
1309    } finally {
1310      util.shutdownMiniCluster();
1311    }
1312  }
1313
1314  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1315    int sleepMs = 10;
1316    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1317    while (retries-- > 0) {
1318      if (c.call().booleanValue()) {
1319        return;
1320      }
1321      Thread.sleep(sleepMs);
1322    }
1323    fail();
1324  }
1325
1326  public static void main(String args[]) throws Exception {
1327    new TestCellBasedHFileOutputFormat2().manualTest(args);
1328  }
1329
1330  public void manualTest(String args[]) throws Exception {
1331    Configuration conf = HBaseConfiguration.create();
1332    util = new HBaseTestingUtility(conf);
1333    if ("newtable".equals(args[0])) {
1334      TableName tname = TableName.valueOf(args[1]);
1335      byte[][] splitKeys = generateRandomSplitKeys(4);
1336      Table table = util.createTable(tname, FAMILIES, splitKeys);
1337    } else if ("incremental".equals(args[0])) {
1338      TableName tname = TableName.valueOf(args[1]);
1339      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
1340        RegionLocator regionLocator = c.getRegionLocator(tname)) {
1341        Path outDir = new Path("incremental-out");
1342        runIncrementalPELoad(conf,
1343          Arrays.asList(
1344            new HFileOutputFormat2.TableInfo(admin.getTableDescriptor(tname), regionLocator)),
1345          outDir, false);
1346      }
1347    } else {
1348      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
1349    }
1350  }
1351
1352  @Test
1353  public void testBlockStoragePolicy() throws Exception {
1354    util = new HBaseTestingUtility();
1355    Configuration conf = util.getConfiguration();
1356    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1357
1358    conf.set(
1359      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
1360        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
1361      "ONE_SSD");
1362    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1363    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1364    util.startMiniDFSCluster(3);
1365    FileSystem fs = util.getDFSCluster().getFileSystem();
1366    try {
1367      fs.mkdirs(cf1Dir);
1368      fs.mkdirs(cf2Dir);
1369
1370      // the original block storage policy would be HOT
1371      String spA = getStoragePolicyName(fs, cf1Dir);
1372      String spB = getStoragePolicyName(fs, cf2Dir);
1373      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1374      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1375      assertEquals("HOT", spA);
1376      assertEquals("HOT", spB);
1377
1378      // alter table cf schema to change storage policies
1379      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1380        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1381      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1382        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1383      spA = getStoragePolicyName(fs, cf1Dir);
1384      spB = getStoragePolicyName(fs, cf2Dir);
1385      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1386      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1387      assertNotNull(spA);
1388      assertEquals("ONE_SSD", spA);
1389      assertNotNull(spB);
1390      assertEquals("ALL_SSD", spB);
1391    } finally {
1392      fs.delete(cf1Dir, true);
1393      fs.delete(cf2Dir, true);
1394      util.shutdownMiniDFSCluster();
1395    }
1396  }
1397
1398  private String getStoragePolicyName(FileSystem fs, Path path) {
1399    try {
1400      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1401      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1402    } catch (Exception e) {
1403      // Maybe fail because of using old HDFS version, try the old way
1404      if (LOG.isTraceEnabled()) {
1405        LOG.trace("Failed to get policy directly", e);
1406      }
1407      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1408      return policy == null ? "HOT" : policy;// HOT by default
1409    }
1410  }
1411
1412  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1413    try {
1414      if (fs instanceof DistributedFileSystem) {
1415        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1416        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1417        if (null != status) {
1418          byte storagePolicyId = status.getStoragePolicy();
1419          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1420          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1421            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1422            for (BlockStoragePolicy policy : policies) {
1423              if (policy.getId() == storagePolicyId) {
1424                return policy.getName();
1425              }
1426            }
1427          }
1428        }
1429      }
1430    } catch (Throwable e) {
1431      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1432    }
1433
1434    return null;
1435  }
1436}