001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Random;
037import java.util.Set;
038import java.util.concurrent.Callable;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.LocatedFileStatus;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.RemoteIterator;
047import org.apache.hadoop.hbase.ArrayBackedTag;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellUtil;
050import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.HColumnDescriptor;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HDFSBlocksDistribution;
057import org.apache.hadoop.hbase.HTableDescriptor;
058import org.apache.hadoop.hbase.HadoopShims;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.PerformanceEvaluation;
061import org.apache.hadoop.hbase.PrivateCellUtil;
062import org.apache.hadoop.hbase.StartMiniClusterOption;
063import org.apache.hadoop.hbase.TableName;
064import org.apache.hadoop.hbase.Tag;
065import org.apache.hadoop.hbase.TagType;
066import org.apache.hadoop.hbase.client.Admin;
067import org.apache.hadoop.hbase.client.Connection;
068import org.apache.hadoop.hbase.client.ConnectionFactory;
069import org.apache.hadoop.hbase.client.Put;
070import org.apache.hadoop.hbase.client.RegionLocator;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.Scan;
074import org.apache.hadoop.hbase.client.Table;
075import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
076import org.apache.hadoop.hbase.io.compress.Compression;
077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
079import org.apache.hadoop.hbase.io.hfile.CacheConfig;
080import org.apache.hadoop.hbase.io.hfile.HFile;
081import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
082import org.apache.hadoop.hbase.io.hfile.HFileScanner;
083import org.apache.hadoop.hbase.regionserver.BloomType;
084import org.apache.hadoop.hbase.regionserver.HRegion;
085import org.apache.hadoop.hbase.regionserver.HStore;
086import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
087import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
088import org.apache.hadoop.hbase.testclassification.LargeTests;
089import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
090import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
091import org.apache.hadoop.hbase.util.Bytes;
092import org.apache.hadoop.hbase.util.CommonFSUtils;
093import org.apache.hadoop.hbase.util.FSUtils;
094import org.apache.hadoop.hbase.util.ReflectionUtils;
095import org.apache.hadoop.hdfs.DistributedFileSystem;
096import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
097import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
098import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
099import org.apache.hadoop.io.NullWritable;
100import org.apache.hadoop.mapreduce.Job;
101import org.apache.hadoop.mapreduce.Mapper;
102import org.apache.hadoop.mapreduce.RecordWriter;
103import org.apache.hadoop.mapreduce.TaskAttemptContext;
104import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
105import org.junit.ClassRule;
106import org.junit.Ignore;
107import org.junit.Test;
108import org.junit.experimental.categories.Category;
109import org.mockito.Mockito;
110import org.slf4j.Logger;
111import org.slf4j.LoggerFactory;
112
113/**
114 * Simple test for {@link HFileOutputFormat2}.
115 * Sets up and runs a mapreduce job that writes hfile output.
116 * Creates a few inner classes to implement splits and an inputformat that
117 * emits keys and values like those of {@link PerformanceEvaluation}.
118 */
119@Category({VerySlowMapReduceTests.class, LargeTests.class})
120//TODO : Remove this in 3.0
121public class TestHFileOutputFormat2  {
122
123  @ClassRule
124  public static final HBaseClassTestRule CLASS_RULE =
125      HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
126
127  private final static int ROWSPERSPLIT = 1024;
128
129  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
130  private static final byte[][] FAMILIES = {
131    Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))};
132  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
133          "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
134
135  private HBaseTestingUtility util = new HBaseTestingUtility();
136
137  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
138
139  /**
140   * Simple mapper that makes KeyValue output.
141   */
142  static class RandomKVGeneratingMapper
143      extends Mapper<NullWritable, NullWritable,
144                 ImmutableBytesWritable, Cell> {
145
146    private int keyLength;
147    private static final int KEYLEN_DEFAULT=10;
148    private static final String KEYLEN_CONF="randomkv.key.length";
149
150    private int valLength;
151    private static final int VALLEN_DEFAULT=10;
152    private static final String VALLEN_CONF="randomkv.val.length";
153    private static final byte [] QUALIFIER = Bytes.toBytes("data");
154    private boolean multiTableMapper = false;
155    private TableName[] tables = null;
156
157
158    @Override
159    protected void setup(Context context) throws IOException,
160        InterruptedException {
161      super.setup(context);
162
163      Configuration conf = context.getConfiguration();
164      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
165      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
166      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
167              false);
168      if (multiTableMapper) {
169        tables = TABLE_NAMES;
170      } else {
171        tables = new TableName[]{TABLE_NAMES[0]};
172      }
173    }
174
175    @Override
176    protected void map(
177        NullWritable n1, NullWritable n2,
178        Mapper<NullWritable, NullWritable,
179               ImmutableBytesWritable,Cell>.Context context)
180        throws java.io.IOException ,InterruptedException
181    {
182
183      byte keyBytes[] = new byte[keyLength];
184      byte valBytes[] = new byte[valLength];
185
186      int taskId = context.getTaskAttemptID().getTaskID().getId();
187      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
188      Random random = new Random();
189      byte[] key;
190      for (int j = 0; j < tables.length; ++j) {
191        for (int i = 0; i < ROWSPERSPLIT; i++) {
192          random.nextBytes(keyBytes);
193          // Ensure that unique tasks generate unique keys
194          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
195          random.nextBytes(valBytes);
196          key = keyBytes;
197          if (multiTableMapper) {
198            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
199          }
200
201          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
202            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
203            context.write(new ImmutableBytesWritable(key), kv);
204          }
205        }
206      }
207    }
208  }
209
210  /**
211   * Simple mapper that makes Put output.
212   */
213  static class RandomPutGeneratingMapper
214      extends Mapper<NullWritable, NullWritable,
215                 ImmutableBytesWritable, Put> {
216
217    private int keyLength;
218    private static final int KEYLEN_DEFAULT = 10;
219    private static final String KEYLEN_CONF = "randomkv.key.length";
220
221    private int valLength;
222    private static final int VALLEN_DEFAULT = 10;
223    private static final String VALLEN_CONF = "randomkv.val.length";
224    private static final byte[] QUALIFIER = Bytes.toBytes("data");
225    private boolean multiTableMapper = false;
226    private TableName[] tables = null;
227
228    @Override
229    protected void setup(Context context) throws IOException,
230            InterruptedException {
231      super.setup(context);
232
233      Configuration conf = context.getConfiguration();
234      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
235      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
236      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
237              false);
238      if (multiTableMapper) {
239        tables = TABLE_NAMES;
240      } else {
241        tables = new TableName[]{TABLE_NAMES[0]};
242      }
243    }
244
245    @Override
246    protected void map(
247            NullWritable n1, NullWritable n2,
248            Mapper<NullWritable, NullWritable,
249                    ImmutableBytesWritable, Put>.Context context)
250            throws java.io.IOException, InterruptedException {
251
252      byte keyBytes[] = new byte[keyLength];
253      byte valBytes[] = new byte[valLength];
254
255      int taskId = context.getTaskAttemptID().getTaskID().getId();
256      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
257
258      Random random = new Random();
259      byte[] key;
260      for (int j = 0; j < tables.length; ++j) {
261        for (int i = 0; i < ROWSPERSPLIT; i++) {
262          random.nextBytes(keyBytes);
263          // Ensure that unique tasks generate unique keys
264          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
265          random.nextBytes(valBytes);
266          key = keyBytes;
267          if (multiTableMapper) {
268            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
269          }
270
271          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
272            Put p = new Put(keyBytes);
273            p.addColumn(family, QUALIFIER, valBytes);
274            // set TTL to very low so that the scan does not return any value
275            p.setTTL(1l);
276            context.write(new ImmutableBytesWritable(key), p);
277          }
278        }
279      }
280    }
281  }
282
283  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
284    if (putSortReducer) {
285      job.setInputFormatClass(NMapInputFormat.class);
286      job.setMapperClass(RandomPutGeneratingMapper.class);
287      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
288      job.setMapOutputValueClass(Put.class);
289    } else {
290      job.setInputFormatClass(NMapInputFormat.class);
291      job.setMapperClass(RandomKVGeneratingMapper.class);
292      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
293      job.setMapOutputValueClass(KeyValue.class);
294    }
295  }
296
297  /**
298   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
299   * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
300   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
301   */
302  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
303  public void test_LATEST_TIMESTAMP_isReplaced()
304  throws Exception {
305    Configuration conf = new Configuration(this.util.getConfiguration());
306    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
307    TaskAttemptContext context = null;
308    Path dir =
309      util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
310    try {
311      Job job = new Job(conf);
312      FileOutputFormat.setOutputPath(job, dir);
313      context = createTestTaskAttemptContext(job);
314      HFileOutputFormat2 hof = new HFileOutputFormat2();
315      writer = hof.getRecordWriter(context);
316      final byte [] b = Bytes.toBytes("b");
317
318      // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
319      // changed by call to write.  Check all in kv is same but ts.
320      KeyValue kv = new KeyValue(b, b, b);
321      KeyValue original = kv.clone();
322      writer.write(new ImmutableBytesWritable(), kv);
323      assertFalse(original.equals(kv));
324      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
325      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
326      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
327      assertNotSame(original.getTimestamp(), kv.getTimestamp());
328      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
329
330      // Test 2. Now test passing a kv that has explicit ts.  It should not be
331      // changed by call to record write.
332      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
333      original = kv.clone();
334      writer.write(new ImmutableBytesWritable(), kv);
335      assertTrue(original.equals(kv));
336    } finally {
337      if (writer != null && context != null) writer.close(context);
338      dir.getFileSystem(conf).delete(dir, true);
339    }
340  }
341
342  private TaskAttemptContext createTestTaskAttemptContext(final Job job)
343  throws Exception {
344    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
345    TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
346      job, "attempt_201402131733_0001_m_000000_0");
347    return context;
348  }
349
350  /*
351   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
352   * metadata used by time-restricted scans.
353   */
354  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
355  public void test_TIMERANGE() throws Exception {
356    Configuration conf = new Configuration(this.util.getConfiguration());
357    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
358    TaskAttemptContext context = null;
359    Path dir =
360      util.getDataTestDir("test_TIMERANGE_present");
361    LOG.info("Timerange dir writing to dir: "+ dir);
362    try {
363      // build a record writer using HFileOutputFormat2
364      Job job = new Job(conf);
365      FileOutputFormat.setOutputPath(job, dir);
366      context = createTestTaskAttemptContext(job);
367      HFileOutputFormat2 hof = new HFileOutputFormat2();
368      writer = hof.getRecordWriter(context);
369
370      // Pass two key values with explicit times stamps
371      final byte [] b = Bytes.toBytes("b");
372
373      // value 1 with timestamp 2000
374      KeyValue kv = new KeyValue(b, b, b, 2000, b);
375      KeyValue original = kv.clone();
376      writer.write(new ImmutableBytesWritable(), kv);
377      assertEquals(original,kv);
378
379      // value 2 with timestamp 1000
380      kv = new KeyValue(b, b, b, 1000, b);
381      original = kv.clone();
382      writer.write(new ImmutableBytesWritable(), kv);
383      assertEquals(original, kv);
384
385      // verify that the file has the proper FileInfo.
386      writer.close(context);
387
388      // the generated file lives 1 directory down from the attempt directory
389      // and is the only file, e.g.
390      // _attempt__0000_r_000000_0/b/1979617994050536795
391      FileSystem fs = FileSystem.get(conf);
392      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
393      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
394      FileStatus[] file = fs.listStatus(sub1[0].getPath());
395
396      // open as HFile Reader and pull out TIMERANGE FileInfo.
397      HFile.Reader rd =
398          HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
399      Map<byte[],byte[]> finfo = rd.getHFileInfo();
400      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
401      assertNotNull(range);
402
403      // unmarshall and check values.
404      TimeRangeTracker timeRangeTracker =TimeRangeTracker.parseFrom(range);
405      LOG.info(timeRangeTracker.getMin() +
406          "...." + timeRangeTracker.getMax());
407      assertEquals(1000, timeRangeTracker.getMin());
408      assertEquals(2000, timeRangeTracker.getMax());
409      rd.close();
410    } finally {
411      if (writer != null && context != null) writer.close(context);
412      dir.getFileSystem(conf).delete(dir, true);
413    }
414  }
415
416  /**
417   * Run small MR job.
418   */
419  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
420  public void testWritingPEData() throws Exception {
421    Configuration conf = util.getConfiguration();
422    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
423    FileSystem fs = testDir.getFileSystem(conf);
424
425    // Set down this value or we OOME in eclipse.
426    conf.setInt("mapreduce.task.io.sort.mb", 20);
427    // Write a few files.
428    long hregionMaxFilesize = 10 * 1024;
429    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
430
431    Job job = new Job(conf, "testWritingPEData");
432    setupRandomGeneratorMapper(job, false);
433    // This partitioner doesn't work well for number keys but using it anyways
434    // just to demonstrate how to configure it.
435    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
436    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
437
438    Arrays.fill(startKey, (byte)0);
439    Arrays.fill(endKey, (byte)0xff);
440
441    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
442    // Set start and end rows for partitioner.
443    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
444    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
445    job.setReducerClass(KeyValueSortReducer.class);
446    job.setOutputFormatClass(HFileOutputFormat2.class);
447    job.setNumReduceTasks(4);
448    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
449        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
450        KeyValueSerialization.class.getName());
451
452    FileOutputFormat.setOutputPath(job, testDir);
453    assertTrue(job.waitForCompletion(false));
454    FileStatus [] files = fs.listStatus(testDir);
455    assertTrue(files.length > 0);
456
457    //check output file num and size.
458    for (byte[] family : FAMILIES) {
459      long kvCount= 0;
460      RemoteIterator<LocatedFileStatus> iterator =
461              fs.listFiles(testDir.suffix("/" + new String(family)), true);
462      while (iterator.hasNext()) {
463        LocatedFileStatus keyFileStatus = iterator.next();
464        HFile.Reader reader =
465                HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
466        HFileScanner scanner = reader.getScanner(false, false, false);
467
468        kvCount += reader.getEntries();
469        scanner.seekTo();
470        long perKVSize = scanner.getCell().getSerializedSize();
471        assertTrue("Data size of each file should not be too large.",
472                perKVSize * reader.getEntries() <= hregionMaxFilesize);
473      }
474      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
475    }
476  }
477
478  /**
479   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
480   * hfile.
481   */
482  @Test
483  public void test_WritingTagData()
484      throws Exception {
485    Configuration conf = new Configuration(this.util.getConfiguration());
486    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
487    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
488    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
489    TaskAttemptContext context = null;
490    Path dir =
491        util.getDataTestDir("WritingTagData");
492    try {
493      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
494      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
495      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
496      Job job = new Job(conf);
497      FileOutputFormat.setOutputPath(job, dir);
498      context = createTestTaskAttemptContext(job);
499      HFileOutputFormat2 hof = new HFileOutputFormat2();
500      writer = hof.getRecordWriter(context);
501      final byte [] b = Bytes.toBytes("b");
502
503      List< Tag > tags = new ArrayList<>();
504      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
505      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
506      writer.write(new ImmutableBytesWritable(), kv);
507      writer.close(context);
508      writer = null;
509      FileSystem fs = dir.getFileSystem(conf);
510      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
511      while(iterator.hasNext()) {
512        LocatedFileStatus keyFileStatus = iterator.next();
513        HFile.Reader reader =
514            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
515        HFileScanner scanner = reader.getScanner(false, false, false);
516        scanner.seekTo();
517        Cell cell = scanner.getCell();
518        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
519        assertTrue(tagsFromCell.size() > 0);
520        for (Tag tag : tagsFromCell) {
521          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
522        }
523      }
524    } finally {
525      if (writer != null && context != null) writer.close(context);
526      dir.getFileSystem(conf).delete(dir, true);
527    }
528  }
529
530  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
531  public void testJobConfiguration() throws Exception {
532    Configuration conf = new Configuration(this.util.getConfiguration());
533    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
534        .toString());
535    Job job = new Job(conf);
536    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
537    Table table = Mockito.mock(Table.class);
538    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
539    setupMockStartKeys(regionLocator);
540    setupMockTableName(regionLocator);
541    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
542    assertEquals(job.getNumReduceTasks(), 4);
543  }
544
545  private byte [][] generateRandomStartKeys(int numKeys) {
546    Random random = new Random();
547    byte[][] ret = new byte[numKeys][];
548    // first region start key is always empty
549    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
550    for (int i = 1; i < numKeys; i++) {
551      ret[i] =
552        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
553    }
554    return ret;
555  }
556
557  private byte[][] generateRandomSplitKeys(int numKeys) {
558    Random random = new Random();
559    byte[][] ret = new byte[numKeys][];
560    for (int i = 0; i < numKeys; i++) {
561      ret[i] =
562          PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
563    }
564    return ret;
565  }
566
567  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
568  public void testMRIncrementalLoad() throws Exception {
569    LOG.info("\nStarting test testMRIncrementalLoad\n");
570    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
571  }
572
573  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
574  public void testMRIncrementalLoadWithSplit() throws Exception {
575    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
576    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
577  }
578
579  /**
580   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
581   * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
582   * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
583   * it's not possible to check the region locality by comparing region locations and DN hostnames.
584   * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
585   * we could test region locality features more easily.
586   */
587  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
588  public void testMRIncrementalLoadWithLocality() throws Exception {
589    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
590    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
591    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
592  }
593
594  //@Ignore("Wahtevs")
595  @Test
596  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
597    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
598    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
599  }
600
601  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
602                                     boolean putSortReducer, String tableStr) throws Exception {
603      doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
604              Arrays.asList(tableStr));
605  }
606
607  @Test
608  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
609    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
610    doIncrementalLoadTest(false, false, true,
611            Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
612                    ()));
613  }
614
615  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
616      boolean putSortReducer, List<String> tableStr) throws Exception {
617    util = new HBaseTestingUtility();
618    Configuration conf = util.getConfiguration();
619    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
620    int hostCount = 1;
621    int regionNum = 5;
622    if (shouldKeepLocality) {
623      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
624      // explicit hostnames parameter just like MiniDFSCluster does.
625      hostCount = 3;
626      regionNum = 20;
627    }
628
629    String[] hostnames = new String[hostCount];
630    for (int i = 0; i < hostCount; ++i) {
631      hostnames[i] = "datanode_" + i;
632    }
633    StartMiniClusterOption option = StartMiniClusterOption.builder()
634        .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
635    util.startMiniCluster(option);
636
637    Map<String, Table> allTables = new HashMap<>(tableStr.size());
638    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
639    boolean writeMultipleTables = tableStr.size() > 1;
640    for (String tableStrSingle : tableStr) {
641      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
642      TableName tableName = TableName.valueOf(tableStrSingle);
643      Table table = util.createTable(tableName, FAMILIES, splitKeys);
644
645      RegionLocator r = util.getConnection().getRegionLocator(tableName);
646      assertEquals("Should start with empty table", 0, util.countRows(table));
647      int numRegions = r.getStartKeys().length;
648      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
649
650      allTables.put(tableStrSingle, table);
651      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
652    }
653    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
654    // Generate the bulk load files
655    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
656
657    for (Table tableSingle : allTables.values()) {
658      // This doesn't write into the table, just makes files
659      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
660    }
661    int numTableDirs = 0;
662    for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
663      Path tablePath = testDir;
664
665      if (writeMultipleTables) {
666        if (allTables.containsKey(tf.getPath().getName())) {
667          ++numTableDirs;
668          tablePath = tf.getPath();
669        }
670        else {
671          continue;
672        }
673      }
674
675      // Make sure that a directory was created for every CF
676      int dir = 0;
677      for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
678        for (byte[] family : FAMILIES) {
679          if (Bytes.toString(family).equals(f.getPath().getName())) {
680            ++dir;
681          }
682        }
683      }
684      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
685    }
686    if (writeMultipleTables) {
687      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
688    }
689
690    Admin admin = util.getConnection().getAdmin();
691    try {
692      // handle the split case
693      if (shouldChangeRegions) {
694        Table chosenTable = allTables.values().iterator().next();
695        // Choose a semi-random table if multiple tables are available
696        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
697        admin.disableTable(chosenTable.getName());
698        util.waitUntilNoRegionsInTransition();
699
700        util.deleteTable(chosenTable.getName());
701        byte[][] newSplitKeys = generateRandomSplitKeys(14);
702        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
703
704        while (util.getConnection().getRegionLocator(chosenTable.getName())
705                .getAllRegionLocations().size() != 15 ||
706                !admin.isTableAvailable(table.getName())) {
707          Thread.sleep(200);
708          LOG.info("Waiting for new region assignment to happen");
709        }
710      }
711
712      // Perform the actual load
713      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
714        Path tableDir = testDir;
715        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
716        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
717        if (writeMultipleTables) {
718          tableDir = new Path(testDir, tableNameStr);
719        }
720        Table currentTable = allTables.get(tableNameStr);
721        TableName currentTableName = currentTable.getName();
722        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
723                .getRegionLocator());
724
725        // Ensure data shows up
726        int expectedRows = 0;
727        if (putSortReducer) {
728          // no rows should be extracted
729          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
730                  util.countRows(currentTable));
731        } else {
732          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
733          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
734                  util.countRows(currentTable));
735          Scan scan = new Scan();
736          ResultScanner results = currentTable.getScanner(scan);
737          for (Result res : results) {
738            assertEquals(FAMILIES.length, res.rawCells().length);
739            Cell first = res.rawCells()[0];
740            for (Cell kv : res.rawCells()) {
741              assertTrue(CellUtil.matchingRows(first, kv));
742              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
743            }
744          }
745          results.close();
746        }
747        String tableDigestBefore = util.checksumRows(currentTable);
748        // Check region locality
749        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
750        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
751          hbd.add(region.getHDFSBlocksDistribution());
752        }
753        for (String hostname : hostnames) {
754          float locality = hbd.getBlockLocalityIndex(hostname);
755          LOG.info("locality of [" + hostname + "]: " + locality);
756          assertEquals(100, (int) (locality * 100));
757        }
758
759        // Cause regions to reopen
760        admin.disableTable(currentTableName);
761        while (!admin.isTableDisabled(currentTableName)) {
762          Thread.sleep(200);
763          LOG.info("Waiting for table to disable");
764        }
765        admin.enableTable(currentTableName);
766        util.waitTableAvailable(currentTableName);
767        assertEquals("Data should remain after reopening of regions",
768                tableDigestBefore, util.checksumRows(currentTable));
769      }
770    } finally {
771      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
772          tableInfoSingle.getRegionLocator().close();
773      }
774      for (Entry<String, Table> singleTable : allTables.entrySet() ) {
775        singleTable.getValue().close();
776        util.deleteTable(singleTable.getValue().getName());
777      }
778      testDir.getFileSystem(conf).delete(testDir, true);
779      util.shutdownMiniCluster();
780    }
781  }
782
783  private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
784                                    boolean putSortReducer) throws IOException,
785          InterruptedException, ClassNotFoundException {
786    Job job = new Job(conf, "testLocalMRIncrementalLoad");
787    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
788    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
789        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
790        KeyValueSerialization.class.getName());
791    setupRandomGeneratorMapper(job, putSortReducer);
792    if (tableInfo.size() > 1) {
793      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
794      int sum = 0;
795      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
796        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
797      }
798      assertEquals(sum, job.getNumReduceTasks());
799    }
800    else {
801      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
802      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
803              regionLocator);
804      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
805    }
806
807    FileOutputFormat.setOutputPath(job, outDir);
808
809    assertFalse(util.getTestFileSystem().exists(outDir)) ;
810
811    assertTrue(job.waitForCompletion(true));
812  }
813
814  /**
815   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}.
816   * Tests that the family compression map is correctly serialized into
817   * and deserialized from configuration
818   *
819   * @throws IOException
820   */
821  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
822  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
823    for (int numCfs = 0; numCfs <= 3; numCfs++) {
824      Configuration conf = new Configuration(this.util.getConfiguration());
825      Map<String, Compression.Algorithm> familyToCompression =
826          getMockColumnFamiliesForCompression(numCfs);
827      Table table = Mockito.mock(Table.class);
828      setupMockColumnFamiliesForCompression(table, familyToCompression);
829      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
830              HFileOutputFormat2.serializeColumnFamilyAttribute
831                      (HFileOutputFormat2.compressionDetails,
832                              Arrays.asList(table.getTableDescriptor())));
833
834      // read back family specific compression setting from the configuration
835      Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
836          .createFamilyCompressionMap(conf);
837
838      // test that we have a value for all column families that matches with the
839      // used mock values
840      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
841        assertEquals("Compression configuration incorrect for column family:"
842            + entry.getKey(), entry.getValue(),
843            retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
844      }
845    }
846  }
847
848  private void setupMockColumnFamiliesForCompression(Table table,
849      Map<String, Compression.Algorithm> familyToCompression) throws IOException {
850    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
851    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
852      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
853          .setMaxVersions(1)
854          .setCompressionType(entry.getValue())
855          .setBlockCacheEnabled(false)
856          .setTimeToLive(0));
857    }
858    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
859  }
860
861  /**
862   * @return a map from column family names to compression algorithms for
863   *         testing column family compression. Column family names have special characters
864   */
865  private Map<String, Compression.Algorithm>
866      getMockColumnFamiliesForCompression (int numCfs) {
867    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
868    // use column family names having special characters
869    if (numCfs-- > 0) {
870      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
871    }
872    if (numCfs-- > 0) {
873      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
874    }
875    if (numCfs-- > 0) {
876      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
877    }
878    if (numCfs-- > 0) {
879      familyToCompression.put("Family3", Compression.Algorithm.NONE);
880    }
881    return familyToCompression;
882  }
883
884
885  /**
886   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}.
887   * Tests that the family bloom type map is correctly serialized into
888   * and deserialized from configuration
889   *
890   * @throws IOException
891   */
892  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
893  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
894    for (int numCfs = 0; numCfs <= 2; numCfs++) {
895      Configuration conf = new Configuration(this.util.getConfiguration());
896      Map<String, BloomType> familyToBloomType =
897          getMockColumnFamiliesForBloomType(numCfs);
898      Table table = Mockito.mock(Table.class);
899      setupMockColumnFamiliesForBloomType(table,
900          familyToBloomType);
901      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
902              HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
903              Arrays.asList(table.getTableDescriptor())));
904
905      // read back family specific data block encoding settings from the
906      // configuration
907      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
908          HFileOutputFormat2
909              .createFamilyBloomTypeMap(conf);
910
911      // test that we have a value for all column families that matches with the
912      // used mock values
913      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
914        assertEquals("BloomType configuration incorrect for column family:"
915            + entry.getKey(), entry.getValue(),
916            retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
917      }
918    }
919  }
920
921  private void setupMockColumnFamiliesForBloomType(Table table,
922      Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
923    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
924    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
925      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
926          .setMaxVersions(1)
927          .setBloomFilterType(entry.getValue())
928          .setBlockCacheEnabled(false)
929          .setTimeToLive(0));
930    }
931    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
932  }
933
934  /**
935   * @return a map from column family names to compression algorithms for
936   *         testing column family compression. Column family names have special characters
937   */
938  private Map<String, BloomType>
939  getMockColumnFamiliesForBloomType (int numCfs) {
940    Map<String, BloomType> familyToBloomType = new HashMap<>();
941    // use column family names having special characters
942    if (numCfs-- > 0) {
943      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
944    }
945    if (numCfs-- > 0) {
946      familyToBloomType.put("Family2=asdads&!AASD",
947          BloomType.ROWCOL);
948    }
949    if (numCfs-- > 0) {
950      familyToBloomType.put("Family3", BloomType.NONE);
951    }
952    return familyToBloomType;
953  }
954
955  /**
956   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}.
957   * Tests that the family block size map is correctly serialized into
958   * and deserialized from configuration
959   *
960   * @throws IOException
961   */
962  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
963  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
964    for (int numCfs = 0; numCfs <= 3; numCfs++) {
965      Configuration conf = new Configuration(this.util.getConfiguration());
966      Map<String, Integer> familyToBlockSize =
967          getMockColumnFamiliesForBlockSize(numCfs);
968      Table table = Mockito.mock(Table.class);
969      setupMockColumnFamiliesForBlockSize(table,
970          familyToBlockSize);
971      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
972              HFileOutputFormat2.serializeColumnFamilyAttribute
973                      (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
974                              .getTableDescriptor())));
975
976      // read back family specific data block encoding settings from the
977      // configuration
978      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
979          HFileOutputFormat2
980              .createFamilyBlockSizeMap(conf);
981
982      // test that we have a value for all column families that matches with the
983      // used mock values
984      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
985          ) {
986        assertEquals("BlockSize configuration incorrect for column family:"
987            + entry.getKey(), entry.getValue(),
988            retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
989      }
990    }
991  }
992
993  private void setupMockColumnFamiliesForBlockSize(Table table,
994      Map<String, Integer> familyToDataBlockEncoding) throws IOException {
995    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
996    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
997      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
998          .setMaxVersions(1)
999          .setBlocksize(entry.getValue())
1000          .setBlockCacheEnabled(false)
1001          .setTimeToLive(0));
1002    }
1003    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1004  }
1005
1006  /**
1007   * @return a map from column family names to compression algorithms for
1008   *         testing column family compression. Column family names have special characters
1009   */
1010  private Map<String, Integer>
1011  getMockColumnFamiliesForBlockSize (int numCfs) {
1012    Map<String, Integer> familyToBlockSize = new HashMap<>();
1013    // use column family names having special characters
1014    if (numCfs-- > 0) {
1015      familyToBlockSize.put("Family1!@#!@#&", 1234);
1016    }
1017    if (numCfs-- > 0) {
1018      familyToBlockSize.put("Family2=asdads&!AASD",
1019          Integer.MAX_VALUE);
1020    }
1021    if (numCfs-- > 0) {
1022      familyToBlockSize.put("Family2=asdads&!AASD",
1023          Integer.MAX_VALUE);
1024    }
1025    if (numCfs-- > 0) {
1026      familyToBlockSize.put("Family3", 0);
1027    }
1028    return familyToBlockSize;
1029  }
1030
1031  /**
1032   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
1033   * Tests that the family data block encoding map is correctly serialized into
1034   * and deserialized from configuration
1035   *
1036   * @throws IOException
1037   */
1038  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1039  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1040    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1041      Configuration conf = new Configuration(this.util.getConfiguration());
1042      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1043          getMockColumnFamiliesForDataBlockEncoding(numCfs);
1044      Table table = Mockito.mock(Table.class);
1045      setupMockColumnFamiliesForDataBlockEncoding(table,
1046          familyToDataBlockEncoding);
1047      HTableDescriptor tableDescriptor = table.getTableDescriptor();
1048      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1049              HFileOutputFormat2.serializeColumnFamilyAttribute
1050                      (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
1051                      .asList(tableDescriptor)));
1052
1053      // read back family specific data block encoding settings from the
1054      // configuration
1055      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1056          HFileOutputFormat2
1057          .createFamilyDataBlockEncodingMap(conf);
1058
1059      // test that we have a value for all column families that matches with the
1060      // used mock values
1061      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1062        assertEquals("DataBlockEncoding configuration incorrect for column family:"
1063            + entry.getKey(), entry.getValue(),
1064            retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
1065      }
1066    }
1067  }
1068
1069  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1070      Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1071    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1072    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1073      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
1074          .setMaxVersions(1)
1075          .setDataBlockEncoding(entry.getValue())
1076          .setBlockCacheEnabled(false)
1077          .setTimeToLive(0));
1078    }
1079    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1080  }
1081
1082  /**
1083   * @return a map from column family names to compression algorithms for
1084   *         testing column family compression. Column family names have special characters
1085   */
1086  private Map<String, DataBlockEncoding>
1087      getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
1088    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1089    // use column family names having special characters
1090    if (numCfs-- > 0) {
1091      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1092    }
1093    if (numCfs-- > 0) {
1094      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1095          DataBlockEncoding.FAST_DIFF);
1096    }
1097    if (numCfs-- > 0) {
1098      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1099          DataBlockEncoding.PREFIX);
1100    }
1101    if (numCfs-- > 0) {
1102      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1103    }
1104    return familyToDataBlockEncoding;
1105  }
1106
1107  private void setupMockStartKeys(RegionLocator table) throws IOException {
1108    byte[][] mockKeys = new byte[][] {
1109        HConstants.EMPTY_BYTE_ARRAY,
1110        Bytes.toBytes("aaa"),
1111        Bytes.toBytes("ggg"),
1112        Bytes.toBytes("zzz")
1113    };
1114    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1115  }
1116
1117  private void setupMockTableName(RegionLocator table) throws IOException {
1118    TableName mockTableName = TableName.valueOf("mock_table");
1119    Mockito.doReturn(mockTableName).when(table).getName();
1120  }
1121
1122  /**
1123   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
1124   * bloom filter settings from the column family descriptor
1125   */
1126  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1127  public void testColumnFamilySettings() throws Exception {
1128    Configuration conf = new Configuration(this.util.getConfiguration());
1129    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1130    TaskAttemptContext context = null;
1131    Path dir = util.getDataTestDir("testColumnFamilySettings");
1132
1133    // Setup table descriptor
1134    Table table = Mockito.mock(Table.class);
1135    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1136    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1137    Mockito.doReturn(htd).when(table).getTableDescriptor();
1138    for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
1139      htd.addFamily(hcd);
1140    }
1141
1142    // set up the table to return some mock keys
1143    setupMockStartKeys(regionLocator);
1144
1145    try {
1146      // partial map red setup to get an operational writer for testing
1147      // We turn off the sequence file compression, because DefaultCodec
1148      // pollutes the GZip codec pool with an incompatible compressor.
1149      conf.set("io.seqfile.compression.type", "NONE");
1150      conf.set("hbase.fs.tmp.dir", dir.toString());
1151      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1152      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1153
1154      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1155      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1156      setupRandomGeneratorMapper(job, false);
1157      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1158      FileOutputFormat.setOutputPath(job, dir);
1159      context = createTestTaskAttemptContext(job);
1160      HFileOutputFormat2 hof = new HFileOutputFormat2();
1161      writer = hof.getRecordWriter(context);
1162
1163      // write out random rows
1164      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1165      writer.close(context);
1166
1167      // Make sure that a directory was created for every CF
1168      FileSystem fs = dir.getFileSystem(conf);
1169
1170      // commit so that the filesystem has one directory per column family
1171      hof.getOutputCommitter(context).commitTask(context);
1172      hof.getOutputCommitter(context).commitJob(context);
1173      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1174      assertEquals(htd.getFamilies().size(), families.length);
1175      for (FileStatus f : families) {
1176        String familyStr = f.getPath().getName();
1177        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1178        // verify that the compression on this file matches the configured
1179        // compression
1180        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1181        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1182        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1183
1184        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1185        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1186        assertEquals("Incorrect bloom filter used for column family " + familyStr +
1187          "(reader: " + reader + ")",
1188          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1189        assertEquals(
1190          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1191          hcd.getCompressionType(), reader.getFileContext().getCompression());
1192      }
1193    } finally {
1194      dir.getFileSystem(conf).delete(dir, true);
1195    }
1196  }
1197
1198  /**
1199   * Write random values to the writer assuming a table created using
1200   * {@link #FAMILIES} as column family descriptors
1201   */
1202  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1203      TaskAttemptContext context, Set<byte[]> families, int numRows)
1204      throws IOException, InterruptedException {
1205    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1206    int valLength = 10;
1207    byte valBytes[] = new byte[valLength];
1208
1209    int taskId = context.getTaskAttemptID().getTaskID().getId();
1210    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1211    final byte [] qualifier = Bytes.toBytes("data");
1212    Random random = new Random();
1213    for (int i = 0; i < numRows; i++) {
1214
1215      Bytes.putInt(keyBytes, 0, i);
1216      random.nextBytes(valBytes);
1217      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1218
1219      for (byte[] family : families) {
1220        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1221        writer.write(key, kv);
1222      }
1223    }
1224  }
1225
1226  /**
1227   * This test is to test the scenario happened in HBASE-6901.
1228   * All files are bulk loaded and excluded from minor compaction.
1229   * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
1230   * will be thrown.
1231   */
1232  @Ignore ("Flakey: See HBASE-9051") @Test
1233  public void testExcludeAllFromMinorCompaction() throws Exception {
1234    Configuration conf = util.getConfiguration();
1235    conf.setInt("hbase.hstore.compaction.min", 2);
1236    generateRandomStartKeys(5);
1237
1238    util.startMiniCluster();
1239    try (Connection conn = ConnectionFactory.createConnection();
1240        Admin admin = conn.getAdmin();
1241        Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1242        RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1243      final FileSystem fs = util.getDFSCluster().getFileSystem();
1244      assertEquals("Should start with empty table", 0, util.countRows(table));
1245
1246      // deep inspection: get the StoreFile dir
1247      final Path storePath = new Path(
1248        CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1249          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1250            Bytes.toString(FAMILIES[0])));
1251      assertEquals(0, fs.listStatus(storePath).length);
1252
1253      // Generate two bulk load files
1254      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1255          true);
1256
1257      for (int i = 0; i < 2; i++) {
1258        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1259        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1260                .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
1261        // Perform the actual load
1262        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1263      }
1264
1265      // Ensure data shows up
1266      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1267      assertEquals("LoadIncrementalHFiles should put expected data in table",
1268          expectedRows, util.countRows(table));
1269
1270      // should have a second StoreFile now
1271      assertEquals(2, fs.listStatus(storePath).length);
1272
1273      // minor compactions shouldn't get rid of the file
1274      admin.compact(TABLE_NAMES[0]);
1275      try {
1276        quickPoll(new Callable<Boolean>() {
1277          @Override
1278          public Boolean call() throws Exception {
1279            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1280            for (HRegion region : regions) {
1281              for (HStore store : region.getStores()) {
1282                store.closeAndArchiveCompactedFiles();
1283              }
1284            }
1285            return fs.listStatus(storePath).length == 1;
1286          }
1287        }, 5000);
1288        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1289      } catch (AssertionError ae) {
1290        // this is expected behavior
1291      }
1292
1293      // a major compaction should work though
1294      admin.majorCompact(TABLE_NAMES[0]);
1295      quickPoll(new Callable<Boolean>() {
1296        @Override
1297        public Boolean call() throws Exception {
1298          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1299          for (HRegion region : regions) {
1300            for (HStore store : region.getStores()) {
1301              store.closeAndArchiveCompactedFiles();
1302            }
1303          }
1304          return fs.listStatus(storePath).length == 1;
1305        }
1306      }, 5000);
1307
1308    } finally {
1309      util.shutdownMiniCluster();
1310    }
1311  }
1312
1313  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1314  public void testExcludeMinorCompaction() throws Exception {
1315    Configuration conf = util.getConfiguration();
1316    conf.setInt("hbase.hstore.compaction.min", 2);
1317    generateRandomStartKeys(5);
1318
1319    util.startMiniCluster();
1320    try (Connection conn = ConnectionFactory.createConnection(conf);
1321        Admin admin = conn.getAdmin()){
1322      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1323      final FileSystem fs = util.getDFSCluster().getFileSystem();
1324      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1325      assertEquals("Should start with empty table", 0, util.countRows(table));
1326
1327      // deep inspection: get the StoreFile dir
1328      final Path storePath = new Path(
1329        CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1330          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1331            Bytes.toString(FAMILIES[0])));
1332      assertEquals(0, fs.listStatus(storePath).length);
1333
1334      // put some data in it and flush to create a storefile
1335      Put p = new Put(Bytes.toBytes("test"));
1336      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1337      table.put(p);
1338      admin.flush(TABLE_NAMES[0]);
1339      assertEquals(1, util.countRows(table));
1340      quickPoll(new Callable<Boolean>() {
1341        @Override
1342        public Boolean call() throws Exception {
1343          return fs.listStatus(storePath).length == 1;
1344        }
1345      }, 5000);
1346
1347      // Generate a bulk load file with more rows
1348      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1349          true);
1350
1351      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1352      runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1353                      .getTableDescriptor(), regionLocator)), testDir, false);
1354
1355      // Perform the actual load
1356      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1357
1358      // Ensure data shows up
1359      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1360      assertEquals("LoadIncrementalHFiles should put expected data in table",
1361          expectedRows + 1, util.countRows(table));
1362
1363      // should have a second StoreFile now
1364      assertEquals(2, fs.listStatus(storePath).length);
1365
1366      // minor compactions shouldn't get rid of the file
1367      admin.compact(TABLE_NAMES[0]);
1368      try {
1369        quickPoll(new Callable<Boolean>() {
1370          @Override
1371          public Boolean call() throws Exception {
1372            return fs.listStatus(storePath).length == 1;
1373          }
1374        }, 5000);
1375        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1376      } catch (AssertionError ae) {
1377        // this is expected behavior
1378      }
1379
1380      // a major compaction should work though
1381      admin.majorCompact(TABLE_NAMES[0]);
1382      quickPoll(new Callable<Boolean>() {
1383        @Override
1384        public Boolean call() throws Exception {
1385          return fs.listStatus(storePath).length == 1;
1386        }
1387      }, 5000);
1388
1389    } finally {
1390      util.shutdownMiniCluster();
1391    }
1392  }
1393
1394  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1395    int sleepMs = 10;
1396    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1397    while (retries-- > 0) {
1398      if (c.call().booleanValue()) {
1399        return;
1400      }
1401      Thread.sleep(sleepMs);
1402    }
1403    fail();
1404  }
1405
1406  public static void main(String args[]) throws Exception {
1407    new TestHFileOutputFormat2().manualTest(args);
1408  }
1409
1410  public void manualTest(String args[]) throws Exception {
1411    Configuration conf = HBaseConfiguration.create();
1412    util = new HBaseTestingUtility(conf);
1413    if ("newtable".equals(args[0])) {
1414      TableName tname = TableName.valueOf(args[1]);
1415      byte[][] splitKeys = generateRandomSplitKeys(4);
1416      Table table = util.createTable(tname, FAMILIES, splitKeys);
1417    } else if ("incremental".equals(args[0])) {
1418      TableName tname = TableName.valueOf(args[1]);
1419      try(Connection c = ConnectionFactory.createConnection(conf);
1420          Admin admin = c.getAdmin();
1421          RegionLocator regionLocator = c.getRegionLocator(tname)) {
1422        Path outDir = new Path("incremental-out");
1423        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
1424                .getTableDescriptor(tname), regionLocator)), outDir, false);
1425      }
1426    } else {
1427      throw new RuntimeException(
1428          "usage: TestHFileOutputFormat2 newtable | incremental");
1429    }
1430  }
1431
1432  @Test
1433  public void testBlockStoragePolicy() throws Exception {
1434    util = new HBaseTestingUtility();
1435    Configuration conf = util.getConfiguration();
1436    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1437
1438    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
1439            Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
1440                    TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
1441    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1442    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1443    util.startMiniDFSCluster(3);
1444    FileSystem fs = util.getDFSCluster().getFileSystem();
1445    try {
1446      fs.mkdirs(cf1Dir);
1447      fs.mkdirs(cf2Dir);
1448
1449      // the original block storage policy would be HOT
1450      String spA = getStoragePolicyName(fs, cf1Dir);
1451      String spB = getStoragePolicyName(fs, cf2Dir);
1452      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1453      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1454      assertEquals("HOT", spA);
1455      assertEquals("HOT", spB);
1456
1457      // alter table cf schema to change storage policies
1458      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1459              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1460      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1461              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1462      spA = getStoragePolicyName(fs, cf1Dir);
1463      spB = getStoragePolicyName(fs, cf2Dir);
1464      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1465      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1466      assertNotNull(spA);
1467      assertEquals("ONE_SSD", spA);
1468      assertNotNull(spB);
1469      assertEquals("ALL_SSD", spB);
1470    } finally {
1471      fs.delete(cf1Dir, true);
1472      fs.delete(cf2Dir, true);
1473      util.shutdownMiniDFSCluster();
1474    }
1475  }
1476
1477  private String getStoragePolicyName(FileSystem fs, Path path) {
1478    try {
1479      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1480      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1481    } catch (Exception e) {
1482      // Maybe fail because of using old HDFS version, try the old way
1483      if (LOG.isTraceEnabled()) {
1484        LOG.trace("Failed to get policy directly", e);
1485      }
1486      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1487      return policy == null ? "HOT" : policy;// HOT by default
1488    }
1489  }
1490
1491  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1492    try {
1493      if (fs instanceof DistributedFileSystem) {
1494        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1495        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1496        if (null != status) {
1497          byte storagePolicyId = status.getStoragePolicy();
1498          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1499          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1500            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1501            for (BlockStoragePolicy policy : policies) {
1502              if (policy.getId() == storagePolicyId) {
1503                return policy.getName();
1504              }
1505            }
1506          }
1507        }
1508      }
1509    } catch (Throwable e) {
1510      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1511    }
1512
1513    return null;
1514  }
1515
1516  @Test
1517  public void TestConfigureCompression() throws Exception {
1518    Configuration conf = new Configuration(this.util.getConfiguration());
1519    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1520    TaskAttemptContext context = null;
1521    Path dir = util.getDataTestDir("TestConfigureCompression");
1522    String hfileoutputformatCompression = "gz";
1523
1524    try {
1525      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1526      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1527
1528      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1529
1530      Job job = Job.getInstance(conf);
1531      FileOutputFormat.setOutputPath(job, dir);
1532      context = createTestTaskAttemptContext(job);
1533      HFileOutputFormat2 hof = new HFileOutputFormat2();
1534      writer = hof.getRecordWriter(context);
1535      final byte[] b = Bytes.toBytes("b");
1536
1537      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1538      writer.write(new ImmutableBytesWritable(), kv);
1539      writer.close(context);
1540      writer = null;
1541      FileSystem fs = dir.getFileSystem(conf);
1542      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1543      while (iterator.hasNext()) {
1544        LocatedFileStatus keyFileStatus = iterator.next();
1545        HFile.Reader reader =
1546            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1547        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
1548            hfileoutputformatCompression);
1549      }
1550    } finally {
1551      if (writer != null && context != null) {
1552        writer.close(context);
1553      }
1554      dir.getFileSystem(conf).delete(dir, true);
1555    }
1556
1557  }
1558}
1559