001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Random;
037import java.util.Set;
038import java.util.concurrent.Callable;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.LocatedFileStatus;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.RemoteIterator;
047import org.apache.hadoop.hbase.ArrayBackedTag;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellUtil;
050import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.HColumnDescriptor;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HDFSBlocksDistribution;
057import org.apache.hadoop.hbase.HTableDescriptor;
058import org.apache.hadoop.hbase.HadoopShims;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.PerformanceEvaluation;
061import org.apache.hadoop.hbase.PrivateCellUtil;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.Tag;
064import org.apache.hadoop.hbase.TagType;
065import org.apache.hadoop.hbase.client.Admin;
066import org.apache.hadoop.hbase.client.Connection;
067import org.apache.hadoop.hbase.client.ConnectionFactory;
068import org.apache.hadoop.hbase.client.Put;
069import org.apache.hadoop.hbase.client.RegionLocator;
070import org.apache.hadoop.hbase.client.Result;
071import org.apache.hadoop.hbase.client.ResultScanner;
072import org.apache.hadoop.hbase.client.Scan;
073import org.apache.hadoop.hbase.client.Table;
074import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
075import org.apache.hadoop.hbase.io.compress.Compression;
076import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
077import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
078import org.apache.hadoop.hbase.io.hfile.CacheConfig;
079import org.apache.hadoop.hbase.io.hfile.HFile;
080import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
081import org.apache.hadoop.hbase.io.hfile.HFileScanner;
082import org.apache.hadoop.hbase.regionserver.BloomType;
083import org.apache.hadoop.hbase.regionserver.HRegion;
084import org.apache.hadoop.hbase.regionserver.HStore;
085import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
086import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
087import org.apache.hadoop.hbase.testclassification.LargeTests;
088import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
089import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
090import org.apache.hadoop.hbase.util.Bytes;
091import org.apache.hadoop.hbase.util.FSUtils;
092import org.apache.hadoop.hbase.util.ReflectionUtils;
093import org.apache.hadoop.hdfs.DistributedFileSystem;
094import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
095import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
096import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
097import org.apache.hadoop.io.NullWritable;
098import org.apache.hadoop.mapreduce.Job;
099import org.apache.hadoop.mapreduce.Mapper;
100import org.apache.hadoop.mapreduce.RecordWriter;
101import org.apache.hadoop.mapreduce.TaskAttemptContext;
102import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
103import org.junit.ClassRule;
104import org.junit.Ignore;
105import org.junit.Test;
106import org.junit.experimental.categories.Category;
107import org.mockito.Mockito;
108import org.slf4j.Logger;
109import org.slf4j.LoggerFactory;
110
111/**
112 * Simple test for {@link HFileOutputFormat2}.
113 * Sets up and runs a mapreduce job that writes hfile output.
114 * Creates a few inner classes to implement splits and an inputformat that
115 * emits keys and values like those of {@link PerformanceEvaluation}.
116 */
117@Category({VerySlowMapReduceTests.class, LargeTests.class})
118//TODO : Remove this in 3.0
119public class TestHFileOutputFormat2  {
120
121  @ClassRule
122  public static final HBaseClassTestRule CLASS_RULE =
123      HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
124
125  private final static int ROWSPERSPLIT = 1024;
126
127  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
128  private static final byte[][] FAMILIES = {
129    Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))};
130  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
131          "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
132
133  private HBaseTestingUtility util = new HBaseTestingUtility();
134
135  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
136
137  /**
138   * Simple mapper that makes KeyValue output.
139   */
140  static class RandomKVGeneratingMapper
141      extends Mapper<NullWritable, NullWritable,
142                 ImmutableBytesWritable, Cell> {
143
144    private int keyLength;
145    private static final int KEYLEN_DEFAULT=10;
146    private static final String KEYLEN_CONF="randomkv.key.length";
147
148    private int valLength;
149    private static final int VALLEN_DEFAULT=10;
150    private static final String VALLEN_CONF="randomkv.val.length";
151    private static final byte [] QUALIFIER = Bytes.toBytes("data");
152    private boolean multiTableMapper = false;
153    private TableName[] tables = null;
154
155
156    @Override
157    protected void setup(Context context) throws IOException,
158        InterruptedException {
159      super.setup(context);
160
161      Configuration conf = context.getConfiguration();
162      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
163      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
164      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
165              false);
166      if (multiTableMapper) {
167        tables = TABLE_NAMES;
168      } else {
169        tables = new TableName[]{TABLE_NAMES[0]};
170      }
171    }
172
173    @Override
174    protected void map(
175        NullWritable n1, NullWritable n2,
176        Mapper<NullWritable, NullWritable,
177               ImmutableBytesWritable,Cell>.Context context)
178        throws java.io.IOException ,InterruptedException
179    {
180
181      byte keyBytes[] = new byte[keyLength];
182      byte valBytes[] = new byte[valLength];
183
184      int taskId = context.getTaskAttemptID().getTaskID().getId();
185      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
186      Random random = new Random();
187      byte[] key;
188      for (int j = 0; j < tables.length; ++j) {
189        for (int i = 0; i < ROWSPERSPLIT; i++) {
190          random.nextBytes(keyBytes);
191          // Ensure that unique tasks generate unique keys
192          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
193          random.nextBytes(valBytes);
194          key = keyBytes;
195          if (multiTableMapper) {
196            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
197          }
198
199          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
200            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
201            context.write(new ImmutableBytesWritable(key), kv);
202          }
203        }
204      }
205    }
206  }
207
208  /**
209   * Simple mapper that makes Put output.
210   */
211  static class RandomPutGeneratingMapper
212      extends Mapper<NullWritable, NullWritable,
213                 ImmutableBytesWritable, Put> {
214
215    private int keyLength;
216    private static final int KEYLEN_DEFAULT = 10;
217    private static final String KEYLEN_CONF = "randomkv.key.length";
218
219    private int valLength;
220    private static final int VALLEN_DEFAULT = 10;
221    private static final String VALLEN_CONF = "randomkv.val.length";
222    private static final byte[] QUALIFIER = Bytes.toBytes("data");
223    private boolean multiTableMapper = false;
224    private TableName[] tables = null;
225
226    @Override
227    protected void setup(Context context) throws IOException,
228            InterruptedException {
229      super.setup(context);
230
231      Configuration conf = context.getConfiguration();
232      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
233      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
234      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
235              false);
236      if (multiTableMapper) {
237        tables = TABLE_NAMES;
238      } else {
239        tables = new TableName[]{TABLE_NAMES[0]};
240      }
241    }
242
243    @Override
244    protected void map(
245            NullWritable n1, NullWritable n2,
246            Mapper<NullWritable, NullWritable,
247                    ImmutableBytesWritable, Put>.Context context)
248            throws java.io.IOException, InterruptedException {
249
250      byte keyBytes[] = new byte[keyLength];
251      byte valBytes[] = new byte[valLength];
252
253      int taskId = context.getTaskAttemptID().getTaskID().getId();
254      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
255
256      Random random = new Random();
257      byte[] key;
258      for (int j = 0; j < tables.length; ++j) {
259        for (int i = 0; i < ROWSPERSPLIT; i++) {
260          random.nextBytes(keyBytes);
261          // Ensure that unique tasks generate unique keys
262          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
263          random.nextBytes(valBytes);
264          key = keyBytes;
265          if (multiTableMapper) {
266            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
267          }
268
269          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
270            Put p = new Put(keyBytes);
271            p.addColumn(family, QUALIFIER, valBytes);
272            // set TTL to very low so that the scan does not return any value
273            p.setTTL(1l);
274            context.write(new ImmutableBytesWritable(key), p);
275          }
276        }
277      }
278    }
279  }
280
281  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
282    if (putSortReducer) {
283      job.setInputFormatClass(NMapInputFormat.class);
284      job.setMapperClass(RandomPutGeneratingMapper.class);
285      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
286      job.setMapOutputValueClass(Put.class);
287    } else {
288      job.setInputFormatClass(NMapInputFormat.class);
289      job.setMapperClass(RandomKVGeneratingMapper.class);
290      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
291      job.setMapOutputValueClass(KeyValue.class);
292    }
293  }
294
295  /**
296   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
297   * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
298   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
299   */
300  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
301  public void test_LATEST_TIMESTAMP_isReplaced()
302  throws Exception {
303    Configuration conf = new Configuration(this.util.getConfiguration());
304    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
305    TaskAttemptContext context = null;
306    Path dir =
307      util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
308    try {
309      Job job = new Job(conf);
310      FileOutputFormat.setOutputPath(job, dir);
311      context = createTestTaskAttemptContext(job);
312      HFileOutputFormat2 hof = new HFileOutputFormat2();
313      writer = hof.getRecordWriter(context);
314      final byte [] b = Bytes.toBytes("b");
315
316      // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
317      // changed by call to write.  Check all in kv is same but ts.
318      KeyValue kv = new KeyValue(b, b, b);
319      KeyValue original = kv.clone();
320      writer.write(new ImmutableBytesWritable(), kv);
321      assertFalse(original.equals(kv));
322      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
323      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
324      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
325      assertNotSame(original.getTimestamp(), kv.getTimestamp());
326      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
327
328      // Test 2. Now test passing a kv that has explicit ts.  It should not be
329      // changed by call to record write.
330      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
331      original = kv.clone();
332      writer.write(new ImmutableBytesWritable(), kv);
333      assertTrue(original.equals(kv));
334    } finally {
335      if (writer != null && context != null) writer.close(context);
336      dir.getFileSystem(conf).delete(dir, true);
337    }
338  }
339
340  private TaskAttemptContext createTestTaskAttemptContext(final Job job)
341  throws Exception {
342    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
343    TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
344      job, "attempt_201402131733_0001_m_000000_0");
345    return context;
346  }
347
348  /*
349   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
350   * metadata used by time-restricted scans.
351   */
352  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
353  public void test_TIMERANGE() throws Exception {
354    Configuration conf = new Configuration(this.util.getConfiguration());
355    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
356    TaskAttemptContext context = null;
357    Path dir =
358      util.getDataTestDir("test_TIMERANGE_present");
359    LOG.info("Timerange dir writing to dir: "+ dir);
360    try {
361      // build a record writer using HFileOutputFormat2
362      Job job = new Job(conf);
363      FileOutputFormat.setOutputPath(job, dir);
364      context = createTestTaskAttemptContext(job);
365      HFileOutputFormat2 hof = new HFileOutputFormat2();
366      writer = hof.getRecordWriter(context);
367
368      // Pass two key values with explicit times stamps
369      final byte [] b = Bytes.toBytes("b");
370
371      // value 1 with timestamp 2000
372      KeyValue kv = new KeyValue(b, b, b, 2000, b);
373      KeyValue original = kv.clone();
374      writer.write(new ImmutableBytesWritable(), kv);
375      assertEquals(original,kv);
376
377      // value 2 with timestamp 1000
378      kv = new KeyValue(b, b, b, 1000, b);
379      original = kv.clone();
380      writer.write(new ImmutableBytesWritable(), kv);
381      assertEquals(original, kv);
382
383      // verify that the file has the proper FileInfo.
384      writer.close(context);
385
386      // the generated file lives 1 directory down from the attempt directory
387      // and is the only file, e.g.
388      // _attempt__0000_r_000000_0/b/1979617994050536795
389      FileSystem fs = FileSystem.get(conf);
390      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
391      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
392      FileStatus[] file = fs.listStatus(sub1[0].getPath());
393
394      // open as HFile Reader and pull out TIMERANGE FileInfo.
395      HFile.Reader rd =
396          HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
397      Map<byte[],byte[]> finfo = rd.loadFileInfo();
398      byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
399      assertNotNull(range);
400
401      // unmarshall and check values.
402      TimeRangeTracker timeRangeTracker =TimeRangeTracker.parseFrom(range);
403      LOG.info(timeRangeTracker.getMin() +
404          "...." + timeRangeTracker.getMax());
405      assertEquals(1000, timeRangeTracker.getMin());
406      assertEquals(2000, timeRangeTracker.getMax());
407      rd.close();
408    } finally {
409      if (writer != null && context != null) writer.close(context);
410      dir.getFileSystem(conf).delete(dir, true);
411    }
412  }
413
414  /**
415   * Run small MR job.
416   */
417  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
418  public void testWritingPEData() throws Exception {
419    Configuration conf = util.getConfiguration();
420    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
421    FileSystem fs = testDir.getFileSystem(conf);
422
423    // Set down this value or we OOME in eclipse.
424    conf.setInt("mapreduce.task.io.sort.mb", 20);
425    // Write a few files.
426    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
427
428    Job job = new Job(conf, "testWritingPEData");
429    setupRandomGeneratorMapper(job, false);
430    // This partitioner doesn't work well for number keys but using it anyways
431    // just to demonstrate how to configure it.
432    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
433    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
434
435    Arrays.fill(startKey, (byte)0);
436    Arrays.fill(endKey, (byte)0xff);
437
438    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
439    // Set start and end rows for partitioner.
440    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
441    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
442    job.setReducerClass(KeyValueSortReducer.class);
443    job.setOutputFormatClass(HFileOutputFormat2.class);
444    job.setNumReduceTasks(4);
445    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
446        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
447        KeyValueSerialization.class.getName());
448
449    FileOutputFormat.setOutputPath(job, testDir);
450    assertTrue(job.waitForCompletion(false));
451    FileStatus [] files = fs.listStatus(testDir);
452    assertTrue(files.length > 0);
453  }
454
455  /**
456   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
457   * hfile.
458   */
459  @Test
460  public void test_WritingTagData()
461      throws Exception {
462    Configuration conf = new Configuration(this.util.getConfiguration());
463    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
464    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
465    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
466    TaskAttemptContext context = null;
467    Path dir =
468        util.getDataTestDir("WritingTagData");
469    try {
470      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
471      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
472      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
473      Job job = new Job(conf);
474      FileOutputFormat.setOutputPath(job, dir);
475      context = createTestTaskAttemptContext(job);
476      HFileOutputFormat2 hof = new HFileOutputFormat2();
477      writer = hof.getRecordWriter(context);
478      final byte [] b = Bytes.toBytes("b");
479
480      List< Tag > tags = new ArrayList<>();
481      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
482      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
483      writer.write(new ImmutableBytesWritable(), kv);
484      writer.close(context);
485      writer = null;
486      FileSystem fs = dir.getFileSystem(conf);
487      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
488      while(iterator.hasNext()) {
489        LocatedFileStatus keyFileStatus = iterator.next();
490        HFile.Reader reader =
491            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
492        HFileScanner scanner = reader.getScanner(false, false, false);
493        scanner.seekTo();
494        Cell cell = scanner.getCell();
495        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
496        assertTrue(tagsFromCell.size() > 0);
497        for (Tag tag : tagsFromCell) {
498          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
499        }
500      }
501    } finally {
502      if (writer != null && context != null) writer.close(context);
503      dir.getFileSystem(conf).delete(dir, true);
504    }
505  }
506
507  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
508  public void testJobConfiguration() throws Exception {
509    Configuration conf = new Configuration(this.util.getConfiguration());
510    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
511        .toString());
512    Job job = new Job(conf);
513    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
514    Table table = Mockito.mock(Table.class);
515    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
516    setupMockStartKeys(regionLocator);
517    setupMockTableName(regionLocator);
518    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
519    assertEquals(job.getNumReduceTasks(), 4);
520  }
521
522  private byte [][] generateRandomStartKeys(int numKeys) {
523    Random random = new Random();
524    byte[][] ret = new byte[numKeys][];
525    // first region start key is always empty
526    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
527    for (int i = 1; i < numKeys; i++) {
528      ret[i] =
529        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
530    }
531    return ret;
532  }
533
534  private byte[][] generateRandomSplitKeys(int numKeys) {
535    Random random = new Random();
536    byte[][] ret = new byte[numKeys][];
537    for (int i = 0; i < numKeys; i++) {
538      ret[i] =
539          PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
540    }
541    return ret;
542  }
543
544  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
545  public void testMRIncrementalLoad() throws Exception {
546    LOG.info("\nStarting test testMRIncrementalLoad\n");
547    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
548  }
549
550  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
551  public void testMRIncrementalLoadWithSplit() throws Exception {
552    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
553    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
554  }
555
556  /**
557   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
558   * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
559   * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
560   * it's not possible to check the region locality by comparing region locations and DN hostnames.
561   * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
562   * we could test region locality features more easily.
563   */
564  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
565  public void testMRIncrementalLoadWithLocality() throws Exception {
566    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
567    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
568    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
569  }
570
571  //@Ignore("Wahtevs")
572  @Test
573  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
574    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
575    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
576  }
577
578  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
579                                     boolean putSortReducer, String tableStr) throws Exception {
580      doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
581              Arrays.asList(tableStr));
582  }
583
584  @Test
585  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
586    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
587    doIncrementalLoadTest(false, false, true,
588            Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
589                    ()));
590  }
591
592  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
593      boolean putSortReducer, List<String> tableStr) throws Exception {
594    util = new HBaseTestingUtility();
595    Configuration conf = util.getConfiguration();
596    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
597    int hostCount = 1;
598    int regionNum = 5;
599    if (shouldKeepLocality) {
600      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
601      // explicit hostnames parameter just like MiniDFSCluster does.
602      hostCount = 3;
603      regionNum = 20;
604    }
605
606    String[] hostnames = new String[hostCount];
607    for (int i = 0; i < hostCount; ++i) {
608      hostnames[i] = "datanode_" + i;
609    }
610    util.startMiniCluster(1, hostCount, hostnames);
611
612    Map<String, Table> allTables = new HashMap<>(tableStr.size());
613    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
614    boolean writeMultipleTables = tableStr.size() > 1;
615    for (String tableStrSingle : tableStr) {
616      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
617      TableName tableName = TableName.valueOf(tableStrSingle);
618      Table table = util.createTable(tableName, FAMILIES, splitKeys);
619
620      RegionLocator r = util.getConnection().getRegionLocator(tableName);
621      assertEquals("Should start with empty table", 0, util.countRows(table));
622      int numRegions = r.getStartKeys().length;
623      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
624
625      allTables.put(tableStrSingle, table);
626      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
627    }
628    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
629    // Generate the bulk load files
630    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
631
632    for (Table tableSingle : allTables.values()) {
633      // This doesn't write into the table, just makes files
634      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
635    }
636    int numTableDirs = 0;
637    for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
638      Path tablePath = testDir;
639
640      if (writeMultipleTables) {
641        if (allTables.containsKey(tf.getPath().getName())) {
642          ++numTableDirs;
643          tablePath = tf.getPath();
644        }
645        else {
646          continue;
647        }
648      }
649
650      // Make sure that a directory was created for every CF
651      int dir = 0;
652      for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
653        for (byte[] family : FAMILIES) {
654          if (Bytes.toString(family).equals(f.getPath().getName())) {
655            ++dir;
656          }
657        }
658      }
659      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
660    }
661    if (writeMultipleTables) {
662      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
663    }
664
665    Admin admin = util.getConnection().getAdmin();
666    try {
667      // handle the split case
668      if (shouldChangeRegions) {
669        Table chosenTable = allTables.values().iterator().next();
670        // Choose a semi-random table if multiple tables are available
671        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
672        admin.disableTable(chosenTable.getName());
673        util.waitUntilNoRegionsInTransition();
674
675        util.deleteTable(chosenTable.getName());
676        byte[][] newSplitKeys = generateRandomSplitKeys(14);
677        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
678
679        while (util.getConnection().getRegionLocator(chosenTable.getName())
680                .getAllRegionLocations().size() != 15 ||
681                !admin.isTableAvailable(table.getName())) {
682          Thread.sleep(200);
683          LOG.info("Waiting for new region assignment to happen");
684        }
685      }
686
687      // Perform the actual load
688      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
689        Path tableDir = testDir;
690        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
691        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
692        if (writeMultipleTables) {
693          tableDir = new Path(testDir, tableNameStr);
694        }
695        Table currentTable = allTables.get(tableNameStr);
696        TableName currentTableName = currentTable.getName();
697        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
698                .getRegionLocator());
699
700        // Ensure data shows up
701        int expectedRows = 0;
702        if (putSortReducer) {
703          // no rows should be extracted
704          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
705                  util.countRows(currentTable));
706        } else {
707          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
708          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
709                  util.countRows(currentTable));
710          Scan scan = new Scan();
711          ResultScanner results = currentTable.getScanner(scan);
712          for (Result res : results) {
713            assertEquals(FAMILIES.length, res.rawCells().length);
714            Cell first = res.rawCells()[0];
715            for (Cell kv : res.rawCells()) {
716              assertTrue(CellUtil.matchingRows(first, kv));
717              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
718            }
719          }
720          results.close();
721        }
722        String tableDigestBefore = util.checksumRows(currentTable);
723        // Check region locality
724        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
725        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
726          hbd.add(region.getHDFSBlocksDistribution());
727        }
728        for (String hostname : hostnames) {
729          float locality = hbd.getBlockLocalityIndex(hostname);
730          LOG.info("locality of [" + hostname + "]: " + locality);
731          assertEquals(100, (int) (locality * 100));
732        }
733
734        // Cause regions to reopen
735        admin.disableTable(currentTableName);
736        while (!admin.isTableDisabled(currentTableName)) {
737          Thread.sleep(200);
738          LOG.info("Waiting for table to disable");
739        }
740        admin.enableTable(currentTableName);
741        util.waitTableAvailable(currentTableName);
742        assertEquals("Data should remain after reopening of regions",
743                tableDigestBefore, util.checksumRows(currentTable));
744      }
745    } finally {
746      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
747          tableInfoSingle.getRegionLocator().close();
748      }
749      for (Entry<String, Table> singleTable : allTables.entrySet() ) {
750        singleTable.getValue().close();
751        util.deleteTable(singleTable.getValue().getName());
752      }
753      testDir.getFileSystem(conf).delete(testDir, true);
754      util.shutdownMiniCluster();
755    }
756  }
757
758  private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
759                                    boolean putSortReducer) throws IOException,
760          InterruptedException, ClassNotFoundException {
761    Job job = new Job(conf, "testLocalMRIncrementalLoad");
762    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
763    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
764        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
765        KeyValueSerialization.class.getName());
766    setupRandomGeneratorMapper(job, putSortReducer);
767    if (tableInfo.size() > 1) {
768      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
769      int sum = 0;
770      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
771        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
772      }
773      assertEquals(sum, job.getNumReduceTasks());
774    }
775    else {
776      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
777      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
778              regionLocator);
779      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
780    }
781
782    FileOutputFormat.setOutputPath(job, outDir);
783
784    assertFalse(util.getTestFileSystem().exists(outDir)) ;
785
786    assertTrue(job.waitForCompletion(true));
787  }
788
789  /**
790   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}.
791   * Tests that the family compression map is correctly serialized into
792   * and deserialized from configuration
793   *
794   * @throws IOException
795   */
796  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
797  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
798    for (int numCfs = 0; numCfs <= 3; numCfs++) {
799      Configuration conf = new Configuration(this.util.getConfiguration());
800      Map<String, Compression.Algorithm> familyToCompression =
801          getMockColumnFamiliesForCompression(numCfs);
802      Table table = Mockito.mock(Table.class);
803      setupMockColumnFamiliesForCompression(table, familyToCompression);
804      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
805              HFileOutputFormat2.serializeColumnFamilyAttribute
806                      (HFileOutputFormat2.compressionDetails,
807                              Arrays.asList(table.getTableDescriptor())));
808
809      // read back family specific compression setting from the configuration
810      Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
811          .createFamilyCompressionMap(conf);
812
813      // test that we have a value for all column families that matches with the
814      // used mock values
815      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
816        assertEquals("Compression configuration incorrect for column family:"
817            + entry.getKey(), entry.getValue(),
818            retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
819      }
820    }
821  }
822
823  private void setupMockColumnFamiliesForCompression(Table table,
824      Map<String, Compression.Algorithm> familyToCompression) throws IOException {
825    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
826    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
827      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
828          .setMaxVersions(1)
829          .setCompressionType(entry.getValue())
830          .setBlockCacheEnabled(false)
831          .setTimeToLive(0));
832    }
833    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
834  }
835
836  /**
837   * @return a map from column family names to compression algorithms for
838   *         testing column family compression. Column family names have special characters
839   */
840  private Map<String, Compression.Algorithm>
841      getMockColumnFamiliesForCompression (int numCfs) {
842    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
843    // use column family names having special characters
844    if (numCfs-- > 0) {
845      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
846    }
847    if (numCfs-- > 0) {
848      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
849    }
850    if (numCfs-- > 0) {
851      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
852    }
853    if (numCfs-- > 0) {
854      familyToCompression.put("Family3", Compression.Algorithm.NONE);
855    }
856    return familyToCompression;
857  }
858
859
860  /**
861   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}.
862   * Tests that the family bloom type map is correctly serialized into
863   * and deserialized from configuration
864   *
865   * @throws IOException
866   */
867  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
868  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
869    for (int numCfs = 0; numCfs <= 2; numCfs++) {
870      Configuration conf = new Configuration(this.util.getConfiguration());
871      Map<String, BloomType> familyToBloomType =
872          getMockColumnFamiliesForBloomType(numCfs);
873      Table table = Mockito.mock(Table.class);
874      setupMockColumnFamiliesForBloomType(table,
875          familyToBloomType);
876      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
877              HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
878              Arrays.asList(table.getTableDescriptor())));
879
880      // read back family specific data block encoding settings from the
881      // configuration
882      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
883          HFileOutputFormat2
884              .createFamilyBloomTypeMap(conf);
885
886      // test that we have a value for all column families that matches with the
887      // used mock values
888      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
889        assertEquals("BloomType configuration incorrect for column family:"
890            + entry.getKey(), entry.getValue(),
891            retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
892      }
893    }
894  }
895
896  private void setupMockColumnFamiliesForBloomType(Table table,
897      Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
898    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
899    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
900      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
901          .setMaxVersions(1)
902          .setBloomFilterType(entry.getValue())
903          .setBlockCacheEnabled(false)
904          .setTimeToLive(0));
905    }
906    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
907  }
908
909  /**
910   * @return a map from column family names to compression algorithms for
911   *         testing column family compression. Column family names have special characters
912   */
913  private Map<String, BloomType>
914  getMockColumnFamiliesForBloomType (int numCfs) {
915    Map<String, BloomType> familyToBloomType = new HashMap<>();
916    // use column family names having special characters
917    if (numCfs-- > 0) {
918      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
919    }
920    if (numCfs-- > 0) {
921      familyToBloomType.put("Family2=asdads&!AASD",
922          BloomType.ROWCOL);
923    }
924    if (numCfs-- > 0) {
925      familyToBloomType.put("Family3", BloomType.NONE);
926    }
927    return familyToBloomType;
928  }
929
930  /**
931   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}.
932   * Tests that the family block size map is correctly serialized into
933   * and deserialized from configuration
934   *
935   * @throws IOException
936   */
937  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
938  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
939    for (int numCfs = 0; numCfs <= 3; numCfs++) {
940      Configuration conf = new Configuration(this.util.getConfiguration());
941      Map<String, Integer> familyToBlockSize =
942          getMockColumnFamiliesForBlockSize(numCfs);
943      Table table = Mockito.mock(Table.class);
944      setupMockColumnFamiliesForBlockSize(table,
945          familyToBlockSize);
946      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
947              HFileOutputFormat2.serializeColumnFamilyAttribute
948                      (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
949                              .getTableDescriptor())));
950
951      // read back family specific data block encoding settings from the
952      // configuration
953      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
954          HFileOutputFormat2
955              .createFamilyBlockSizeMap(conf);
956
957      // test that we have a value for all column families that matches with the
958      // used mock values
959      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
960          ) {
961        assertEquals("BlockSize configuration incorrect for column family:"
962            + entry.getKey(), entry.getValue(),
963            retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
964      }
965    }
966  }
967
968  private void setupMockColumnFamiliesForBlockSize(Table table,
969      Map<String, Integer> familyToDataBlockEncoding) throws IOException {
970    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
971    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
972      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
973          .setMaxVersions(1)
974          .setBlocksize(entry.getValue())
975          .setBlockCacheEnabled(false)
976          .setTimeToLive(0));
977    }
978    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
979  }
980
981  /**
982   * @return a map from column family names to compression algorithms for
983   *         testing column family compression. Column family names have special characters
984   */
985  private Map<String, Integer>
986  getMockColumnFamiliesForBlockSize (int numCfs) {
987    Map<String, Integer> familyToBlockSize = new HashMap<>();
988    // use column family names having special characters
989    if (numCfs-- > 0) {
990      familyToBlockSize.put("Family1!@#!@#&", 1234);
991    }
992    if (numCfs-- > 0) {
993      familyToBlockSize.put("Family2=asdads&!AASD",
994          Integer.MAX_VALUE);
995    }
996    if (numCfs-- > 0) {
997      familyToBlockSize.put("Family2=asdads&!AASD",
998          Integer.MAX_VALUE);
999    }
1000    if (numCfs-- > 0) {
1001      familyToBlockSize.put("Family3", 0);
1002    }
1003    return familyToBlockSize;
1004  }
1005
1006  /**
1007   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
1008   * Tests that the family data block encoding map is correctly serialized into
1009   * and deserialized from configuration
1010   *
1011   * @throws IOException
1012   */
1013  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1014  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1015    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1016      Configuration conf = new Configuration(this.util.getConfiguration());
1017      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1018          getMockColumnFamiliesForDataBlockEncoding(numCfs);
1019      Table table = Mockito.mock(Table.class);
1020      setupMockColumnFamiliesForDataBlockEncoding(table,
1021          familyToDataBlockEncoding);
1022      HTableDescriptor tableDescriptor = table.getTableDescriptor();
1023      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1024              HFileOutputFormat2.serializeColumnFamilyAttribute
1025                      (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
1026                      .asList(tableDescriptor)));
1027
1028      // read back family specific data block encoding settings from the
1029      // configuration
1030      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1031          HFileOutputFormat2
1032          .createFamilyDataBlockEncodingMap(conf);
1033
1034      // test that we have a value for all column families that matches with the
1035      // used mock values
1036      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1037        assertEquals("DataBlockEncoding configuration incorrect for column family:"
1038            + entry.getKey(), entry.getValue(),
1039            retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
1040      }
1041    }
1042  }
1043
1044  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1045      Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1046    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1047    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1048      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
1049          .setMaxVersions(1)
1050          .setDataBlockEncoding(entry.getValue())
1051          .setBlockCacheEnabled(false)
1052          .setTimeToLive(0));
1053    }
1054    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1055  }
1056
1057  /**
1058   * @return a map from column family names to compression algorithms for
1059   *         testing column family compression. Column family names have special characters
1060   */
1061  private Map<String, DataBlockEncoding>
1062      getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
1063    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1064    // use column family names having special characters
1065    if (numCfs-- > 0) {
1066      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1067    }
1068    if (numCfs-- > 0) {
1069      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1070          DataBlockEncoding.FAST_DIFF);
1071    }
1072    if (numCfs-- > 0) {
1073      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1074          DataBlockEncoding.PREFIX);
1075    }
1076    if (numCfs-- > 0) {
1077      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1078    }
1079    return familyToDataBlockEncoding;
1080  }
1081
1082  private void setupMockStartKeys(RegionLocator table) throws IOException {
1083    byte[][] mockKeys = new byte[][] {
1084        HConstants.EMPTY_BYTE_ARRAY,
1085        Bytes.toBytes("aaa"),
1086        Bytes.toBytes("ggg"),
1087        Bytes.toBytes("zzz")
1088    };
1089    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1090  }
1091
1092  private void setupMockTableName(RegionLocator table) throws IOException {
1093    TableName mockTableName = TableName.valueOf("mock_table");
1094    Mockito.doReturn(mockTableName).when(table).getName();
1095  }
1096
1097  /**
1098   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
1099   * bloom filter settings from the column family descriptor
1100   */
1101  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1102  public void testColumnFamilySettings() throws Exception {
1103    Configuration conf = new Configuration(this.util.getConfiguration());
1104    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1105    TaskAttemptContext context = null;
1106    Path dir = util.getDataTestDir("testColumnFamilySettings");
1107
1108    // Setup table descriptor
1109    Table table = Mockito.mock(Table.class);
1110    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1111    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1112    Mockito.doReturn(htd).when(table).getTableDescriptor();
1113    for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
1114      htd.addFamily(hcd);
1115    }
1116
1117    // set up the table to return some mock keys
1118    setupMockStartKeys(regionLocator);
1119
1120    try {
1121      // partial map red setup to get an operational writer for testing
1122      // We turn off the sequence file compression, because DefaultCodec
1123      // pollutes the GZip codec pool with an incompatible compressor.
1124      conf.set("io.seqfile.compression.type", "NONE");
1125      conf.set("hbase.fs.tmp.dir", dir.toString());
1126      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1127      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1128
1129      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1130      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1131      setupRandomGeneratorMapper(job, false);
1132      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1133      FileOutputFormat.setOutputPath(job, dir);
1134      context = createTestTaskAttemptContext(job);
1135      HFileOutputFormat2 hof = new HFileOutputFormat2();
1136      writer = hof.getRecordWriter(context);
1137
1138      // write out random rows
1139      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1140      writer.close(context);
1141
1142      // Make sure that a directory was created for every CF
1143      FileSystem fs = dir.getFileSystem(conf);
1144
1145      // commit so that the filesystem has one directory per column family
1146      hof.getOutputCommitter(context).commitTask(context);
1147      hof.getOutputCommitter(context).commitJob(context);
1148      FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1149      assertEquals(htd.getFamilies().size(), families.length);
1150      for (FileStatus f : families) {
1151        String familyStr = f.getPath().getName();
1152        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1153        // verify that the compression on this file matches the configured
1154        // compression
1155        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1156        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1157        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1158
1159        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1160        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1161        assertEquals("Incorrect bloom filter used for column family " + familyStr +
1162          "(reader: " + reader + ")",
1163          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1164        assertEquals("Incorrect compression used for column family " + familyStr +
1165          "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression());
1166      }
1167    } finally {
1168      dir.getFileSystem(conf).delete(dir, true);
1169    }
1170  }
1171
1172  /**
1173   * Write random values to the writer assuming a table created using
1174   * {@link #FAMILIES} as column family descriptors
1175   */
1176  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1177      TaskAttemptContext context, Set<byte[]> families, int numRows)
1178      throws IOException, InterruptedException {
1179    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1180    int valLength = 10;
1181    byte valBytes[] = new byte[valLength];
1182
1183    int taskId = context.getTaskAttemptID().getTaskID().getId();
1184    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1185    final byte [] qualifier = Bytes.toBytes("data");
1186    Random random = new Random();
1187    for (int i = 0; i < numRows; i++) {
1188
1189      Bytes.putInt(keyBytes, 0, i);
1190      random.nextBytes(valBytes);
1191      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1192
1193      for (byte[] family : families) {
1194        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1195        writer.write(key, kv);
1196      }
1197    }
1198  }
1199
1200  /**
1201   * This test is to test the scenario happened in HBASE-6901.
1202   * All files are bulk loaded and excluded from minor compaction.
1203   * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
1204   * will be thrown.
1205   */
1206  @Ignore ("Flakey: See HBASE-9051") @Test
1207  public void testExcludeAllFromMinorCompaction() throws Exception {
1208    Configuration conf = util.getConfiguration();
1209    conf.setInt("hbase.hstore.compaction.min", 2);
1210    generateRandomStartKeys(5);
1211
1212    util.startMiniCluster();
1213    try (Connection conn = ConnectionFactory.createConnection();
1214        Admin admin = conn.getAdmin();
1215        Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1216        RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1217      final FileSystem fs = util.getDFSCluster().getFileSystem();
1218      assertEquals("Should start with empty table", 0, util.countRows(table));
1219
1220      // deep inspection: get the StoreFile dir
1221      final Path storePath = new Path(
1222        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1223          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1224            Bytes.toString(FAMILIES[0])));
1225      assertEquals(0, fs.listStatus(storePath).length);
1226
1227      // Generate two bulk load files
1228      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1229          true);
1230
1231      for (int i = 0; i < 2; i++) {
1232        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1233        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1234                .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
1235        // Perform the actual load
1236        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1237      }
1238
1239      // Ensure data shows up
1240      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1241      assertEquals("LoadIncrementalHFiles should put expected data in table",
1242          expectedRows, util.countRows(table));
1243
1244      // should have a second StoreFile now
1245      assertEquals(2, fs.listStatus(storePath).length);
1246
1247      // minor compactions shouldn't get rid of the file
1248      admin.compact(TABLE_NAMES[0]);
1249      try {
1250        quickPoll(new Callable<Boolean>() {
1251          @Override
1252          public Boolean call() throws Exception {
1253            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1254            for (HRegion region : regions) {
1255              for (HStore store : region.getStores()) {
1256                store.closeAndArchiveCompactedFiles();
1257              }
1258            }
1259            return fs.listStatus(storePath).length == 1;
1260          }
1261        }, 5000);
1262        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1263      } catch (AssertionError ae) {
1264        // this is expected behavior
1265      }
1266
1267      // a major compaction should work though
1268      admin.majorCompact(TABLE_NAMES[0]);
1269      quickPoll(new Callable<Boolean>() {
1270        @Override
1271        public Boolean call() throws Exception {
1272          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1273          for (HRegion region : regions) {
1274            for (HStore store : region.getStores()) {
1275              store.closeAndArchiveCompactedFiles();
1276            }
1277          }
1278          return fs.listStatus(storePath).length == 1;
1279        }
1280      }, 5000);
1281
1282    } finally {
1283      util.shutdownMiniCluster();
1284    }
1285  }
1286
1287  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1288  public void testExcludeMinorCompaction() throws Exception {
1289    Configuration conf = util.getConfiguration();
1290    conf.setInt("hbase.hstore.compaction.min", 2);
1291    generateRandomStartKeys(5);
1292
1293    util.startMiniCluster();
1294    try (Connection conn = ConnectionFactory.createConnection(conf);
1295        Admin admin = conn.getAdmin()){
1296      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1297      final FileSystem fs = util.getDFSCluster().getFileSystem();
1298      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1299      assertEquals("Should start with empty table", 0, util.countRows(table));
1300
1301      // deep inspection: get the StoreFile dir
1302      final Path storePath = new Path(
1303        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1304          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1305            Bytes.toString(FAMILIES[0])));
1306      assertEquals(0, fs.listStatus(storePath).length);
1307
1308      // put some data in it and flush to create a storefile
1309      Put p = new Put(Bytes.toBytes("test"));
1310      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1311      table.put(p);
1312      admin.flush(TABLE_NAMES[0]);
1313      assertEquals(1, util.countRows(table));
1314      quickPoll(new Callable<Boolean>() {
1315        @Override
1316        public Boolean call() throws Exception {
1317          return fs.listStatus(storePath).length == 1;
1318        }
1319      }, 5000);
1320
1321      // Generate a bulk load file with more rows
1322      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1323          true);
1324
1325      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1326      runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1327                      .getTableDescriptor(), regionLocator)), testDir, false);
1328
1329      // Perform the actual load
1330      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1331
1332      // Ensure data shows up
1333      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1334      assertEquals("LoadIncrementalHFiles should put expected data in table",
1335          expectedRows + 1, util.countRows(table));
1336
1337      // should have a second StoreFile now
1338      assertEquals(2, fs.listStatus(storePath).length);
1339
1340      // minor compactions shouldn't get rid of the file
1341      admin.compact(TABLE_NAMES[0]);
1342      try {
1343        quickPoll(new Callable<Boolean>() {
1344          @Override
1345          public Boolean call() throws Exception {
1346            return fs.listStatus(storePath).length == 1;
1347          }
1348        }, 5000);
1349        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1350      } catch (AssertionError ae) {
1351        // this is expected behavior
1352      }
1353
1354      // a major compaction should work though
1355      admin.majorCompact(TABLE_NAMES[0]);
1356      quickPoll(new Callable<Boolean>() {
1357        @Override
1358        public Boolean call() throws Exception {
1359          return fs.listStatus(storePath).length == 1;
1360        }
1361      }, 5000);
1362
1363    } finally {
1364      util.shutdownMiniCluster();
1365    }
1366  }
1367
1368  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1369    int sleepMs = 10;
1370    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1371    while (retries-- > 0) {
1372      if (c.call().booleanValue()) {
1373        return;
1374      }
1375      Thread.sleep(sleepMs);
1376    }
1377    fail();
1378  }
1379
1380  public static void main(String args[]) throws Exception {
1381    new TestHFileOutputFormat2().manualTest(args);
1382  }
1383
1384  public void manualTest(String args[]) throws Exception {
1385    Configuration conf = HBaseConfiguration.create();
1386    util = new HBaseTestingUtility(conf);
1387    if ("newtable".equals(args[0])) {
1388      TableName tname = TableName.valueOf(args[1]);
1389      byte[][] splitKeys = generateRandomSplitKeys(4);
1390      Table table = util.createTable(tname, FAMILIES, splitKeys);
1391    } else if ("incremental".equals(args[0])) {
1392      TableName tname = TableName.valueOf(args[1]);
1393      try(Connection c = ConnectionFactory.createConnection(conf);
1394          Admin admin = c.getAdmin();
1395          RegionLocator regionLocator = c.getRegionLocator(tname)) {
1396        Path outDir = new Path("incremental-out");
1397        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
1398                .getTableDescriptor(tname), regionLocator)), outDir, false);
1399      }
1400    } else {
1401      throw new RuntimeException(
1402          "usage: TestHFileOutputFormat2 newtable | incremental");
1403    }
1404  }
1405
1406  @Test
1407  public void testBlockStoragePolicy() throws Exception {
1408    util = new HBaseTestingUtility();
1409    Configuration conf = util.getConfiguration();
1410    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1411
1412    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
1413            Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
1414                    TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
1415    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1416    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1417    util.startMiniDFSCluster(3);
1418    FileSystem fs = util.getDFSCluster().getFileSystem();
1419    try {
1420      fs.mkdirs(cf1Dir);
1421      fs.mkdirs(cf2Dir);
1422
1423      // the original block storage policy would be HOT
1424      String spA = getStoragePolicyName(fs, cf1Dir);
1425      String spB = getStoragePolicyName(fs, cf2Dir);
1426      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1427      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1428      assertEquals("HOT", spA);
1429      assertEquals("HOT", spB);
1430
1431      // alter table cf schema to change storage policies
1432      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1433              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1434      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1435              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1436      spA = getStoragePolicyName(fs, cf1Dir);
1437      spB = getStoragePolicyName(fs, cf2Dir);
1438      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1439      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1440      assertNotNull(spA);
1441      assertEquals("ONE_SSD", spA);
1442      assertNotNull(spB);
1443      assertEquals("ALL_SSD", spB);
1444    } finally {
1445      fs.delete(cf1Dir, true);
1446      fs.delete(cf2Dir, true);
1447      util.shutdownMiniDFSCluster();
1448    }
1449  }
1450
1451  private String getStoragePolicyName(FileSystem fs, Path path) {
1452    try {
1453      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1454      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1455    } catch (Exception e) {
1456      // Maybe fail because of using old HDFS version, try the old way
1457      if (LOG.isTraceEnabled()) {
1458        LOG.trace("Failed to get policy directly", e);
1459      }
1460      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1461      return policy == null ? "HOT" : policy;// HOT by default
1462    }
1463  }
1464
1465  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1466    try {
1467      if (fs instanceof DistributedFileSystem) {
1468        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1469        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1470        if (null != status) {
1471          byte storagePolicyId = status.getStoragePolicy();
1472          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1473          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1474            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1475            for (BlockStoragePolicy policy : policies) {
1476              if (policy.getId() == storagePolicyId) {
1477                return policy.getName();
1478              }
1479            }
1480          }
1481        }
1482      }
1483    } catch (Throwable e) {
1484      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1485    }
1486
1487    return null;
1488  }
1489}
1490