001/**
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.mapreduce;
020
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNotSame;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Random;
038import java.util.Set;
039import java.util.concurrent.Callable;
040import java.util.stream.Collectors;
041import java.util.stream.Stream;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.LocatedFileStatus;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.fs.RemoteIterator;
048import org.apache.hadoop.hbase.ArrayBackedTag;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellUtil;
051import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
052import org.apache.hadoop.hbase.HBaseClassTestRule;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtility;
055import org.apache.hadoop.hbase.HColumnDescriptor;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HDFSBlocksDistribution;
058import org.apache.hadoop.hbase.HTableDescriptor;
059import org.apache.hadoop.hbase.HadoopShims;
060import org.apache.hadoop.hbase.KeyValue;
061import org.apache.hadoop.hbase.PerformanceEvaluation;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.Tag;
064import org.apache.hadoop.hbase.TagType;
065import org.apache.hadoop.hbase.TagUtil;
066import org.apache.hadoop.hbase.client.Admin;
067import org.apache.hadoop.hbase.client.Connection;
068import org.apache.hadoop.hbase.client.ConnectionFactory;
069import org.apache.hadoop.hbase.client.Put;
070import org.apache.hadoop.hbase.client.RegionLocator;
071import org.apache.hadoop.hbase.client.Result;
072import org.apache.hadoop.hbase.client.ResultScanner;
073import org.apache.hadoop.hbase.client.Scan;
074import org.apache.hadoop.hbase.client.Table;
075import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
076import org.apache.hadoop.hbase.io.compress.Compression;
077import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
078import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
079import org.apache.hadoop.hbase.io.hfile.CacheConfig;
080import org.apache.hadoop.hbase.io.hfile.HFile;
081import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
082import org.apache.hadoop.hbase.io.hfile.HFileScanner;
083import org.apache.hadoop.hbase.regionserver.BloomType;
084import org.apache.hadoop.hbase.regionserver.HRegion;
085import org.apache.hadoop.hbase.regionserver.HStore;
086import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
087import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
088import org.apache.hadoop.hbase.testclassification.LargeTests;
089import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
090import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
091import org.apache.hadoop.hbase.util.Bytes;
092import org.apache.hadoop.hbase.util.CommonFSUtils;
093import org.apache.hadoop.hbase.util.FSUtils;
094import org.apache.hadoop.hbase.util.ReflectionUtils;
095import org.apache.hadoop.hdfs.DistributedFileSystem;
096import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
097import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
098import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
099import org.apache.hadoop.io.NullWritable;
100import org.apache.hadoop.mapreduce.Job;
101import org.apache.hadoop.mapreduce.Mapper;
102import org.apache.hadoop.mapreduce.RecordWriter;
103import org.apache.hadoop.mapreduce.TaskAttemptContext;
104import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
105import org.junit.ClassRule;
106import org.junit.Ignore;
107import org.junit.Test;
108import org.junit.experimental.categories.Category;
109import org.mockito.Mockito;
110import org.slf4j.Logger;
111import org.slf4j.LoggerFactory;
112
113/**
114 * Simple test for {@link HFileOutputFormat2}.
115 * Sets up and runs a mapreduce job that writes hfile output.
116 * Creates a few inner classes to implement splits and an inputformat that
117 * emits keys and values like those of {@link PerformanceEvaluation}.
118 */
119@Category({VerySlowMapReduceTests.class, LargeTests.class})
120public class TestCellBasedHFileOutputFormat2  {
121
122  @ClassRule
123  public static final HBaseClassTestRule CLASS_RULE =
124      HBaseClassTestRule.forClass(TestCellBasedHFileOutputFormat2.class);
125
126  private final static int ROWSPERSPLIT = 1024;
127
128  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
129  private static final byte[][] FAMILIES = {
130    Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))};
131  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
132          "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
133
134  private HBaseTestingUtility util = new HBaseTestingUtility();
135
136  private static final Logger LOG = LoggerFactory.getLogger(TestCellBasedHFileOutputFormat2.class);
137
138  /**
139   * Simple mapper that makes KeyValue output.
140   */
141  static class RandomKVGeneratingMapper
142      extends Mapper<NullWritable, NullWritable,
143                 ImmutableBytesWritable, Cell> {
144
145    private int keyLength;
146    private static final int KEYLEN_DEFAULT=10;
147    private static final String KEYLEN_CONF="randomkv.key.length";
148
149    private int valLength;
150    private static final int VALLEN_DEFAULT=10;
151    private static final String VALLEN_CONF="randomkv.val.length";
152    private static final byte [] QUALIFIER = Bytes.toBytes("data");
153    private boolean multiTableMapper = false;
154    private TableName[] tables = null;
155
156
157    @Override
158    protected void setup(Context context) throws IOException,
159        InterruptedException {
160      super.setup(context);
161
162      Configuration conf = context.getConfiguration();
163      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
164      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
165      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
166              false);
167      if (multiTableMapper) {
168        tables = TABLE_NAMES;
169      } else {
170        tables = new TableName[]{TABLE_NAMES[0]};
171      }
172    }
173
174    @Override
175    protected void map(
176        NullWritable n1, NullWritable n2,
177        Mapper<NullWritable, NullWritable,
178               ImmutableBytesWritable,Cell>.Context context)
179        throws java.io.IOException ,InterruptedException
180    {
181
182      byte keyBytes[] = new byte[keyLength];
183      byte valBytes[] = new byte[valLength];
184
185      int taskId = context.getTaskAttemptID().getTaskID().getId();
186      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
187      Random random = new Random();
188      byte[] key;
189      for (int j = 0; j < tables.length; ++j) {
190        for (int i = 0; i < ROWSPERSPLIT; i++) {
191          random.nextBytes(keyBytes);
192          // Ensure that unique tasks generate unique keys
193          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
194          random.nextBytes(valBytes);
195          key = keyBytes;
196          if (multiTableMapper) {
197            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
198          }
199
200          for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) {
201            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
202            context.write(new ImmutableBytesWritable(key), kv);
203          }
204        }
205      }
206    }
207  }
208
209  /**
210   * Simple mapper that makes Put output.
211   */
212  static class RandomPutGeneratingMapper
213      extends Mapper<NullWritable, NullWritable,
214                 ImmutableBytesWritable, Put> {
215
216    private int keyLength;
217    private static final int KEYLEN_DEFAULT = 10;
218    private static final String KEYLEN_CONF = "randomkv.key.length";
219
220    private int valLength;
221    private static final int VALLEN_DEFAULT = 10;
222    private static final String VALLEN_CONF = "randomkv.val.length";
223    private static final byte[] QUALIFIER = Bytes.toBytes("data");
224    private boolean multiTableMapper = false;
225    private TableName[] tables = null;
226
227    @Override
228    protected void setup(Context context) throws IOException,
229            InterruptedException {
230      super.setup(context);
231
232      Configuration conf = context.getConfiguration();
233      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
234      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
235      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
236              false);
237      if (multiTableMapper) {
238        tables = TABLE_NAMES;
239      } else {
240        tables = new TableName[]{TABLE_NAMES[0]};
241      }
242    }
243
244    @Override
245    protected void map(
246            NullWritable n1, NullWritable n2,
247            Mapper<NullWritable, NullWritable,
248                    ImmutableBytesWritable, Put>.Context context)
249            throws java.io.IOException, InterruptedException {
250
251      byte keyBytes[] = new byte[keyLength];
252      byte valBytes[] = new byte[valLength];
253
254      int taskId = context.getTaskAttemptID().getTaskID().getId();
255      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
256
257      Random random = new Random();
258      byte[] key;
259      for (int j = 0; j < tables.length; ++j) {
260        for (int i = 0; i < ROWSPERSPLIT; i++) {
261          random.nextBytes(keyBytes);
262          // Ensure that unique tasks generate unique keys
263          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
264          random.nextBytes(valBytes);
265          key = keyBytes;
266          if (multiTableMapper) {
267            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
268          }
269
270          for (byte[] family : TestCellBasedHFileOutputFormat2.FAMILIES) {
271            Put p = new Put(keyBytes);
272            p.addColumn(family, QUALIFIER, valBytes);
273            // set TTL to very low so that the scan does not return any value
274            p.setTTL(1l);
275            context.write(new ImmutableBytesWritable(key), p);
276          }
277        }
278      }
279    }
280  }
281
282  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
283    if (putSortReducer) {
284      job.setInputFormatClass(NMapInputFormat.class);
285      job.setMapperClass(RandomPutGeneratingMapper.class);
286      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
287      job.setMapOutputValueClass(Put.class);
288    } else {
289      job.setInputFormatClass(NMapInputFormat.class);
290      job.setMapperClass(RandomKVGeneratingMapper.class);
291      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
292      job.setMapOutputValueClass(KeyValue.class);
293    }
294  }
295
296  /**
297   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
298   * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
299   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
300   */
301  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
302  public void test_LATEST_TIMESTAMP_isReplaced()
303  throws Exception {
304    Configuration conf = new Configuration(this.util.getConfiguration());
305    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
306    TaskAttemptContext context = null;
307    Path dir =
308      util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
309    try {
310      Job job = new Job(conf);
311      FileOutputFormat.setOutputPath(job, dir);
312      context = createTestTaskAttemptContext(job);
313      HFileOutputFormat2 hof = new HFileOutputFormat2();
314      writer = hof.getRecordWriter(context);
315      final byte [] b = Bytes.toBytes("b");
316
317      // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
318      // changed by call to write.  Check all in kv is same but ts.
319      KeyValue kv = new KeyValue(b, b, b);
320      KeyValue original = kv.clone();
321      writer.write(new ImmutableBytesWritable(), kv);
322      assertFalse(original.equals(kv));
323      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
324      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
325      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
326      assertNotSame(original.getTimestamp(), kv.getTimestamp());
327      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
328
329      // Test 2. Now test passing a kv that has explicit ts.  It should not be
330      // changed by call to record write.
331      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
332      original = kv.clone();
333      writer.write(new ImmutableBytesWritable(), kv);
334      assertTrue(original.equals(kv));
335    } finally {
336      if (writer != null && context != null) writer.close(context);
337      dir.getFileSystem(conf).delete(dir, true);
338    }
339  }
340
341  private TaskAttemptContext createTestTaskAttemptContext(final Job job)
342  throws Exception {
343    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
344    TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
345      job, "attempt_201402131733_0001_m_000000_0");
346    return context;
347  }
348
349  /*
350   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
351   * metadata used by time-restricted scans.
352   */
353  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
354  public void test_TIMERANGE() throws Exception {
355    Configuration conf = new Configuration(this.util.getConfiguration());
356    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
357    TaskAttemptContext context = null;
358    Path dir =
359      util.getDataTestDir("test_TIMERANGE_present");
360    LOG.info("Timerange dir writing to dir: "+ dir);
361    try {
362      // build a record writer using HFileOutputFormat2
363      Job job = new Job(conf);
364      FileOutputFormat.setOutputPath(job, dir);
365      context = createTestTaskAttemptContext(job);
366      HFileOutputFormat2 hof = new HFileOutputFormat2();
367      writer = hof.getRecordWriter(context);
368
369      // Pass two key values with explicit times stamps
370      final byte [] b = Bytes.toBytes("b");
371
372      // value 1 with timestamp 2000
373      KeyValue kv = new KeyValue(b, b, b, 2000, b);
374      KeyValue original = kv.clone();
375      writer.write(new ImmutableBytesWritable(), kv);
376      assertEquals(original,kv);
377
378      // value 2 with timestamp 1000
379      kv = new KeyValue(b, b, b, 1000, b);
380      original = kv.clone();
381      writer.write(new ImmutableBytesWritable(), kv);
382      assertEquals(original, kv);
383
384      // verify that the file has the proper FileInfo.
385      writer.close(context);
386
387      // the generated file lives 1 directory down from the attempt directory
388      // and is the only file, e.g.
389      // _attempt__0000_r_000000_0/b/1979617994050536795
390      FileSystem fs = FileSystem.get(conf);
391      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
392      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
393      FileStatus[] file = fs.listStatus(sub1[0].getPath());
394
395      // open as HFile Reader and pull out TIMERANGE FileInfo.
396      HFile.Reader rd =
397          HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
398      Map<byte[],byte[]> finfo = rd.getHFileInfo();
399      byte[] range = finfo.get("TIMERANGE".getBytes("UTF-8"));
400      assertNotNull(range);
401
402      // unmarshall and check values.
403      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
404      LOG.info(timeRangeTracker.getMin() +
405          "...." + timeRangeTracker.getMax());
406      assertEquals(1000, timeRangeTracker.getMin());
407      assertEquals(2000, timeRangeTracker.getMax());
408      rd.close();
409    } finally {
410      if (writer != null && context != null) writer.close(context);
411      dir.getFileSystem(conf).delete(dir, true);
412    }
413  }
414
415  /**
416   * Run small MR job.
417   */
418  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
419  public void testWritingPEData() throws Exception {
420    Configuration conf = util.getConfiguration();
421    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
422    FileSystem fs = testDir.getFileSystem(conf);
423
424    // Set down this value or we OOME in eclipse.
425    conf.setInt("mapreduce.task.io.sort.mb", 20);
426    // Write a few files.
427    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
428
429    Job job = new Job(conf, "testWritingPEData");
430    setupRandomGeneratorMapper(job, false);
431    // This partitioner doesn't work well for number keys but using it anyways
432    // just to demonstrate how to configure it.
433    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
434    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
435
436    Arrays.fill(startKey, (byte)0);
437    Arrays.fill(endKey, (byte)0xff);
438
439    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
440    // Set start and end rows for partitioner.
441    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
442    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
443    job.setReducerClass(CellSortReducer.class);
444    job.setOutputFormatClass(HFileOutputFormat2.class);
445    job.setNumReduceTasks(4);
446    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
447        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
448        CellSerialization.class.getName());
449
450    FileOutputFormat.setOutputPath(job, testDir);
451    assertTrue(job.waitForCompletion(false));
452    FileStatus [] files = fs.listStatus(testDir);
453    assertTrue(files.length > 0);
454  }
455
456  /**
457   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
458   * hfile.
459   */
460  @Test
461  public void test_WritingTagData()
462      throws Exception {
463    Configuration conf = new Configuration(this.util.getConfiguration());
464    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
465    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
466    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
467    TaskAttemptContext context = null;
468    Path dir =
469        util.getDataTestDir("WritingTagData");
470    try {
471      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
472      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
473      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
474      Job job = new Job(conf);
475      FileOutputFormat.setOutputPath(job, dir);
476      context = createTestTaskAttemptContext(job);
477      HFileOutputFormat2 hof = new HFileOutputFormat2();
478      writer = hof.getRecordWriter(context);
479      final byte [] b = Bytes.toBytes("b");
480
481      List< Tag > tags = new ArrayList<>();
482      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
483      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
484      writer.write(new ImmutableBytesWritable(), kv);
485      writer.close(context);
486      writer = null;
487      FileSystem fs = dir.getFileSystem(conf);
488      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
489      while(iterator.hasNext()) {
490        LocatedFileStatus keyFileStatus = iterator.next();
491        HFile.Reader reader =
492            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
493        HFileScanner scanner = reader.getScanner(false, false, false);
494        scanner.seekTo();
495        Cell cell = scanner.getCell();
496        List<Tag> tagsFromCell = TagUtil.asList(cell.getTagsArray(), cell.getTagsOffset(),
497            cell.getTagsLength());
498        assertTrue(tagsFromCell.size() > 0);
499        for (Tag tag : tagsFromCell) {
500          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
501        }
502      }
503    } finally {
504      if (writer != null && context != null) writer.close(context);
505      dir.getFileSystem(conf).delete(dir, true);
506    }
507  }
508
509  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
510  public void testJobConfiguration() throws Exception {
511    Configuration conf = new Configuration(this.util.getConfiguration());
512    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
513        .toString());
514    Job job = new Job(conf);
515    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
516    Table table = Mockito.mock(Table.class);
517    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
518    setupMockStartKeys(regionLocator);
519    setupMockTableName(regionLocator);
520    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
521    assertEquals(job.getNumReduceTasks(), 4);
522  }
523
524  private byte [][] generateRandomStartKeys(int numKeys) {
525    Random random = new Random();
526    byte[][] ret = new byte[numKeys][];
527    // first region start key is always empty
528    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
529    for (int i = 1; i < numKeys; i++) {
530      ret[i] =
531        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
532    }
533    return ret;
534  }
535
536  private byte[][] generateRandomSplitKeys(int numKeys) {
537    Random random = new Random();
538    byte[][] ret = new byte[numKeys][];
539    for (int i = 0; i < numKeys; i++) {
540      ret[i] =
541          PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
542    }
543    return ret;
544  }
545
546  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
547  public void testMRIncrementalLoad() throws Exception {
548    LOG.info("\nStarting test testMRIncrementalLoad\n");
549    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
550  }
551
552  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
553  public void testMRIncrementalLoadWithSplit() throws Exception {
554    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
555    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
556  }
557
558  /**
559   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
560   * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
561   * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
562   * it's not possible to check the region locality by comparing region locations and DN hostnames.
563   * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
564   * we could test region locality features more easily.
565   */
566  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
567  public void testMRIncrementalLoadWithLocality() throws Exception {
568    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
569    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
570    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
571  }
572
573  //@Ignore("Wahtevs")
574  @Test
575  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
576    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
577    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
578  }
579
580  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
581                                     boolean putSortReducer, String tableStr) throws Exception {
582      doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
583              Arrays.asList(tableStr));
584  }
585
586  @Test
587  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
588    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
589    doIncrementalLoadTest(false, false, true,
590            Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
591                    ()));
592  }
593
594  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
595      boolean putSortReducer, List<String> tableStr) throws Exception {
596    util = new HBaseTestingUtility();
597    Configuration conf = util.getConfiguration();
598    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
599    int hostCount = 1;
600    int regionNum = 5;
601    if (shouldKeepLocality) {
602      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
603      // explicit hostnames parameter just like MiniDFSCluster does.
604      hostCount = 3;
605      regionNum = 20;
606    }
607
608    String[] hostnames = new String[hostCount];
609    for (int i = 0; i < hostCount; ++i) {
610      hostnames[i] = "datanode_" + i;
611    }
612    util.startMiniCluster(1, hostCount, hostnames);
613
614    Map<String, Table> allTables = new HashMap<>(tableStr.size());
615    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
616    boolean writeMultipleTables = tableStr.size() > 1;
617    for (String tableStrSingle : tableStr) {
618      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
619      TableName tableName = TableName.valueOf(tableStrSingle);
620      Table table = util.createTable(tableName, FAMILIES, splitKeys);
621
622      RegionLocator r = util.getConnection().getRegionLocator(tableName);
623      assertEquals("Should start with empty table", 0, util.countRows(table));
624      int numRegions = r.getStartKeys().length;
625      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
626
627      allTables.put(tableStrSingle, table);
628      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
629    }
630    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
631    // Generate the bulk load files
632    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
633
634    for (Table tableSingle : allTables.values()) {
635      // This doesn't write into the table, just makes files
636      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
637    }
638    int numTableDirs = 0;
639    for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
640      Path tablePath = testDir;
641
642      if (writeMultipleTables) {
643        if (allTables.containsKey(tf.getPath().getName())) {
644          ++numTableDirs;
645          tablePath = tf.getPath();
646        }
647        else {
648          continue;
649        }
650      }
651
652      // Make sure that a directory was created for every CF
653      int dir = 0;
654      for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
655        for (byte[] family : FAMILIES) {
656          if (Bytes.toString(family).equals(f.getPath().getName())) {
657            ++dir;
658          }
659        }
660      }
661      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
662    }
663    if (writeMultipleTables) {
664      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
665    }
666
667    Admin admin = util.getConnection().getAdmin();
668    try {
669      // handle the split case
670      if (shouldChangeRegions) {
671        Table chosenTable = allTables.values().iterator().next();
672        // Choose a semi-random table if multiple tables are available
673        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
674        admin.disableTable(chosenTable.getName());
675        util.waitUntilNoRegionsInTransition();
676
677        util.deleteTable(chosenTable.getName());
678        byte[][] newSplitKeys = generateRandomSplitKeys(14);
679        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
680
681        while (util.getConnection().getRegionLocator(chosenTable.getName())
682                .getAllRegionLocations().size() != 15 ||
683                !admin.isTableAvailable(table.getName())) {
684          Thread.sleep(200);
685          LOG.info("Waiting for new region assignment to happen");
686        }
687      }
688
689      // Perform the actual load
690      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
691        Path tableDir = testDir;
692        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
693        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
694        if (writeMultipleTables) {
695          tableDir = new Path(testDir, tableNameStr);
696        }
697        Table currentTable = allTables.get(tableNameStr);
698        TableName currentTableName = currentTable.getName();
699        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
700                .getRegionLocator());
701
702        // Ensure data shows up
703        int expectedRows = 0;
704        if (putSortReducer) {
705          // no rows should be extracted
706          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
707                  util.countRows(currentTable));
708        } else {
709          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
710          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
711                  util.countRows(currentTable));
712          Scan scan = new Scan();
713          ResultScanner results = currentTable.getScanner(scan);
714          for (Result res : results) {
715            assertEquals(FAMILIES.length, res.rawCells().length);
716            Cell first = res.rawCells()[0];
717            for (Cell kv : res.rawCells()) {
718              assertTrue(CellUtil.matchingRows(first, kv));
719              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
720            }
721          }
722          results.close();
723        }
724        String tableDigestBefore = util.checksumRows(currentTable);
725        // Check region locality
726        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
727        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
728          hbd.add(region.getHDFSBlocksDistribution());
729        }
730        for (String hostname : hostnames) {
731          float locality = hbd.getBlockLocalityIndex(hostname);
732          LOG.info("locality of [" + hostname + "]: " + locality);
733          assertEquals(100, (int) (locality * 100));
734        }
735
736        // Cause regions to reopen
737        admin.disableTable(currentTableName);
738        while (!admin.isTableDisabled(currentTableName)) {
739          Thread.sleep(200);
740          LOG.info("Waiting for table to disable");
741        }
742        admin.enableTable(currentTableName);
743        util.waitTableAvailable(currentTableName);
744        assertEquals("Data should remain after reopening of regions",
745                tableDigestBefore, util.checksumRows(currentTable));
746      }
747    } finally {
748      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
749          tableInfoSingle.getRegionLocator().close();
750      }
751      for (Entry<String, Table> singleTable : allTables.entrySet() ) {
752        singleTable.getValue().close();
753        util.deleteTable(singleTable.getValue().getName());
754      }
755      testDir.getFileSystem(conf).delete(testDir, true);
756      util.shutdownMiniCluster();
757    }
758  }
759
760  private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
761                                    boolean putSortReducer) throws IOException,
762          InterruptedException, ClassNotFoundException {
763    Job job = new Job(conf, "testLocalMRIncrementalLoad");
764    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
765    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
766        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
767        CellSerialization.class.getName());
768    setupRandomGeneratorMapper(job, putSortReducer);
769    if (tableInfo.size() > 1) {
770      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
771      int sum = 0;
772      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
773        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
774      }
775      assertEquals(sum, job.getNumReduceTasks());
776    }
777    else {
778      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
779      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
780              regionLocator);
781      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
782    }
783
784    FileOutputFormat.setOutputPath(job, outDir);
785
786    assertFalse(util.getTestFileSystem().exists(outDir)) ;
787
788    assertTrue(job.waitForCompletion(true));
789  }
790
791  /**
792   * Test for {@link HFileOutputFormat2#configureCompression(Configuration, HTableDescriptor)} and
793   * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}.
794   * Tests that the compression map is correctly serialized into
795   * and deserialized from configuration
796   *
797   * @throws IOException
798   */
799  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
800  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
801    for (int numCfs = 0; numCfs <= 3; numCfs++) {
802      Configuration conf = new Configuration(this.util.getConfiguration());
803      Map<String, Compression.Algorithm> familyToCompression =
804          getMockColumnFamiliesForCompression(numCfs);
805      Table table = Mockito.mock(Table.class);
806      setupMockColumnFamiliesForCompression(table, familyToCompression);
807      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
808              HFileOutputFormat2.serializeColumnFamilyAttribute
809                      (HFileOutputFormat2.compressionDetails,
810                              Arrays.asList(table.getTableDescriptor())));
811
812      // read back family specific compression setting from the configuration
813      Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
814          .createFamilyCompressionMap(conf);
815
816      // test that we have a value for all column families that matches with the
817      // used mock values
818      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
819        assertEquals("Compression configuration incorrect for column family:"
820            + entry.getKey(), entry.getValue(),
821            retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
822      }
823    }
824  }
825
826  private void setupMockColumnFamiliesForCompression(Table table,
827      Map<String, Compression.Algorithm> familyToCompression) throws IOException {
828    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
829    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
830      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
831          .setMaxVersions(1)
832          .setCompressionType(entry.getValue())
833          .setBlockCacheEnabled(false)
834          .setTimeToLive(0));
835    }
836    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
837  }
838
839  /**
840   * @return a map from column family names to compression algorithms for
841   *         testing column family compression. Column family names have special characters
842   */
843  private Map<String, Compression.Algorithm>
844      getMockColumnFamiliesForCompression (int numCfs) {
845    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
846    // use column family names having special characters
847    if (numCfs-- > 0) {
848      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
849    }
850    if (numCfs-- > 0) {
851      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
852    }
853    if (numCfs-- > 0) {
854      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
855    }
856    if (numCfs-- > 0) {
857      familyToCompression.put("Family3", Compression.Algorithm.NONE);
858    }
859    return familyToCompression;
860  }
861
862
863  /**
864   * Test for {@link HFileOutputFormat2#configureBloomType(HTableDescriptor, Configuration)} and
865   * {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}.
866   * Tests that the compression map is correctly serialized into
867   * and deserialized from configuration
868   *
869   * @throws IOException
870   */
871  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
872  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
873    for (int numCfs = 0; numCfs <= 2; numCfs++) {
874      Configuration conf = new Configuration(this.util.getConfiguration());
875      Map<String, BloomType> familyToBloomType =
876          getMockColumnFamiliesForBloomType(numCfs);
877      Table table = Mockito.mock(Table.class);
878      setupMockColumnFamiliesForBloomType(table,
879          familyToBloomType);
880      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
881              HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
882              Arrays.asList(table.getTableDescriptor())));
883
884      // read back family specific data block encoding settings from the
885      // configuration
886      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
887          HFileOutputFormat2
888              .createFamilyBloomTypeMap(conf);
889
890      // test that we have a value for all column families that matches with the
891      // used mock values
892      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
893        assertEquals("BloomType configuration incorrect for column family:"
894            + entry.getKey(), entry.getValue(),
895            retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
896      }
897    }
898  }
899
900  private void setupMockColumnFamiliesForBloomType(Table table,
901      Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
902    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
903    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
904      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
905          .setMaxVersions(1)
906          .setBloomFilterType(entry.getValue())
907          .setBlockCacheEnabled(false)
908          .setTimeToLive(0));
909    }
910    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
911  }
912
913  /**
914   * @return a map from column family names to compression algorithms for
915   *         testing column family compression. Column family names have special characters
916   */
917  private Map<String, BloomType>
918  getMockColumnFamiliesForBloomType (int numCfs) {
919    Map<String, BloomType> familyToBloomType = new HashMap<>();
920    // use column family names having special characters
921    if (numCfs-- > 0) {
922      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
923    }
924    if (numCfs-- > 0) {
925      familyToBloomType.put("Family2=asdads&!AASD",
926          BloomType.ROWCOL);
927    }
928    if (numCfs-- > 0) {
929      familyToBloomType.put("Family3", BloomType.NONE);
930    }
931    return familyToBloomType;
932  }
933
934  /**
935   * Test for {@link HFileOutputFormat2#configureBlockSize(HTableDescriptor, Configuration)} and
936   * {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}.
937   * Tests that the compression map is correctly serialized into
938   * and deserialized from configuration
939   *
940   * @throws IOException
941   */
942  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
943  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
944    for (int numCfs = 0; numCfs <= 3; numCfs++) {
945      Configuration conf = new Configuration(this.util.getConfiguration());
946      Map<String, Integer> familyToBlockSize =
947          getMockColumnFamiliesForBlockSize(numCfs);
948      Table table = Mockito.mock(Table.class);
949      setupMockColumnFamiliesForBlockSize(table,
950          familyToBlockSize);
951      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
952              HFileOutputFormat2.serializeColumnFamilyAttribute
953                      (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
954                              .getTableDescriptor())));
955
956      // read back family specific data block encoding settings from the
957      // configuration
958      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
959          HFileOutputFormat2
960              .createFamilyBlockSizeMap(conf);
961
962      // test that we have a value for all column families that matches with the
963      // used mock values
964      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
965          ) {
966        assertEquals("BlockSize configuration incorrect for column family:"
967            + entry.getKey(), entry.getValue(),
968            retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
969      }
970    }
971  }
972
973  private void setupMockColumnFamiliesForBlockSize(Table table,
974      Map<String, Integer> familyToDataBlockEncoding) throws IOException {
975    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
976    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
977      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
978          .setMaxVersions(1)
979          .setBlocksize(entry.getValue())
980          .setBlockCacheEnabled(false)
981          .setTimeToLive(0));
982    }
983    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
984  }
985
986  /**
987   * @return a map from column family names to compression algorithms for
988   *         testing column family compression. Column family names have special characters
989   */
990  private Map<String, Integer>
991  getMockColumnFamiliesForBlockSize (int numCfs) {
992    Map<String, Integer> familyToBlockSize = new HashMap<>();
993    // use column family names having special characters
994    if (numCfs-- > 0) {
995      familyToBlockSize.put("Family1!@#!@#&", 1234);
996    }
997    if (numCfs-- > 0) {
998      familyToBlockSize.put("Family2=asdads&!AASD",
999          Integer.MAX_VALUE);
1000    }
1001    if (numCfs-- > 0) {
1002      familyToBlockSize.put("Family2=asdads&!AASD",
1003          Integer.MAX_VALUE);
1004    }
1005    if (numCfs-- > 0) {
1006      familyToBlockSize.put("Family3", 0);
1007    }
1008    return familyToBlockSize;
1009  }
1010
1011  /**
1012   * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)}
1013   * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
1014   * Tests that the compression map is correctly serialized into
1015   * and deserialized from configuration
1016   *
1017   * @throws IOException
1018   */
1019  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1020  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1021    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1022      Configuration conf = new Configuration(this.util.getConfiguration());
1023      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1024          getMockColumnFamiliesForDataBlockEncoding(numCfs);
1025      Table table = Mockito.mock(Table.class);
1026      setupMockColumnFamiliesForDataBlockEncoding(table,
1027          familyToDataBlockEncoding);
1028      HTableDescriptor tableDescriptor = table.getTableDescriptor();
1029      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1030              HFileOutputFormat2.serializeColumnFamilyAttribute
1031                      (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
1032                      .asList(tableDescriptor)));
1033
1034      // read back family specific data block encoding settings from the
1035      // configuration
1036      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1037          HFileOutputFormat2
1038          .createFamilyDataBlockEncodingMap(conf);
1039
1040      // test that we have a value for all column families that matches with the
1041      // used mock values
1042      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1043        assertEquals("DataBlockEncoding configuration incorrect for column family:"
1044            + entry.getKey(), entry.getValue(),
1045            retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
1046      }
1047    }
1048  }
1049
1050  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1051      Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1052    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1053    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1054      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
1055          .setMaxVersions(1)
1056          .setDataBlockEncoding(entry.getValue())
1057          .setBlockCacheEnabled(false)
1058          .setTimeToLive(0));
1059    }
1060    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1061  }
1062
1063  /**
1064   * @return a map from column family names to compression algorithms for
1065   *         testing column family compression. Column family names have special characters
1066   */
1067  private Map<String, DataBlockEncoding>
1068      getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
1069    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1070    // use column family names having special characters
1071    if (numCfs-- > 0) {
1072      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1073    }
1074    if (numCfs-- > 0) {
1075      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1076          DataBlockEncoding.FAST_DIFF);
1077    }
1078    if (numCfs-- > 0) {
1079      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1080          DataBlockEncoding.PREFIX);
1081    }
1082    if (numCfs-- > 0) {
1083      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1084    }
1085    return familyToDataBlockEncoding;
1086  }
1087
1088  private void setupMockStartKeys(RegionLocator table) throws IOException {
1089    byte[][] mockKeys = new byte[][] {
1090        HConstants.EMPTY_BYTE_ARRAY,
1091        Bytes.toBytes("aaa"),
1092        Bytes.toBytes("ggg"),
1093        Bytes.toBytes("zzz")
1094    };
1095    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1096  }
1097
1098  private void setupMockTableName(RegionLocator table) throws IOException {
1099    TableName mockTableName = TableName.valueOf("mock_table");
1100    Mockito.doReturn(mockTableName).when(table).getName();
1101  }
1102
1103  /**
1104   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
1105   * bloom filter settings from the column family descriptor
1106   */
1107  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1108  public void testColumnFamilySettings() throws Exception {
1109    Configuration conf = new Configuration(this.util.getConfiguration());
1110    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1111    TaskAttemptContext context = null;
1112    Path dir = util.getDataTestDir("testColumnFamilySettings");
1113
1114    // Setup table descriptor
1115    Table table = Mockito.mock(Table.class);
1116    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1117    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1118    Mockito.doReturn(htd).when(table).getTableDescriptor();
1119    for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
1120      htd.addFamily(hcd);
1121    }
1122
1123    // set up the table to return some mock keys
1124    setupMockStartKeys(regionLocator);
1125
1126    try {
1127      // partial map red setup to get an operational writer for testing
1128      // We turn off the sequence file compression, because DefaultCodec
1129      // pollutes the GZip codec pool with an incompatible compressor.
1130      conf.set("io.seqfile.compression.type", "NONE");
1131      conf.set("hbase.fs.tmp.dir", dir.toString());
1132      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1133      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1134
1135      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1136      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1137      setupRandomGeneratorMapper(job, false);
1138      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1139      FileOutputFormat.setOutputPath(job, dir);
1140      context = createTestTaskAttemptContext(job);
1141      HFileOutputFormat2 hof = new HFileOutputFormat2();
1142      writer = hof.getRecordWriter(context);
1143
1144      // write out random rows
1145      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1146      writer.close(context);
1147
1148      // Make sure that a directory was created for every CF
1149      FileSystem fs = dir.getFileSystem(conf);
1150
1151      // commit so that the filesystem has one directory per column family
1152      hof.getOutputCommitter(context).commitTask(context);
1153      hof.getOutputCommitter(context).commitJob(context);
1154      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1155      assertEquals(htd.getFamilies().size(), families.length);
1156      for (FileStatus f : families) {
1157        String familyStr = f.getPath().getName();
1158        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1159        // verify that the compression on this file matches the configured
1160        // compression
1161        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1162        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1163        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1164
1165        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1166        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1167        assertEquals("Incorrect bloom filter used for column family " + familyStr +
1168          "(reader: " + reader + ")",
1169          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1170        assertEquals("Incorrect compression used for column family " + familyStr +
1171          "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression());
1172      }
1173    } finally {
1174      dir.getFileSystem(conf).delete(dir, true);
1175    }
1176  }
1177
1178  /**
1179   * Write random values to the writer assuming a table created using
1180   * {@link #FAMILIES} as column family descriptors
1181   */
1182  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1183      TaskAttemptContext context, Set<byte[]> families, int numRows)
1184      throws IOException, InterruptedException {
1185    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1186    int valLength = 10;
1187    byte valBytes[] = new byte[valLength];
1188
1189    int taskId = context.getTaskAttemptID().getTaskID().getId();
1190    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1191    final byte [] qualifier = Bytes.toBytes("data");
1192    Random random = new Random();
1193    for (int i = 0; i < numRows; i++) {
1194
1195      Bytes.putInt(keyBytes, 0, i);
1196      random.nextBytes(valBytes);
1197      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1198
1199      for (byte[] family : families) {
1200        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1201        writer.write(key, kv);
1202      }
1203    }
1204  }
1205
1206  /**
1207   * This test is to test the scenario happened in HBASE-6901.
1208   * All files are bulk loaded and excluded from minor compaction.
1209   * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
1210   * will be thrown.
1211   */
1212  @Ignore ("Flakey: See HBASE-9051") @Test
1213  public void testExcludeAllFromMinorCompaction() throws Exception {
1214    Configuration conf = util.getConfiguration();
1215    conf.setInt("hbase.hstore.compaction.min", 2);
1216    generateRandomStartKeys(5);
1217
1218    util.startMiniCluster();
1219    try (Connection conn = ConnectionFactory.createConnection();
1220        Admin admin = conn.getAdmin();
1221        Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1222        RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1223      final FileSystem fs = util.getDFSCluster().getFileSystem();
1224      assertEquals("Should start with empty table", 0, util.countRows(table));
1225
1226      // deep inspection: get the StoreFile dir
1227      final Path storePath = new Path(
1228        CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1229          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1230            Bytes.toString(FAMILIES[0])));
1231      assertEquals(0, fs.listStatus(storePath).length);
1232
1233      // Generate two bulk load files
1234      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1235          true);
1236
1237      for (int i = 0; i < 2; i++) {
1238        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1239        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1240                .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
1241        // Perform the actual load
1242        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1243      }
1244
1245      // Ensure data shows up
1246      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1247      assertEquals("LoadIncrementalHFiles should put expected data in table",
1248          expectedRows, util.countRows(table));
1249
1250      // should have a second StoreFile now
1251      assertEquals(2, fs.listStatus(storePath).length);
1252
1253      // minor compactions shouldn't get rid of the file
1254      admin.compact(TABLE_NAMES[0]);
1255      try {
1256        quickPoll(new Callable<Boolean>() {
1257          @Override
1258          public Boolean call() throws Exception {
1259            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1260            for (HRegion region : regions) {
1261              for (HStore store : region.getStores()) {
1262                store.closeAndArchiveCompactedFiles();
1263              }
1264            }
1265            return fs.listStatus(storePath).length == 1;
1266          }
1267        }, 5000);
1268        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1269      } catch (AssertionError ae) {
1270        // this is expected behavior
1271      }
1272
1273      // a major compaction should work though
1274      admin.majorCompact(TABLE_NAMES[0]);
1275      quickPoll(new Callable<Boolean>() {
1276        @Override
1277        public Boolean call() throws Exception {
1278          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1279          for (HRegion region : regions) {
1280            for (HStore store : region.getStores()) {
1281              store.closeAndArchiveCompactedFiles();
1282            }
1283          }
1284          return fs.listStatus(storePath).length == 1;
1285        }
1286      }, 5000);
1287
1288    } finally {
1289      util.shutdownMiniCluster();
1290    }
1291  }
1292
1293  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1294  public void testExcludeMinorCompaction() throws Exception {
1295    Configuration conf = util.getConfiguration();
1296    conf.setInt("hbase.hstore.compaction.min", 2);
1297    generateRandomStartKeys(5);
1298
1299    util.startMiniCluster();
1300    try (Connection conn = ConnectionFactory.createConnection(conf);
1301        Admin admin = conn.getAdmin()){
1302      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1303      final FileSystem fs = util.getDFSCluster().getFileSystem();
1304      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1305      assertEquals("Should start with empty table", 0, util.countRows(table));
1306
1307      // deep inspection: get the StoreFile dir
1308      final Path storePath = new Path(
1309        CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1310          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1311            Bytes.toString(FAMILIES[0])));
1312      assertEquals(0, fs.listStatus(storePath).length);
1313
1314      // put some data in it and flush to create a storefile
1315      Put p = new Put(Bytes.toBytes("test"));
1316      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1317      table.put(p);
1318      admin.flush(TABLE_NAMES[0]);
1319      assertEquals(1, util.countRows(table));
1320      quickPoll(new Callable<Boolean>() {
1321        @Override
1322        public Boolean call() throws Exception {
1323          return fs.listStatus(storePath).length == 1;
1324        }
1325      }, 5000);
1326
1327      // Generate a bulk load file with more rows
1328      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1329          true);
1330
1331      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1332      runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1333                      .getTableDescriptor(), regionLocator)), testDir, false);
1334
1335      // Perform the actual load
1336      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1337
1338      // Ensure data shows up
1339      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1340      assertEquals("LoadIncrementalHFiles should put expected data in table",
1341          expectedRows + 1, util.countRows(table));
1342
1343      // should have a second StoreFile now
1344      assertEquals(2, fs.listStatus(storePath).length);
1345
1346      // minor compactions shouldn't get rid of the file
1347      admin.compact(TABLE_NAMES[0]);
1348      try {
1349        quickPoll(new Callable<Boolean>() {
1350          @Override
1351          public Boolean call() throws Exception {
1352            return fs.listStatus(storePath).length == 1;
1353          }
1354        }, 5000);
1355        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1356      } catch (AssertionError ae) {
1357        // this is expected behavior
1358      }
1359
1360      // a major compaction should work though
1361      admin.majorCompact(TABLE_NAMES[0]);
1362      quickPoll(new Callable<Boolean>() {
1363        @Override
1364        public Boolean call() throws Exception {
1365          return fs.listStatus(storePath).length == 1;
1366        }
1367      }, 5000);
1368
1369    } finally {
1370      util.shutdownMiniCluster();
1371    }
1372  }
1373
1374  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1375    int sleepMs = 10;
1376    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1377    while (retries-- > 0) {
1378      if (c.call().booleanValue()) {
1379        return;
1380      }
1381      Thread.sleep(sleepMs);
1382    }
1383    fail();
1384  }
1385
1386  public static void main(String args[]) throws Exception {
1387    new TestCellBasedHFileOutputFormat2().manualTest(args);
1388  }
1389
1390  public void manualTest(String args[]) throws Exception {
1391    Configuration conf = HBaseConfiguration.create();
1392    util = new HBaseTestingUtility(conf);
1393    if ("newtable".equals(args[0])) {
1394      TableName tname = TableName.valueOf(args[1]);
1395      byte[][] splitKeys = generateRandomSplitKeys(4);
1396      Table table = util.createTable(tname, FAMILIES, splitKeys);
1397    } else if ("incremental".equals(args[0])) {
1398      TableName tname = TableName.valueOf(args[1]);
1399      try(Connection c = ConnectionFactory.createConnection(conf);
1400          Admin admin = c.getAdmin();
1401          RegionLocator regionLocator = c.getRegionLocator(tname)) {
1402        Path outDir = new Path("incremental-out");
1403        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
1404                .getTableDescriptor(tname), regionLocator)), outDir, false);
1405      }
1406    } else {
1407      throw new RuntimeException(
1408          "usage: TestHFileOutputFormat2 newtable | incremental");
1409    }
1410  }
1411
1412  @Test
1413  public void testBlockStoragePolicy() throws Exception {
1414    util = new HBaseTestingUtility();
1415    Configuration conf = util.getConfiguration();
1416    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1417
1418    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
1419            Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
1420                    TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
1421    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1422    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1423    util.startMiniDFSCluster(3);
1424    FileSystem fs = util.getDFSCluster().getFileSystem();
1425    try {
1426      fs.mkdirs(cf1Dir);
1427      fs.mkdirs(cf2Dir);
1428
1429      // the original block storage policy would be HOT
1430      String spA = getStoragePolicyName(fs, cf1Dir);
1431      String spB = getStoragePolicyName(fs, cf2Dir);
1432      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1433      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1434      assertEquals("HOT", spA);
1435      assertEquals("HOT", spB);
1436
1437      // alter table cf schema to change storage policies
1438      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1439              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1440      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1441              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1442      spA = getStoragePolicyName(fs, cf1Dir);
1443      spB = getStoragePolicyName(fs, cf2Dir);
1444      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1445      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1446      assertNotNull(spA);
1447      assertEquals("ONE_SSD", spA);
1448      assertNotNull(spB);
1449      assertEquals("ALL_SSD", spB);
1450    } finally {
1451      fs.delete(cf1Dir, true);
1452      fs.delete(cf2Dir, true);
1453      util.shutdownMiniDFSCluster();
1454    }
1455  }
1456
1457  private String getStoragePolicyName(FileSystem fs, Path path) {
1458    try {
1459      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1460      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1461    } catch (Exception e) {
1462      // Maybe fail because of using old HDFS version, try the old way
1463      if (LOG.isTraceEnabled()) {
1464        LOG.trace("Failed to get policy directly", e);
1465      }
1466      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1467      return policy == null ? "HOT" : policy;// HOT by default
1468    }
1469  }
1470
1471  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1472    try {
1473      if (fs instanceof DistributedFileSystem) {
1474        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1475        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1476        if (null != status) {
1477          byte storagePolicyId = status.getStoragePolicy();
1478          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1479          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1480            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1481            for (BlockStoragePolicy policy : policies) {
1482              if (policy.getId() == storagePolicyId) {
1483                return policy.getName();
1484              }
1485            }
1486          }
1487        }
1488      }
1489    } catch (Throwable e) {
1490      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1491    }
1492
1493    return null;
1494  }
1495}
1496