001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import static org.mockito.Mockito.verify;
028
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.security.PrivilegedAction;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.HashMap;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Random;
040import java.util.Set;
041import java.util.concurrent.Callable;
042import java.util.stream.Collectors;
043import java.util.stream.Stream;
044import org.apache.hadoop.conf.Configuration;
045import org.apache.hadoop.fs.FileStatus;
046import org.apache.hadoop.fs.FileSystem;
047import org.apache.hadoop.fs.LocatedFileStatus;
048import org.apache.hadoop.fs.Path;
049import org.apache.hadoop.fs.RemoteIterator;
050import org.apache.hadoop.hbase.ArrayBackedTag;
051import org.apache.hadoop.hbase.Cell;
052import org.apache.hadoop.hbase.CellUtil;
053import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
054import org.apache.hadoop.hbase.HBaseClassTestRule;
055import org.apache.hadoop.hbase.HBaseConfiguration;
056import org.apache.hadoop.hbase.HBaseTestingUtility;
057import org.apache.hadoop.hbase.HColumnDescriptor;
058import org.apache.hadoop.hbase.HConstants;
059import org.apache.hadoop.hbase.HDFSBlocksDistribution;
060import org.apache.hadoop.hbase.HTableDescriptor;
061import org.apache.hadoop.hbase.HadoopShims;
062import org.apache.hadoop.hbase.KeyValue;
063import org.apache.hadoop.hbase.PerformanceEvaluation;
064import org.apache.hadoop.hbase.PrivateCellUtil;
065import org.apache.hadoop.hbase.StartMiniClusterOption;
066import org.apache.hadoop.hbase.TableName;
067import org.apache.hadoop.hbase.Tag;
068import org.apache.hadoop.hbase.TagType;
069import org.apache.hadoop.hbase.client.Admin;
070import org.apache.hadoop.hbase.client.Connection;
071import org.apache.hadoop.hbase.client.ConnectionFactory;
072import org.apache.hadoop.hbase.client.Put;
073import org.apache.hadoop.hbase.client.RegionLocator;
074import org.apache.hadoop.hbase.client.Result;
075import org.apache.hadoop.hbase.client.ResultScanner;
076import org.apache.hadoop.hbase.client.Scan;
077import org.apache.hadoop.hbase.client.Table;
078import org.apache.hadoop.hbase.client.TableDescriptor;
079import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
080import org.apache.hadoop.hbase.io.compress.Compression;
081import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
082import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
083import org.apache.hadoop.hbase.io.hfile.CacheConfig;
084import org.apache.hadoop.hbase.io.hfile.HFile;
085import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
086import org.apache.hadoop.hbase.io.hfile.HFileScanner;
087import org.apache.hadoop.hbase.regionserver.BloomType;
088import org.apache.hadoop.hbase.regionserver.HRegion;
089import org.apache.hadoop.hbase.regionserver.HStore;
090import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
091import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
092import org.apache.hadoop.hbase.testclassification.LargeTests;
093import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
094import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
095import org.apache.hadoop.hbase.util.Bytes;
096import org.apache.hadoop.hbase.util.FSUtils;
097import org.apache.hadoop.hbase.util.ReflectionUtils;
098import org.apache.hadoop.hdfs.DistributedFileSystem;
099import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
100import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
101import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
102import org.apache.hadoop.io.NullWritable;
103import org.apache.hadoop.mapreduce.Job;
104import org.apache.hadoop.mapreduce.Mapper;
105import org.apache.hadoop.mapreduce.RecordWriter;
106import org.apache.hadoop.mapreduce.TaskAttemptContext;
107import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
108import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
109import org.apache.hadoop.security.UserGroupInformation;
110import org.junit.Assert;
111import org.junit.ClassRule;
112import org.junit.Ignore;
113import org.junit.Test;
114import org.junit.experimental.categories.Category;
115import org.mockito.Mockito;
116import org.slf4j.Logger;
117import org.slf4j.LoggerFactory;
118
119/**
120 * Simple test for {@link HFileOutputFormat2}.
121 * Sets up and runs a mapreduce job that writes hfile output.
122 * Creates a few inner classes to implement splits and an inputformat that
123 * emits keys and values like those of {@link PerformanceEvaluation}.
124 */
125@Category({VerySlowMapReduceTests.class, LargeTests.class})
126public class TestHFileOutputFormat2  {
127
128  @ClassRule
129  public static final HBaseClassTestRule CLASS_RULE =
130      HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
131
132  private final static int ROWSPERSPLIT = 1024;
133
134  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
135  private static final byte[][] FAMILIES = {
136    Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))};
137  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
138          "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
139
140  private HBaseTestingUtility util = new HBaseTestingUtility();
141
142  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
143
144  /**
145   * Simple mapper that makes KeyValue output.
146   */
147  static class RandomKVGeneratingMapper
148      extends Mapper<NullWritable, NullWritable,
149                 ImmutableBytesWritable, Cell> {
150
151    private int keyLength;
152    private static final int KEYLEN_DEFAULT=10;
153    private static final String KEYLEN_CONF="randomkv.key.length";
154
155    private int valLength;
156    private static final int VALLEN_DEFAULT=10;
157    private static final String VALLEN_CONF="randomkv.val.length";
158    private static final byte [] QUALIFIER = Bytes.toBytes("data");
159    private boolean multiTableMapper = false;
160    private TableName[] tables = null;
161
162
163    @Override
164    protected void setup(Context context) throws IOException,
165        InterruptedException {
166      super.setup(context);
167
168      Configuration conf = context.getConfiguration();
169      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
170      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
171      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
172              false);
173      if (multiTableMapper) {
174        tables = TABLE_NAMES;
175      } else {
176        tables = new TableName[]{TABLE_NAMES[0]};
177      }
178    }
179
180    @Override
181    protected void map(
182        NullWritable n1, NullWritable n2,
183        Mapper<NullWritable, NullWritable,
184               ImmutableBytesWritable,Cell>.Context context)
185        throws java.io.IOException ,InterruptedException
186    {
187
188      byte keyBytes[] = new byte[keyLength];
189      byte valBytes[] = new byte[valLength];
190
191      int taskId = context.getTaskAttemptID().getTaskID().getId();
192      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
193      Random random = new Random();
194      byte[] key;
195      for (int j = 0; j < tables.length; ++j) {
196        for (int i = 0; i < ROWSPERSPLIT; i++) {
197          random.nextBytes(keyBytes);
198          // Ensure that unique tasks generate unique keys
199          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
200          random.nextBytes(valBytes);
201          key = keyBytes;
202          if (multiTableMapper) {
203            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
204          }
205
206          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
207            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
208            context.write(new ImmutableBytesWritable(key), kv);
209          }
210        }
211      }
212    }
213  }
214
215  /**
216   * Simple mapper that makes Put output.
217   */
218  static class RandomPutGeneratingMapper
219      extends Mapper<NullWritable, NullWritable,
220                 ImmutableBytesWritable, Put> {
221
222    private int keyLength;
223    private static final int KEYLEN_DEFAULT = 10;
224    private static final String KEYLEN_CONF = "randomkv.key.length";
225
226    private int valLength;
227    private static final int VALLEN_DEFAULT = 10;
228    private static final String VALLEN_CONF = "randomkv.val.length";
229    private static final byte[] QUALIFIER = Bytes.toBytes("data");
230    private boolean multiTableMapper = false;
231    private TableName[] tables = null;
232
233    @Override
234    protected void setup(Context context) throws IOException,
235            InterruptedException {
236      super.setup(context);
237
238      Configuration conf = context.getConfiguration();
239      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
240      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
241      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
242              false);
243      if (multiTableMapper) {
244        tables = TABLE_NAMES;
245      } else {
246        tables = new TableName[]{TABLE_NAMES[0]};
247      }
248    }
249
250    @Override
251    protected void map(
252            NullWritable n1, NullWritable n2,
253            Mapper<NullWritable, NullWritable,
254                    ImmutableBytesWritable, Put>.Context context)
255            throws java.io.IOException, InterruptedException {
256
257      byte keyBytes[] = new byte[keyLength];
258      byte valBytes[] = new byte[valLength];
259
260      int taskId = context.getTaskAttemptID().getTaskID().getId();
261      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
262
263      Random random = new Random();
264      byte[] key;
265      for (int j = 0; j < tables.length; ++j) {
266        for (int i = 0; i < ROWSPERSPLIT; i++) {
267          random.nextBytes(keyBytes);
268          // Ensure that unique tasks generate unique keys
269          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
270          random.nextBytes(valBytes);
271          key = keyBytes;
272          if (multiTableMapper) {
273            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
274          }
275
276          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
277            Put p = new Put(keyBytes);
278            p.addColumn(family, QUALIFIER, valBytes);
279            // set TTL to very low so that the scan does not return any value
280            p.setTTL(1l);
281            context.write(new ImmutableBytesWritable(key), p);
282          }
283        }
284      }
285    }
286  }
287
288  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
289    if (putSortReducer) {
290      job.setInputFormatClass(NMapInputFormat.class);
291      job.setMapperClass(RandomPutGeneratingMapper.class);
292      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
293      job.setMapOutputValueClass(Put.class);
294    } else {
295      job.setInputFormatClass(NMapInputFormat.class);
296      job.setMapperClass(RandomKVGeneratingMapper.class);
297      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
298      job.setMapOutputValueClass(KeyValue.class);
299    }
300  }
301
302  /**
303   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
304   * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
305   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
306   */
307  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
308  public void test_LATEST_TIMESTAMP_isReplaced()
309  throws Exception {
310    Configuration conf = new Configuration(this.util.getConfiguration());
311    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
312    TaskAttemptContext context = null;
313    Path dir =
314      util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
315    try {
316      Job job = new Job(conf);
317      FileOutputFormat.setOutputPath(job, dir);
318      context = createTestTaskAttemptContext(job);
319      HFileOutputFormat2 hof = new HFileOutputFormat2();
320      writer = hof.getRecordWriter(context);
321      final byte [] b = Bytes.toBytes("b");
322
323      // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
324      // changed by call to write.  Check all in kv is same but ts.
325      KeyValue kv = new KeyValue(b, b, b);
326      KeyValue original = kv.clone();
327      writer.write(new ImmutableBytesWritable(), kv);
328      assertFalse(original.equals(kv));
329      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
330      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
331      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
332      assertNotSame(original.getTimestamp(), kv.getTimestamp());
333      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
334
335      // Test 2. Now test passing a kv that has explicit ts.  It should not be
336      // changed by call to record write.
337      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
338      original = kv.clone();
339      writer.write(new ImmutableBytesWritable(), kv);
340      assertTrue(original.equals(kv));
341    } finally {
342      if (writer != null && context != null) writer.close(context);
343      dir.getFileSystem(conf).delete(dir, true);
344    }
345  }
346
347  private TaskAttemptContext createTestTaskAttemptContext(final Job job)
348  throws Exception {
349    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
350    TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
351      job, "attempt_201402131733_0001_m_000000_0");
352    return context;
353  }
354
355  /*
356   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
357   * metadata used by time-restricted scans.
358   */
359  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
360  public void test_TIMERANGE() throws Exception {
361    Configuration conf = new Configuration(this.util.getConfiguration());
362    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
363    TaskAttemptContext context = null;
364    Path dir =
365      util.getDataTestDir("test_TIMERANGE_present");
366    LOG.info("Timerange dir writing to dir: "+ dir);
367    try {
368      // build a record writer using HFileOutputFormat2
369      Job job = new Job(conf);
370      FileOutputFormat.setOutputPath(job, dir);
371      context = createTestTaskAttemptContext(job);
372      HFileOutputFormat2 hof = new HFileOutputFormat2();
373      writer = hof.getRecordWriter(context);
374
375      // Pass two key values with explicit times stamps
376      final byte [] b = Bytes.toBytes("b");
377
378      // value 1 with timestamp 2000
379      KeyValue kv = new KeyValue(b, b, b, 2000, b);
380      KeyValue original = kv.clone();
381      writer.write(new ImmutableBytesWritable(), kv);
382      assertEquals(original,kv);
383
384      // value 2 with timestamp 1000
385      kv = new KeyValue(b, b, b, 1000, b);
386      original = kv.clone();
387      writer.write(new ImmutableBytesWritable(), kv);
388      assertEquals(original, kv);
389
390      // verify that the file has the proper FileInfo.
391      writer.close(context);
392
393      // the generated file lives 1 directory down from the attempt directory
394      // and is the only file, e.g.
395      // _attempt__0000_r_000000_0/b/1979617994050536795
396      FileSystem fs = FileSystem.get(conf);
397      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
398      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
399      FileStatus[] file = fs.listStatus(sub1[0].getPath());
400
401      // open as HFile Reader and pull out TIMERANGE FileInfo.
402      HFile.Reader rd =
403          HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
404      Map<byte[],byte[]> finfo = rd.loadFileInfo();
405      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
406      assertNotNull(range);
407
408      // unmarshall and check values.
409      TimeRangeTracker timeRangeTracker =TimeRangeTracker.parseFrom(range);
410      LOG.info(timeRangeTracker.getMin() +
411          "...." + timeRangeTracker.getMax());
412      assertEquals(1000, timeRangeTracker.getMin());
413      assertEquals(2000, timeRangeTracker.getMax());
414      rd.close();
415    } finally {
416      if (writer != null && context != null) writer.close(context);
417      dir.getFileSystem(conf).delete(dir, true);
418    }
419  }
420
421  /**
422   * Run small MR job.
423   */
424  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
425  public void testWritingPEData() throws Exception {
426    Configuration conf = util.getConfiguration();
427    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
428    FileSystem fs = testDir.getFileSystem(conf);
429
430    // Set down this value or we OOME in eclipse.
431    conf.setInt("mapreduce.task.io.sort.mb", 20);
432    // Write a few files.
433    conf.setLong(HConstants.HREGION_MAX_FILESIZE, 64 * 1024);
434
435    Job job = new Job(conf, "testWritingPEData");
436    setupRandomGeneratorMapper(job, false);
437    // This partitioner doesn't work well for number keys but using it anyways
438    // just to demonstrate how to configure it.
439    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
440    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
441
442    Arrays.fill(startKey, (byte)0);
443    Arrays.fill(endKey, (byte)0xff);
444
445    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
446    // Set start and end rows for partitioner.
447    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
448    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
449    job.setReducerClass(CellSortReducer.class);
450    job.setOutputFormatClass(HFileOutputFormat2.class);
451    job.setNumReduceTasks(4);
452    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
453        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
454        CellSerialization.class.getName());
455
456    FileOutputFormat.setOutputPath(job, testDir);
457    assertTrue(job.waitForCompletion(false));
458    FileStatus [] files = fs.listStatus(testDir);
459    assertTrue(files.length > 0);
460  }
461
462  /**
463   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
464   * hfile.
465   */
466  @Test
467  public void test_WritingTagData()
468      throws Exception {
469    Configuration conf = new Configuration(this.util.getConfiguration());
470    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
471    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
472    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
473    TaskAttemptContext context = null;
474    Path dir =
475        util.getDataTestDir("WritingTagData");
476    try {
477      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
478      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
479      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
480      Job job = new Job(conf);
481      FileOutputFormat.setOutputPath(job, dir);
482      context = createTestTaskAttemptContext(job);
483      HFileOutputFormat2 hof = new HFileOutputFormat2();
484      writer = hof.getRecordWriter(context);
485      final byte [] b = Bytes.toBytes("b");
486
487      List< Tag > tags = new ArrayList<>();
488      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
489      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
490      writer.write(new ImmutableBytesWritable(), kv);
491      writer.close(context);
492      writer = null;
493      FileSystem fs = dir.getFileSystem(conf);
494      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
495      while(iterator.hasNext()) {
496        LocatedFileStatus keyFileStatus = iterator.next();
497        HFile.Reader reader =
498            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
499        HFileScanner scanner = reader.getScanner(false, false, false);
500        scanner.seekTo();
501        Cell cell = scanner.getCell();
502        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
503        assertTrue(tagsFromCell.size() > 0);
504        for (Tag tag : tagsFromCell) {
505          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
506        }
507      }
508    } finally {
509      if (writer != null && context != null) writer.close(context);
510      dir.getFileSystem(conf).delete(dir, true);
511    }
512  }
513
514  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
515  public void testJobConfiguration() throws Exception {
516    Configuration conf = new Configuration(this.util.getConfiguration());
517    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
518        .toString());
519    Job job = new Job(conf);
520    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
521    Table table = Mockito.mock(Table.class);
522    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
523    setupMockStartKeys(regionLocator);
524    setupMockTableName(regionLocator);
525    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
526    assertEquals(job.getNumReduceTasks(), 4);
527  }
528
529  private byte [][] generateRandomStartKeys(int numKeys) {
530    Random random = new Random();
531    byte[][] ret = new byte[numKeys][];
532    // first region start key is always empty
533    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
534    for (int i = 1; i < numKeys; i++) {
535      ret[i] =
536        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
537    }
538    return ret;
539  }
540
541  private byte[][] generateRandomSplitKeys(int numKeys) {
542    Random random = new Random();
543    byte[][] ret = new byte[numKeys][];
544    for (int i = 0; i < numKeys; i++) {
545      ret[i] =
546          PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
547    }
548    return ret;
549  }
550
551  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
552  public void testMRIncrementalLoad() throws Exception {
553    LOG.info("\nStarting test testMRIncrementalLoad\n");
554    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
555  }
556
557  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
558  public void testMRIncrementalLoadWithSplit() throws Exception {
559    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
560    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
561  }
562
563  /**
564   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
565   * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
566   * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
567   * it's not possible to check the region locality by comparing region locations and DN hostnames.
568   * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
569   * we could test region locality features more easily.
570   */
571  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
572  public void testMRIncrementalLoadWithLocality() throws Exception {
573    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
574    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
575    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
576  }
577
578  //@Ignore("Wahtevs")
579  @Test
580  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
581    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
582    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
583  }
584
585  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
586                                     boolean putSortReducer, String tableStr) throws Exception {
587      doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
588              Arrays.asList(tableStr));
589  }
590
591  @Test
592  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
593    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
594    doIncrementalLoadTest(false, false, true,
595            Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
596                    ()));
597  }
598
599  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
600      boolean putSortReducer, List<String> tableStr) throws Exception {
601    util = new HBaseTestingUtility();
602    Configuration conf = util.getConfiguration();
603    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
604    int hostCount = 1;
605    int regionNum = 5;
606    if (shouldKeepLocality) {
607      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
608      // explicit hostnames parameter just like MiniDFSCluster does.
609      hostCount = 3;
610      regionNum = 20;
611    }
612
613    String[] hostnames = new String[hostCount];
614    for (int i = 0; i < hostCount; ++i) {
615      hostnames[i] = "datanode_" + i;
616    }
617    StartMiniClusterOption option = StartMiniClusterOption.builder()
618        .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
619    util.startMiniCluster(option);
620
621    Map<String, Table> allTables = new HashMap<>(tableStr.size());
622    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
623    boolean writeMultipleTables = tableStr.size() > 1;
624    for (String tableStrSingle : tableStr) {
625      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
626      TableName tableName = TableName.valueOf(tableStrSingle);
627      Table table = util.createTable(tableName, FAMILIES, splitKeys);
628
629      RegionLocator r = util.getConnection().getRegionLocator(tableName);
630      assertEquals("Should start with empty table", 0, util.countRows(table));
631      int numRegions = r.getStartKeys().length;
632      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
633
634      allTables.put(tableStrSingle, table);
635      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r));
636    }
637    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
638    // Generate the bulk load files
639    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
640    if (writeMultipleTables) {
641      testDir = new Path(testDir, "default");
642    }
643
644    for (Table tableSingle : allTables.values()) {
645      // This doesn't write into the table, just makes files
646      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
647    }
648    int numTableDirs = 0;
649    FileStatus[] fss =
650        testDir.getFileSystem(conf).listStatus(testDir);
651    for (FileStatus tf: fss) {
652      Path tablePath = testDir;
653      if (writeMultipleTables) {
654        if (allTables.containsKey(tf.getPath().getName())) {
655          ++numTableDirs;
656          tablePath = tf.getPath();
657        }
658        else {
659          continue;
660        }
661      }
662
663      // Make sure that a directory was created for every CF
664      int dir = 0;
665      fss = tablePath.getFileSystem(conf).listStatus(tablePath);
666      for (FileStatus f: fss) {
667        for (byte[] family : FAMILIES) {
668          if (Bytes.toString(family).equals(f.getPath().getName())) {
669            ++dir;
670          }
671        }
672      }
673      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
674    }
675    if (writeMultipleTables) {
676      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
677    }
678
679    Admin admin = util.getConnection().getAdmin();
680    try {
681      // handle the split case
682      if (shouldChangeRegions) {
683        Table chosenTable = allTables.values().iterator().next();
684        // Choose a semi-random table if multiple tables are available
685        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
686        admin.disableTable(chosenTable.getName());
687        util.waitUntilNoRegionsInTransition();
688
689        util.deleteTable(chosenTable.getName());
690        byte[][] newSplitKeys = generateRandomSplitKeys(14);
691        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
692
693        while (util.getConnection().getRegionLocator(chosenTable.getName())
694                .getAllRegionLocations().size() != 15 ||
695                !admin.isTableAvailable(table.getName())) {
696          Thread.sleep(200);
697          LOG.info("Waiting for new region assignment to happen");
698        }
699      }
700
701      // Perform the actual load
702      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
703        Path tableDir = testDir;
704        String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
705        LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
706        if (writeMultipleTables) {
707          tableDir = new Path(testDir, tableNameStr);
708        }
709        Table currentTable = allTables.get(tableNameStr);
710        TableName currentTableName = currentTable.getName();
711        BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir);
712
713        // Ensure data shows up
714        int expectedRows = 0;
715        if (putSortReducer) {
716          // no rows should be extracted
717          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
718                  util.countRows(currentTable));
719        } else {
720          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
721          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
722                  util.countRows(currentTable));
723          Scan scan = new Scan();
724          ResultScanner results = currentTable.getScanner(scan);
725          for (Result res : results) {
726            assertEquals(FAMILIES.length, res.rawCells().length);
727            Cell first = res.rawCells()[0];
728            for (Cell kv : res.rawCells()) {
729              assertTrue(CellUtil.matchingRows(first, kv));
730              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
731            }
732          }
733          results.close();
734        }
735        String tableDigestBefore = util.checksumRows(currentTable);
736        // Check region locality
737        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
738        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
739          hbd.add(region.getHDFSBlocksDistribution());
740        }
741        for (String hostname : hostnames) {
742          float locality = hbd.getBlockLocalityIndex(hostname);
743          LOG.info("locality of [" + hostname + "]: " + locality);
744          assertEquals(100, (int) (locality * 100));
745        }
746
747        // Cause regions to reopen
748        admin.disableTable(currentTableName);
749        while (!admin.isTableDisabled(currentTableName)) {
750          Thread.sleep(200);
751          LOG.info("Waiting for table to disable");
752        }
753        admin.enableTable(currentTableName);
754        util.waitTableAvailable(currentTableName);
755        assertEquals("Data should remain after reopening of regions",
756                tableDigestBefore, util.checksumRows(currentTable));
757      }
758    } finally {
759      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
760          tableInfoSingle.getRegionLocator().close();
761      }
762      for (Entry<String, Table> singleTable : allTables.entrySet() ) {
763        singleTable.getValue().close();
764        util.deleteTable(singleTable.getValue().getName());
765      }
766      testDir.getFileSystem(conf).delete(testDir, true);
767      util.shutdownMiniCluster();
768    }
769  }
770
771  private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
772                                    boolean putSortReducer) throws IOException,
773          InterruptedException, ClassNotFoundException {
774    Job job = new Job(conf, "testLocalMRIncrementalLoad");
775    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
776    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
777        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
778        CellSerialization.class.getName());
779    setupRandomGeneratorMapper(job, putSortReducer);
780    if (tableInfo.size() > 1) {
781      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
782      int sum = 0;
783      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
784        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
785      }
786      assertEquals(sum, job.getNumReduceTasks());
787    }
788    else {
789      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
790      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(),
791              regionLocator);
792      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
793    }
794
795    FileOutputFormat.setOutputPath(job, outDir);
796
797    assertFalse(util.getTestFileSystem().exists(outDir)) ;
798
799    assertTrue(job.waitForCompletion(true));
800  }
801
802  /**
803   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}.
804   * Tests that the family compression map is correctly serialized into
805   * and deserialized from configuration
806   *
807   * @throws IOException
808   */
809  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
810  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
811    for (int numCfs = 0; numCfs <= 3; numCfs++) {
812      Configuration conf = new Configuration(this.util.getConfiguration());
813      Map<String, Compression.Algorithm> familyToCompression =
814          getMockColumnFamiliesForCompression(numCfs);
815      Table table = Mockito.mock(Table.class);
816      setupMockColumnFamiliesForCompression(table, familyToCompression);
817      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
818              HFileOutputFormat2.serializeColumnFamilyAttribute
819                      (HFileOutputFormat2.compressionDetails,
820                              Arrays.asList(table.getDescriptor())));
821
822      // read back family specific compression setting from the configuration
823      Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
824          .createFamilyCompressionMap(conf);
825
826      // test that we have a value for all column families that matches with the
827      // used mock values
828      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
829        assertEquals("Compression configuration incorrect for column family:"
830            + entry.getKey(), entry.getValue(),
831            retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())));
832      }
833    }
834  }
835
836  private void setupMockColumnFamiliesForCompression(Table table,
837      Map<String, Compression.Algorithm> familyToCompression) throws IOException {
838    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
839    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
840      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
841          .setMaxVersions(1)
842          .setCompressionType(entry.getValue())
843          .setBlockCacheEnabled(false)
844          .setTimeToLive(0));
845    }
846    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
847  }
848
849  /**
850   * @return a map from column family names to compression algorithms for
851   *         testing column family compression. Column family names have special characters
852   */
853  private Map<String, Compression.Algorithm>
854      getMockColumnFamiliesForCompression (int numCfs) {
855    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
856    // use column family names having special characters
857    if (numCfs-- > 0) {
858      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
859    }
860    if (numCfs-- > 0) {
861      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
862    }
863    if (numCfs-- > 0) {
864      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
865    }
866    if (numCfs-- > 0) {
867      familyToCompression.put("Family3", Compression.Algorithm.NONE);
868    }
869    return familyToCompression;
870  }
871
872
873  /**
874   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}.
875   * Tests that the family bloom type map is correctly serialized into
876   * and deserialized from configuration
877   *
878   * @throws IOException
879   */
880  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
881  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
882    for (int numCfs = 0; numCfs <= 2; numCfs++) {
883      Configuration conf = new Configuration(this.util.getConfiguration());
884      Map<String, BloomType> familyToBloomType =
885          getMockColumnFamiliesForBloomType(numCfs);
886      Table table = Mockito.mock(Table.class);
887      setupMockColumnFamiliesForBloomType(table,
888          familyToBloomType);
889      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
890              HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
891              Arrays.asList(table.getDescriptor())));
892
893      // read back family specific data block encoding settings from the
894      // configuration
895      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
896          HFileOutputFormat2
897              .createFamilyBloomTypeMap(conf);
898
899      // test that we have a value for all column families that matches with the
900      // used mock values
901      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
902        assertEquals("BloomType configuration incorrect for column family:"
903            + entry.getKey(), entry.getValue(),
904            retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())));
905      }
906    }
907  }
908
909  private void setupMockColumnFamiliesForBloomType(Table table,
910      Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
911    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
912    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
913      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
914          .setMaxVersions(1)
915          .setBloomFilterType(entry.getValue())
916          .setBlockCacheEnabled(false)
917          .setTimeToLive(0));
918    }
919    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
920  }
921
922  /**
923   * @return a map from column family names to compression algorithms for
924   *         testing column family compression. Column family names have special characters
925   */
926  private Map<String, BloomType>
927  getMockColumnFamiliesForBloomType (int numCfs) {
928    Map<String, BloomType> familyToBloomType = new HashMap<>();
929    // use column family names having special characters
930    if (numCfs-- > 0) {
931      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
932    }
933    if (numCfs-- > 0) {
934      familyToBloomType.put("Family2=asdads&!AASD",
935          BloomType.ROWCOL);
936    }
937    if (numCfs-- > 0) {
938      familyToBloomType.put("Family3", BloomType.NONE);
939    }
940    return familyToBloomType;
941  }
942
943  /**
944   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}.
945   * Tests that the family block size map is correctly serialized into
946   * and deserialized from configuration
947   *
948   * @throws IOException
949   */
950  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
951  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
952    for (int numCfs = 0; numCfs <= 3; numCfs++) {
953      Configuration conf = new Configuration(this.util.getConfiguration());
954      Map<String, Integer> familyToBlockSize =
955          getMockColumnFamiliesForBlockSize(numCfs);
956      Table table = Mockito.mock(Table.class);
957      setupMockColumnFamiliesForBlockSize(table,
958          familyToBlockSize);
959      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
960              HFileOutputFormat2.serializeColumnFamilyAttribute
961                      (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
962                              .getDescriptor())));
963
964      // read back family specific data block encoding settings from the
965      // configuration
966      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
967          HFileOutputFormat2
968              .createFamilyBlockSizeMap(conf);
969
970      // test that we have a value for all column families that matches with the
971      // used mock values
972      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
973          ) {
974        assertEquals("BlockSize configuration incorrect for column family:"
975            + entry.getKey(), entry.getValue(),
976            retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())));
977      }
978    }
979  }
980
981  private void setupMockColumnFamiliesForBlockSize(Table table,
982      Map<String, Integer> familyToDataBlockEncoding) throws IOException {
983    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
984    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
985      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
986          .setMaxVersions(1)
987          .setBlocksize(entry.getValue())
988          .setBlockCacheEnabled(false)
989          .setTimeToLive(0));
990    }
991    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
992  }
993
994  /**
995   * @return a map from column family names to compression algorithms for
996   *         testing column family compression. Column family names have special characters
997   */
998  private Map<String, Integer>
999  getMockColumnFamiliesForBlockSize (int numCfs) {
1000    Map<String, Integer> familyToBlockSize = new HashMap<>();
1001    // use column family names having special characters
1002    if (numCfs-- > 0) {
1003      familyToBlockSize.put("Family1!@#!@#&", 1234);
1004    }
1005    if (numCfs-- > 0) {
1006      familyToBlockSize.put("Family2=asdads&!AASD",
1007          Integer.MAX_VALUE);
1008    }
1009    if (numCfs-- > 0) {
1010      familyToBlockSize.put("Family2=asdads&!AASD",
1011          Integer.MAX_VALUE);
1012    }
1013    if (numCfs-- > 0) {
1014      familyToBlockSize.put("Family3", 0);
1015    }
1016    return familyToBlockSize;
1017  }
1018
1019  /**
1020   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
1021   * Tests that the family data block encoding map is correctly serialized into
1022   * and deserialized from configuration
1023   *
1024   * @throws IOException
1025   */
1026  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1027  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1028    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1029      Configuration conf = new Configuration(this.util.getConfiguration());
1030      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1031          getMockColumnFamiliesForDataBlockEncoding(numCfs);
1032      Table table = Mockito.mock(Table.class);
1033      setupMockColumnFamiliesForDataBlockEncoding(table,
1034          familyToDataBlockEncoding);
1035      TableDescriptor tableDescriptor = table.getDescriptor();
1036      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1037              HFileOutputFormat2.serializeColumnFamilyAttribute
1038                      (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
1039                      .asList(tableDescriptor)));
1040
1041      // read back family specific data block encoding settings from the
1042      // configuration
1043      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1044          HFileOutputFormat2
1045          .createFamilyDataBlockEncodingMap(conf);
1046
1047      // test that we have a value for all column families that matches with the
1048      // used mock values
1049      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1050        assertEquals("DataBlockEncoding configuration incorrect for column family:"
1051            + entry.getKey(), entry.getValue(),
1052            retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())));
1053      }
1054    }
1055  }
1056
1057  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1058      Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1059    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1060    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1061      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey())
1062          .setMaxVersions(1)
1063          .setDataBlockEncoding(entry.getValue())
1064          .setBlockCacheEnabled(false)
1065          .setTimeToLive(0));
1066    }
1067    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
1068  }
1069
1070  /**
1071   * @return a map from column family names to compression algorithms for
1072   *         testing column family compression. Column family names have special characters
1073   */
1074  private Map<String, DataBlockEncoding>
1075      getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
1076    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1077    // use column family names having special characters
1078    if (numCfs-- > 0) {
1079      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1080    }
1081    if (numCfs-- > 0) {
1082      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1083          DataBlockEncoding.FAST_DIFF);
1084    }
1085    if (numCfs-- > 0) {
1086      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1087          DataBlockEncoding.PREFIX);
1088    }
1089    if (numCfs-- > 0) {
1090      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1091    }
1092    return familyToDataBlockEncoding;
1093  }
1094
1095  private void setupMockStartKeys(RegionLocator table) throws IOException {
1096    byte[][] mockKeys = new byte[][] {
1097        HConstants.EMPTY_BYTE_ARRAY,
1098        Bytes.toBytes("aaa"),
1099        Bytes.toBytes("ggg"),
1100        Bytes.toBytes("zzz")
1101    };
1102    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1103  }
1104
1105  private void setupMockTableName(RegionLocator table) throws IOException {
1106    TableName mockTableName = TableName.valueOf("mock_table");
1107    Mockito.doReturn(mockTableName).when(table).getName();
1108  }
1109
1110  /**
1111   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
1112   * bloom filter settings from the column family descriptor
1113   */
1114  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1115  public void testColumnFamilySettings() throws Exception {
1116    Configuration conf = new Configuration(this.util.getConfiguration());
1117    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1118    TaskAttemptContext context = null;
1119    Path dir = util.getDataTestDir("testColumnFamilySettings");
1120
1121    // Setup table descriptor
1122    Table table = Mockito.mock(Table.class);
1123    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1124    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1125    Mockito.doReturn(htd).when(table).getDescriptor();
1126    for (HColumnDescriptor hcd: HBaseTestingUtility.generateColumnDescriptors()) {
1127      htd.addFamily(hcd);
1128    }
1129
1130    // set up the table to return some mock keys
1131    setupMockStartKeys(regionLocator);
1132
1133    try {
1134      // partial map red setup to get an operational writer for testing
1135      // We turn off the sequence file compression, because DefaultCodec
1136      // pollutes the GZip codec pool with an incompatible compressor.
1137      conf.set("io.seqfile.compression.type", "NONE");
1138      conf.set("hbase.fs.tmp.dir", dir.toString());
1139      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1140      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1141
1142      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1143      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1144      setupRandomGeneratorMapper(job, false);
1145      HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
1146      FileOutputFormat.setOutputPath(job, dir);
1147      context = createTestTaskAttemptContext(job);
1148      HFileOutputFormat2 hof = new HFileOutputFormat2();
1149      writer = hof.getRecordWriter(context);
1150
1151      // write out random rows
1152      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1153      writer.close(context);
1154
1155      // Make sure that a directory was created for every CF
1156      FileSystem fs = dir.getFileSystem(conf);
1157
1158      // commit so that the filesystem has one directory per column family
1159      hof.getOutputCommitter(context).commitTask(context);
1160      hof.getOutputCommitter(context).commitJob(context);
1161      FileStatus[] families = FSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1162      assertEquals(htd.getFamilies().size(), families.length);
1163      for (FileStatus f : families) {
1164        String familyStr = f.getPath().getName();
1165        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1166        // verify that the compression on this file matches the configured
1167        // compression
1168        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1169        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1170        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
1171
1172        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1173        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1174        assertEquals("Incorrect bloom filter used for column family " + familyStr +
1175          "(reader: " + reader + ")",
1176          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1177        assertEquals("Incorrect compression used for column family " + familyStr +
1178          "(reader: " + reader + ")", hcd.getCompressionType(), reader.getFileContext().getCompression());
1179      }
1180    } finally {
1181      dir.getFileSystem(conf).delete(dir, true);
1182    }
1183  }
1184
1185  /**
1186   * Write random values to the writer assuming a table created using
1187   * {@link #FAMILIES} as column family descriptors
1188   */
1189  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1190      TaskAttemptContext context, Set<byte[]> families, int numRows)
1191      throws IOException, InterruptedException {
1192    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1193    int valLength = 10;
1194    byte valBytes[] = new byte[valLength];
1195
1196    int taskId = context.getTaskAttemptID().getTaskID().getId();
1197    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1198    final byte [] qualifier = Bytes.toBytes("data");
1199    Random random = new Random();
1200    for (int i = 0; i < numRows; i++) {
1201
1202      Bytes.putInt(keyBytes, 0, i);
1203      random.nextBytes(valBytes);
1204      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1205
1206      for (byte[] family : families) {
1207        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1208        writer.write(key, kv);
1209      }
1210    }
1211  }
1212
1213  /**
1214   * This test is to test the scenario happened in HBASE-6901.
1215   * All files are bulk loaded and excluded from minor compaction.
1216   * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
1217   * will be thrown.
1218   */
1219  @Ignore ("Flakey: See HBASE-9051") @Test
1220  public void testExcludeAllFromMinorCompaction() throws Exception {
1221    Configuration conf = util.getConfiguration();
1222    conf.setInt("hbase.hstore.compaction.min", 2);
1223    generateRandomStartKeys(5);
1224
1225    util.startMiniCluster();
1226    try (Connection conn = ConnectionFactory.createConnection();
1227        Admin admin = conn.getAdmin();
1228        Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1229        RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1230      final FileSystem fs = util.getDFSCluster().getFileSystem();
1231      assertEquals("Should start with empty table", 0, util.countRows(table));
1232
1233      // deep inspection: get the StoreFile dir
1234      final Path storePath = new Path(
1235        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1236          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1237            Bytes.toString(FAMILIES[0])));
1238      assertEquals(0, fs.listStatus(storePath).length);
1239
1240      // Generate two bulk load files
1241      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1242          true);
1243
1244      for (int i = 0; i < 2; i++) {
1245        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1246        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1247                .getDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
1248        // Perform the actual load
1249        BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1250      }
1251
1252      // Ensure data shows up
1253      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1254      assertEquals("BulkLoadHFiles should put expected data in table",
1255          expectedRows, util.countRows(table));
1256
1257      // should have a second StoreFile now
1258      assertEquals(2, fs.listStatus(storePath).length);
1259
1260      // minor compactions shouldn't get rid of the file
1261      admin.compact(TABLE_NAMES[0]);
1262      try {
1263        quickPoll(new Callable<Boolean>() {
1264          @Override
1265          public Boolean call() throws Exception {
1266            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1267            for (HRegion region : regions) {
1268              for (HStore store : region.getStores()) {
1269                store.closeAndArchiveCompactedFiles();
1270              }
1271            }
1272            return fs.listStatus(storePath).length == 1;
1273          }
1274        }, 5000);
1275        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1276      } catch (AssertionError ae) {
1277        // this is expected behavior
1278      }
1279
1280      // a major compaction should work though
1281      admin.majorCompact(TABLE_NAMES[0]);
1282      quickPoll(new Callable<Boolean>() {
1283        @Override
1284        public Boolean call() throws Exception {
1285          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1286          for (HRegion region : regions) {
1287            for (HStore store : region.getStores()) {
1288              store.closeAndArchiveCompactedFiles();
1289            }
1290          }
1291          return fs.listStatus(storePath).length == 1;
1292        }
1293      }, 5000);
1294
1295    } finally {
1296      util.shutdownMiniCluster();
1297    }
1298  }
1299
1300  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1301  public void testExcludeMinorCompaction() throws Exception {
1302    Configuration conf = util.getConfiguration();
1303    conf.setInt("hbase.hstore.compaction.min", 2);
1304    generateRandomStartKeys(5);
1305
1306    util.startMiniCluster();
1307    try (Connection conn = ConnectionFactory.createConnection(conf);
1308        Admin admin = conn.getAdmin()){
1309      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1310      final FileSystem fs = util.getDFSCluster().getFileSystem();
1311      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1312      assertEquals("Should start with empty table", 0, util.countRows(table));
1313
1314      // deep inspection: get the StoreFile dir
1315      final Path storePath = new Path(
1316        FSUtils.getTableDir(FSUtils.getRootDir(conf), TABLE_NAMES[0]),
1317          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1318            Bytes.toString(FAMILIES[0])));
1319      assertEquals(0, fs.listStatus(storePath).length);
1320
1321      // put some data in it and flush to create a storefile
1322      Put p = new Put(Bytes.toBytes("test"));
1323      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1324      table.put(p);
1325      admin.flush(TABLE_NAMES[0]);
1326      assertEquals(1, util.countRows(table));
1327      quickPoll(new Callable<Boolean>() {
1328        @Override
1329        public Boolean call() throws Exception {
1330          return fs.listStatus(storePath).length == 1;
1331        }
1332      }, 5000);
1333
1334      // Generate a bulk load file with more rows
1335      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1336          true);
1337
1338      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1339      runIncrementalPELoad(conf,
1340        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)),
1341        testDir, false);
1342
1343      // Perform the actual load
1344      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1345
1346      // Ensure data shows up
1347      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1348      assertEquals("BulkLoadHFiles should put expected data in table",
1349          expectedRows + 1, util.countRows(table));
1350
1351      // should have a second StoreFile now
1352      assertEquals(2, fs.listStatus(storePath).length);
1353
1354      // minor compactions shouldn't get rid of the file
1355      admin.compact(TABLE_NAMES[0]);
1356      try {
1357        quickPoll(new Callable<Boolean>() {
1358          @Override
1359          public Boolean call() throws Exception {
1360            return fs.listStatus(storePath).length == 1;
1361          }
1362        }, 5000);
1363        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1364      } catch (AssertionError ae) {
1365        // this is expected behavior
1366      }
1367
1368      // a major compaction should work though
1369      admin.majorCompact(TABLE_NAMES[0]);
1370      quickPoll(new Callable<Boolean>() {
1371        @Override
1372        public Boolean call() throws Exception {
1373          return fs.listStatus(storePath).length == 1;
1374        }
1375      }, 5000);
1376
1377    } finally {
1378      util.shutdownMiniCluster();
1379    }
1380  }
1381
1382  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1383    int sleepMs = 10;
1384    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1385    while (retries-- > 0) {
1386      if (c.call().booleanValue()) {
1387        return;
1388      }
1389      Thread.sleep(sleepMs);
1390    }
1391    fail();
1392  }
1393
1394  public static void main(String args[]) throws Exception {
1395    new TestHFileOutputFormat2().manualTest(args);
1396  }
1397
1398  public void manualTest(String args[]) throws Exception {
1399    Configuration conf = HBaseConfiguration.create();
1400    util = new HBaseTestingUtility(conf);
1401    if ("newtable".equals(args[0])) {
1402      TableName tname = TableName.valueOf(args[1]);
1403      byte[][] splitKeys = generateRandomSplitKeys(4);
1404      Table table = util.createTable(tname, FAMILIES, splitKeys);
1405    } else if ("incremental".equals(args[0])) {
1406      TableName tname = TableName.valueOf(args[1]);
1407      try(Connection c = ConnectionFactory.createConnection(conf);
1408          Admin admin = c.getAdmin();
1409          RegionLocator regionLocator = c.getRegionLocator(tname)) {
1410        Path outDir = new Path("incremental-out");
1411        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
1412                .getDescriptor(tname), regionLocator)), outDir, false);
1413      }
1414    } else {
1415      throw new RuntimeException(
1416          "usage: TestHFileOutputFormat2 newtable | incremental");
1417    }
1418  }
1419
1420  @Test
1421  public void testBlockStoragePolicy() throws Exception {
1422    util = new HBaseTestingUtility();
1423    Configuration conf = util.getConfiguration();
1424    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1425
1426    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
1427            Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
1428                    TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
1429    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1430    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1431    util.startMiniDFSCluster(3);
1432    FileSystem fs = util.getDFSCluster().getFileSystem();
1433    try {
1434      fs.mkdirs(cf1Dir);
1435      fs.mkdirs(cf2Dir);
1436
1437      // the original block storage policy would be HOT
1438      String spA = getStoragePolicyName(fs, cf1Dir);
1439      String spB = getStoragePolicyName(fs, cf2Dir);
1440      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1441      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1442      assertEquals("HOT", spA);
1443      assertEquals("HOT", spB);
1444
1445      // alter table cf schema to change storage policies
1446      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1447              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1448      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1449              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1450      spA = getStoragePolicyName(fs, cf1Dir);
1451      spB = getStoragePolicyName(fs, cf2Dir);
1452      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1453      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1454      assertNotNull(spA);
1455      assertEquals("ONE_SSD", spA);
1456      assertNotNull(spB);
1457      assertEquals("ALL_SSD", spB);
1458    } finally {
1459      fs.delete(cf1Dir, true);
1460      fs.delete(cf2Dir, true);
1461      util.shutdownMiniDFSCluster();
1462    }
1463  }
1464
1465  private String getStoragePolicyName(FileSystem fs, Path path) {
1466    try {
1467      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1468      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1469    } catch (Exception e) {
1470      // Maybe fail because of using old HDFS version, try the old way
1471      if (LOG.isTraceEnabled()) {
1472        LOG.trace("Failed to get policy directly", e);
1473      }
1474      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1475      return policy == null ? "HOT" : policy;// HOT by default
1476    }
1477  }
1478
1479  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1480    try {
1481      if (fs instanceof DistributedFileSystem) {
1482        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1483        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1484        if (null != status) {
1485          byte storagePolicyId = status.getStoragePolicy();
1486          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1487          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1488            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1489            for (BlockStoragePolicy policy : policies) {
1490              if (policy.getId() == storagePolicyId) {
1491                return policy.getName();
1492              }
1493            }
1494          }
1495        }
1496      }
1497    } catch (Throwable e) {
1498      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1499    }
1500
1501    return null;
1502  }
1503
1504  @Test
1505  public void TestConfigurePartitioner() throws IOException {
1506    Configuration conf = util.getConfiguration();
1507    // Create a user who is not the current user
1508    String fooUserName = "foo1234";
1509    String fooGroupName = "group1";
1510    UserGroupInformation
1511        ugi = UserGroupInformation.createUserForTesting(fooUserName, new String[]{fooGroupName});
1512    // Get user's home directory
1513    Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() {
1514      @Override public Path run() {
1515        try (FileSystem fs = FileSystem.get(conf)) {
1516          return fs.makeQualified(fs.getHomeDirectory());
1517        } catch (IOException ioe) {
1518          LOG.error("Failed to get foo's home directory", ioe);
1519        }
1520        return null;
1521      }
1522    });
1523
1524    Job job = Mockito.mock(Job.class);
1525    Mockito.doReturn(conf).when(job).getConfiguration();
1526    ImmutableBytesWritable writable = new ImmutableBytesWritable();
1527    List<ImmutableBytesWritable> splitPoints = new LinkedList<ImmutableBytesWritable>();
1528    splitPoints.add(writable);
1529
1530    ugi.doAs(new PrivilegedAction<Void>() {
1531      @Override public Void run() {
1532        try {
1533          HFileOutputFormat2.configurePartitioner(job, splitPoints, false);
1534        } catch (IOException ioe) {
1535          LOG.error("Failed to configure partitioner", ioe);
1536        }
1537        return null;
1538      }
1539    });
1540    FileSystem fs = FileSystem.get(conf);
1541    // verify that the job uses TotalOrderPartitioner
1542    verify(job).setPartitionerClass(TotalOrderPartitioner.class);
1543    // verify that TotalOrderPartitioner.setPartitionFile() is called.
1544    String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path");
1545    Assert.assertNotNull(partitionPathString);
1546    // Make sure the partion file is in foo1234's home directory, and that
1547    // the file exists.
1548    Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
1549    Assert.assertTrue(fs.exists(new Path(partitionPathString)));
1550  }
1551
1552  @Test
1553  public void TestConfigureCompression() throws Exception {
1554    Configuration conf = new Configuration(this.util.getConfiguration());
1555    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1556    TaskAttemptContext context = null;
1557    Path dir = util.getDataTestDir("TestConfigureCompression");
1558    String hfileoutputformatCompression = "gz";
1559
1560    try {
1561      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1562      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1563
1564      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1565
1566      Job job = Job.getInstance(conf);
1567      FileOutputFormat.setOutputPath(job, dir);
1568      context = createTestTaskAttemptContext(job);
1569      HFileOutputFormat2 hof = new HFileOutputFormat2();
1570      writer = hof.getRecordWriter(context);
1571      final byte[] b = Bytes.toBytes("b");
1572
1573      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1574      writer.write(new ImmutableBytesWritable(), kv);
1575      writer.close(context);
1576      writer = null;
1577      FileSystem fs = dir.getFileSystem(conf);
1578      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1579      while (iterator.hasNext()) {
1580        LocatedFileStatus keyFileStatus = iterator.next();
1581        HFile.Reader reader =
1582            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1583        assertEquals(reader.getCompressionAlgorithm().getName(), hfileoutputformatCompression);
1584      }
1585    } finally {
1586      if (writer != null && context != null) {
1587        writer.close(context);
1588      }
1589      dir.getFileSystem(conf).delete(dir, true);
1590    }
1591
1592  }
1593
1594}
1595