001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import static org.mockito.Mockito.verify;
028
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.security.PrivilegedAction;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.HashMap;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Random;
040import java.util.Set;
041import java.util.concurrent.Callable;
042import java.util.stream.Collectors;
043import java.util.stream.Stream;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.LocatedFileStatus;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.fs.RemoteIterator;
050import org.apache.hadoop.hbase.ArrayBackedTag;
051import org.apache.hadoop.hbase.Cell;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
054import org.apache.hadoop.hbase.HBaseClassTestRule;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.HBaseTestingUtility;
057import org.apache.hadoop.hbase.HColumnDescriptor;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HDFSBlocksDistribution;
060import org.apache.hadoop.hbase.HTableDescriptor;
061import org.apache.hadoop.hbase.HadoopShims;
062import org.apache.hadoop.hbase.KeyValue;
063import org.apache.hadoop.hbase.PerformanceEvaluation;
064import org.apache.hadoop.hbase.PrivateCellUtil;
065import org.apache.hadoop.hbase.StartMiniClusterOption;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.Tag;
068import org.apache.hadoop.hbase.TagType;
069import org.apache.hadoop.hbase.client.Admin;
070import org.apache.hadoop.hbase.client.Connection;
071import org.apache.hadoop.hbase.client.ConnectionFactory;
072import org.apache.hadoop.hbase.client.Put;
073import org.apache.hadoop.hbase.client.RegionLocator;
074import org.apache.hadoop.hbase.client.Result;
075import org.apache.hadoop.hbase.client.ResultScanner;
076import org.apache.hadoop.hbase.client.Scan;
077import org.apache.hadoop.hbase.client.Table;
078import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
079import org.apache.hadoop.hbase.io.compress.Compression;
080import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
081import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
082import org.apache.hadoop.hbase.io.hfile.CacheConfig;
083import org.apache.hadoop.hbase.io.hfile.HFile;
084import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
085import org.apache.hadoop.hbase.io.hfile.HFileScanner;
086import org.apache.hadoop.hbase.regionserver.BloomType;
087import org.apache.hadoop.hbase.regionserver.HRegion;
088import org.apache.hadoop.hbase.regionserver.HStore;
089import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
090import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
091import org.apache.hadoop.hbase.testclassification.LargeTests;
092import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
093import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
094import org.apache.hadoop.hbase.util.Bytes;
095import org.apache.hadoop.hbase.util.FSUtils;
096import org.apache.hadoop.hbase.util.ReflectionUtils;
097import org.apache.hadoop.hdfs.DistributedFileSystem;
098import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
099import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
100import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
101import org.apache.hadoop.io.NullWritable;
102import org.apache.hadoop.mapreduce.Job;
103import org.apache.hadoop.mapreduce.Mapper;
104import org.apache.hadoop.mapreduce.RecordWriter;
105import org.apache.hadoop.mapreduce.TaskAttemptContext;
106import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
107import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
108import org.apache.hadoop.security.UserGroupInformation;
109import org.junit.Assert;
110import org.junit.ClassRule;
111import org.junit.Ignore;
112import org.junit.Test;
113import org.junit.experimental.categories.Category;
114import org.mockito.Mockito;
115import org.slf4j.Logger;
116import org.slf4j.LoggerFactory;
117
118/**
119 * Simple test for {@link HFileOutputFormat2}.
120 * Sets up and runs a mapreduce job that writes hfile output.
121 * Creates a few inner classes to implement splits and an inputformat that
122 * emits keys and values like those of {@link PerformanceEvaluation}.
123 */
124@Category({VerySlowMapReduceTests.class, LargeTests.class})
125public class TestHFileOutputFormat2  {
126
127  @ClassRule
128  public static final HBaseClassTestRule CLASS_RULE =
129      HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
130
131  private final static int ROWSPERSPLIT = 1024;
132
133  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
134  private static final byte[][] FAMILIES = {
135    Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))};
136  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
137          "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
138
139  private HBaseTestingUtility util = new HBaseTestingUtility();
140
141  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
142
143  /**
144   * Simple mapper that makes KeyValue output.
145   */
146  static class RandomKVGeneratingMapper
147      extends Mapper<NullWritable, NullWritable,
148                 ImmutableBytesWritable, Cell> {
149
150    private int keyLength;
151    private static final int KEYLEN_DEFAULT=10;
152    private static final String KEYLEN_CONF="randomkv.key.length";
153
154    private int valLength;
155    private static final int VALLEN_DEFAULT=10;
156    private static final String VALLEN_CONF="randomkv.val.length";
157    private static final byte [] QUALIFIER = Bytes.toBytes("data");
158    private boolean multiTableMapper = false;
159    private TableName[] tables = null;
160
161
162    @Override
163    protected void setup(Context context) throws IOException,
164        InterruptedException {
165      super.setup(context);
166
167      Configuration conf = context.getConfiguration();
168      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
169      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
170      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
171              false);
172      if (multiTableMapper) {
173        tables = TABLE_NAMES;
174      } else {
175        tables = new TableName[]{TABLE_NAMES[0]};
176      }
177    }
178
179    @Override
180    protected void map(
181        NullWritable n1, NullWritable n2,
182        Mapper<NullWritable, NullWritable,
183               ImmutableBytesWritable,Cell>.Context context)
184        throws java.io.IOException ,InterruptedException
185    {
186
187      byte keyBytes[] = new byte[keyLength];
188      byte valBytes[] = new byte[valLength];
189
190      int taskId = context.getTaskAttemptID().getTaskID().getId();
191      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
192      Random random = new Random();
193      byte[] key;
194      for (int j = 0; j < tables.length; ++j) {
195        for (int i = 0; i < ROWSPERSPLIT; i++) {
196          random.nextBytes(keyBytes);
197          // Ensure that unique tasks generate unique keys
198          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
199          random.nextBytes(valBytes);
200          key = keyBytes;
201          if (multiTableMapper) {
202            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
203          }
204
205          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
206            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
207            context.write(new ImmutableBytesWritable(key), kv);
208          }
209        }
210      }
211    }
212  }
213
214  /**
215   * Simple mapper that makes Put output.
216   */
217  static class RandomPutGeneratingMapper
218      extends Mapper<NullWritable, NullWritable,
219                 ImmutableBytesWritable, Put> {
220
221    private int keyLength;
222    private static final int KEYLEN_DEFAULT = 10;
223    private static final String KEYLEN_CONF = "randomkv.key.length";
224
225    private int valLength;
226    private static final int VALLEN_DEFAULT = 10;
227    private static final String VALLEN_CONF = "randomkv.val.length";
228    private static final byte[] QUALIFIER = Bytes.toBytes("data");
229    private boolean multiTableMapper = false;
230    private TableName[] tables = null;
231
232    @Override
233    protected void setup(Context context) throws IOException,
234            InterruptedException {
235      super.setup(context);
236
237      Configuration conf = context.getConfiguration();
238      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
239      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
240      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
241              false);
242      if (multiTableMapper) {
243        tables = TABLE_NAMES;
244      } else {
245        tables = new TableName[]{TABLE_NAMES[0]};
246      }
247    }
248
249    @Override
250    protected void map(
251            NullWritable n1, NullWritable n2,
252            Mapper<NullWritable, NullWritable,
253                    ImmutableBytesWritable, Put>.Context context)
254            throws java.io.IOException, InterruptedException {
255
256      byte keyBytes[] = new byte[keyLength];
257      byte valBytes[] = new byte[valLength];
258
259      int taskId = context.getTaskAttemptID().getTaskID().getId();
260      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
261
262      Random random = new Random();
263      byte[] key;
264      for (int j = 0; j < tables.length; ++j) {
265        for (int i = 0; i < ROWSPERSPLIT; i++) {
266          random.nextBytes(keyBytes);
267          // Ensure that unique tasks generate unique keys
268          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
269          random.nextBytes(valBytes);
270          key = keyBytes;
271          if (multiTableMapper) {
272            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
273          }
274
275          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
276            Put p = new Put(keyBytes);
277            p.addColumn(family, QUALIFIER, valBytes);
278            // set TTL to very low so that the scan does not return any value
279            p.setTTL(1l);
280            context.write(new ImmutableBytesWritable(key), p);
281          }
282        }
283      }
284    }
285  }
286
287  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
288    if (putSortReducer) {
289      job.setInputFormatClass(NMapInputFormat.class);
290      job.setMapperClass(RandomPutGeneratingMapper.class);
291      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
292      job.setMapOutputValueClass(Put.class);
293    } else {
294      job.setInputFormatClass(NMapInputFormat.class);
295      job.setMapperClass(RandomKVGeneratingMapper.class);
296      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
297      job.setMapOutputValueClass(KeyValue.class);
298    }
299  }
300
301  /**
302   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
303   * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
304   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
305   */
306  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
307  public void test_LATEST_TIMESTAMP_isReplaced()
308  throws Exception {
309    Configuration conf = new Configuration(this.util.getConfiguration());
310    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
311    TaskAttemptContext context = null;
312    Path dir =
313      util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
314    try {
315      Job job = new Job(conf);
316      FileOutputFormat.setOutputPath(job, dir);
317      context = createTestTaskAttemptContext(job);
318      HFileOutputFormat2 hof = new HFileOutputFormat2();
319      writer = hof.getRecordWriter(context);
320      final byte [] b = Bytes.toBytes("b");
321
322      // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
323      // changed by call to write.  Check all in kv is same but ts.
324      KeyValue kv = new KeyValue(b, b, b);
325      KeyValue original = kv.clone();
326      writer.write(new ImmutableBytesWritable(), kv);
327      assertFalse(original.equals(kv));
328      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
329      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
330      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
331      assertNotSame(original.getTimestamp(), kv.getTimestamp());
332      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
333
334      // Test 2. Now test passing a kv that has explicit ts.  It should not be
335      // changed by call to record write.
336      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
337      original = kv.clone();
338      writer.write(new ImmutableBytesWritable(), kv);
339      assertTrue(original.equals(kv));
340    } finally {
341      if (writer != null && context != null) writer.close(context);
342      dir.getFileSystem(conf).delete(dir, true);
343    }
344  }
345
346  private TaskAttemptContext createTestTaskAttemptContext(final Job job)
347  throws Exception {
348    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
349    TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
350      job, "attempt_201402131733_0001_m_000000_0");
351    return context;
352  }
353
354  /*
355   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
356   * metadata used by time-restricted scans.
357   */
358  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
359  public void test_TIMERANGE() throws Exception {
360    Configuration conf = new Configuration(this.util.getConfiguration());
361    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
362    TaskAttemptContext context = null;
363    Path dir =
364      util.getDataTestDir("test_TIMERANGE_present");
365    LOG.info("Timerange dir writing to dir: "+ dir);
366    try {
367      // build a record writer using HFileOutputFormat2
368      Job job = new Job(conf);
369      FileOutputFormat.setOutputPath(job, dir);
370      context = createTestTaskAttemptContext(job);
371      HFileOutputFormat2 hof = new HFileOutputFormat2();
372      writer = hof.getRecordWriter(context);
373
374      // Pass two key values with explicit times stamps
375      final byte [] b = Bytes.toBytes("b");
376
377      // value 1 with timestamp 2000
378      KeyValue kv = new KeyValue(b, b, b, 2000, b);
379      KeyValue original = kv.clone();
380      writer.write(new ImmutableBytesWritable(), kv);
381      assertEquals(original,kv);
382
383      // value 2 with timestamp 1000
384      kv = new KeyValue(b, b, b, 1000, b);
385      original = kv.clone();
386      writer.write(new ImmutableBytesWritable(), kv);
387      assertEquals(original, kv);
388
389      // verify that the file has the proper FileInfo.
390      writer.close(context);
391
392      // the generated file lives 1 directory down from the attempt directory
393      // and is the only file, e.g.
394      // _attempt__0000_r_000000_0/b/1979617994050536795
395      FileSystem fs = FileSystem.get(conf);
396      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
397      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
398      FileStatus[] file = fs.listStatus(sub1[0].getPath());
399
400      // open as HFile Reader and pull out TIMERANGE FileInfo.
401      HFile.Reader rd =
402          HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
403      Map<byte[],byte[]> finfo = rd.loadFileInfo();
404      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
405      assertNotNull(range);
406
407      // unmarshall and check values.
408      TimeRangeTracker timeRangeTracker =TimeRangeTracker.parseFrom(range);
409      LOG.info(timeRangeTracker.getMin() +
410          "...." + timeRangeTracker.getMax());
411      assertEquals(1000, timeRangeTracker.getMin());
412      assertEquals(2000, timeRangeTracker.getMax());
413      rd.close();
414    } finally {
415      if (writer != null && context != null) writer.close(context);
416      dir.getFileSystem(conf).delete(dir, true);
417    }
418  }
419
420  /**
421   * Run small MR job.
422   */
423  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
424  public void testWritingPEData() throws Exception {
425    Configuration conf = util.getConfiguration();
426    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
427    FileSystem fs = testDir.getFileSystem(conf);
428
429    // Set down this value or we OOME in eclipse.
430    conf.setInt("mapreduce.task.io.sort.mb", 20);
431    // Write a few files.
432    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
433
434    Job job = new Job(conf, "testWritingPEData");
435    setupRandomGeneratorMapper(job, false);
436    // This partitioner doesn't work well for number keys but using it anyways
437    // just to demonstrate how to configure it.
438    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
439    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
440
441    Arrays.fill(startKey, (byte)0);
442    Arrays.fill(endKey, (byte)0xff);
443
444    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
445    // Set start and end rows for partitioner.
446    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
447    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
448    job.setReducerClass(CellSortReducer.class);
449    job.setOutputFormatClass(HFileOutputFormat2.class);
450    job.setNumReduceTasks(4);
451    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
452        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
453        CellSerialization.class.getName());
454
455    FileOutputFormat.setOutputPath(job, testDir);
456    assertTrue(job.waitForCompletion(false));
457    FileStatus [] files = fs.listStatus(testDir);
458    assertTrue(files.length > 0);
459  }
460
461  /**
462   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
463   * hfile.
464   */
465  @Test
466  public void test_WritingTagData()
467      throws Exception {
468    Configuration conf = new Configuration(this.util.getConfiguration());
469    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
470    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
471    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
472    TaskAttemptContext context = null;
473    Path dir =
474        util.getDataTestDir("WritingTagData");
475    try {
476      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
477      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
478      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
479      Job job = new Job(conf);
480      FileOutputFormat.setOutputPath(job, dir);
481      context = createTestTaskAttemptContext(job);
482      HFileOutputFormat2 hof = new HFileOutputFormat2();
483      writer = hof.getRecordWriter(context);
484      final byte [] b = Bytes.toBytes("b");
485
486      List< Tag > tags = new ArrayList<>();
487      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
488      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
489      writer.write(new ImmutableBytesWritable(), kv);
490      writer.close(context);
491      writer = null;
492      FileSystem fs = dir.getFileSystem(conf);
493      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
494      while(iterator.hasNext()) {
495        LocatedFileStatus keyFileStatus = iterator.next();
496        HFile.Reader reader =
497            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
498        HFileScanner scanner = reader.getScanner(false, false, false);
499        scanner.seekTo();
500        Cell cell = scanner.getCell();
501        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
502        assertTrue(tagsFromCell.size() > 0);
503        for (Tag tag : tagsFromCell) {
504          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
505        }
506      }
507    } finally {
508      if (writer != null && context != null) writer.close(context);
509      dir.getFileSystem(conf).delete(dir, true);
510    }
511  }
512
513  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
514  public void testJobConfiguration() throws Exception {
515    Configuration conf = new Configuration(this.util.getConfiguration());
516    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
517        .toString());
518    Job job = new Job(conf);
519    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
520    Table table = Mockito.mock(Table.class);
521    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
522    setupMockStartKeys(regionLocator);
523    setupMockTableName(regionLocator);
524    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
525    assertEquals(job.getNumReduceTasks(), 4);
526  }
527
528  private byte [][] generateRandomStartKeys(int numKeys) {
529    Random random = new Random();
530    byte[][] ret = new byte[numKeys][];
531    // first region start key is always empty
532    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
533    for (int i = 1; i < numKeys; i++) {
534      ret[i] =
535        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
536    }
537    return ret;
538  }
539
540  private byte[][] generateRandomSplitKeys(int numKeys) {
541    Random random = new Random();
542    byte[][] ret = new byte[numKeys][];
543    for (int i = 0; i < numKeys; i++) {
544      ret[i] =
545          PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
546    }
547    return ret;
548  }
549
550  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
551  public void testMRIncrementalLoad() throws Exception {
552    LOG.info("\nStarting test testMRIncrementalLoad\n");
553    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
554  }
555
556  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
557  public void testMRIncrementalLoadWithSplit() throws Exception {
558    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
559    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
560  }
561
562  /**
563   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
564   * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
565   * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
566   * it's not possible to check the region locality by comparing region locations and DN hostnames.
567   * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
568   * we could test region locality features more easily.
569   */
570  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
571  public void testMRIncrementalLoadWithLocality() throws Exception {
572    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
573    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
574    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
575  }
576
577  //@Ignore("Wahtevs")
578  @Test
579  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
580    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
581    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
582  }
583
584  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
585                                     boolean putSortReducer, String tableStr) throws Exception {
586      doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
587              Arrays.asList(tableStr));
588  }
589
590  @Test
591  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
592    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
593    doIncrementalLoadTest(false, false, true,
594            Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
595                    ()));
596  }
597
598  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
599      boolean putSortReducer, List<String> tableStr) throws Exception {
600    util = new HBaseTestingUtility();
601    Configuration conf = util.getConfiguration();
602    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
603    int hostCount = 1;
604    int regionNum = 5;
605    if (shouldKeepLocality) {
606      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
607      // explicit hostnames parameter just like MiniDFSCluster does.
608      hostCount = 3;
609      regionNum = 20;
610    }
611
612    String[] hostnames = new String[hostCount];
613    for (int i = 0; i < hostCount; ++i) {
614      hostnames[i] = "datanode_" + i;
615    }
616    StartMiniClusterOption option = StartMiniClusterOption.builder()
617        .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
618    util.startMiniCluster(option);
619
620    Map<String, Table> allTables = new HashMap<>(tableStr.size());
621    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
622    boolean writeMultipleTables = tableStr.size() > 1;
623    for (String tableStrSingle : tableStr) {
624      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
625      TableName tableName = TableName.valueOf(tableStrSingle);
626      Table table = util.createTable(tableName, FAMILIES, splitKeys);
627
628      RegionLocator r = util.getConnection().getRegionLocator(tableName);
629      assertEquals("Should start with empty table", 0, util.countRows(table));
630      int numRegions = r.getStartKeys().length;
631      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
632
633      allTables.put(tableStrSingle, table);
634      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
635    }
636    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
637    // Generate the bulk load files
638    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
639    if (writeMultipleTables) {
640      testDir = new Path(testDir, "default");
641    }
642
643    for (Table tableSingle : allTables.values()) {
644      // This doesn't write into the table, just makes files
645      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
646    }
647    int numTableDirs = 0;
648    FileStatus[] fss =
649        testDir.getFileSystem(conf).listStatus(testDir);
650    for (FileStatus tf: fss) {
651      Path tablePath = testDir;
652      if (writeMultipleTables) {
653        if (allTables.containsKey(tf.getPath().getName())) {
654          ++numTableDirs;
655          tablePath = tf.getPath();
656        }
657        else {
658          continue;
659        }
660      }
661
662      // Make sure that a directory was created for every CF
663      int dir = 0;
664      fss = tablePath.getFileSystem(conf).listStatus(tablePath);
665      for (FileStatus f: fss) {
666        for (byte[] family : FAMILIES) {
667          if (Bytes.toString(family).equals(f.getPath().getName())) {
668            ++dir;
669          }
670        }
671      }
672      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
673    }
674    if (writeMultipleTables) {
675      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
676    }
677
678    Admin admin = util.getConnection().getAdmin();
679    try {
680      // handle the split case
681      if (shouldChangeRegions) {
682        Table chosenTable = allTables.values().iterator().next();
683        // Choose a semi-random table if multiple tables are available
684        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
685        admin.disableTable(chosenTable.getName());
686        util.waitUntilNoRegionsInTransition();
687
688        util.deleteTable(chosenTable.getName());
689        byte[][] newSplitKeys = generateRandomSplitKeys(14);
690        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
691
692        while (util.getConnection().getRegionLocator(chosenTable.getName())
693                .getAllRegionLocations().size() != 15 ||
694                !admin.isTableAvailable(table.getName())) {
695          Thread.sleep(200);
696          LOG.info("Waiting for new region assignment to happen");
697        }
698      }
699
700      // Perform the actual load
701      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
702        Path tableDir = testDir;
703        String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
704        LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
705        if (writeMultipleTables) {
706          tableDir = new Path(testDir, tableNameStr);
707        }
708        Table currentTable = allTables.get(tableNameStr);
709        TableName currentTableName = currentTable.getName();
710        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
711                .getRegionLocator());
712
713        // Ensure data shows up
714        int expectedRows = 0;
715        if (putSortReducer) {
716          // no rows should be extracted
717          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
718                  util.countRows(currentTable));
719        } else {
720          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
721          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
722                  util.countRows(currentTable));
723          Scan scan = new Scan();
724          ResultScanner results = currentTable.getScanner(scan);
725          for (Result res : results) {
726            assertEquals(FAMILIES.length, res.rawCells().length);
727            Cell first = res.rawCells()[0];
728            for (Cell kv : res.rawCells()) {
729              assertTrue(CellUtil.matchingRows(first, kv));
730              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
731            }
732          }
733          results.close();
734        }
735        String tableDigestBefore = util.checksumRows(currentTable);
736        // Check region locality
737        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
738        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
739          hbd.add(region.getHDFSBlocksDistribution());
740        }
741        for (String hostname : hostnames) {
742          float locality = hbd.getBlockLocalityIndex(hostname);
743          LOG.info("locality of [" + hostname + "]: " + locality);
744          assertEquals(100, (int) (locality * 100));
745        }
746
747        // Cause regions to reopen
748        admin.disableTable(currentTableName);
749        while (!admin.isTableDisabled(currentTableName)) {
750          Thread.sleep(200);
751          LOG.info("Waiting for table to disable");
752        }
753        admin.enableTable(currentTableName);
754        util.waitTableAvailable(currentTableName);
755        assertEquals("Data should remain after reopening of regions",
756                tableDigestBefore, util.checksumRows(currentTable));
757      }
758    } finally {
759      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
760          tableInfoSingle.getRegionLocator().close();
761      }
762      for (Entry<String, Table> singleTable : allTables.entrySet() ) {
763        singleTable.getValue().close();
764        util.deleteTable(singleTable.getValue().getName());
765      }
766      testDir.getFileSystem(conf).delete(testDir, true);
767      util.shutdownMiniCluster();
768    }
769  }
770
771  private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
772                                    boolean putSortReducer) throws IOException,
773          InterruptedException, ClassNotFoundException {
774    Job job = new Job(conf, "testLocalMRIncrementalLoad");
775    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
776    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
777        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
778        CellSerialization.class.getName());
779    setupRandomGeneratorMapper(job, putSortReducer);
780    if (tableInfo.size() > 1) {
781      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
782      int sum = 0;
783      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
784        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
785      }
786      assertEquals(sum, job.getNumReduceTasks());
787    }
788    else {
789      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
790      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
791              regionLocator);
792      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
793    }
794
795    FileOutputFormat.setOutputPath(job, outDir);
796
797    assertFalse(util.getTestFileSystem().exists(outDir)) ;
798
799    assertTrue(job.waitForCompletion(true));
800  }
801
802  /**
803   * Test for {@link HFileOutputFormat2#configureCompression(Configuration, HTableDescriptor)} and
804   * {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}.
805   * Tests that the compression map is correctly serialized into
806   * and deserialized from configuration
807   *
808   * @throws IOException
809   */
810  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
811  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
812    for (int numCfs = 0; numCfs <= 3; numCfs++) {
813      Configuration conf = new Configuration(this.util.getConfiguration());
814      Map<String, Compression.Algorithm> familyToCompression =
815          getMockColumnFamiliesForCompression(numCfs);
816      Table table = Mockito.mock(Table.class);
817      setupMockColumnFamiliesForCompression(table, familyToCompression);
818      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
819              HFileOutputFormat2.serializeColumnFamilyAttribute
820                      (HFileOutputFormat2.compressionDetails,
821                              Arrays.asList(table.getTableDescriptor())));
822
823      // read back family specific compression setting from the configuration
824      Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
825          .createFamilyCompressionMap(conf);
826
827      // test that we have a value for all column families that matches with the
828      // used mock values
829      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
830        assertEquals("Compression configuration incorrect for column family:"
831            + entry.getKey(), entry.getValue(),
832            retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())));
833      }
834    }
835  }
836
837  private void setupMockColumnFamiliesForCompression(Table table,
838      Map<String, Compression.Algorithm> familyToCompression) throws IOException {
839    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
840    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
841      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
842          .setMaxVersions(1)
843          .setCompressionType(entry.getValue())
844          .setBlockCacheEnabled(false)
845          .setTimeToLive(0));
846    }
847    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
848  }
849
850  /**
851   * @return a map from column family names to compression algorithms for
852   *         testing column family compression. Column family names have special characters
853   */
854  private Map<String, Compression.Algorithm>
855      getMockColumnFamiliesForCompression (int numCfs) {
856    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
857    // use column family names having special characters
858    if (numCfs-- > 0) {
859      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
860    }
861    if (numCfs-- > 0) {
862      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
863    }
864    if (numCfs-- > 0) {
865      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
866    }
867    if (numCfs-- > 0) {
868      familyToCompression.put("Family3", Compression.Algorithm.NONE);
869    }
870    return familyToCompression;
871  }
872
873
874  /**
875   * Test for {@link HFileOutputFormat2#configureBloomType(HTableDescriptor, Configuration)} and
876   * {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}.
877   * Tests that the compression map is correctly serialized into
878   * and deserialized from configuration
879   *
880   * @throws IOException
881   */
882  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
883  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
884    for (int numCfs = 0; numCfs <= 2; numCfs++) {
885      Configuration conf = new Configuration(this.util.getConfiguration());
886      Map<String, BloomType> familyToBloomType =
887          getMockColumnFamiliesForBloomType(numCfs);
888      Table table = Mockito.mock(Table.class);
889      setupMockColumnFamiliesForBloomType(table,
890          familyToBloomType);
891      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
892              HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
893              Arrays.asList(table.getTableDescriptor())));
894
895      // read back family specific data block encoding settings from the
896      // configuration
897      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
898          HFileOutputFormat2
899              .createFamilyBloomTypeMap(conf);
900
901      // test that we have a value for all column families that matches with the
902      // used mock values
903      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
904        assertEquals("BloomType configuration incorrect for column family:"
905            + entry.getKey(), entry.getValue(),
906            retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())));
907      }
908    }
909  }
910
911  private void setupMockColumnFamiliesForBloomType(Table table,
912      Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
913    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
914    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
915      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
916          .setMaxVersions(1)
917          .setBloomFilterType(entry.getValue())
918          .setBlockCacheEnabled(false)
919          .setTimeToLive(0));
920    }
921    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
922  }
923
924  /**
925   * @return a map from column family names to compression algorithms for
926   *         testing column family compression. Column family names have special characters
927   */
928  private Map<String, BloomType>
929  getMockColumnFamiliesForBloomType (int numCfs) {
930    Map<String, BloomType> familyToBloomType = new HashMap<>();
931    // use column family names having special characters
932    if (numCfs-- > 0) {
933      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
934    }
935    if (numCfs-- > 0) {
936      familyToBloomType.put("Family2=asdads&!AASD",
937          BloomType.ROWCOL);
938    }
939    if (numCfs-- > 0) {
940      familyToBloomType.put("Family3", BloomType.NONE);
941    }
942    return familyToBloomType;
943  }
944
945  /**
946   * Test for {@link HFileOutputFormat2#configureBlockSize(HTableDescriptor, Configuration)} and
947   * {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}.
948   * Tests that the compression map is correctly serialized into
949   * and deserialized from configuration
950   *
951   * @throws IOException
952   */
953  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
954  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
955    for (int numCfs = 0; numCfs <= 3; numCfs++) {
956      Configuration conf = new Configuration(this.util.getConfiguration());
957      Map<String, Integer> familyToBlockSize =
958          getMockColumnFamiliesForBlockSize(numCfs);
959      Table table = Mockito.mock(Table.class);
960      setupMockColumnFamiliesForBlockSize(table,
961          familyToBlockSize);
962      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
963              HFileOutputFormat2.serializeColumnFamilyAttribute
964                      (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
965                              .getTableDescriptor())));
966
967      // read back family specific data block encoding settings from the
968      // configuration
969      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
970          HFileOutputFormat2
971              .createFamilyBlockSizeMap(conf);
972
973      // test that we have a value for all column families that matches with the
974      // used mock values
975      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
976          ) {
977        assertEquals("BlockSize configuration incorrect for column family:"
978            + entry.getKey(), entry.getValue(),
979            retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())));
980      }
981    }
982  }
983
984  private void setupMockColumnFamiliesForBlockSize(Table table,
985      Map<String, Integer> familyToDataBlockEncoding) throws IOException {
986    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
987    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
988      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
989          .setMaxVersions(1)
990          .setBlocksize(entry.getValue())
991          .setBlockCacheEnabled(false)
992          .setTimeToLive(0));
993    }
994    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
995  }
996
997  /**
998   * @return a map from column family names to compression algorithms for
999   *         testing column family compression. Column family names have special characters
1000   */
1001  private Map<String, Integer>
1002  getMockColumnFamiliesForBlockSize (int numCfs) {
1003    Map<String, Integer> familyToBlockSize = new HashMap<>();
1004    // use column family names having special characters
1005    if (numCfs-- > 0) {
1006      familyToBlockSize.put("Family1!@#!@#&", 1234);
1007    }
1008    if (numCfs-- > 0) {
1009      familyToBlockSize.put("Family2=asdads&!AASD",
1010          Integer.MAX_VALUE);
1011    }
1012    if (numCfs-- > 0) {
1013      familyToBlockSize.put("Family2=asdads&!AASD",
1014          Integer.MAX_VALUE);
1015    }
1016    if (numCfs-- > 0) {
1017      familyToBlockSize.put("Family3", 0);
1018    }
1019    return familyToBlockSize;
1020  }
1021
1022  /**
1023   * Test for {@link HFileOutputFormat2#configureDataBlockEncoding(HTableDescriptor, Configuration)}
1024   * and {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
1025   * Tests that the compression map is correctly serialized into
1026   * and deserialized from configuration
1027   *
1028   * @throws IOException
1029   */
1030  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1031  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1032    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1033      Configuration conf = new Configuration(this.util.getConfiguration());
1034      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1035          getMockColumnFamiliesForDataBlockEncoding(numCfs);
1036      Table table = Mockito.mock(Table.class);
1037      setupMockColumnFamiliesForDataBlockEncoding(table,
1038          familyToDataBlockEncoding);
1039      HTableDescriptor tableDescriptor = table.getTableDescriptor();
1040      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1041              HFileOutputFormat2.serializeColumnFamilyAttribute
1042                      (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
1043                      .asList(tableDescriptor)));
1044
1045      // read back family specific data block encoding settings from the
1046      // configuration
1047      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1048          HFileOutputFormat2
1049          .createFamilyDataBlockEncodingMap(conf);
1050
1051      // test that we have a value for all column families that matches with the
1052      // used mock values
1053      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1054        assertEquals("DataBlockEncoding configuration incorrect for column family:"
1055            + entry.getKey(), entry.getValue(),
1056            retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())));
1057      }
1058    }
1059  }
1060
1061  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1062      Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1063    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1064    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1065      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
1066          .setMaxVersions(1)
1067          .setDataBlockEncoding(entry.getValue())
1068          .setBlockCacheEnabled(false)
1069          .setTimeToLive(0));
1070    }
1071    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1072  }
1073
1074  /**
1075   * @return a map from column family names to compression algorithms for
1076   *         testing column family compression. Column family names have special characters
1077   */
1078  private Map<String, DataBlockEncoding>
1079      getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
1080    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1081    // use column family names having special characters
1082    if (numCfs-- > 0) {
1083      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1084    }
1085    if (numCfs-- > 0) {
1086      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1087          DataBlockEncoding.FAST_DIFF);
1088    }
1089    if (numCfs-- > 0) {
1090      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1091          DataBlockEncoding.PREFIX);
1092    }
1093    if (numCfs-- > 0) {
1094      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1095    }
1096    return familyToDataBlockEncoding;
1097  }
1098
1099  private void setupMockStartKeys(RegionLocator table) throws IOException {
1100    byte[][] mockKeys = new byte[][] {
1101        HConstants.EMPTY_BYTE_ARRAY,
1102        Bytes.toBytes("aaa"),
1103        Bytes.toBytes("ggg"),
1104        Bytes.toBytes("zzz")
1105    };
1106    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1107  }
1108
1109  private void setupMockTableName(RegionLocator table) throws IOException {
1110    TableName mockTableName = TableName.valueOf("mock_table");
1111    Mockito.doReturn(mockTableName).when(table).getName();
1112  }
1113
1114  /**
1115   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
1116   * bloom filter settings from the column family descriptor
1117   */
1118  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1119  public void testColumnFamilySettings() throws Exception {
1120    Configuration conf = new Configuration(this.util.getConfiguration());
1121    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1122    TaskAttemptContext context = null;
1123    Path dir = util.getDataTestDir("testColumnFamilySettings");
1124
1125    // Setup table descriptor
1126    Table table = Mockito.mock(Table.class);
1127    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1128    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1129    Mockito.doReturn(htd).when(table).getTableDescriptor();
1130    for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
1131      htd.addFamily(hcd);
1132    }
1133
1134    // set up the table to return some mock keys
1135    setupMockStartKeys(regionLocator);
1136
1137    try {
1138      // partial map red setup to get an operational writer for testing
1139      // We turn off the sequence file compression, because DefaultCodec
1140      // pollutes the GZip codec pool with an incompatible compressor.
1141      conf.set("io.seqfile.compression.type", "NONE");
1142      conf.set("hbase.fs.tmp.dir", dir.toString());
1143      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1144      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1145
1146      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1147      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1148      setupRandomGeneratorMapper(job, false);
1149      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1150      FileOutputFormat.setOutputPath(job, dir);
1151      context = createTestTaskAttemptContext(job);
1152      HFileOutputFormat2 hof = new HFileOutputFormat2();
1153      writer = hof.getRecordWriter(context);
1154
1155      // write out random rows
1156      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1157      writer.close(context);
1158
1159      // Make sure that a directory was created for every CF
1160      FileSystem fs = dir.getFileSystem(conf);
1161
1162      // commit so that the filesystem has one directory per column family
1163      hof.getOutputCommitter(context).commitTask(context);
1164      hof.getOutputCommitter(context).commitJob(context);
1165      FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1166      assertEquals(htd.getFamilies().size(), families.length);
1167      for (FileStatus f : families) {
1168        String familyStr = f.getPath().getName();
1169        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1170        // verify that the compression on this file matches the configured
1171        // compression
1172        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1173        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1174        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1175
1176        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1177        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1178        assertEquals("Incorrect bloom filter used for column family " + familyStr +
1179          "(reader: " + reader + ")",
1180          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1181        assertEquals("Incorrect compression used for column family " + familyStr +
1182          "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression());
1183      }
1184    } finally {
1185      dir.getFileSystem(conf).delete(dir, true);
1186    }
1187  }
1188
1189  /**
1190   * Write random values to the writer assuming a table created using
1191   * {@link #FAMILIES} as column family descriptors
1192   */
1193  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1194      TaskAttemptContext context, Set<byte[]> families, int numRows)
1195      throws IOException, InterruptedException {
1196    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1197    int valLength = 10;
1198    byte valBytes[] = new byte[valLength];
1199
1200    int taskId = context.getTaskAttemptID().getTaskID().getId();
1201    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1202    final byte [] qualifier = Bytes.toBytes("data");
1203    Random random = new Random();
1204    for (int i = 0; i < numRows; i++) {
1205
1206      Bytes.putInt(keyBytes, 0, i);
1207      random.nextBytes(valBytes);
1208      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1209
1210      for (byte[] family : families) {
1211        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1212        writer.write(key, kv);
1213      }
1214    }
1215  }
1216
1217  /**
1218   * This test is to test the scenario happened in HBASE-6901.
1219   * All files are bulk loaded and excluded from minor compaction.
1220   * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
1221   * will be thrown.
1222   */
1223  @Ignore ("Flakey: See HBASE-9051") @Test
1224  public void testExcludeAllFromMinorCompaction() throws Exception {
1225    Configuration conf = util.getConfiguration();
1226    conf.setInt("hbase.hstore.compaction.min", 2);
1227    generateRandomStartKeys(5);
1228
1229    util.startMiniCluster();
1230    try (Connection conn = ConnectionFactory.createConnection();
1231        Admin admin = conn.getAdmin();
1232        Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1233        RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1234      final FileSystem fs = util.getDFSCluster().getFileSystem();
1235      assertEquals("Should start with empty table", 0, util.countRows(table));
1236
1237      // deep inspection: get the StoreFile dir
1238      final Path storePath = new Path(
1239        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1240          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1241            Bytes.toString(FAMILIES[0])));
1242      assertEquals(0, fs.listStatus(storePath).length);
1243
1244      // Generate two bulk load files
1245      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1246          true);
1247
1248      for (int i = 0; i < 2; i++) {
1249        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1250        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1251                .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
1252        // Perform the actual load
1253        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1254      }
1255
1256      // Ensure data shows up
1257      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1258      assertEquals("LoadIncrementalHFiles should put expected data in table",
1259          expectedRows, util.countRows(table));
1260
1261      // should have a second StoreFile now
1262      assertEquals(2, fs.listStatus(storePath).length);
1263
1264      // minor compactions shouldn't get rid of the file
1265      admin.compact(TABLE_NAMES[0]);
1266      try {
1267        quickPoll(new Callable<Boolean>() {
1268          @Override
1269          public Boolean call() throws Exception {
1270            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1271            for (HRegion region : regions) {
1272              for (HStore store : region.getStores()) {
1273                store.closeAndArchiveCompactedFiles();
1274              }
1275            }
1276            return fs.listStatus(storePath).length == 1;
1277          }
1278        }, 5000);
1279        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1280      } catch (AssertionError ae) {
1281        // this is expected behavior
1282      }
1283
1284      // a major compaction should work though
1285      admin.majorCompact(TABLE_NAMES[0]);
1286      quickPoll(new Callable<Boolean>() {
1287        @Override
1288        public Boolean call() throws Exception {
1289          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1290          for (HRegion region : regions) {
1291            for (HStore store : region.getStores()) {
1292              store.closeAndArchiveCompactedFiles();
1293            }
1294          }
1295          return fs.listStatus(storePath).length == 1;
1296        }
1297      }, 5000);
1298
1299    } finally {
1300      util.shutdownMiniCluster();
1301    }
1302  }
1303
1304  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1305  public void testExcludeMinorCompaction() throws Exception {
1306    Configuration conf = util.getConfiguration();
1307    conf.setInt("hbase.hstore.compaction.min", 2);
1308    generateRandomStartKeys(5);
1309
1310    util.startMiniCluster();
1311    try (Connection conn = ConnectionFactory.createConnection(conf);
1312        Admin admin = conn.getAdmin()){
1313      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1314      final FileSystem fs = util.getDFSCluster().getFileSystem();
1315      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1316      assertEquals("Should start with empty table", 0, util.countRows(table));
1317
1318      // deep inspection: get the StoreFile dir
1319      final Path storePath = new Path(
1320        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1321          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1322            Bytes.toString(FAMILIES[0])));
1323      assertEquals(0, fs.listStatus(storePath).length);
1324
1325      // put some data in it and flush to create a storefile
1326      Put p = new Put(Bytes.toBytes("test"));
1327      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1328      table.put(p);
1329      admin.flush(TABLE_NAMES[0]);
1330      assertEquals(1, util.countRows(table));
1331      quickPoll(new Callable<Boolean>() {
1332        @Override
1333        public Boolean call() throws Exception {
1334          return fs.listStatus(storePath).length == 1;
1335        }
1336      }, 5000);
1337
1338      // Generate a bulk load file with more rows
1339      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1340          true);
1341
1342      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1343      runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1344                      .getTableDescriptor(), regionLocator)), testDir, false);
1345
1346      // Perform the actual load
1347      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1348
1349      // Ensure data shows up
1350      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1351      assertEquals("LoadIncrementalHFiles should put expected data in table",
1352          expectedRows + 1, util.countRows(table));
1353
1354      // should have a second StoreFile now
1355      assertEquals(2, fs.listStatus(storePath).length);
1356
1357      // minor compactions shouldn't get rid of the file
1358      admin.compact(TABLE_NAMES[0]);
1359      try {
1360        quickPoll(new Callable<Boolean>() {
1361          @Override
1362          public Boolean call() throws Exception {
1363            return fs.listStatus(storePath).length == 1;
1364          }
1365        }, 5000);
1366        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1367      } catch (AssertionError ae) {
1368        // this is expected behavior
1369      }
1370
1371      // a major compaction should work though
1372      admin.majorCompact(TABLE_NAMES[0]);
1373      quickPoll(new Callable<Boolean>() {
1374        @Override
1375        public Boolean call() throws Exception {
1376          return fs.listStatus(storePath).length == 1;
1377        }
1378      }, 5000);
1379
1380    } finally {
1381      util.shutdownMiniCluster();
1382    }
1383  }
1384
1385  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1386    int sleepMs = 10;
1387    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1388    while (retries-- > 0) {
1389      if (c.call().booleanValue()) {
1390        return;
1391      }
1392      Thread.sleep(sleepMs);
1393    }
1394    fail();
1395  }
1396
1397  public static void main(String args[]) throws Exception {
1398    new TestHFileOutputFormat2().manualTest(args);
1399  }
1400
1401  public void manualTest(String args[]) throws Exception {
1402    Configuration conf = HBaseConfiguration.create();
1403    util = new HBaseTestingUtility(conf);
1404    if ("newtable".equals(args[0])) {
1405      TableName tname = TableName.valueOf(args[1]);
1406      byte[][] splitKeys = generateRandomSplitKeys(4);
1407      Table table = util.createTable(tname, FAMILIES, splitKeys);
1408    } else if ("incremental".equals(args[0])) {
1409      TableName tname = TableName.valueOf(args[1]);
1410      try(Connection c = ConnectionFactory.createConnection(conf);
1411          Admin admin = c.getAdmin();
1412          RegionLocator regionLocator = c.getRegionLocator(tname)) {
1413        Path outDir = new Path("incremental-out");
1414        runIncrementalPELoad(conf,
1415          Arrays
1416            .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)),
1417          outDir, false);
1418      }
1419    } else {
1420      throw new RuntimeException(
1421          "usage: TestHFileOutputFormat2 newtable | incremental");
1422    }
1423  }
1424
1425  @Test
1426  public void testBlockStoragePolicy() throws Exception {
1427    util = new HBaseTestingUtility();
1428    Configuration conf = util.getConfiguration();
1429    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1430
1431    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
1432            Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
1433                    TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
1434    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1435    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1436    util.startMiniDFSCluster(3);
1437    FileSystem fs = util.getDFSCluster().getFileSystem();
1438    try {
1439      fs.mkdirs(cf1Dir);
1440      fs.mkdirs(cf2Dir);
1441
1442      // the original block storage policy would be HOT
1443      String spA = getStoragePolicyName(fs, cf1Dir);
1444      String spB = getStoragePolicyName(fs, cf2Dir);
1445      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1446      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1447      assertEquals("HOT", spA);
1448      assertEquals("HOT", spB);
1449
1450      // alter table cf schema to change storage policies
1451      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1452              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1453      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1454              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1455      spA = getStoragePolicyName(fs, cf1Dir);
1456      spB = getStoragePolicyName(fs, cf2Dir);
1457      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1458      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1459      assertNotNull(spA);
1460      assertEquals("ONE_SSD", spA);
1461      assertNotNull(spB);
1462      assertEquals("ALL_SSD", spB);
1463    } finally {
1464      fs.delete(cf1Dir, true);
1465      fs.delete(cf2Dir, true);
1466      util.shutdownMiniDFSCluster();
1467    }
1468  }
1469
1470  private String getStoragePolicyName(FileSystem fs, Path path) {
1471    try {
1472      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1473      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1474    } catch (Exception e) {
1475      // Maybe fail because of using old HDFS version, try the old way
1476      if (LOG.isTraceEnabled()) {
1477        LOG.trace("Failed to get policy directly", e);
1478      }
1479      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1480      return policy == null ? "HOT" : policy;// HOT by default
1481    }
1482  }
1483
1484  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1485    try {
1486      if (fs instanceof DistributedFileSystem) {
1487        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1488        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1489        if (null != status) {
1490          byte storagePolicyId = status.getStoragePolicy();
1491          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1492          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1493            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1494            for (BlockStoragePolicy policy : policies) {
1495              if (policy.getId() == storagePolicyId) {
1496                return policy.getName();
1497              }
1498            }
1499          }
1500        }
1501      }
1502    } catch (Throwable e) {
1503      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1504    }
1505
1506    return null;
1507  }
1508
1509  @Test
1510  public void TestConfigurePartitioner() throws IOException {
1511    Configuration conf = util.getConfiguration();
1512    // Create a user who is not the current user
1513    String fooUserName = "foo1234";
1514    String fooGroupName = "group1";
1515    UserGroupInformation
1516        ugi = UserGroupInformation.createUserForTesting(fooUserName, new String[]{fooGroupName});
1517    // Get user's home directory
1518    Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() {
1519      @Override public Path run() {
1520        try (FileSystem fs = FileSystem.get(conf)) {
1521          return fs.makeQualified(fs.getHomeDirectory());
1522        } catch (IOException ioe) {
1523          LOG.error("Failed to get foo's home directory", ioe);
1524        }
1525        return null;
1526      }
1527    });
1528
1529    Job job = Mockito.mock(Job.class);
1530    Mockito.doReturn(conf).when(job).getConfiguration();
1531    ImmutableBytesWritable writable = new ImmutableBytesWritable();
1532    List<ImmutableBytesWritable> splitPoints = new LinkedList<ImmutableBytesWritable>();
1533    splitPoints.add(writable);
1534
1535    ugi.doAs(new PrivilegedAction<Void>() {
1536      @Override public Void run() {
1537        try {
1538          HFileOutputFormat2.configurePartitioner(job, splitPoints, false);
1539        } catch (IOException ioe) {
1540          LOG.error("Failed to configure partitioner", ioe);
1541        }
1542        return null;
1543      }
1544    });
1545    FileSystem fs = FileSystem.get(conf);
1546    // verify that the job uses TotalOrderPartitioner
1547    verify(job).setPartitionerClass(TotalOrderPartitioner.class);
1548    // verify that TotalOrderPartitioner.setPartitionFile() is called.
1549    String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path");
1550    Assert.assertNotNull(partitionPathString);
1551    // Make sure the partion file is in foo1234's home directory, and that
1552    // the file exists.
1553    Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
1554    Assert.assertTrue(fs.exists(new Path(partitionPathString)));
1555  }
1556
1557  @Test
1558  public void TestConfigureCompression() throws Exception {
1559    Configuration conf = new Configuration(this.util.getConfiguration());
1560    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1561    TaskAttemptContext context = null;
1562    Path dir = util.getDataTestDir("TestConfigureCompression");
1563    String hfileoutputformatCompression = "gz";
1564
1565    try {
1566      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1567      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1568
1569      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1570
1571      Job job = Job.getInstance(conf);
1572      FileOutputFormat.setOutputPath(job, dir);
1573      context = createTestTaskAttemptContext(job);
1574      HFileOutputFormat2 hof = new HFileOutputFormat2();
1575      writer = hof.getRecordWriter(context);
1576      final byte[] b = Bytes.toBytes("b");
1577
1578      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1579      writer.write(new ImmutableBytesWritable(), kv);
1580      writer.close(context);
1581      writer = null;
1582      FileSystem fs = dir.getFileSystem(conf);
1583      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1584      while (iterator.hasNext()) {
1585        LocatedFileStatus keyFileStatus = iterator.next();
1586        HFile.Reader reader =
1587            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1588        assertEquals(reader.getCompressionAlgorithm().getName(), hfileoutputformatCompression);
1589      }
1590    } finally {
1591      if (writer != null && context != null) {
1592        writer.close(context);
1593      }
1594      dir.getFileSystem(conf).delete(dir, true);
1595    }
1596
1597  }
1598
1599}
1600