001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Random;
037import java.util.Set;
038import java.util.concurrent.Callable;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.hadoop.conf.Configuration;
042import org.apache.hadoop.fs.FileStatus;
043import org.apache.hadoop.fs.FileSystem;
044import org.apache.hadoop.fs.LocatedFileStatus;
045import org.apache.hadoop.fs.Path;
046import org.apache.hadoop.fs.RemoteIterator;
047import org.apache.hadoop.hbase.ArrayBackedTag;
048import org.apache.hadoop.hbase.Cell;
049import org.apache.hadoop.hbase.CellUtil;
050import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
051import org.apache.hadoop.hbase.HBaseClassTestRule;
052import org.apache.hadoop.hbase.HBaseConfiguration;
053import org.apache.hadoop.hbase.HBaseTestingUtility;
054import org.apache.hadoop.hbase.HColumnDescriptor;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HDFSBlocksDistribution;
057import org.apache.hadoop.hbase.HTableDescriptor;
058import org.apache.hadoop.hbase.HadoopShims;
059import org.apache.hadoop.hbase.KeyValue;
060import org.apache.hadoop.hbase.PerformanceEvaluation;
061import org.apache.hadoop.hbase.PrivateCellUtil;
062import org.apache.hadoop.hbase.StartMiniClusterOption;
063import org.apache.hadoop.hbase.TableName;
064import org.apache.hadoop.hbase.Tag;
065import org.apache.hadoop.hbase.TagType;
066import org.apache.hadoop.hbase.client.Admin;
067import org.apache.hadoop.hbase.client.Connection;
068import org.apache.hadoop.hbase.client.ConnectionFactory;
069import org.apache.hadoop.hbase.client.Put;
070import org.apache.hadoop.hbase.client.RegionLocator;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.Scan;
074import org.apache.hadoop.hbase.client.Table;
075import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
076import org.apache.hadoop.hbase.io.compress.Compression;
077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
079import org.apache.hadoop.hbase.io.hfile.CacheConfig;
080import org.apache.hadoop.hbase.io.hfile.HFile;
081import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
082import org.apache.hadoop.hbase.io.hfile.HFileScanner;
083import org.apache.hadoop.hbase.regionserver.BloomType;
084import org.apache.hadoop.hbase.regionserver.HRegion;
085import org.apache.hadoop.hbase.regionserver.HStore;
086import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
087import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
088import org.apache.hadoop.hbase.testclassification.LargeTests;
089import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
090import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
091import org.apache.hadoop.hbase.util.Bytes;
092import org.apache.hadoop.hbase.util.FSUtils;
093import org.apache.hadoop.hbase.util.ReflectionUtils;
094import org.apache.hadoop.hdfs.DistributedFileSystem;
095import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
096import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
097import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
098import org.apache.hadoop.io.NullWritable;
099import org.apache.hadoop.mapreduce.Job;
100import org.apache.hadoop.mapreduce.Mapper;
101import org.apache.hadoop.mapreduce.RecordWriter;
102import org.apache.hadoop.mapreduce.TaskAttemptContext;
103import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
104import org.junit.ClassRule;
105import org.junit.Ignore;
106import org.junit.Test;
107import org.junit.experimental.categories.Category;
108import org.mockito.Mockito;
109import org.slf4j.Logger;
110import org.slf4j.LoggerFactory;
111
112/**
113 * Simple test for {@link HFileOutputFormat2}.
114 * Sets up and runs a mapreduce job that writes hfile output.
115 * Creates a few inner classes to implement splits and an inputformat that
116 * emits keys and values like those of {@link PerformanceEvaluation}.
117 */
118@Category({VerySlowMapReduceTests.class, LargeTests.class})
119//TODO : Remove this in 3.0
120public class TestHFileOutputFormat2  {
121
122  @ClassRule
123  public static final HBaseClassTestRule CLASS_RULE =
124      HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
125
126  private final static int ROWSPERSPLIT = 1024;
127
128  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
129  private static final byte[][] FAMILIES = {
130    Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))};
131  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
132          "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
133
134  private HBaseTestingUtility util = new HBaseTestingUtility();
135
136  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
137
138  /**
139   * Simple mapper that makes KeyValue output.
140   */
141  static class RandomKVGeneratingMapper
142      extends Mapper<NullWritable, NullWritable,
143                 ImmutableBytesWritable, Cell> {
144
145    private int keyLength;
146    private static final int KEYLEN_DEFAULT=10;
147    private static final String KEYLEN_CONF="randomkv.key.length";
148
149    private int valLength;
150    private static final int VALLEN_DEFAULT=10;
151    private static final String VALLEN_CONF="randomkv.val.length";
152    private static final byte [] QUALIFIER = Bytes.toBytes("data");
153    private boolean multiTableMapper = false;
154    private TableName[] tables = null;
155
156
157    @Override
158    protected void setup(Context context) throws IOException,
159        InterruptedException {
160      super.setup(context);
161
162      Configuration conf = context.getConfiguration();
163      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
164      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
165      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
166              false);
167      if (multiTableMapper) {
168        tables = TABLE_NAMES;
169      } else {
170        tables = new TableName[]{TABLE_NAMES[0]};
171      }
172    }
173
174    @Override
175    protected void map(
176        NullWritable n1, NullWritable n2,
177        Mapper<NullWritable, NullWritable,
178               ImmutableBytesWritable,Cell>.Context context)
179        throws java.io.IOException ,InterruptedException
180    {
181
182      byte keyBytes[] = new byte[keyLength];
183      byte valBytes[] = new byte[valLength];
184
185      int taskId = context.getTaskAttemptID().getTaskID().getId();
186      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
187      Random random = new Random();
188      byte[] key;
189      for (int j = 0; j < tables.length; ++j) {
190        for (int i = 0; i < ROWSPERSPLIT; i++) {
191          random.nextBytes(keyBytes);
192          // Ensure that unique tasks generate unique keys
193          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
194          random.nextBytes(valBytes);
195          key = keyBytes;
196          if (multiTableMapper) {
197            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
198          }
199
200          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
201            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
202            context.write(new ImmutableBytesWritable(key), kv);
203          }
204        }
205      }
206    }
207  }
208
209  /**
210   * Simple mapper that makes Put output.
211   */
212  static class RandomPutGeneratingMapper
213      extends Mapper<NullWritable, NullWritable,
214                 ImmutableBytesWritable, Put> {
215
216    private int keyLength;
217    private static final int KEYLEN_DEFAULT = 10;
218    private static final String KEYLEN_CONF = "randomkv.key.length";
219
220    private int valLength;
221    private static final int VALLEN_DEFAULT = 10;
222    private static final String VALLEN_CONF = "randomkv.val.length";
223    private static final byte[] QUALIFIER = Bytes.toBytes("data");
224    private boolean multiTableMapper = false;
225    private TableName[] tables = null;
226
227    @Override
228    protected void setup(Context context) throws IOException,
229            InterruptedException {
230      super.setup(context);
231
232      Configuration conf = context.getConfiguration();
233      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
234      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
235      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
236              false);
237      if (multiTableMapper) {
238        tables = TABLE_NAMES;
239      } else {
240        tables = new TableName[]{TABLE_NAMES[0]};
241      }
242    }
243
244    @Override
245    protected void map(
246            NullWritable n1, NullWritable n2,
247            Mapper<NullWritable, NullWritable,
248                    ImmutableBytesWritable, Put>.Context context)
249            throws java.io.IOException, InterruptedException {
250
251      byte keyBytes[] = new byte[keyLength];
252      byte valBytes[] = new byte[valLength];
253
254      int taskId = context.getTaskAttemptID().getTaskID().getId();
255      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
256
257      Random random = new Random();
258      byte[] key;
259      for (int j = 0; j < tables.length; ++j) {
260        for (int i = 0; i < ROWSPERSPLIT; i++) {
261          random.nextBytes(keyBytes);
262          // Ensure that unique tasks generate unique keys
263          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
264          random.nextBytes(valBytes);
265          key = keyBytes;
266          if (multiTableMapper) {
267            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
268          }
269
270          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
271            Put p = new Put(keyBytes);
272            p.addColumn(family, QUALIFIER, valBytes);
273            // set TTL to very low so that the scan does not return any value
274            p.setTTL(1l);
275            context.write(new ImmutableBytesWritable(key), p);
276          }
277        }
278      }
279    }
280  }
281
282  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
283    if (putSortReducer) {
284      job.setInputFormatClass(NMapInputFormat.class);
285      job.setMapperClass(RandomPutGeneratingMapper.class);
286      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
287      job.setMapOutputValueClass(Put.class);
288    } else {
289      job.setInputFormatClass(NMapInputFormat.class);
290      job.setMapperClass(RandomKVGeneratingMapper.class);
291      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
292      job.setMapOutputValueClass(KeyValue.class);
293    }
294  }
295
296  /**
297   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
298   * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
299   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
300   */
301  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
302  public void test_LATEST_TIMESTAMP_isReplaced()
303  throws Exception {
304    Configuration conf = new Configuration(this.util.getConfiguration());
305    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
306    TaskAttemptContext context = null;
307    Path dir =
308      util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
309    try {
310      Job job = new Job(conf);
311      FileOutputFormat.setOutputPath(job, dir);
312      context = createTestTaskAttemptContext(job);
313      HFileOutputFormat2 hof = new HFileOutputFormat2();
314      writer = hof.getRecordWriter(context);
315      final byte [] b = Bytes.toBytes("b");
316
317      // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
318      // changed by call to write.  Check all in kv is same but ts.
319      KeyValue kv = new KeyValue(b, b, b);
320      KeyValue original = kv.clone();
321      writer.write(new ImmutableBytesWritable(), kv);
322      assertFalse(original.equals(kv));
323      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
324      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
325      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
326      assertNotSame(original.getTimestamp(), kv.getTimestamp());
327      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
328
329      // Test 2. Now test passing a kv that has explicit ts.  It should not be
330      // changed by call to record write.
331      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
332      original = kv.clone();
333      writer.write(new ImmutableBytesWritable(), kv);
334      assertTrue(original.equals(kv));
335    } finally {
336      if (writer != null && context != null) writer.close(context);
337      dir.getFileSystem(conf).delete(dir, true);
338    }
339  }
340
341  private TaskAttemptContext createTestTaskAttemptContext(final Job job)
342  throws Exception {
343    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
344    TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
345      job, "attempt_201402131733_0001_m_000000_0");
346    return context;
347  }
348
349  /*
350   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
351   * metadata used by time-restricted scans.
352   */
353  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
354  public void test_TIMERANGE() throws Exception {
355    Configuration conf = new Configuration(this.util.getConfiguration());
356    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
357    TaskAttemptContext context = null;
358    Path dir =
359      util.getDataTestDir("test_TIMERANGE_present");
360    LOG.info("Timerange dir writing to dir: "+ dir);
361    try {
362      // build a record writer using HFileOutputFormat2
363      Job job = new Job(conf);
364      FileOutputFormat.setOutputPath(job, dir);
365      context = createTestTaskAttemptContext(job);
366      HFileOutputFormat2 hof = new HFileOutputFormat2();
367      writer = hof.getRecordWriter(context);
368
369      // Pass two key values with explicit times stamps
370      final byte [] b = Bytes.toBytes("b");
371
372      // value 1 with timestamp 2000
373      KeyValue kv = new KeyValue(b, b, b, 2000, b);
374      KeyValue original = kv.clone();
375      writer.write(new ImmutableBytesWritable(), kv);
376      assertEquals(original,kv);
377
378      // value 2 with timestamp 1000
379      kv = new KeyValue(b, b, b, 1000, b);
380      original = kv.clone();
381      writer.write(new ImmutableBytesWritable(), kv);
382      assertEquals(original, kv);
383
384      // verify that the file has the proper FileInfo.
385      writer.close(context);
386
387      // the generated file lives 1 directory down from the attempt directory
388      // and is the only file, e.g.
389      // _attempt__0000_r_000000_0/b/1979617994050536795
390      FileSystem fs = FileSystem.get(conf);
391      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
392      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
393      FileStatus[] file = fs.listStatus(sub1[0].getPath());
394
395      // open as HFile Reader and pull out TIMERANGE FileInfo.
396      HFile.Reader rd =
397          HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
398      Map<byte[],byte[]> finfo = rd.loadFileInfo();
399      byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
400      assertNotNull(range);
401
402      // unmarshall and check values.
403      TimeRangeTracker timeRangeTracker =TimeRangeTracker.parseFrom(range);
404      LOG.info(timeRangeTracker.getMin() +
405          "...." + timeRangeTracker.getMax());
406      assertEquals(1000, timeRangeTracker.getMin());
407      assertEquals(2000, timeRangeTracker.getMax());
408      rd.close();
409    } finally {
410      if (writer != null && context != null) writer.close(context);
411      dir.getFileSystem(conf).delete(dir, true);
412    }
413  }
414
415  /**
416   * Run small MR job.
417   */
418  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
419  public void testWritingPEData() throws Exception {
420    Configuration conf = util.getConfiguration();
421    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
422    FileSystem fs = testDir.getFileSystem(conf);
423
424    // Set down this value or we OOME in eclipse.
425    conf.setInt("mapreduce.task.io.sort.mb", 20);
426    // Write a few files.
427    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
428
429    Job job = new Job(conf, "testWritingPEData");
430    setupRandomGeneratorMapper(job, false);
431    // This partitioner doesn't work well for number keys but using it anyways
432    // just to demonstrate how to configure it.
433    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
434    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
435
436    Arrays.fill(startKey, (byte)0);
437    Arrays.fill(endKey, (byte)0xff);
438
439    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
440    // Set start and end rows for partitioner.
441    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
442    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
443    job.setReducerClass(KeyValueSortReducer.class);
444    job.setOutputFormatClass(HFileOutputFormat2.class);
445    job.setNumReduceTasks(4);
446    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
447        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
448        KeyValueSerialization.class.getName());
449
450    FileOutputFormat.setOutputPath(job, testDir);
451    assertTrue(job.waitForCompletion(false));
452    FileStatus [] files = fs.listStatus(testDir);
453    assertTrue(files.length > 0);
454  }
455
456  /**
457   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
458   * hfile.
459   */
460  @Test
461  public void test_WritingTagData()
462      throws Exception {
463    Configuration conf = new Configuration(this.util.getConfiguration());
464    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
465    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
466    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
467    TaskAttemptContext context = null;
468    Path dir =
469        util.getDataTestDir("WritingTagData");
470    try {
471      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
472      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
473      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
474      Job job = new Job(conf);
475      FileOutputFormat.setOutputPath(job, dir);
476      context = createTestTaskAttemptContext(job);
477      HFileOutputFormat2 hof = new HFileOutputFormat2();
478      writer = hof.getRecordWriter(context);
479      final byte [] b = Bytes.toBytes("b");
480
481      List< Tag > tags = new ArrayList<>();
482      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
483      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
484      writer.write(new ImmutableBytesWritable(), kv);
485      writer.close(context);
486      writer = null;
487      FileSystem fs = dir.getFileSystem(conf);
488      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
489      while(iterator.hasNext()) {
490        LocatedFileStatus keyFileStatus = iterator.next();
491        HFile.Reader reader =
492            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
493        HFileScanner scanner = reader.getScanner(false, false, false);
494        scanner.seekTo();
495        Cell cell = scanner.getCell();
496        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
497        assertTrue(tagsFromCell.size() > 0);
498        for (Tag tag : tagsFromCell) {
499          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
500        }
501      }
502    } finally {
503      if (writer != null && context != null) writer.close(context);
504      dir.getFileSystem(conf).delete(dir, true);
505    }
506  }
507
508  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
509  public void testJobConfiguration() throws Exception {
510    Configuration conf = new Configuration(this.util.getConfiguration());
511    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
512        .toString());
513    Job job = new Job(conf);
514    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
515    Table table = Mockito.mock(Table.class);
516    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
517    setupMockStartKeys(regionLocator);
518    setupMockTableName(regionLocator);
519    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
520    assertEquals(job.getNumReduceTasks(), 4);
521  }
522
523  private byte [][] generateRandomStartKeys(int numKeys) {
524    Random random = new Random();
525    byte[][] ret = new byte[numKeys][];
526    // first region start key is always empty
527    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
528    for (int i = 1; i < numKeys; i++) {
529      ret[i] =
530        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
531    }
532    return ret;
533  }
534
535  private byte[][] generateRandomSplitKeys(int numKeys) {
536    Random random = new Random();
537    byte[][] ret = new byte[numKeys][];
538    for (int i = 0; i < numKeys; i++) {
539      ret[i] =
540          PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
541    }
542    return ret;
543  }
544
545  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
546  public void testMRIncrementalLoad() throws Exception {
547    LOG.info("\nStarting test testMRIncrementalLoad\n");
548    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
549  }
550
551  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
552  public void testMRIncrementalLoadWithSplit() throws Exception {
553    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
554    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
555  }
556
557  /**
558   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
559   * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
560   * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
561   * it's not possible to check the region locality by comparing region locations and DN hostnames.
562   * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
563   * we could test region locality features more easily.
564   */
565  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
566  public void testMRIncrementalLoadWithLocality() throws Exception {
567    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
568    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
569    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
570  }
571
572  //@Ignore("Wahtevs")
573  @Test
574  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
575    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
576    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
577  }
578
579  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
580                                     boolean putSortReducer, String tableStr) throws Exception {
581      doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
582              Arrays.asList(tableStr));
583  }
584
585  @Test
586  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
587    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
588    doIncrementalLoadTest(false, false, true,
589            Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
590                    ()));
591  }
592
593  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
594      boolean putSortReducer, List<String> tableStr) throws Exception {
595    util = new HBaseTestingUtility();
596    Configuration conf = util.getConfiguration();
597    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
598    int hostCount = 1;
599    int regionNum = 5;
600    if (shouldKeepLocality) {
601      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
602      // explicit hostnames parameter just like MiniDFSCluster does.
603      hostCount = 3;
604      regionNum = 20;
605    }
606
607    String[] hostnames = new String[hostCount];
608    for (int i = 0; i < hostCount; ++i) {
609      hostnames[i] = "datanode_" + i;
610    }
611    StartMiniClusterOption option = StartMiniClusterOption.builder()
612        .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
613    util.startMiniCluster(option);
614
615    Map<String, Table> allTables = new HashMap<>(tableStr.size());
616    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
617    boolean writeMultipleTables = tableStr.size() > 1;
618    for (String tableStrSingle : tableStr) {
619      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
620      TableName tableName = TableName.valueOf(tableStrSingle);
621      Table table = util.createTable(tableName, FAMILIES, splitKeys);
622
623      RegionLocator r = util.getConnection().getRegionLocator(tableName);
624      assertEquals("Should start with empty table", 0, util.countRows(table));
625      int numRegions = r.getStartKeys().length;
626      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
627
628      allTables.put(tableStrSingle, table);
629      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
630    }
631    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
632    // Generate the bulk load files
633    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
634
635    for (Table tableSingle : allTables.values()) {
636      // This doesn't write into the table, just makes files
637      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
638    }
639    int numTableDirs = 0;
640    for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
641      Path tablePath = testDir;
642
643      if (writeMultipleTables) {
644        if (allTables.containsKey(tf.getPath().getName())) {
645          ++numTableDirs;
646          tablePath = tf.getPath();
647        }
648        else {
649          continue;
650        }
651      }
652
653      // Make sure that a directory was created for every CF
654      int dir = 0;
655      for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
656        for (byte[] family : FAMILIES) {
657          if (Bytes.toString(family).equals(f.getPath().getName())) {
658            ++dir;
659          }
660        }
661      }
662      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
663    }
664    if (writeMultipleTables) {
665      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
666    }
667
668    Admin admin = util.getConnection().getAdmin();
669    try {
670      // handle the split case
671      if (shouldChangeRegions) {
672        Table chosenTable = allTables.values().iterator().next();
673        // Choose a semi-random table if multiple tables are available
674        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
675        admin.disableTable(chosenTable.getName());
676        util.waitUntilNoRegionsInTransition();
677
678        util.deleteTable(chosenTable.getName());
679        byte[][] newSplitKeys = generateRandomSplitKeys(14);
680        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
681
682        while (util.getConnection().getRegionLocator(chosenTable.getName())
683                .getAllRegionLocations().size() != 15 ||
684                !admin.isTableAvailable(table.getName())) {
685          Thread.sleep(200);
686          LOG.info("Waiting for new region assignment to happen");
687        }
688      }
689
690      // Perform the actual load
691      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
692        Path tableDir = testDir;
693        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
694        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
695        if (writeMultipleTables) {
696          tableDir = new Path(testDir, tableNameStr);
697        }
698        Table currentTable = allTables.get(tableNameStr);
699        TableName currentTableName = currentTable.getName();
700        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
701                .getRegionLocator());
702
703        // Ensure data shows up
704        int expectedRows = 0;
705        if (putSortReducer) {
706          // no rows should be extracted
707          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
708                  util.countRows(currentTable));
709        } else {
710          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
711          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
712                  util.countRows(currentTable));
713          Scan scan = new Scan();
714          ResultScanner results = currentTable.getScanner(scan);
715          for (Result res : results) {
716            assertEquals(FAMILIES.length, res.rawCells().length);
717            Cell first = res.rawCells()[0];
718            for (Cell kv : res.rawCells()) {
719              assertTrue(CellUtil.matchingRows(first, kv));
720              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
721            }
722          }
723          results.close();
724        }
725        String tableDigestBefore = util.checksumRows(currentTable);
726        // Check region locality
727        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
728        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
729          hbd.add(region.getHDFSBlocksDistribution());
730        }
731        for (String hostname : hostnames) {
732          float locality = hbd.getBlockLocalityIndex(hostname);
733          LOG.info("locality of [" + hostname + "]: " + locality);
734          assertEquals(100, (int) (locality * 100));
735        }
736
737        // Cause regions to reopen
738        admin.disableTable(currentTableName);
739        while (!admin.isTableDisabled(currentTableName)) {
740          Thread.sleep(200);
741          LOG.info("Waiting for table to disable");
742        }
743        admin.enableTable(currentTableName);
744        util.waitTableAvailable(currentTableName);
745        assertEquals("Data should remain after reopening of regions",
746                tableDigestBefore, util.checksumRows(currentTable));
747      }
748    } finally {
749      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
750          tableInfoSingle.getRegionLocator().close();
751      }
752      for (Entry<String, Table> singleTable : allTables.entrySet() ) {
753        singleTable.getValue().close();
754        util.deleteTable(singleTable.getValue().getName());
755      }
756      testDir.getFileSystem(conf).delete(testDir, true);
757      util.shutdownMiniCluster();
758    }
759  }
760
761  private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
762                                    boolean putSortReducer) throws IOException,
763          InterruptedException, ClassNotFoundException {
764    Job job = new Job(conf, "testLocalMRIncrementalLoad");
765    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
766    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
767        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
768        KeyValueSerialization.class.getName());
769    setupRandomGeneratorMapper(job, putSortReducer);
770    if (tableInfo.size() > 1) {
771      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
772      int sum = 0;
773      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
774        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
775      }
776      assertEquals(sum, job.getNumReduceTasks());
777    }
778    else {
779      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
780      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
781              regionLocator);
782      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
783    }
784
785    FileOutputFormat.setOutputPath(job, outDir);
786
787    assertFalse(util.getTestFileSystem().exists(outDir)) ;
788
789    assertTrue(job.waitForCompletion(true));
790  }
791
792  /**
793   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}.
794   * Tests that the family compression map is correctly serialized into
795   * and deserialized from configuration
796   *
797   * @throws IOException
798   */
799  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
800  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
801    for (int numCfs = 0; numCfs <= 3; numCfs++) {
802      Configuration conf = new Configuration(this.util.getConfiguration());
803      Map<String, Compression.Algorithm> familyToCompression =
804          getMockColumnFamiliesForCompression(numCfs);
805      Table table = Mockito.mock(Table.class);
806      setupMockColumnFamiliesForCompression(table, familyToCompression);
807      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
808              HFileOutputFormat2.serializeColumnFamilyAttribute
809                      (HFileOutputFormat2.compressionDetails,
810                              Arrays.asList(table.getTableDescriptor())));
811
812      // read back family specific compression setting from the configuration
813      Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
814          .createFamilyCompressionMap(conf);
815
816      // test that we have a value for all column families that matches with the
817      // used mock values
818      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
819        assertEquals("Compression configuration incorrect for column family:"
820            + entry.getKey(), entry.getValue(),
821            retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
822      }
823    }
824  }
825
826  private void setupMockColumnFamiliesForCompression(Table table,
827      Map<String, Compression.Algorithm> familyToCompression) throws IOException {
828    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
829    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
830      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
831          .setMaxVersions(1)
832          .setCompressionType(entry.getValue())
833          .setBlockCacheEnabled(false)
834          .setTimeToLive(0));
835    }
836    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
837  }
838
839  /**
840   * @return a map from column family names to compression algorithms for
841   *         testing column family compression. Column family names have special characters
842   */
843  private Map<String, Compression.Algorithm>
844      getMockColumnFamiliesForCompression (int numCfs) {
845    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
846    // use column family names having special characters
847    if (numCfs-- > 0) {
848      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
849    }
850    if (numCfs-- > 0) {
851      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
852    }
853    if (numCfs-- > 0) {
854      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
855    }
856    if (numCfs-- > 0) {
857      familyToCompression.put("Family3", Compression.Algorithm.NONE);
858    }
859    return familyToCompression;
860  }
861
862
863  /**
864   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}.
865   * Tests that the family bloom type map is correctly serialized into
866   * and deserialized from configuration
867   *
868   * @throws IOException
869   */
870  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
871  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
872    for (int numCfs = 0; numCfs <= 2; numCfs++) {
873      Configuration conf = new Configuration(this.util.getConfiguration());
874      Map<String, BloomType> familyToBloomType =
875          getMockColumnFamiliesForBloomType(numCfs);
876      Table table = Mockito.mock(Table.class);
877      setupMockColumnFamiliesForBloomType(table,
878          familyToBloomType);
879      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
880              HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
881              Arrays.asList(table.getTableDescriptor())));
882
883      // read back family specific data block encoding settings from the
884      // configuration
885      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
886          HFileOutputFormat2
887              .createFamilyBloomTypeMap(conf);
888
889      // test that we have a value for all column families that matches with the
890      // used mock values
891      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
892        assertEquals("BloomType configuration incorrect for column family:"
893            + entry.getKey(), entry.getValue(),
894            retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
895      }
896    }
897  }
898
899  private void setupMockColumnFamiliesForBloomType(Table table,
900      Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
901    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
902    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
903      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
904          .setMaxVersions(1)
905          .setBloomFilterType(entry.getValue())
906          .setBlockCacheEnabled(false)
907          .setTimeToLive(0));
908    }
909    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
910  }
911
912  /**
913   * @return a map from column family names to compression algorithms for
914   *         testing column family compression. Column family names have special characters
915   */
916  private Map<String, BloomType>
917  getMockColumnFamiliesForBloomType (int numCfs) {
918    Map<String, BloomType> familyToBloomType = new HashMap<>();
919    // use column family names having special characters
920    if (numCfs-- > 0) {
921      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
922    }
923    if (numCfs-- > 0) {
924      familyToBloomType.put("Family2=asdads&!AASD",
925          BloomType.ROWCOL);
926    }
927    if (numCfs-- > 0) {
928      familyToBloomType.put("Family3", BloomType.NONE);
929    }
930    return familyToBloomType;
931  }
932
933  /**
934   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}.
935   * Tests that the family block size map is correctly serialized into
936   * and deserialized from configuration
937   *
938   * @throws IOException
939   */
940  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
941  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
942    for (int numCfs = 0; numCfs <= 3; numCfs++) {
943      Configuration conf = new Configuration(this.util.getConfiguration());
944      Map<String, Integer> familyToBlockSize =
945          getMockColumnFamiliesForBlockSize(numCfs);
946      Table table = Mockito.mock(Table.class);
947      setupMockColumnFamiliesForBlockSize(table,
948          familyToBlockSize);
949      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
950              HFileOutputFormat2.serializeColumnFamilyAttribute
951                      (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
952                              .getTableDescriptor())));
953
954      // read back family specific data block encoding settings from the
955      // configuration
956      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
957          HFileOutputFormat2
958              .createFamilyBlockSizeMap(conf);
959
960      // test that we have a value for all column families that matches with the
961      // used mock values
962      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
963          ) {
964        assertEquals("BlockSize configuration incorrect for column family:"
965            + entry.getKey(), entry.getValue(),
966            retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
967      }
968    }
969  }
970
971  private void setupMockColumnFamiliesForBlockSize(Table table,
972      Map<String, Integer> familyToDataBlockEncoding) throws IOException {
973    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
974    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
975      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
976          .setMaxVersions(1)
977          .setBlocksize(entry.getValue())
978          .setBlockCacheEnabled(false)
979          .setTimeToLive(0));
980    }
981    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
982  }
983
984  /**
985   * @return a map from column family names to compression algorithms for
986   *         testing column family compression. Column family names have special characters
987   */
988  private Map<String, Integer>
989  getMockColumnFamiliesForBlockSize (int numCfs) {
990    Map<String, Integer> familyToBlockSize = new HashMap<>();
991    // use column family names having special characters
992    if (numCfs-- > 0) {
993      familyToBlockSize.put("Family1!@#!@#&", 1234);
994    }
995    if (numCfs-- > 0) {
996      familyToBlockSize.put("Family2=asdads&!AASD",
997          Integer.MAX_VALUE);
998    }
999    if (numCfs-- > 0) {
1000      familyToBlockSize.put("Family2=asdads&!AASD",
1001          Integer.MAX_VALUE);
1002    }
1003    if (numCfs-- > 0) {
1004      familyToBlockSize.put("Family3", 0);
1005    }
1006    return familyToBlockSize;
1007  }
1008
1009  /**
1010   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
1011   * Tests that the family data block encoding map is correctly serialized into
1012   * and deserialized from configuration
1013   *
1014   * @throws IOException
1015   */
1016  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1017  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1018    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1019      Configuration conf = new Configuration(this.util.getConfiguration());
1020      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1021          getMockColumnFamiliesForDataBlockEncoding(numCfs);
1022      Table table = Mockito.mock(Table.class);
1023      setupMockColumnFamiliesForDataBlockEncoding(table,
1024          familyToDataBlockEncoding);
1025      HTableDescriptor tableDescriptor = table.getTableDescriptor();
1026      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1027              HFileOutputFormat2.serializeColumnFamilyAttribute
1028                      (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
1029                      .asList(tableDescriptor)));
1030
1031      // read back family specific data block encoding settings from the
1032      // configuration
1033      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1034          HFileOutputFormat2
1035          .createFamilyDataBlockEncodingMap(conf);
1036
1037      // test that we have a value for all column families that matches with the
1038      // used mock values
1039      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1040        assertEquals("DataBlockEncoding configuration incorrect for column family:"
1041            + entry.getKey(), entry.getValue(),
1042            retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
1043      }
1044    }
1045  }
1046
1047  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1048      Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1049    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1050    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1051      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
1052          .setMaxVersions(1)
1053          .setDataBlockEncoding(entry.getValue())
1054          .setBlockCacheEnabled(false)
1055          .setTimeToLive(0));
1056    }
1057    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1058  }
1059
1060  /**
1061   * @return a map from column family names to compression algorithms for
1062   *         testing column family compression. Column family names have special characters
1063   */
1064  private Map<String, DataBlockEncoding>
1065      getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
1066    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1067    // use column family names having special characters
1068    if (numCfs-- > 0) {
1069      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1070    }
1071    if (numCfs-- > 0) {
1072      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1073          DataBlockEncoding.FAST_DIFF);
1074    }
1075    if (numCfs-- > 0) {
1076      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1077          DataBlockEncoding.PREFIX);
1078    }
1079    if (numCfs-- > 0) {
1080      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1081    }
1082    return familyToDataBlockEncoding;
1083  }
1084
1085  private void setupMockStartKeys(RegionLocator table) throws IOException {
1086    byte[][] mockKeys = new byte[][] {
1087        HConstants.EMPTY_BYTE_ARRAY,
1088        Bytes.toBytes("aaa"),
1089        Bytes.toBytes("ggg"),
1090        Bytes.toBytes("zzz")
1091    };
1092    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1093  }
1094
1095  private void setupMockTableName(RegionLocator table) throws IOException {
1096    TableName mockTableName = TableName.valueOf("mock_table");
1097    Mockito.doReturn(mockTableName).when(table).getName();
1098  }
1099
1100  /**
1101   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
1102   * bloom filter settings from the column family descriptor
1103   */
1104  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1105  public void testColumnFamilySettings() throws Exception {
1106    Configuration conf = new Configuration(this.util.getConfiguration());
1107    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1108    TaskAttemptContext context = null;
1109    Path dir = util.getDataTestDir("testColumnFamilySettings");
1110
1111    // Setup table descriptor
1112    Table table = Mockito.mock(Table.class);
1113    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1114    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1115    Mockito.doReturn(htd).when(table).getTableDescriptor();
1116    for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
1117      htd.addFamily(hcd);
1118    }
1119
1120    // set up the table to return some mock keys
1121    setupMockStartKeys(regionLocator);
1122
1123    try {
1124      // partial map red setup to get an operational writer for testing
1125      // We turn off the sequence file compression, because DefaultCodec
1126      // pollutes the GZip codec pool with an incompatible compressor.
1127      conf.set("io.seqfile.compression.type", "NONE");
1128      conf.set("hbase.fs.tmp.dir", dir.toString());
1129      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1130      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1131
1132      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1133      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1134      setupRandomGeneratorMapper(job, false);
1135      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1136      FileOutputFormat.setOutputPath(job, dir);
1137      context = createTestTaskAttemptContext(job);
1138      HFileOutputFormat2 hof = new HFileOutputFormat2();
1139      writer = hof.getRecordWriter(context);
1140
1141      // write out random rows
1142      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1143      writer.close(context);
1144
1145      // Make sure that a directory was created for every CF
1146      FileSystem fs = dir.getFileSystem(conf);
1147
1148      // commit so that the filesystem has one directory per column family
1149      hof.getOutputCommitter(context).commitTask(context);
1150      hof.getOutputCommitter(context).commitJob(context);
1151      FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1152      assertEquals(htd.getFamilies().size(), families.length);
1153      for (FileStatus f : families) {
1154        String familyStr = f.getPath().getName();
1155        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1156        // verify that the compression on this file matches the configured
1157        // compression
1158        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1159        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1160        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1161
1162        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1163        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1164        assertEquals("Incorrect bloom filter used for column family " + familyStr +
1165          "(reader: " + reader + ")",
1166          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1167        assertEquals("Incorrect compression used for column family " + familyStr +
1168          "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression());
1169      }
1170    } finally {
1171      dir.getFileSystem(conf).delete(dir, true);
1172    }
1173  }
1174
1175  /**
1176   * Write random values to the writer assuming a table created using
1177   * {@link #FAMILIES} as column family descriptors
1178   */
1179  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1180      TaskAttemptContext context, Set<byte[]> families, int numRows)
1181      throws IOException, InterruptedException {
1182    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1183    int valLength = 10;
1184    byte valBytes[] = new byte[valLength];
1185
1186    int taskId = context.getTaskAttemptID().getTaskID().getId();
1187    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1188    final byte [] qualifier = Bytes.toBytes("data");
1189    Random random = new Random();
1190    for (int i = 0; i < numRows; i++) {
1191
1192      Bytes.putInt(keyBytes, 0, i);
1193      random.nextBytes(valBytes);
1194      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1195
1196      for (byte[] family : families) {
1197        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1198        writer.write(key, kv);
1199      }
1200    }
1201  }
1202
1203  /**
1204   * This test is to test the scenario happened in HBASE-6901.
1205   * All files are bulk loaded and excluded from minor compaction.
1206   * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
1207   * will be thrown.
1208   */
1209  @Ignore ("Flakey: See HBASE-9051") @Test
1210  public void testExcludeAllFromMinorCompaction() throws Exception {
1211    Configuration conf = util.getConfiguration();
1212    conf.setInt("hbase.hstore.compaction.min", 2);
1213    generateRandomStartKeys(5);
1214
1215    util.startMiniCluster();
1216    try (Connection conn = ConnectionFactory.createConnection();
1217        Admin admin = conn.getAdmin();
1218        Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1219        RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1220      final FileSystem fs = util.getDFSCluster().getFileSystem();
1221      assertEquals("Should start with empty table", 0, util.countRows(table));
1222
1223      // deep inspection: get the StoreFile dir
1224      final Path storePath = new Path(
1225        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1226          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1227            Bytes.toString(FAMILIES[0])));
1228      assertEquals(0, fs.listStatus(storePath).length);
1229
1230      // Generate two bulk load files
1231      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1232          true);
1233
1234      for (int i = 0; i < 2; i++) {
1235        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1236        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1237                .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
1238        // Perform the actual load
1239        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1240      }
1241
1242      // Ensure data shows up
1243      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1244      assertEquals("LoadIncrementalHFiles should put expected data in table",
1245          expectedRows, util.countRows(table));
1246
1247      // should have a second StoreFile now
1248      assertEquals(2, fs.listStatus(storePath).length);
1249
1250      // minor compactions shouldn't get rid of the file
1251      admin.compact(TABLE_NAMES[0]);
1252      try {
1253        quickPoll(new Callable<Boolean>() {
1254          @Override
1255          public Boolean call() throws Exception {
1256            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1257            for (HRegion region : regions) {
1258              for (HStore store : region.getStores()) {
1259                store.closeAndArchiveCompactedFiles();
1260              }
1261            }
1262            return fs.listStatus(storePath).length == 1;
1263          }
1264        }, 5000);
1265        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1266      } catch (AssertionError ae) {
1267        // this is expected behavior
1268      }
1269
1270      // a major compaction should work though
1271      admin.majorCompact(TABLE_NAMES[0]);
1272      quickPoll(new Callable<Boolean>() {
1273        @Override
1274        public Boolean call() throws Exception {
1275          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1276          for (HRegion region : regions) {
1277            for (HStore store : region.getStores()) {
1278              store.closeAndArchiveCompactedFiles();
1279            }
1280          }
1281          return fs.listStatus(storePath).length == 1;
1282        }
1283      }, 5000);
1284
1285    } finally {
1286      util.shutdownMiniCluster();
1287    }
1288  }
1289
1290  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1291  public void testExcludeMinorCompaction() throws Exception {
1292    Configuration conf = util.getConfiguration();
1293    conf.setInt("hbase.hstore.compaction.min", 2);
1294    generateRandomStartKeys(5);
1295
1296    util.startMiniCluster();
1297    try (Connection conn = ConnectionFactory.createConnection(conf);
1298        Admin admin = conn.getAdmin()){
1299      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1300      final FileSystem fs = util.getDFSCluster().getFileSystem();
1301      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1302      assertEquals("Should start with empty table", 0, util.countRows(table));
1303
1304      // deep inspection: get the StoreFile dir
1305      final Path storePath = new Path(
1306        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1307          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1308            Bytes.toString(FAMILIES[0])));
1309      assertEquals(0, fs.listStatus(storePath).length);
1310
1311      // put some data in it and flush to create a storefile
1312      Put p = new Put(Bytes.toBytes("test"));
1313      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1314      table.put(p);
1315      admin.flush(TABLE_NAMES[0]);
1316      assertEquals(1, util.countRows(table));
1317      quickPoll(new Callable<Boolean>() {
1318        @Override
1319        public Boolean call() throws Exception {
1320          return fs.listStatus(storePath).length == 1;
1321        }
1322      }, 5000);
1323
1324      // Generate a bulk load file with more rows
1325      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1326          true);
1327
1328      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1329      runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1330                      .getTableDescriptor(), regionLocator)), testDir, false);
1331
1332      // Perform the actual load
1333      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1334
1335      // Ensure data shows up
1336      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1337      assertEquals("LoadIncrementalHFiles should put expected data in table",
1338          expectedRows + 1, util.countRows(table));
1339
1340      // should have a second StoreFile now
1341      assertEquals(2, fs.listStatus(storePath).length);
1342
1343      // minor compactions shouldn't get rid of the file
1344      admin.compact(TABLE_NAMES[0]);
1345      try {
1346        quickPoll(new Callable<Boolean>() {
1347          @Override
1348          public Boolean call() throws Exception {
1349            return fs.listStatus(storePath).length == 1;
1350          }
1351        }, 5000);
1352        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1353      } catch (AssertionError ae) {
1354        // this is expected behavior
1355      }
1356
1357      // a major compaction should work though
1358      admin.majorCompact(TABLE_NAMES[0]);
1359      quickPoll(new Callable<Boolean>() {
1360        @Override
1361        public Boolean call() throws Exception {
1362          return fs.listStatus(storePath).length == 1;
1363        }
1364      }, 5000);
1365
1366    } finally {
1367      util.shutdownMiniCluster();
1368    }
1369  }
1370
1371  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1372    int sleepMs = 10;
1373    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1374    while (retries-- > 0) {
1375      if (c.call().booleanValue()) {
1376        return;
1377      }
1378      Thread.sleep(sleepMs);
1379    }
1380    fail();
1381  }
1382
1383  public static void main(String args[]) throws Exception {
1384    new TestHFileOutputFormat2().manualTest(args);
1385  }
1386
1387  public void manualTest(String args[]) throws Exception {
1388    Configuration conf = HBaseConfiguration.create();
1389    util = new HBaseTestingUtility(conf);
1390    if ("newtable".equals(args[0])) {
1391      TableName tname = TableName.valueOf(args[1]);
1392      byte[][] splitKeys = generateRandomSplitKeys(4);
1393      Table table = util.createTable(tname, FAMILIES, splitKeys);
1394    } else if ("incremental".equals(args[0])) {
1395      TableName tname = TableName.valueOf(args[1]);
1396      try(Connection c = ConnectionFactory.createConnection(conf);
1397          Admin admin = c.getAdmin();
1398          RegionLocator regionLocator = c.getRegionLocator(tname)) {
1399        Path outDir = new Path("incremental-out");
1400        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
1401                .getTableDescriptor(tname), regionLocator)), outDir, false);
1402      }
1403    } else {
1404      throw new RuntimeException(
1405          "usage: TestHFileOutputFormat2 newtable | incremental");
1406    }
1407  }
1408
1409  @Test
1410  public void testBlockStoragePolicy() throws Exception {
1411    util = new HBaseTestingUtility();
1412    Configuration conf = util.getConfiguration();
1413    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1414
1415    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
1416            Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
1417                    TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
1418    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1419    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1420    util.startMiniDFSCluster(3);
1421    FileSystem fs = util.getDFSCluster().getFileSystem();
1422    try {
1423      fs.mkdirs(cf1Dir);
1424      fs.mkdirs(cf2Dir);
1425
1426      // the original block storage policy would be HOT
1427      String spA = getStoragePolicyName(fs, cf1Dir);
1428      String spB = getStoragePolicyName(fs, cf2Dir);
1429      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1430      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1431      assertEquals("HOT", spA);
1432      assertEquals("HOT", spB);
1433
1434      // alter table cf schema to change storage policies
1435      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1436              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1437      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1438              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1439      spA = getStoragePolicyName(fs, cf1Dir);
1440      spB = getStoragePolicyName(fs, cf2Dir);
1441      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1442      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1443      assertNotNull(spA);
1444      assertEquals("ONE_SSD", spA);
1445      assertNotNull(spB);
1446      assertEquals("ALL_SSD", spB);
1447    } finally {
1448      fs.delete(cf1Dir, true);
1449      fs.delete(cf2Dir, true);
1450      util.shutdownMiniDFSCluster();
1451    }
1452  }
1453
1454  private String getStoragePolicyName(FileSystem fs, Path path) {
1455    try {
1456      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1457      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1458    } catch (Exception e) {
1459      // Maybe fail because of using old HDFS version, try the old way
1460      if (LOG.isTraceEnabled()) {
1461        LOG.trace("Failed to get policy directly", e);
1462      }
1463      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1464      return policy == null ? "HOT" : policy;// HOT by default
1465    }
1466  }
1467
1468  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1469    try {
1470      if (fs instanceof DistributedFileSystem) {
1471        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1472        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1473        if (null != status) {
1474          byte storagePolicyId = status.getStoragePolicy();
1475          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1476          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1477            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1478            for (BlockStoragePolicy policy : policies) {
1479              if (policy.getId() == storagePolicyId) {
1480                return policy.getName();
1481              }
1482            }
1483          }
1484        }
1485      }
1486    } catch (Throwable e) {
1487      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1488    }
1489
1490    return null;
1491  }
1492
1493  @Test
1494  public void TestConfigureCompression() throws Exception {
1495    Configuration conf = new Configuration(this.util.getConfiguration());
1496    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1497    TaskAttemptContext context = null;
1498    Path dir = util.getDataTestDir("TestConfigureCompression");
1499    String hfileoutputformatCompression = "gz";
1500
1501    try {
1502      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1503      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1504
1505      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1506
1507      Job job = Job.getInstance(conf);
1508      FileOutputFormat.setOutputPath(job, dir);
1509      context = createTestTaskAttemptContext(job);
1510      HFileOutputFormat2 hof = new HFileOutputFormat2();
1511      writer = hof.getRecordWriter(context);
1512      final byte[] b = Bytes.toBytes("b");
1513
1514      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1515      writer.write(new ImmutableBytesWritable(), kv);
1516      writer.close(context);
1517      writer = null;
1518      FileSystem fs = dir.getFileSystem(conf);
1519      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1520      while (iterator.hasNext()) {
1521        LocatedFileStatus keyFileStatus = iterator.next();
1522        HFile.Reader reader =
1523            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1524        assertEquals(reader.getCompressionAlgorithm().getName(), hfileoutputformatCompression);
1525      }
1526    } finally {
1527      if (writer != null && context != null) {
1528        writer.close(context);
1529      }
1530      dir.getFileSystem(conf).delete(dir, true);
1531    }
1532
1533  }
1534}
1535