001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNotSame;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Random;
038import java.util.Set;
039import java.util.concurrent.Callable;
040import java.util.stream.Collectors;
041import java.util.stream.Stream;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.LocatedFileStatus;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.fs.RemoteIterator;
048import org.apache.hadoop.hbase.ArrayBackedTag;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellUtil;
051import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtility;
055import org.apache.hadoop.hbase.HColumnDescriptor;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HDFSBlocksDistribution;
058import org.apache.hadoop.hbase.HTableDescriptor;
059import org.apache.hadoop.hbase.HadoopShims;
060import org.apache.hadoop.hbase.KeyValue;
061import org.apache.hadoop.hbase.PerformanceEvaluation;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.Tag;
064import org.apache.hadoop.hbase.TagType;
065import org.apache.hadoop.hbase.TagUtil;
066import org.apache.hadoop.hbase.client.Admin;
067import org.apache.hadoop.hbase.client.Connection;
068import org.apache.hadoop.hbase.client.ConnectionFactory;
069import org.apache.hadoop.hbase.client.Put;
070import org.apache.hadoop.hbase.client.RegionLocator;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.Scan;
074import org.apache.hadoop.hbase.client.Table;
075import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
076import org.apache.hadoop.hbase.io.compress.Compression;
077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
079import org.apache.hadoop.hbase.io.hfile.CacheConfig;
080import org.apache.hadoop.hbase.io.hfile.HFile;
081import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
082import org.apache.hadoop.hbase.io.hfile.HFileScanner;
083import org.apache.hadoop.hbase.regionserver.BloomType;
084import org.apache.hadoop.hbase.regionserver.HRegion;
085import org.apache.hadoop.hbase.regionserver.HStore;
086import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
087import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
088import org.apache.hadoop.hbase.testclassification.LargeTests;
089import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
090import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
091import org.apache.hadoop.hbase.util.Bytes;
092import org.apache.hadoop.hbase.util.FSUtils;
093import org.apache.hadoop.hbase.util.ReflectionUtils;
094import org.apache.hadoop.hdfs.DistributedFileSystem;
095import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
096import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
097import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
098import org.apache.hadoop.io.NullWritable;
099import org.apache.hadoop.mapreduce.Job;
100import org.apache.hadoop.mapreduce.Mapper;
101import org.apache.hadoop.mapreduce.RecordWriter;
102import org.apache.hadoop.mapreduce.TaskAttemptContext;
103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
104import org.junit.ClassRule;
105import org.junit.Ignore;
106import org.junit.Test;
107import org.junit.experimental.categories.Category;
108import org.mockito.Mockito;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111
112/**
113 * Simple test for {@link HFileOutputFormat2}.
114 * Sets up and runs a mapreduce job that writes hfile output.
115 * Creates a few inner classes to implement splits and an inputformat that
116 * emits keys and values like those of {@link PerformanceEvaluation}.
117 */
118@Category({VerySlowMapReduceTests.class, LargeTests.class})
119public class TestCellBasedHFileOutputFormat2  {
120
121  @ClassRule
122  public static final HBaseClassTestRule CLASS_RULE =
123      HBaseClassTestRule.forClass(TestCellBasedHFileOutputFormat2.class);
124
125  private final static int ROWSPERSPLIT = 1024;
126
127  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
128  private static final byte[][] FAMILIES = {
129    Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))};
130  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
131          "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
132
133  private HBaseTestingUtility util = new HBaseTestingUtility();
134
135  private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedHFileOutputFormat2.class);
136
137  /**
138   * Simple mapper that makes KeyValue output.
139   */
140  static class RandomKVGeneratingMapper
141      extends Mapper<NullWritable, NullWritable,
142                 ImmutableBytesWritable, Cell> {
143
144    private int keyLength;
145    private static final int KEYLEN_DEFAULT=10;
146    private static final String KEYLEN_CONF="randomkv.key.length";
147
148    private int valLength;
149    private static final int VALLEN_DEFAULT=10;
150    private static final String VALLEN_CONF="randomkv.val.length";
151    private static final byte [] QUALIFIER = Bytes.toBytes("data");
152    private boolean multiTableMapper = false;
153    private TableName[] tables = null;
154
155
156    @Override
157    protected void setup(Context context) throws IOException,
158        InterruptedException {
159      super.setup(context);
160
161      Configuration conf = context.getConfiguration();
162      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
163      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
164      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
165              false);
166      if (multiTableMapper) {
167        tables = TABLE_NAMES;
168      } else {
169        tables = new TableName[]{TABLE_NAMES[0]};
170      }
171    }
172
173    @Override
174    protected void map(
175        NullWritable n1, NullWritable n2,
176        Mapper<NullWritable, NullWritable,
177               ImmutableBytesWritable,Cell>.Context context)
178        throws java.io.IOException ,InterruptedException
179    {
180
181      byte keyBytes[] = new byte[keyLength];
182      byte valBytes[] = new byte[valLength];
183
184      int taskId = context.getTaskAttemptID().getTaskID().getId();
185      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
186      Random random = new Random();
187      byte[] key;
188      for (int j = 0; j < tables.length; ++j) {
189        for (int i = 0; i < ROWSPERSPLIT; i++) {
190          random.nextBytes(keyBytes);
191          // Ensure that unique tasks generate unique keys
192          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
193          random.nextBytes(valBytes);
194          key = keyBytes;
195          if (multiTableMapper) {
196            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
197          }
198
199          for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) {
200            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
201            context.write(new ImmutableBytesWritable(key), kv);
202          }
203        }
204      }
205    }
206  }
207
208  /**
209   * Simple mapper that makes Put output.
210   */
211  static class RandomPutGeneratingMapper
212      extends Mapper<NullWritable, NullWritable,
213                 ImmutableBytesWritable, Put> {
214
215    private int keyLength;
216    private static final int KEYLEN_DEFAULT = 10;
217    private static final String KEYLEN_CONF = "randomkv.key.length";
218
219    private int valLength;
220    private static final int VALLEN_DEFAULT = 10;
221    private static final String VALLEN_CONF = "randomkv.val.length";
222    private static final byte[] QUALIFIER = Bytes.toBytes("data");
223    private boolean multiTableMapper = false;
224    private TableName[] tables = null;
225
226    @Override
227    protected void setup(Context context) throws IOException,
228            InterruptedException {
229      super.setup(context);
230
231      Configuration conf = context.getConfiguration();
232      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
233      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
234      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
235              false);
236      if (multiTableMapper) {
237        tables = TABLE_NAMES;
238      } else {
239        tables = new TableName[]{TABLE_NAMES[0]};
240      }
241    }
242
243    @Override
244    protected void map(
245            NullWritable n1, NullWritable n2,
246            Mapper<NullWritable, NullWritable,
247                    ImmutableBytesWritable, Put>.Context context)
248            throws java.io.IOException, InterruptedException {
249
250      byte keyBytes[] = new byte[keyLength];
251      byte valBytes[] = new byte[valLength];
252
253      int taskId = context.getTaskAttemptID().getTaskID().getId();
254      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
255
256      Random random = new Random();
257      byte[] key;
258      for (int j = 0; j < tables.length; ++j) {
259        for (int i = 0; i < ROWSPERSPLIT; i++) {
260          random.nextBytes(keyBytes);
261          // Ensure that unique tasks generate unique keys
262          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
263          random.nextBytes(valBytes);
264          key = keyBytes;
265          if (multiTableMapper) {
266            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
267          }
268
269          for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) {
270            Put p = new Put(keyBytes);
271            p.addColumn(family, QUALIFIER, valBytes);
272            // set TTL to very low so that the scan does not return any value
273            p.setTTL(1l);
274            context.write(new ImmutableBytesWritable(key), p);
275          }
276        }
277      }
278    }
279  }
280
281  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
282    if (putSortReducer) {
283      job.setInputFormatClass(NMapInputFormat.class);
284      job.setMapperClass(RandomPutGeneratingMapper.class);
285      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
286      job.setMapOutputValueClass(Put.class);
287    } else {
288      job.setInputFormatClass(NMapInputFormat.class);
289      job.setMapperClass(RandomKVGeneratingMapper.class);
290      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
291      job.setMapOutputValueClass(KeyValue.class);
292    }
293  }
294
295  /**
296   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
297   * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
298   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
299   */
300  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
301  public void test_LATEST_TIMESTAMP_isReplaced()
302  throws Exception {
303    Configuration conf = new Configuration(this.util.getConfiguration());
304    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
305    TaskAttemptContext context = null;
306    Path dir =
307      util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
308    try {
309      Job job = new Job(conf);
310      FileOutputFormat.setOutputPath(job, dir);
311      context = createTestTaskAttemptContext(job);
312      HFileOutputFormat2 hof = new HFileOutputFormat2();
313      writer = hof.getRecordWriter(context);
314      final byte [] b = Bytes.toBytes("b");
315
316      // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
317      // changed by call to write.  Check all in kv is same but ts.
318      KeyValue kv = new KeyValue(b, b, b);
319      KeyValue original = kv.clone();
320      writer.write(new ImmutableBytesWritable(), kv);
321      assertFalse(original.equals(kv));
322      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
323      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
324      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
325      assertNotSame(original.getTimestamp(), kv.getTimestamp());
326      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
327
328      // Test 2. Now test passing a kv that has explicit ts.  It should not be
329      // changed by call to record write.
330      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
331      original = kv.clone();
332      writer.write(new ImmutableBytesWritable(), kv);
333      assertTrue(original.equals(kv));
334    } finally {
335      if (writer != null && context != null) writer.close(context);
336      dir.getFileSystem(conf).delete(dir, true);
337    }
338  }
339
340  private TaskAttemptContext createTestTaskAttemptContext(final Job job)
341  throws Exception {
342    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
343    TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
344      job, "attempt_201402131733_0001_m_000000_0");
345    return context;
346  }
347
348  /*
349   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
350   * metadata used by time-restricted scans.
351   */
352  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
353  public void test_TIMERANGE() throws Exception {
354    Configuration conf = new Configuration(this.util.getConfiguration());
355    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
356    TaskAttemptContext context = null;
357    Path dir =
358      util.getDataTestDir("test_TIMERANGE_present");
359    LOG.info("Timerange dir writing to dir: "+ dir);
360    try {
361      // build a record writer using HFileOutputFormat2
362      Job job = new Job(conf);
363      FileOutputFormat.setOutputPath(job, dir);
364      context = createTestTaskAttemptContext(job);
365      HFileOutputFormat2 hof = new HFileOutputFormat2();
366      writer = hof.getRecordWriter(context);
367
368      // Pass two key values with explicit times stamps
369      final byte [] b = Bytes.toBytes("b");
370
371      // value 1 with timestamp 2000
372      KeyValue kv = new KeyValue(b, b, b, 2000, b);
373      KeyValue original = kv.clone();
374      writer.write(new ImmutableBytesWritable(), kv);
375      assertEquals(original,kv);
376
377      // value 2 with timestamp 1000
378      kv = new KeyValue(b, b, b, 1000, b);
379      original = kv.clone();
380      writer.write(new ImmutableBytesWritable(), kv);
381      assertEquals(original, kv);
382
383      // verify that the file has the proper FileInfo.
384      writer.close(context);
385
386      // the generated file lives 1 directory down from the attempt directory
387      // and is the only file, e.g.
388      // _attempt__0000_r_000000_0/b/1979617994050536795
389      FileSystem fs = FileSystem.get(conf);
390      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
391      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
392      FileStatus[] file = fs.listStatus(sub1[0].getPath());
393
394      // open as HFile Reader and pull out TIMERANGE FileInfo.
395      HFile.Reader rd =
396          HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
397      Map<byte[],byte[]> finfo = rd.loadFileInfo();
398      byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
399      assertNotNull(range);
400
401      // unmarshall and check values.
402      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
403      LOG.info(timeRangeTracker.getMin() +
404          "...." + timeRangeTracker.getMax());
405      assertEquals(1000, timeRangeTracker.getMin());
406      assertEquals(2000, timeRangeTracker.getMax());
407      rd.close();
408    } finally {
409      if (writer != null && context != null) writer.close(context);
410      dir.getFileSystem(conf).delete(dir, true);
411    }
412  }
413
414  /**
415   * Run small MR job.
416   */
417  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
418  public void testWritingPEData() throws Exception {
419    Configuration conf = util.getConfiguration();
420    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
421    FileSystem fs = testDir.getFileSystem(conf);
422
423    // Set down this value or we OOME in eclipse.
424    conf.setInt("mapreduce.task.io.sort.mb", 20);
425    // Write a few files.
426    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
427
428    Job job = new Job(conf, "testWritingPEData");
429    setupRandomGeneratorMapper(job, false);
430    // This partitioner doesn't work well for number keys but using it anyways
431    // just to demonstrate how to configure it.
432    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
433    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
434
435    Arrays.fill(startKey, (byte)0);
436    Arrays.fill(endKey, (byte)0xff);
437
438    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
439    // Set start and end rows for partitioner.
440    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
441    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
442    job.setReducerClass(CellSortReducer.class);
443    job.setOutputFormatClass(HFileOutputFormat2.class);
444    job.setNumReduceTasks(4);
445    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
446        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
447        CellSerialization.class.getName());
448
449    FileOutputFormat.setOutputPath(job, testDir);
450    assertTrue(job.waitForCompletion(false));
451    FileStatus [] files = fs.listStatus(testDir);
452    assertTrue(files.length > 0);
453  }
454
455  /**
456   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
457   * hfile.
458   */
459  @Test
460  public void test_WritingTagData()
461      throws Exception {
462    Configuration conf = new Configuration(this.util.getConfiguration());
463    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
464    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
465    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
466    TaskAttemptContext context = null;
467    Path dir =
468        util.getDataTestDir("WritingTagData");
469    try {
470      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
471      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
472      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
473      Job job = new Job(conf);
474      FileOutputFormat.setOutputPath(job, dir);
475      context = createTestTaskAttemptContext(job);
476      HFileOutputFormat2 hof = new HFileOutputFormat2();
477      writer = hof.getRecordWriter(context);
478      final byte [] b = Bytes.toBytes("b");
479
480      List< Tag > tags = new ArrayList<>();
481      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
482      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
483      writer.write(new ImmutableBytesWritable(), kv);
484      writer.close(context);
485      writer = null;
486      FileSystem fs = dir.getFileSystem(conf);
487      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
488      while(iterator.hasNext()) {
489        LocatedFileStatus keyFileStatus = iterator.next();
490        HFile.Reader reader =
491            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
492        HFileScanner scanner = reader.getScanner(false, false, false);
493        scanner.seekTo();
494        Cell cell = scanner.getCell();
495        List<Tag> tagsFromCell = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
496            cell.getTagsLength());
497        assertTrue(tagsFromCell.size() > 0);
498        for (Tag tag : tagsFromCell) {
499          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
500        }
501      }
502    } finally {
503      if (writer != null && context != null) writer.close(context);
504      dir.getFileSystem(conf).delete(dir, true);
505    }
506  }
507
508  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
509  public void testJobConfiguration() throws Exception {
510    Configuration conf = new Configuration(this.util.getConfiguration());
511    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
512        .toString());
513    Job job = new Job(conf);
514    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
515    Table table = Mockito.mock(Table.class);
516    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
517    setupMockStartKeys(regionLocator);
518    setupMockTableName(regionLocator);
519    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
520    assertEquals(job.getNumReduceTasks(), 4);
521  }
522
523  private byte [][] generateRandomStartKeys(int numKeys) {
524    Random random = new Random();
525    byte[][] ret = new byte[numKeys][];
526    // first region start key is always empty
527    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
528    for (int i = 1; i < numKeys; i++) {
529      ret[i] =
530        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
531    }
532    return ret;
533  }
534
535  private byte[][] generateRandomSplitKeys(int numKeys) {
536    Random random = new Random();
537    byte[][] ret = new byte[numKeys][];
538    for (int i = 0; i < numKeys; i++) {
539      ret[i] =
540          PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
541    }
542    return ret;
543  }
544
545  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
546  public void testMRIncrementalLoad() throws Exception {
547    LOG.info("\nStarting test testMRIncrementalLoad\n");
548    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
549  }
550
551  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
552  public void testMRIncrementalLoadWithSplit() throws Exception {
553    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
554    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
555  }
556
557  /**
558   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
559   * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
560   * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
561   * it's not possible to check the region locality by comparing region locations and DN hostnames.
562   * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
563   * we could test region locality features more easily.
564   */
565  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
566  public void testMRIncrementalLoadWithLocality() throws Exception {
567    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
568    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
569    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
570  }
571
572  //@Ignore("Wahtevs")
573  @Test
574  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
575    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
576    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
577  }
578
579  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
580                                     boolean putSortReducer, String tableStr) throws Exception {
581      doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
582              Arrays.asList(tableStr));
583  }
584
585  @Test
586  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
587    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
588    doIncrementalLoadTest(false, false, true,
589            Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
590                    ()));
591  }
592
593  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
594      boolean putSortReducer, List<String> tableStr) throws Exception {
595    util = new HBaseTestingUtility();
596    Configuration conf = util.getConfiguration();
597    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
598    int hostCount = 1;
599    int regionNum = 5;
600    if (shouldKeepLocality) {
601      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
602      // explicit hostnames parameter just like MiniDFSCluster does.
603      hostCount = 3;
604      regionNum = 20;
605    }
606
607    String[] hostnames = new String[hostCount];
608    for (int i = 0; i < hostCount; ++i) {
609      hostnames[i] = "datanode_" + i;
610    }
611    util.startMiniCluster(1, hostCount, hostnames);
612
613    Map<String, Table> allTables = new HashMap<>(tableStr.size());
614    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
615    boolean writeMultipleTables = tableStr.size() > 1;
616    for (String tableStrSingle : tableStr) {
617      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
618      TableName tableName = TableName.valueOf(tableStrSingle);
619      Table table = util.createTable(tableName, FAMILIES, splitKeys);
620
621      RegionLocator r = util.getConnection().getRegionLocator(tableName);
622      assertEquals("Should start with empty table", 0, util.countRows(table));
623      int numRegions = r.getStartKeys().length;
624      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
625
626      allTables.put(tableStrSingle, table);
627      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
628    }
629    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
630    // Generate the bulk load files
631    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
632
633    for (Table tableSingle : allTables.values()) {
634      // This doesn't write into the table, just makes files
635      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
636    }
637    int numTableDirs = 0;
638    for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
639      Path tablePath = testDir;
640
641      if (writeMultipleTables) {
642        if (allTables.containsKey(tf.getPath().getName())) {
643          ++numTableDirs;
644          tablePath = tf.getPath();
645        }
646        else {
647          continue;
648        }
649      }
650
651      // Make sure that a directory was created for every CF
652      int dir = 0;
653      for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
654        for (byte[] family : FAMILIES) {
655          if (Bytes.toString(family).equals(f.getPath().getName())) {
656            ++dir;
657          }
658        }
659      }
660      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
661    }
662    if (writeMultipleTables) {
663      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
664    }
665
666    Admin admin = util.getConnection().getAdmin();
667    try {
668      // handle the split case
669      if (shouldChangeRegions) {
670        Table chosenTable = allTables.values().iterator().next();
671        // Choose a semi-random table if multiple tables are available
672        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
673        admin.disableTable(chosenTable.getName());
674        util.waitUntilNoRegionsInTransition();
675
676        util.deleteTable(chosenTable.getName());
677        byte[][] newSplitKeys = generateRandomSplitKeys(14);
678        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
679
680        while (util.getConnection().getRegionLocator(chosenTable.getName())
681                .getAllRegionLocations().size() != 15 ||
682                !admin.isTableAvailable(table.getName())) {
683          Thread.sleep(200);
684          LOG.info("Waiting for new region assignment to happen");
685        }
686      }
687
688      // Perform the actual load
689      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
690        Path tableDir = testDir;
691        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
692        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
693        if (writeMultipleTables) {
694          tableDir = new Path(testDir, tableNameStr);
695        }
696        Table currentTable = allTables.get(tableNameStr);
697        TableName currentTableName = currentTable.getName();
698        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
699                .getRegionLocator());
700
701        // Ensure data shows up
702        int expectedRows = 0;
703        if (putSortReducer) {
704          // no rows should be extracted
705          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
706                  util.countRows(currentTable));
707        } else {
708          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
709          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
710                  util.countRows(currentTable));
711          Scan scan = new Scan();
712          ResultScanner results = currentTable.getScanner(scan);
713          for (Result res : results) {
714            assertEquals(FAMILIES.length, res.rawCells().length);
715            Cell first = res.rawCells()[0];
716            for (Cell kv : res.rawCells()) {
717              assertTrue(CellUtil.matchingRows(first, kv));
718              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
719            }
720          }
721          results.close();
722        }
723        String tableDigestBefore = util.checksumRows(currentTable);
724        // Check region locality
725        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
726        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
727          hbd.add(region.getHDFSBlocksDistribution());
728        }
729        for (String hostname : hostnames) {
730          float locality = hbd.getBlockLocalityIndex(hostname);
731          LOG.info("locality of [" + hostname + "]: " + locality);
732          assertEquals(100, (int) (locality * 100));
733        }
734
735        // Cause regions to reopen
736        admin.disableTable(currentTableName);
737        while (!admin.isTableDisabled(currentTableName)) {
738          Thread.sleep(200);
739          LOG.info("Waiting for table to disable");
740        }
741        admin.enableTable(currentTableName);
742        util.waitTableAvailable(currentTableName);
743        assertEquals("Data should remain after reopening of regions",
744                tableDigestBefore, util.checksumRows(currentTable));
745      }
746    } finally {
747      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
748          tableInfoSingle.getRegionLocator().close();
749      }
750      for (Entry<String, Table> singleTable : allTables.entrySet() ) {
751        singleTable.getValue().close();
752        util.deleteTable(singleTable.getValue().getName());
753      }
754      testDir.getFileSystem(conf).delete(testDir, true);
755      util.shutdownMiniCluster();
756    }
757  }
758
759  private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
760                                    boolean putSortReducer) throws IOException,
761          InterruptedException, ClassNotFoundException {
762    Job job = new Job(conf, "testLocalMRIncrementalLoad");
763    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
764    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
765        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
766        CellSerialization.class.getName());
767    setupRandomGeneratorMapper(job, putSortReducer);
768    if (tableInfo.size() > 1) {
769      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
770      int sum = 0;
771      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
772        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
773      }
774      assertEquals(sum, job.getNumReduceTasks());
775    }
776    else {
777      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
778      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
779              regionLocator);
780      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
781    }
782
783    FileOutputFormat.setOutputPath(job, outDir);
784
785    assertFalse(util.getTestFileSystem().exists(outDir)) ;
786
787    assertTrue(job.waitForCompletion(true));
788  }
789
790  /**
791   * Test for {@link HFileOutputFormat2#configureCompression(Configuration, HTableDescriptor)} and
792   * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}.
793   * Tests that the compression map is correctly serialized into
794   * and deserialized from configuration
795   *
796   * @throws IOException
797   */
798  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
799  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
800    for (int numCfs = 0; numCfs <= 3; numCfs++) {
801      Configuration conf = new Configuration(this.util.getConfiguration());
802      Map<String, Compression.Algorithm> familyToCompression =
803          getMockColumnFamiliesForCompression(numCfs);
804      Table table = Mockito.mock(Table.class);
805      setupMockColumnFamiliesForCompression(table, familyToCompression);
806      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
807              HFileOutputFormat2.serializeColumnFamilyAttribute
808                      (HFileOutputFormat2.compressionDetails,
809                              Arrays.asList(table.getTableDescriptor())));
810
811      // read back family specific compression setting from the configuration
812      Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
813          .createFamilyCompressionMap(conf);
814
815      // test that we have a value for all column families that matches with the
816      // used mock values
817      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
818        assertEquals("Compression configuration incorrect for column family:"
819            + entry.getKey(), entry.getValue(),
820            retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
821      }
822    }
823  }
824
825  private void setupMockColumnFamiliesForCompression(Table table,
826      Map<String, Compression.Algorithm> familyToCompression) throws IOException {
827    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
828    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
829      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
830          .setMaxVersions(1)
831          .setCompressionType(entry.getValue())
832          .setBlockCacheEnabled(false)
833          .setTimeToLive(0));
834    }
835    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
836  }
837
838  /**
839   * @return a map from column family names to compression algorithms for
840   *         testing column family compression. Column family names have special characters
841   */
842  private Map<String, Compression.Algorithm>
843      getMockColumnFamiliesForCompression (int numCfs) {
844    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
845    // use column family names having special characters
846    if (numCfs-- > 0) {
847      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
848    }
849    if (numCfs-- > 0) {
850      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
851    }
852    if (numCfs-- > 0) {
853      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
854    }
855    if (numCfs-- > 0) {
856      familyToCompression.put("Family3", Compression.Algorithm.NONE);
857    }
858    return familyToCompression;
859  }
860
861
862  /**
863   * Test for {@link HFileOutputFormat2#configureBloomType(HTableDescriptor, Configuration)} and
864   * {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}.
865   * Tests that the compression map is correctly serialized into
866   * and deserialized from configuration
867   *
868   * @throws IOException
869   */
870  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
871  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
872    for (int numCfs = 0; numCfs <= 2; numCfs++) {
873      Configuration conf = new Configuration(this.util.getConfiguration());
874      Map<String, BloomType> familyToBloomType =
875          getMockColumnFamiliesForBloomType(numCfs);
876      Table table = Mockito.mock(Table.class);
877      setupMockColumnFamiliesForBloomType(table,
878          familyToBloomType);
879      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
880              HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
881              Arrays.asList(table.getTableDescriptor())));
882
883      // read back family specific data block encoding settings from the
884      // configuration
885      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
886          HFileOutputFormat2
887              .createFamilyBloomTypeMap(conf);
888
889      // test that we have a value for all column families that matches with the
890      // used mock values
891      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
892        assertEquals("BloomType configuration incorrect for column family:"
893            + entry.getKey(), entry.getValue(),
894            retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
895      }
896    }
897  }
898
899  private void setupMockColumnFamiliesForBloomType(Table table,
900      Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
901    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
902    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
903      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
904          .setMaxVersions(1)
905          .setBloomFilterType(entry.getValue())
906          .setBlockCacheEnabled(false)
907          .setTimeToLive(0));
908    }
909    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
910  }
911
912  /**
913   * @return a map from column family names to compression algorithms for
914   *         testing column family compression. Column family names have special characters
915   */
916  private Map<String, BloomType>
917  getMockColumnFamiliesForBloomType (int numCfs) {
918    Map<String, BloomType> familyToBloomType = new HashMap<>();
919    // use column family names having special characters
920    if (numCfs-- > 0) {
921      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
922    }
923    if (numCfs-- > 0) {
924      familyToBloomType.put("Family2=asdads&!AASD",
925          BloomType.ROWCOL);
926    }
927    if (numCfs-- > 0) {
928      familyToBloomType.put("Family3", BloomType.NONE);
929    }
930    return familyToBloomType;
931  }
932
933  /**
934   * Test for {@link HFileOutputFormat2#configureBlockSize(HTableDescriptor, Configuration)} and
935   * {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}.
936   * Tests that the compression map is correctly serialized into
937   * and deserialized from configuration
938   *
939   * @throws IOException
940   */
941  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
942  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
943    for (int numCfs = 0; numCfs <= 3; numCfs++) {
944      Configuration conf = new Configuration(this.util.getConfiguration());
945      Map<String, Integer> familyToBlockSize =
946          getMockColumnFamiliesForBlockSize(numCfs);
947      Table table = Mockito.mock(Table.class);
948      setupMockColumnFamiliesForBlockSize(table,
949          familyToBlockSize);
950      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
951              HFileOutputFormat2.serializeColumnFamilyAttribute
952                      (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
953                              .getTableDescriptor())));
954
955      // read back family specific data block encoding settings from the
956      // configuration
957      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
958          HFileOutputFormat2
959              .createFamilyBlockSizeMap(conf);
960
961      // test that we have a value for all column families that matches with the
962      // used mock values
963      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
964          ) {
965        assertEquals("BlockSize configuration incorrect for column family:"
966            + entry.getKey(), entry.getValue(),
967            retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
968      }
969    }
970  }
971
972  private void setupMockColumnFamiliesForBlockSize(Table table,
973      Map<String, Integer> familyToDataBlockEncoding) throws IOException {
974    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
975    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
976      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
977          .setMaxVersions(1)
978          .setBlocksize(entry.getValue())
979          .setBlockCacheEnabled(false)
980          .setTimeToLive(0));
981    }
982    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
983  }
984
985  /**
986   * @return a map from column family names to compression algorithms for
987   *         testing column family compression. Column family names have special characters
988   */
989  private Map<String, Integer>
990  getMockColumnFamiliesForBlockSize (int numCfs) {
991    Map<String, Integer> familyToBlockSize = new HashMap<>();
992    // use column family names having special characters
993    if (numCfs-- > 0) {
994      familyToBlockSize.put("Family1!@#!@#&", 1234);
995    }
996    if (numCfs-- > 0) {
997      familyToBlockSize.put("Family2=asdads&!AASD",
998          Integer.MAX_VALUE);
999    }
1000    if (numCfs-- > 0) {
1001      familyToBlockSize.put("Family2=asdads&!AASD",
1002          Integer.MAX_VALUE);
1003    }
1004    if (numCfs-- > 0) {
1005      familyToBlockSize.put("Family3", 0);
1006    }
1007    return familyToBlockSize;
1008  }
1009
1010  /**
1011   * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)}
1012   * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
1013   * Tests that the compression map is correctly serialized into
1014   * and deserialized from configuration
1015   *
1016   * @throws IOException
1017   */
1018  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1019  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1020    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1021      Configuration conf = new Configuration(this.util.getConfiguration());
1022      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1023          getMockColumnFamiliesForDataBlockEncoding(numCfs);
1024      Table table = Mockito.mock(Table.class);
1025      setupMockColumnFamiliesForDataBlockEncoding(table,
1026          familyToDataBlockEncoding);
1027      HTableDescriptor tableDescriptor = table.getTableDescriptor();
1028      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1029              HFileOutputFormat2.serializeColumnFamilyAttribute
1030                      (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
1031                      .asList(tableDescriptor)));
1032
1033      // read back family specific data block encoding settings from the
1034      // configuration
1035      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1036          HFileOutputFormat2
1037          .createFamilyDataBlockEncodingMap(conf);
1038
1039      // test that we have a value for all column families that matches with the
1040      // used mock values
1041      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1042        assertEquals("DataBlockEncoding configuration incorrect for column family:"
1043            + entry.getKey(), entry.getValue(),
1044            retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
1045      }
1046    }
1047  }
1048
1049  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1050      Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1051    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1052    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1053      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
1054          .setMaxVersions(1)
1055          .setDataBlockEncoding(entry.getValue())
1056          .setBlockCacheEnabled(false)
1057          .setTimeToLive(0));
1058    }
1059    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1060  }
1061
1062  /**
1063   * @return a map from column family names to compression algorithms for
1064   *         testing column family compression. Column family names have special characters
1065   */
1066  private Map<String, DataBlockEncoding>
1067      getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
1068    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1069    // use column family names having special characters
1070    if (numCfs-- > 0) {
1071      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1072    }
1073    if (numCfs-- > 0) {
1074      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1075          DataBlockEncoding.FAST_DIFF);
1076    }
1077    if (numCfs-- > 0) {
1078      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1079          DataBlockEncoding.PREFIX);
1080    }
1081    if (numCfs-- > 0) {
1082      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1083    }
1084    return familyToDataBlockEncoding;
1085  }
1086
1087  private void setupMockStartKeys(RegionLocator table) throws IOException {
1088    byte[][] mockKeys = new byte[][] {
1089        HConstants.EMPTY_BYTE_ARRAY,
1090        Bytes.toBytes("aaa"),
1091        Bytes.toBytes("ggg"),
1092        Bytes.toBytes("zzz")
1093    };
1094    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1095  }
1096
1097  private void setupMockTableName(RegionLocator table) throws IOException {
1098    TableName mockTableName = TableName.valueOf("mock_table");
1099    Mockito.doReturn(mockTableName).when(table).getName();
1100  }
1101
1102  /**
1103   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
1104   * bloom filter settings from the column family descriptor
1105   */
1106  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1107  public void testColumnFamilySettings() throws Exception {
1108    Configuration conf = new Configuration(this.util.getConfiguration());
1109    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1110    TaskAttemptContext context = null;
1111    Path dir = util.getDataTestDir("testColumnFamilySettings");
1112
1113    // Setup table descriptor
1114    Table table = Mockito.mock(Table.class);
1115    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1116    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1117    Mockito.doReturn(htd).when(table).getTableDescriptor();
1118    for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
1119      htd.addFamily(hcd);
1120    }
1121
1122    // set up the table to return some mock keys
1123    setupMockStartKeys(regionLocator);
1124
1125    try {
1126      // partial map red setup to get an operational writer for testing
1127      // We turn off the sequence file compression, because DefaultCodec
1128      // pollutes the GZip codec pool with an incompatible compressor.
1129      conf.set("io.seqfile.compression.type", "NONE");
1130      conf.set("hbase.fs.tmp.dir", dir.toString());
1131      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1132      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1133
1134      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1135      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1136      setupRandomGeneratorMapper(job, false);
1137      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1138      FileOutputFormat.setOutputPath(job, dir);
1139      context = createTestTaskAttemptContext(job);
1140      HFileOutputFormat2 hof = new HFileOutputFormat2();
1141      writer = hof.getRecordWriter(context);
1142
1143      // write out random rows
1144      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1145      writer.close(context);
1146
1147      // Make sure that a directory was created for every CF
1148      FileSystem fs = dir.getFileSystem(conf);
1149
1150      // commit so that the filesystem has one directory per column family
1151      hof.getOutputCommitter(context).commitTask(context);
1152      hof.getOutputCommitter(context).commitJob(context);
1153      FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1154      assertEquals(htd.getFamilies().size(), families.length);
1155      for (FileStatus f : families) {
1156        String familyStr = f.getPath().getName();
1157        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1158        // verify that the compression on this file matches the configured
1159        // compression
1160        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1161        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1162        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1163
1164        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1165        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1166        assertEquals("Incorrect bloom filter used for column family " + familyStr +
1167          "(reader: " + reader + ")",
1168          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1169        assertEquals("Incorrect compression used for column family " + familyStr +
1170          "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression());
1171      }
1172    } finally {
1173      dir.getFileSystem(conf).delete(dir, true);
1174    }
1175  }
1176
1177  /**
1178   * Write random values to the writer assuming a table created using
1179   * {@link #FAMILIES} as column family descriptors
1180   */
1181  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1182      TaskAttemptContext context, Set<byte[]> families, int numRows)
1183      throws IOException, InterruptedException {
1184    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1185    int valLength = 10;
1186    byte valBytes[] = new byte[valLength];
1187
1188    int taskId = context.getTaskAttemptID().getTaskID().getId();
1189    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1190    final byte [] qualifier = Bytes.toBytes("data");
1191    Random random = new Random();
1192    for (int i = 0; i < numRows; i++) {
1193
1194      Bytes.putInt(keyBytes, 0, i);
1195      random.nextBytes(valBytes);
1196      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1197
1198      for (byte[] family : families) {
1199        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1200        writer.write(key, kv);
1201      }
1202    }
1203  }
1204
1205  /**
1206   * This test is to test the scenario happened in HBASE-6901.
1207   * All files are bulk loaded and excluded from minor compaction.
1208   * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
1209   * will be thrown.
1210   */
1211  @Ignore ("Flakey: See HBASE-9051") @Test
1212  public void testExcludeAllFromMinorCompaction() throws Exception {
1213    Configuration conf = util.getConfiguration();
1214    conf.setInt("hbase.hstore.compaction.min", 2);
1215    generateRandomStartKeys(5);
1216
1217    util.startMiniCluster();
1218    try (Connection conn = ConnectionFactory.createConnection();
1219        Admin admin = conn.getAdmin();
1220        Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1221        RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1222      final FileSystem fs = util.getDFSCluster().getFileSystem();
1223      assertEquals("Should start with empty table", 0, util.countRows(table));
1224
1225      // deep inspection: get the StoreFile dir
1226      final Path storePath = new Path(
1227        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1228          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1229            Bytes.toString(FAMILIES[0])));
1230      assertEquals(0, fs.listStatus(storePath).length);
1231
1232      // Generate two bulk load files
1233      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1234          true);
1235
1236      for (int i = 0; i < 2; i++) {
1237        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1238        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1239                .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
1240        // Perform the actual load
1241        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1242      }
1243
1244      // Ensure data shows up
1245      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1246      assertEquals("LoadIncrementalHFiles should put expected data in table",
1247          expectedRows, util.countRows(table));
1248
1249      // should have a second StoreFile now
1250      assertEquals(2, fs.listStatus(storePath).length);
1251
1252      // minor compactions shouldn't get rid of the file
1253      admin.compact(TABLE_NAMES[0]);
1254      try {
1255        quickPoll(new Callable<Boolean>() {
1256          @Override
1257          public Boolean call() throws Exception {
1258            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1259            for (HRegion region : regions) {
1260              for (HStore store : region.getStores()) {
1261                store.closeAndArchiveCompactedFiles();
1262              }
1263            }
1264            return fs.listStatus(storePath).length == 1;
1265          }
1266        }, 5000);
1267        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1268      } catch (AssertionError ae) {
1269        // this is expected behavior
1270      }
1271
1272      // a major compaction should work though
1273      admin.majorCompact(TABLE_NAMES[0]);
1274      quickPoll(new Callable<Boolean>() {
1275        @Override
1276        public Boolean call() throws Exception {
1277          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1278          for (HRegion region : regions) {
1279            for (HStore store : region.getStores()) {
1280              store.closeAndArchiveCompactedFiles();
1281            }
1282          }
1283          return fs.listStatus(storePath).length == 1;
1284        }
1285      }, 5000);
1286
1287    } finally {
1288      util.shutdownMiniCluster();
1289    }
1290  }
1291
1292  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1293  public void testExcludeMinorCompaction() throws Exception {
1294    Configuration conf = util.getConfiguration();
1295    conf.setInt("hbase.hstore.compaction.min", 2);
1296    generateRandomStartKeys(5);
1297
1298    util.startMiniCluster();
1299    try (Connection conn = ConnectionFactory.createConnection(conf);
1300        Admin admin = conn.getAdmin()){
1301      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1302      final FileSystem fs = util.getDFSCluster().getFileSystem();
1303      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1304      assertEquals("Should start with empty table", 0, util.countRows(table));
1305
1306      // deep inspection: get the StoreFile dir
1307      final Path storePath = new Path(
1308        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1309          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1310            Bytes.toString(FAMILIES[0])));
1311      assertEquals(0, fs.listStatus(storePath).length);
1312
1313      // put some data in it and flush to create a storefile
1314      Put p = new Put(Bytes.toBytes("test"));
1315      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1316      table.put(p);
1317      admin.flush(TABLE_NAMES[0]);
1318      assertEquals(1, util.countRows(table));
1319      quickPoll(new Callable<Boolean>() {
1320        @Override
1321        public Boolean call() throws Exception {
1322          return fs.listStatus(storePath).length == 1;
1323        }
1324      }, 5000);
1325
1326      // Generate a bulk load file with more rows
1327      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1328          true);
1329
1330      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1331      runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1332                      .getTableDescriptor(), regionLocator)), testDir, false);
1333
1334      // Perform the actual load
1335      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1336
1337      // Ensure data shows up
1338      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1339      assertEquals("LoadIncrementalHFiles should put expected data in table",
1340          expectedRows + 1, util.countRows(table));
1341
1342      // should have a second StoreFile now
1343      assertEquals(2, fs.listStatus(storePath).length);
1344
1345      // minor compactions shouldn't get rid of the file
1346      admin.compact(TABLE_NAMES[0]);
1347      try {
1348        quickPoll(new Callable<Boolean>() {
1349          @Override
1350          public Boolean call() throws Exception {
1351            return fs.listStatus(storePath).length == 1;
1352          }
1353        }, 5000);
1354        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1355      } catch (AssertionError ae) {
1356        // this is expected behavior
1357      }
1358
1359      // a major compaction should work though
1360      admin.majorCompact(TABLE_NAMES[0]);
1361      quickPoll(new Callable<Boolean>() {
1362        @Override
1363        public Boolean call() throws Exception {
1364          return fs.listStatus(storePath).length == 1;
1365        }
1366      }, 5000);
1367
1368    } finally {
1369      util.shutdownMiniCluster();
1370    }
1371  }
1372
1373  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1374    int sleepMs = 10;
1375    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1376    while (retries-- > 0) {
1377      if (c.call().booleanValue()) {
1378        return;
1379      }
1380      Thread.sleep(sleepMs);
1381    }
1382    fail();
1383  }
1384
1385  public static void main(String args[]) throws Exception {
1386    new TestCellBasedHFileOutputFormat2().manualTest(args);
1387  }
1388
1389  public void manualTest(String args[]) throws Exception {
1390    Configuration conf = HBaseConfiguration.create();
1391    util = new HBaseTestingUtility(conf);
1392    if ("newtable".equals(args[0])) {
1393      TableName tname = TableName.valueOf(args[1]);
1394      byte[][] splitKeys = generateRandomSplitKeys(4);
1395      Table table = util.createTable(tname, FAMILIES, splitKeys);
1396    } else if ("incremental".equals(args[0])) {
1397      TableName tname = TableName.valueOf(args[1]);
1398      try(Connection c = ConnectionFactory.createConnection(conf);
1399          Admin admin = c.getAdmin();
1400          RegionLocator regionLocator = c.getRegionLocator(tname)) {
1401        Path outDir = new Path("incremental-out");
1402        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
1403                .getTableDescriptor(tname), regionLocator)), outDir, false);
1404      }
1405    } else {
1406      throw new RuntimeException(
1407          "usage: TestHFileOutputFormat2 newtable | incremental");
1408    }
1409  }
1410
1411  @Test
1412  public void testBlockStoragePolicy() throws Exception {
1413    util = new HBaseTestingUtility();
1414    Configuration conf = util.getConfiguration();
1415    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1416
1417    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
1418            Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
1419                    TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
1420    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1421    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1422    util.startMiniDFSCluster(3);
1423    FileSystem fs = util.getDFSCluster().getFileSystem();
1424    try {
1425      fs.mkdirs(cf1Dir);
1426      fs.mkdirs(cf2Dir);
1427
1428      // the original block storage policy would be HOT
1429      String spA = getStoragePolicyName(fs, cf1Dir);
1430      String spB = getStoragePolicyName(fs, cf2Dir);
1431      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1432      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1433      assertEquals("HOT", spA);
1434      assertEquals("HOT", spB);
1435
1436      // alter table cf schema to change storage policies
1437      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1438              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1439      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1440              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1441      spA = getStoragePolicyName(fs, cf1Dir);
1442      spB = getStoragePolicyName(fs, cf2Dir);
1443      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1444      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1445      assertNotNull(spA);
1446      assertEquals("ONE_SSD", spA);
1447      assertNotNull(spB);
1448      assertEquals("ALL_SSD", spB);
1449    } finally {
1450      fs.delete(cf1Dir, true);
1451      fs.delete(cf2Dir, true);
1452      util.shutdownMiniDFSCluster();
1453    }
1454  }
1455
1456  private String getStoragePolicyName(FileSystem fs, Path path) {
1457    try {
1458      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1459      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1460    } catch (Exception e) {
1461      // Maybe fail because of using old HDFS version, try the old way
1462      if (LOG.isTraceEnabled()) {
1463        LOG.trace("Failed to get policy directly", e);
1464      }
1465      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1466      return policy == null ? "HOT" : policy;// HOT by default
1467    }
1468  }
1469
1470  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1471    try {
1472      if (fs instanceof DistributedFileSystem) {
1473        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1474        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1475        if (null != status) {
1476          byte storagePolicyId = status.getStoragePolicy();
1477          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1478          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1479            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1480            for (BlockStoragePolicy policy : policies) {
1481              if (policy.getId() == storagePolicyId) {
1482                return policy.getName();
1483              }
1484            }
1485          }
1486        }
1487      }
1488    } catch (Throwable e) {
1489      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1490    }
1491
1492    return null;
1493  }
1494}
1495