001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.client.ConnectionFactory.createConnection;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNotSame;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.util.ArrayList;
032import java.util.Arrays;
033import java.util.HashMap;
034import java.util.List;
035import java.util.Map;
036import java.util.Map.Entry;
037import java.util.Random;
038import java.util.Set;
039import java.util.UUID;
040import java.util.concurrent.Callable;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.CopyOnWriteArrayList;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.ThreadLocalRandom;
045import java.util.stream.Collectors;
046import java.util.stream.Stream;
047import org.apache.hadoop.conf.Configuration;
048import org.apache.hadoop.fs.FileStatus;
049import org.apache.hadoop.fs.FileSystem;
050import org.apache.hadoop.fs.LocatedFileStatus;
051import org.apache.hadoop.fs.Path;
052import org.apache.hadoop.fs.RemoteIterator;
053import org.apache.hadoop.hbase.ArrayBackedTag;
054import org.apache.hadoop.hbase.Cell;
055import org.apache.hadoop.hbase.CellUtil;
056import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
057import org.apache.hadoop.hbase.HBaseClassTestRule;
058import org.apache.hadoop.hbase.HBaseConfiguration;
059import org.apache.hadoop.hbase.HBaseTestingUtility;
060import org.apache.hadoop.hbase.HColumnDescriptor;
061import org.apache.hadoop.hbase.HConstants;
062import org.apache.hadoop.hbase.HDFSBlocksDistribution;
063import org.apache.hadoop.hbase.HTableDescriptor;
064import org.apache.hadoop.hbase.HadoopShims;
065import org.apache.hadoop.hbase.KeyValue;
066import org.apache.hadoop.hbase.PerformanceEvaluation;
067import org.apache.hadoop.hbase.PrivateCellUtil;
068import org.apache.hadoop.hbase.ServerName;
069import org.apache.hadoop.hbase.StartMiniClusterOption;
070import org.apache.hadoop.hbase.TableName;
071import org.apache.hadoop.hbase.Tag;
072import org.apache.hadoop.hbase.TagType;
073import org.apache.hadoop.hbase.client.Admin;
074import org.apache.hadoop.hbase.client.BufferedMutator;
075import org.apache.hadoop.hbase.client.BufferedMutatorParams;
076import org.apache.hadoop.hbase.client.ClusterConnection;
077import org.apache.hadoop.hbase.client.Connection;
078import org.apache.hadoop.hbase.client.ConnectionFactory;
079import org.apache.hadoop.hbase.client.Hbck;
080import org.apache.hadoop.hbase.client.Put;
081import org.apache.hadoop.hbase.client.RegionLocator;
082import org.apache.hadoop.hbase.client.Result;
083import org.apache.hadoop.hbase.client.ResultScanner;
084import org.apache.hadoop.hbase.client.Scan;
085import org.apache.hadoop.hbase.client.Table;
086import org.apache.hadoop.hbase.client.TableBuilder;
087import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
088import org.apache.hadoop.hbase.io.compress.Compression;
089import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
090import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
091import org.apache.hadoop.hbase.io.hfile.CacheConfig;
092import org.apache.hadoop.hbase.io.hfile.HFile;
093import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
094import org.apache.hadoop.hbase.io.hfile.HFileScanner;
095import org.apache.hadoop.hbase.regionserver.BloomType;
096import org.apache.hadoop.hbase.regionserver.HRegion;
097import org.apache.hadoop.hbase.regionserver.HStore;
098import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
099import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
100import org.apache.hadoop.hbase.security.User;
101import org.apache.hadoop.hbase.testclassification.LargeTests;
102import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
103import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
104import org.apache.hadoop.hbase.util.Bytes;
105import org.apache.hadoop.hbase.util.CommonFSUtils;
106import org.apache.hadoop.hbase.util.FSUtils;
107import org.apache.hadoop.hbase.util.ReflectionUtils;
108import org.apache.hadoop.hdfs.DistributedFileSystem;
109import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
110import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
111import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
112import org.apache.hadoop.io.NullWritable;
113import org.apache.hadoop.mapreduce.Job;
114import org.apache.hadoop.mapreduce.Mapper;
115import org.apache.hadoop.mapreduce.RecordWriter;
116import org.apache.hadoop.mapreduce.TaskAttemptContext;
117import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
118import org.junit.ClassRule;
119import org.junit.Ignore;
120import org.junit.Test;
121import org.junit.experimental.categories.Category;
122import org.mockito.Mockito;
123import org.slf4j.Logger;
124import org.slf4j.LoggerFactory;
125
126/**
127 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
128 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
129 * values like those of {@link PerformanceEvaluation}.
130 */
131@Category({ VerySlowMapReduceTests.class, LargeTests.class })
132// TODO : Remove this in 3.0
133public class TestHFileOutputFormat2 {
134
135  @ClassRule
136  public static final HBaseClassTestRule CLASS_RULE =
137    HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
138
139  private final static int ROWSPERSPLIT = 1024;
140
141  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
142  private static final byte[][] FAMILIES =
143    { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) };
144  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3")
145    .map(TableName::valueOf).toArray(TableName[]::new);
146
147  private HBaseTestingUtility util = new HBaseTestingUtility();
148
149  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
150
151  /**
152   * Simple mapper that makes KeyValue output.
153   */
154  static class RandomKVGeneratingMapper
155    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
156
157    private int keyLength;
158    private static final int KEYLEN_DEFAULT = 10;
159    private static final String KEYLEN_CONF = "randomkv.key.length";
160
161    private int valLength;
162    private static final int VALLEN_DEFAULT = 10;
163    private static final String VALLEN_CONF = "randomkv.val.length";
164    private static final byte[] QUALIFIER = Bytes.toBytes("data");
165    private boolean multiTableMapper = false;
166    private TableName[] tables = null;
167
168    @Override
169    protected void setup(Context context) throws IOException, InterruptedException {
170      super.setup(context);
171
172      Configuration conf = context.getConfiguration();
173      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
174      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
175      multiTableMapper =
176        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
177      if (multiTableMapper) {
178        tables = TABLE_NAMES;
179      } else {
180        tables = new TableName[] { TABLE_NAMES[0] };
181      }
182    }
183
184    @Override
185    protected void map(NullWritable n1, NullWritable n2,
186      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
187      throws java.io.IOException, InterruptedException {
188
189      byte keyBytes[] = new byte[keyLength];
190      byte valBytes[] = new byte[valLength];
191
192      int taskId = context.getTaskAttemptID().getTaskID().getId();
193      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
194      byte[] key;
195      for (int j = 0; j < tables.length; ++j) {
196        for (int i = 0; i < ROWSPERSPLIT; i++) {
197          Bytes.random(keyBytes);
198          // Ensure that unique tasks generate unique keys
199          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
200          Bytes.random(valBytes);
201          key = keyBytes;
202          if (multiTableMapper) {
203            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
204          }
205
206          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
207            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
208            context.write(new ImmutableBytesWritable(key), kv);
209          }
210        }
211      }
212    }
213  }
214
215  /**
216   * Simple mapper that makes Put output.
217   */
218  static class RandomPutGeneratingMapper
219    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
220
221    private int keyLength;
222    private static final int KEYLEN_DEFAULT = 10;
223    private static final String KEYLEN_CONF = "randomkv.key.length";
224
225    private int valLength;
226    private static final int VALLEN_DEFAULT = 10;
227    private static final String VALLEN_CONF = "randomkv.val.length";
228    private static final byte[] QUALIFIER = Bytes.toBytes("data");
229    private boolean multiTableMapper = false;
230    private TableName[] tables = null;
231
232    @Override
233    protected void setup(Context context) throws IOException, InterruptedException {
234      super.setup(context);
235
236      Configuration conf = context.getConfiguration();
237      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
238      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
239      multiTableMapper =
240        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
241      if (multiTableMapper) {
242        tables = TABLE_NAMES;
243      } else {
244        tables = new TableName[] { TABLE_NAMES[0] };
245      }
246    }
247
248    @Override
249    protected void map(NullWritable n1, NullWritable n2,
250      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context)
251      throws java.io.IOException, InterruptedException {
252
253      byte keyBytes[] = new byte[keyLength];
254      byte valBytes[] = new byte[valLength];
255
256      int taskId = context.getTaskAttemptID().getTaskID().getId();
257      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
258
259      byte[] key;
260      for (int j = 0; j < tables.length; ++j) {
261        for (int i = 0; i < ROWSPERSPLIT; i++) {
262          Bytes.random(keyBytes);
263          // Ensure that unique tasks generate unique keys
264          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
265          Bytes.random(valBytes);
266          key = keyBytes;
267          if (multiTableMapper) {
268            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
269          }
270
271          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
272            Put p = new Put(keyBytes);
273            p.addColumn(family, QUALIFIER, valBytes);
274            // set TTL to very low so that the scan does not return any value
275            p.setTTL(1l);
276            context.write(new ImmutableBytesWritable(key), p);
277          }
278        }
279      }
280    }
281  }
282
283  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
284    if (putSortReducer) {
285      job.setInputFormatClass(NMapInputFormat.class);
286      job.setMapperClass(RandomPutGeneratingMapper.class);
287      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
288      job.setMapOutputValueClass(Put.class);
289    } else {
290      job.setInputFormatClass(NMapInputFormat.class);
291      job.setMapperClass(RandomKVGeneratingMapper.class);
292      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
293      job.setMapOutputValueClass(KeyValue.class);
294    }
295  }
296
297  /**
298   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
299   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
300   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
301   */
302  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
303  @Test
304  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
305    Configuration conf = new Configuration(this.util.getConfiguration());
306    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
307    TaskAttemptContext context = null;
308    Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
309    try {
310      Job job = new Job(conf);
311      FileOutputFormat.setOutputPath(job, dir);
312      context = createTestTaskAttemptContext(job);
313      HFileOutputFormat2 hof = new HFileOutputFormat2();
314      writer = hof.getRecordWriter(context);
315      final byte[] b = Bytes.toBytes("b");
316
317      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
318      // changed by call to write. Check all in kv is same but ts.
319      KeyValue kv = new KeyValue(b, b, b);
320      KeyValue original = kv.clone();
321      writer.write(new ImmutableBytesWritable(), kv);
322      assertFalse(original.equals(kv));
323      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
324      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
325      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
326      assertNotSame(original.getTimestamp(), kv.getTimestamp());
327      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
328
329      // Test 2. Now test passing a kv that has explicit ts. It should not be
330      // changed by call to record write.
331      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
332      original = kv.clone();
333      writer.write(new ImmutableBytesWritable(), kv);
334      assertTrue(original.equals(kv));
335    } finally {
336      if (writer != null && context != null) writer.close(context);
337      dir.getFileSystem(conf).delete(dir, true);
338    }
339  }
340
341  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
342    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
343    TaskAttemptContext context =
344      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
345    return context;
346  }
347
348  /*
349   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
350   * time-restricted scans.
351   */
352  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
353  @Test
354  public void test_TIMERANGE() throws Exception {
355    Configuration conf = new Configuration(this.util.getConfiguration());
356    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
357    TaskAttemptContext context = null;
358    Path dir = util.getDataTestDir("test_TIMERANGE_present");
359    LOG.info("Timerange dir writing to dir: " + dir);
360    try {
361      // build a record writer using HFileOutputFormat2
362      Job job = new Job(conf);
363      FileOutputFormat.setOutputPath(job, dir);
364      context = createTestTaskAttemptContext(job);
365      HFileOutputFormat2 hof = new HFileOutputFormat2();
366      writer = hof.getRecordWriter(context);
367
368      // Pass two key values with explicit times stamps
369      final byte[] b = Bytes.toBytes("b");
370
371      // value 1 with timestamp 2000
372      KeyValue kv = new KeyValue(b, b, b, 2000, b);
373      KeyValue original = kv.clone();
374      writer.write(new ImmutableBytesWritable(), kv);
375      assertEquals(original, kv);
376
377      // value 2 with timestamp 1000
378      kv = new KeyValue(b, b, b, 1000, b);
379      original = kv.clone();
380      writer.write(new ImmutableBytesWritable(), kv);
381      assertEquals(original, kv);
382
383      // verify that the file has the proper FileInfo.
384      writer.close(context);
385
386      // the generated file lives 1 directory down from the attempt directory
387      // and is the only file, e.g.
388      // _attempt__0000_r_000000_0/b/1979617994050536795
389      FileSystem fs = FileSystem.get(conf);
390      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
391      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
392      FileStatus[] file = fs.listStatus(sub1[0].getPath());
393
394      // open as HFile Reader and pull out TIMERANGE FileInfo.
395      HFile.Reader rd =
396        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
397      Map<byte[], byte[]> finfo = rd.getHFileInfo();
398      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
399      assertNotNull(range);
400
401      // unmarshall and check values.
402      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
403      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
404      assertEquals(1000, timeRangeTracker.getMin());
405      assertEquals(2000, timeRangeTracker.getMax());
406      rd.close();
407    } finally {
408      if (writer != null && context != null) writer.close(context);
409      dir.getFileSystem(conf).delete(dir, true);
410    }
411  }
412
413  /**
414   * Run small MR job.
415   */
416  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
417  @Test
418  public void testWritingPEData() throws Exception {
419    Configuration conf = util.getConfiguration();
420    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
421    FileSystem fs = testDir.getFileSystem(conf);
422
423    // Set down this value or we OOME in eclipse.
424    conf.setInt("mapreduce.task.io.sort.mb", 20);
425    // Write a few files.
426    long hregionMaxFilesize = 10 * 1024;
427    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
428
429    Job job = new Job(conf, "testWritingPEData");
430    setupRandomGeneratorMapper(job, false);
431    // This partitioner doesn't work well for number keys but using it anyways
432    // just to demonstrate how to configure it.
433    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
434    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
435
436    Arrays.fill(startKey, (byte) 0);
437    Arrays.fill(endKey, (byte) 0xff);
438
439    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
440    // Set start and end rows for partitioner.
441    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
442    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
443    job.setReducerClass(KeyValueSortReducer.class);
444    job.setOutputFormatClass(HFileOutputFormat2.class);
445    job.setNumReduceTasks(4);
446    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
447      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
448      KeyValueSerialization.class.getName());
449
450    FileOutputFormat.setOutputPath(job, testDir);
451    assertTrue(job.waitForCompletion(false));
452    FileStatus[] files = fs.listStatus(testDir);
453    assertTrue(files.length > 0);
454
455    // check output file num and size.
456    for (byte[] family : FAMILIES) {
457      long kvCount = 0;
458      RemoteIterator<LocatedFileStatus> iterator =
459        fs.listFiles(testDir.suffix("/" + new String(family)), true);
460      while (iterator.hasNext()) {
461        LocatedFileStatus keyFileStatus = iterator.next();
462        HFile.Reader reader =
463          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
464        HFileScanner scanner = reader.getScanner(conf, false, false, false);
465
466        kvCount += reader.getEntries();
467        scanner.seekTo();
468        long perKVSize = scanner.getCell().getSerializedSize();
469        assertTrue("Data size of each file should not be too large.",
470          perKVSize * reader.getEntries() <= hregionMaxFilesize);
471      }
472      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
473    }
474  }
475
476  /**
477   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
478   */
479  @Test
480  public void test_WritingTagData() throws Exception {
481    Configuration conf = new Configuration(this.util.getConfiguration());
482    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
483    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
484    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
485    TaskAttemptContext context = null;
486    Path dir = util.getDataTestDir("WritingTagData");
487    try {
488      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
489      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
490      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
491      Job job = new Job(conf);
492      FileOutputFormat.setOutputPath(job, dir);
493      context = createTestTaskAttemptContext(job);
494      HFileOutputFormat2 hof = new HFileOutputFormat2();
495      writer = hof.getRecordWriter(context);
496      final byte[] b = Bytes.toBytes("b");
497
498      List<Tag> tags = new ArrayList<>();
499      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
500      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
501      writer.write(new ImmutableBytesWritable(), kv);
502      writer.close(context);
503      writer = null;
504      FileSystem fs = dir.getFileSystem(conf);
505      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
506      while (iterator.hasNext()) {
507        LocatedFileStatus keyFileStatus = iterator.next();
508        HFile.Reader reader =
509          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
510        HFileScanner scanner = reader.getScanner(conf, false, false, false);
511        scanner.seekTo();
512        Cell cell = scanner.getCell();
513        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
514        assertTrue(tagsFromCell.size() > 0);
515        for (Tag tag : tagsFromCell) {
516          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
517        }
518      }
519    } finally {
520      if (writer != null && context != null) writer.close(context);
521      dir.getFileSystem(conf).delete(dir, true);
522    }
523  }
524
525  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
526  @Test
527  public void testJobConfiguration() throws Exception {
528    Configuration conf = new Configuration(this.util.getConfiguration());
529    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
530      util.getDataTestDir("testJobConfiguration").toString());
531    Job job = new Job(conf);
532    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
533    Table table = Mockito.mock(Table.class);
534    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
535    setupMockStartKeys(regionLocator);
536    setupMockTableName(regionLocator);
537    HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
538    assertEquals(job.getNumReduceTasks(), 4);
539  }
540
541  private byte[][] generateRandomStartKeys(int numKeys) {
542    Random random = ThreadLocalRandom.current();
543    byte[][] ret = new byte[numKeys][];
544    // first region start key is always empty
545    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
546    for (int i = 1; i < numKeys; i++) {
547      ret[i] =
548        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
549    }
550    return ret;
551  }
552
553  private byte[][] generateRandomSplitKeys(int numKeys) {
554    Random random = ThreadLocalRandom.current();
555    byte[][] ret = new byte[numKeys][];
556    for (int i = 0; i < numKeys; i++) {
557      ret[i] =
558        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
559    }
560    return ret;
561  }
562
563  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
564  @Test
565  public void testMRIncrementalLoad() throws Exception {
566    LOG.info("\nStarting test testMRIncrementalLoad\n");
567    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
568  }
569
570  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
571  @Test
572  public void testMRIncrementalLoadWithSplit() throws Exception {
573    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
574    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
575  }
576
577  /**
578   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
579   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
580   * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
581   * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
582   * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
583   * locality features more easily.
584   */
585  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
586  @Test
587  public void testMRIncrementalLoadWithLocality() throws Exception {
588    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
589    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
590    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
591  }
592
593  // @Ignore("Wahtevs")
594  @Test
595  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
596    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
597    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
598  }
599
600  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
601    boolean putSortReducer, String tableStr) throws Exception {
602    doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
603      Arrays.asList(tableStr));
604  }
605
606  @Test
607  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
608    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
609    doIncrementalLoadTest(false, false, true,
610      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
611  }
612
613  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
614    boolean putSortReducer, List<String> tableStr) throws Exception {
615    util = new HBaseTestingUtility();
616    Configuration conf = util.getConfiguration();
617    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
618    int hostCount = 1;
619    int regionNum = 5;
620    if (shouldKeepLocality) {
621      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
622      // explicit hostnames parameter just like MiniDFSCluster does.
623      hostCount = 3;
624      regionNum = 20;
625    }
626
627    String[] hostnames = new String[hostCount];
628    for (int i = 0; i < hostCount; ++i) {
629      hostnames[i] = "datanode_" + i;
630    }
631    StartMiniClusterOption option =
632      StartMiniClusterOption.builder().numRegionServers(hostCount).dataNodeHosts(hostnames).build();
633    util.startMiniCluster(option);
634
635    Map<String, Table> allTables = new HashMap<>(tableStr.size());
636    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
637    boolean writeMultipleTables = tableStr.size() > 1;
638    for (String tableStrSingle : tableStr) {
639      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
640      TableName tableName = TableName.valueOf(tableStrSingle);
641      Table table = util.createTable(tableName, FAMILIES, splitKeys);
642
643      RegionLocator r = util.getConnection().getRegionLocator(tableName);
644      assertEquals("Should start with empty table", 0, util.countRows(table));
645      int numRegions = r.getStartKeys().length;
646      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
647
648      allTables.put(tableStrSingle, table);
649      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), r));
650    }
651    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
652    // Generate the bulk load files
653    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
654
655    for (Table tableSingle : allTables.values()) {
656      // This doesn't write into the table, just makes files
657      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
658    }
659    int numTableDirs = 0;
660    for (FileStatus tf : testDir.getFileSystem(conf).listStatus(testDir)) {
661      Path tablePath = testDir;
662
663      if (writeMultipleTables) {
664        if (allTables.containsKey(tf.getPath().getName())) {
665          ++numTableDirs;
666          tablePath = tf.getPath();
667        } else {
668          continue;
669        }
670      }
671
672      // Make sure that a directory was created for every CF
673      int dir = 0;
674      for (FileStatus f : tablePath.getFileSystem(conf).listStatus(tablePath)) {
675        for (byte[] family : FAMILIES) {
676          if (Bytes.toString(family).equals(f.getPath().getName())) {
677            ++dir;
678          }
679        }
680      }
681      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
682    }
683    if (writeMultipleTables) {
684      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
685    }
686
687    Admin admin = util.getConnection().getAdmin();
688    try {
689      // handle the split case
690      if (shouldChangeRegions) {
691        Table chosenTable = allTables.values().iterator().next();
692        // Choose a semi-random table if multiple tables are available
693        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
694        admin.disableTable(chosenTable.getName());
695        util.waitUntilNoRegionsInTransition();
696
697        util.deleteTable(chosenTable.getName());
698        byte[][] newSplitKeys = generateRandomSplitKeys(14);
699        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
700
701        while (
702          util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations()
703            .size() != 15 || !admin.isTableAvailable(table.getName())
704        ) {
705          Thread.sleep(200);
706          LOG.info("Waiting for new region assignment to happen");
707        }
708      }
709
710      // Perform the actual load
711      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
712        Path tableDir = testDir;
713        String tableNameStr = singleTableInfo.getHTableDescriptor().getNameAsString();
714        LOG.info("Running LoadIncrementalHFiles on table" + tableNameStr);
715        if (writeMultipleTables) {
716          tableDir = new Path(testDir, tableNameStr);
717        }
718        Table currentTable = allTables.get(tableNameStr);
719        TableName currentTableName = currentTable.getName();
720        new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable,
721          singleTableInfo.getRegionLocator());
722
723        // Ensure data shows up
724        int expectedRows = 0;
725        if (putSortReducer) {
726          // no rows should be extracted
727          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
728            util.countRows(currentTable));
729        } else {
730          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
731          assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
732            util.countRows(currentTable));
733          Scan scan = new Scan();
734          ResultScanner results = currentTable.getScanner(scan);
735          for (Result res : results) {
736            assertEquals(FAMILIES.length, res.rawCells().length);
737            Cell first = res.rawCells()[0];
738            for (Cell kv : res.rawCells()) {
739              assertTrue(CellUtil.matchingRows(first, kv));
740              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
741            }
742          }
743          results.close();
744        }
745        String tableDigestBefore = util.checksumRows(currentTable);
746        // Check region locality
747        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
748        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
749          hbd.add(region.getHDFSBlocksDistribution());
750        }
751        for (String hostname : hostnames) {
752          float locality = hbd.getBlockLocalityIndex(hostname);
753          LOG.info("locality of [" + hostname + "]: " + locality);
754          assertEquals(100, (int) (locality * 100));
755        }
756
757        // Cause regions to reopen
758        admin.disableTable(currentTableName);
759        while (!admin.isTableDisabled(currentTableName)) {
760          Thread.sleep(200);
761          LOG.info("Waiting for table to disable");
762        }
763        admin.enableTable(currentTableName);
764        util.waitTableAvailable(currentTableName);
765        assertEquals("Data should remain after reopening of regions", tableDigestBefore,
766          util.checksumRows(currentTable));
767      }
768    } finally {
769      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
770        tableInfoSingle.getRegionLocator().close();
771      }
772      for (Entry<String, Table> singleTable : allTables.entrySet()) {
773        singleTable.getValue().close();
774        util.deleteTable(singleTable.getValue().getName());
775      }
776      testDir.getFileSystem(conf).delete(testDir, true);
777      util.shutdownMiniCluster();
778    }
779  }
780
781  private void runIncrementalPELoad(Configuration conf,
782    List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer)
783    throws IOException, InterruptedException, ClassNotFoundException {
784    Job job = new Job(conf, "testLocalMRIncrementalLoad");
785    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
786    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
787      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
788      KeyValueSerialization.class.getName());
789    setupRandomGeneratorMapper(job, putSortReducer);
790    if (tableInfo.size() > 1) {
791      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
792      int sum = 0;
793      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
794        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
795      }
796      assertEquals(sum, job.getNumReduceTasks());
797    } else {
798      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
799      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getHTableDescriptor(),
800        regionLocator);
801      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
802    }
803
804    FileOutputFormat.setOutputPath(job, outDir);
805
806    assertFalse(util.getTestFileSystem().exists(outDir));
807
808    assertTrue(job.waitForCompletion(true));
809  }
810
811  /**
812   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
813   * family compression map is correctly serialized into and deserialized from configuration n
814   */
815  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
816  @Test
817  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
818    for (int numCfs = 0; numCfs <= 3; numCfs++) {
819      Configuration conf = new Configuration(this.util.getConfiguration());
820      Map<String, Compression.Algorithm> familyToCompression =
821        getMockColumnFamiliesForCompression(numCfs);
822      Table table = Mockito.mock(Table.class);
823      setupMockColumnFamiliesForCompression(table, familyToCompression);
824      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
825        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
826          Arrays.asList(table.getTableDescriptor())));
827
828      // read back family specific compression setting from the configuration
829      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
830        HFileOutputFormat2.createFamilyCompressionMap(conf);
831
832      // test that we have a value for all column families that matches with the
833      // used mock values
834      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
835        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(),
836          entry.getValue(), retrievedFamilyToCompressionMap.get(entry.getKey().getBytes("UTF-8")));
837      }
838    }
839  }
840
841  private void setupMockColumnFamiliesForCompression(Table table,
842    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
843    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
844    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
845      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
846        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
847    }
848    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
849  }
850
851  /**
852   * @return a map from column family names to compression algorithms for testing column family
853   *         compression. Column family names have special characters
854   */
855  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
856    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
857    // use column family names having special characters
858    if (numCfs-- > 0) {
859      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
860    }
861    if (numCfs-- > 0) {
862      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
863    }
864    if (numCfs-- > 0) {
865      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
866    }
867    if (numCfs-- > 0) {
868      familyToCompression.put("Family3", Compression.Algorithm.NONE);
869    }
870    return familyToCompression;
871  }
872
873  /**
874   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the
875   * family bloom type map is correctly serialized into and deserialized from configuration n
876   */
877  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
878  @Test
879  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
880    for (int numCfs = 0; numCfs <= 2; numCfs++) {
881      Configuration conf = new Configuration(this.util.getConfiguration());
882      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
883      Table table = Mockito.mock(Table.class);
884      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
885      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
886        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
887          Arrays.asList(table.getTableDescriptor())));
888
889      // read back family specific data block encoding settings from the
890      // configuration
891      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
892        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
893
894      // test that we have a value for all column families that matches with the
895      // used mock values
896      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
897        assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(),
898          entry.getValue(), retrievedFamilyToBloomTypeMap.get(entry.getKey().getBytes("UTF-8")));
899      }
900    }
901  }
902
903  private void setupMockColumnFamiliesForBloomType(Table table,
904    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
905    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
906    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
907      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
908        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
909    }
910    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
911  }
912
913  /**
914   * @return a map from column family names to compression algorithms for testing column family
915   *         compression. Column family names have special characters
916   */
917  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
918    Map<String, BloomType> familyToBloomType = new HashMap<>();
919    // use column family names having special characters
920    if (numCfs-- > 0) {
921      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
922    }
923    if (numCfs-- > 0) {
924      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
925    }
926    if (numCfs-- > 0) {
927      familyToBloomType.put("Family3", BloomType.NONE);
928    }
929    return familyToBloomType;
930  }
931
932  /**
933   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the
934   * family block size map is correctly serialized into and deserialized from configuration n
935   */
936  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
937  @Test
938  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
939    for (int numCfs = 0; numCfs <= 3; numCfs++) {
940      Configuration conf = new Configuration(this.util.getConfiguration());
941      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
942      Table table = Mockito.mock(Table.class);
943      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
944      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
945        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
946          Arrays.asList(table.getTableDescriptor())));
947
948      // read back family specific data block encoding settings from the
949      // configuration
950      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
951        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
952
953      // test that we have a value for all column families that matches with the
954      // used mock values
955      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
956        assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(),
957          entry.getValue(), retrievedFamilyToBlockSizeMap.get(entry.getKey().getBytes("UTF-8")));
958      }
959    }
960  }
961
962  private void setupMockColumnFamiliesForBlockSize(Table table,
963    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
964    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
965    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
966      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
967        .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
968    }
969    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
970  }
971
972  /**
973   * @return a map from column family names to compression algorithms for testing column family
974   *         compression. Column family names have special characters
975   */
976  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
977    Map<String, Integer> familyToBlockSize = new HashMap<>();
978    // use column family names having special characters
979    if (numCfs-- > 0) {
980      familyToBlockSize.put("Family1!@#!@#&", 1234);
981    }
982    if (numCfs-- > 0) {
983      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
984    }
985    if (numCfs-- > 0) {
986      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
987    }
988    if (numCfs-- > 0) {
989      familyToBlockSize.put("Family3", 0);
990    }
991    return familyToBlockSize;
992  }
993
994  /**
995   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that
996   * the family data block encoding map is correctly serialized into and deserialized from
997   * configuration n
998   */
999  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1000  @Test
1001  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1002    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1003      Configuration conf = new Configuration(this.util.getConfiguration());
1004      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1005        getMockColumnFamiliesForDataBlockEncoding(numCfs);
1006      Table table = Mockito.mock(Table.class);
1007      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
1008      HTableDescriptor tableDescriptor = table.getTableDescriptor();
1009      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1010        HFileOutputFormat2.serializeColumnFamilyAttribute(
1011          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
1012
1013      // read back family specific data block encoding settings from the
1014      // configuration
1015      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1016        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
1017
1018      // test that we have a value for all column families that matches with the
1019      // used mock values
1020      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1021        assertEquals(
1022          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(),
1023          entry.getValue(),
1024          retrievedFamilyToDataBlockEncodingMap.get(entry.getKey().getBytes("UTF-8")));
1025      }
1026    }
1027  }
1028
1029  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1030    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1031    HTableDescriptor mockTableDescriptor = new HTableDescriptor(TABLE_NAMES[0]);
1032    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1033      mockTableDescriptor.addFamily(new HColumnDescriptor(entry.getKey()).setMaxVersions(1)
1034        .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0));
1035    }
1036    Mockito.doReturn(mockTableDescriptor).when(table).getTableDescriptor();
1037  }
1038
1039  /**
1040   * @return a map from column family names to compression algorithms for testing column family
1041   *         compression. Column family names have special characters
1042   */
1043  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
1044    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1045    // use column family names having special characters
1046    if (numCfs-- > 0) {
1047      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1048    }
1049    if (numCfs-- > 0) {
1050      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
1051    }
1052    if (numCfs-- > 0) {
1053      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
1054    }
1055    if (numCfs-- > 0) {
1056      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1057    }
1058    return familyToDataBlockEncoding;
1059  }
1060
1061  private void setupMockStartKeys(RegionLocator table) throws IOException {
1062    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
1063      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
1064    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1065  }
1066
1067  private void setupMockTableName(RegionLocator table) throws IOException {
1068    TableName mockTableName = TableName.valueOf("mock_table");
1069    Mockito.doReturn(mockTableName).when(table).getName();
1070  }
1071
1072  /**
1073   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
1074   * from the column family descriptor
1075   */
1076  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1077  @Test
1078  public void testColumnFamilySettings() throws Exception {
1079    Configuration conf = new Configuration(this.util.getConfiguration());
1080    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1081    TaskAttemptContext context = null;
1082    Path dir = util.getDataTestDir("testColumnFamilySettings");
1083
1084    // Setup table descriptor
1085    Table table = Mockito.mock(Table.class);
1086    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1087    HTableDescriptor htd = new HTableDescriptor(TABLE_NAMES[0]);
1088    Mockito.doReturn(htd).when(table).getTableDescriptor();
1089    for (HColumnDescriptor hcd : HBaseTestingUtility.generateColumnDescriptors()) {
1090      htd.addFamily(hcd);
1091    }
1092
1093    // set up the table to return some mock keys
1094    setupMockStartKeys(regionLocator);
1095
1096    try {
1097      // partial map red setup to get an operational writer for testing
1098      // We turn off the sequence file compression, because DefaultCodec
1099      // pollutes the GZip codec pool with an incompatible compressor.
1100      conf.set("io.seqfile.compression.type", "NONE");
1101      conf.set("hbase.fs.tmp.dir", dir.toString());
1102      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1103      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1104
1105      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1106      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1107      setupRandomGeneratorMapper(job, false);
1108      HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
1109      FileOutputFormat.setOutputPath(job, dir);
1110      context = createTestTaskAttemptContext(job);
1111      HFileOutputFormat2 hof = new HFileOutputFormat2();
1112      writer = hof.getRecordWriter(context);
1113
1114      // write out random rows
1115      writeRandomKeyValues(writer, context, htd.getFamiliesKeys(), ROWSPERSPLIT);
1116      writer.close(context);
1117
1118      // Make sure that a directory was created for every CF
1119      FileSystem fs = dir.getFileSystem(conf);
1120
1121      // commit so that the filesystem has one directory per column family
1122      hof.getOutputCommitter(context).commitTask(context);
1123      hof.getOutputCommitter(context).commitJob(context);
1124      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1125      assertEquals(htd.getFamilies().size(), families.length);
1126      for (FileStatus f : families) {
1127        String familyStr = f.getPath().getName();
1128        HColumnDescriptor hcd = htd.getFamily(Bytes.toBytes(familyStr));
1129        // verify that the compression on this file matches the configured
1130        // compression
1131        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1132        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1133        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1134
1135        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1136        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1137        assertEquals(
1138          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")",
1139          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1140        assertEquals(
1141          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1142          hcd.getCompressionType(), reader.getFileContext().getCompression());
1143      }
1144    } finally {
1145      dir.getFileSystem(conf).delete(dir, true);
1146    }
1147  }
1148
1149  /**
1150   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
1151   * family descriptors
1152   */
1153  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1154    TaskAttemptContext context, Set<byte[]> families, int numRows)
1155    throws IOException, InterruptedException {
1156    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1157    int valLength = 10;
1158    byte valBytes[] = new byte[valLength];
1159
1160    int taskId = context.getTaskAttemptID().getTaskID().getId();
1161    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1162    final byte[] qualifier = Bytes.toBytes("data");
1163    for (int i = 0; i < numRows; i++) {
1164      Bytes.putInt(keyBytes, 0, i);
1165      Bytes.random(valBytes);
1166      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1167      for (byte[] family : families) {
1168        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1169        writer.write(key, kv);
1170      }
1171    }
1172  }
1173
1174  /**
1175   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
1176   * excluded from minor compaction. Without the fix of HBASE-6901, an
1177   * ArrayIndexOutOfBoundsException will be thrown.
1178   */
1179  @Ignore("Flakey: See HBASE-9051")
1180  @Test
1181  public void testExcludeAllFromMinorCompaction() throws Exception {
1182    Configuration conf = util.getConfiguration();
1183    conf.setInt("hbase.hstore.compaction.min", 2);
1184    generateRandomStartKeys(5);
1185
1186    util.startMiniCluster();
1187    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
1188      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1189      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1190      final FileSystem fs = util.getDFSCluster().getFileSystem();
1191      assertEquals("Should start with empty table", 0, util.countRows(table));
1192
1193      // deep inspection: get the StoreFile dir
1194      final Path storePath =
1195        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1196          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1197            Bytes.toString(FAMILIES[0])));
1198      assertEquals(0, fs.listStatus(storePath).length);
1199
1200      // Generate two bulk load files
1201      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1202
1203      for (int i = 0; i < 2; i++) {
1204        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1205        runIncrementalPELoad(conf,
1206          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(),
1207            conn.getRegionLocator(TABLE_NAMES[0]))),
1208          testDir, false);
1209        // Perform the actual load
1210        new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
1211      }
1212
1213      // Ensure data shows up
1214      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1215      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
1216        util.countRows(table));
1217
1218      // should have a second StoreFile now
1219      assertEquals(2, fs.listStatus(storePath).length);
1220
1221      // minor compactions shouldn't get rid of the file
1222      admin.compact(TABLE_NAMES[0]);
1223      try {
1224        quickPoll(new Callable<Boolean>() {
1225          @Override
1226          public Boolean call() throws Exception {
1227            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1228            for (HRegion region : regions) {
1229              for (HStore store : region.getStores()) {
1230                store.closeAndArchiveCompactedFiles();
1231              }
1232            }
1233            return fs.listStatus(storePath).length == 1;
1234          }
1235        }, 5000);
1236        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1237      } catch (AssertionError ae) {
1238        // this is expected behavior
1239      }
1240
1241      // a major compaction should work though
1242      admin.majorCompact(TABLE_NAMES[0]);
1243      quickPoll(new Callable<Boolean>() {
1244        @Override
1245        public Boolean call() throws Exception {
1246          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1247          for (HRegion region : regions) {
1248            for (HStore store : region.getStores()) {
1249              store.closeAndArchiveCompactedFiles();
1250            }
1251          }
1252          return fs.listStatus(storePath).length == 1;
1253        }
1254      }, 5000);
1255
1256    } finally {
1257      util.shutdownMiniCluster();
1258    }
1259  }
1260
1261  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1262  @Test
1263  public void testExcludeMinorCompaction() throws Exception {
1264    Configuration conf = util.getConfiguration();
1265    conf.setInt("hbase.hstore.compaction.min", 2);
1266    generateRandomStartKeys(5);
1267
1268    util.startMiniCluster();
1269    try (Connection conn = ConnectionFactory.createConnection(conf);
1270      Admin admin = conn.getAdmin()) {
1271      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1272      final FileSystem fs = util.getDFSCluster().getFileSystem();
1273      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1274      assertEquals("Should start with empty table", 0, util.countRows(table));
1275
1276      // deep inspection: get the StoreFile dir
1277      final Path storePath =
1278        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1279          new Path(admin.getTableRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1280            Bytes.toString(FAMILIES[0])));
1281      assertEquals(0, fs.listStatus(storePath).length);
1282
1283      // put some data in it and flush to create a storefile
1284      Put p = new Put(Bytes.toBytes("test"));
1285      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1286      table.put(p);
1287      admin.flush(TABLE_NAMES[0]);
1288      assertEquals(1, util.countRows(table));
1289      quickPoll(new Callable<Boolean>() {
1290        @Override
1291        public Boolean call() throws Exception {
1292          return fs.listStatus(storePath).length == 1;
1293        }
1294      }, 5000);
1295
1296      // Generate a bulk load file with more rows
1297      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1298
1299      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1300      runIncrementalPELoad(conf,
1301        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getTableDescriptor(), regionLocator)),
1302        testDir, false);
1303
1304      // Perform the actual load
1305      new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
1306
1307      // Ensure data shows up
1308      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1309      assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows + 1,
1310        util.countRows(table));
1311
1312      // should have a second StoreFile now
1313      assertEquals(2, fs.listStatus(storePath).length);
1314
1315      // minor compactions shouldn't get rid of the file
1316      admin.compact(TABLE_NAMES[0]);
1317      try {
1318        quickPoll(new Callable<Boolean>() {
1319          @Override
1320          public Boolean call() throws Exception {
1321            return fs.listStatus(storePath).length == 1;
1322          }
1323        }, 5000);
1324        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1325      } catch (AssertionError ae) {
1326        // this is expected behavior
1327      }
1328
1329      // a major compaction should work though
1330      admin.majorCompact(TABLE_NAMES[0]);
1331      quickPoll(new Callable<Boolean>() {
1332        @Override
1333        public Boolean call() throws Exception {
1334          return fs.listStatus(storePath).length == 1;
1335        }
1336      }, 5000);
1337
1338    } finally {
1339      util.shutdownMiniCluster();
1340    }
1341  }
1342
1343  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1344    int sleepMs = 10;
1345    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1346    while (retries-- > 0) {
1347      if (c.call().booleanValue()) {
1348        return;
1349      }
1350      Thread.sleep(sleepMs);
1351    }
1352    fail();
1353  }
1354
1355  public static void main(String args[]) throws Exception {
1356    new TestHFileOutputFormat2().manualTest(args);
1357  }
1358
1359  public void manualTest(String args[]) throws Exception {
1360    Configuration conf = HBaseConfiguration.create();
1361    util = new HBaseTestingUtility(conf);
1362    if ("newtable".equals(args[0])) {
1363      TableName tname = TableName.valueOf(args[1]);
1364      byte[][] splitKeys = generateRandomSplitKeys(4);
1365      Table table = util.createTable(tname, FAMILIES, splitKeys);
1366    } else if ("incremental".equals(args[0])) {
1367      TableName tname = TableName.valueOf(args[1]);
1368      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
1369        RegionLocator regionLocator = c.getRegionLocator(tname)) {
1370        Path outDir = new Path("incremental-out");
1371        runIncrementalPELoad(conf,
1372          Arrays.asList(
1373            new HFileOutputFormat2.TableInfo(admin.getTableDescriptor(tname), regionLocator)),
1374          outDir, false);
1375      }
1376    } else {
1377      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
1378    }
1379  }
1380
1381  @Test
1382  public void testBlockStoragePolicy() throws Exception {
1383    util = new HBaseTestingUtility();
1384    Configuration conf = util.getConfiguration();
1385    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1386
1387    conf.set(
1388      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
1389        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
1390      "ONE_SSD");
1391    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1392    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1393    util.startMiniDFSCluster(3);
1394    FileSystem fs = util.getDFSCluster().getFileSystem();
1395    try {
1396      fs.mkdirs(cf1Dir);
1397      fs.mkdirs(cf2Dir);
1398
1399      // the original block storage policy would be HOT
1400      String spA = getStoragePolicyName(fs, cf1Dir);
1401      String spB = getStoragePolicyName(fs, cf2Dir);
1402      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1403      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1404      assertEquals("HOT", spA);
1405      assertEquals("HOT", spB);
1406
1407      // alter table cf schema to change storage policies
1408      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1409        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1410      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1411        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1412      spA = getStoragePolicyName(fs, cf1Dir);
1413      spB = getStoragePolicyName(fs, cf2Dir);
1414      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1415      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1416      assertNotNull(spA);
1417      assertEquals("ONE_SSD", spA);
1418      assertNotNull(spB);
1419      assertEquals("ALL_SSD", spB);
1420    } finally {
1421      fs.delete(cf1Dir, true);
1422      fs.delete(cf2Dir, true);
1423      util.shutdownMiniDFSCluster();
1424    }
1425  }
1426
1427  private String getStoragePolicyName(FileSystem fs, Path path) {
1428    try {
1429      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1430      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1431    } catch (Exception e) {
1432      // Maybe fail because of using old HDFS version, try the old way
1433      if (LOG.isTraceEnabled()) {
1434        LOG.trace("Failed to get policy directly", e);
1435      }
1436      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1437      return policy == null ? "HOT" : policy;// HOT by default
1438    }
1439  }
1440
1441  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1442    try {
1443      if (fs instanceof DistributedFileSystem) {
1444        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1445        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1446        if (null != status) {
1447          byte storagePolicyId = status.getStoragePolicy();
1448          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1449          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1450            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1451            for (BlockStoragePolicy policy : policies) {
1452              if (policy.getId() == storagePolicyId) {
1453                return policy.getName();
1454              }
1455            }
1456          }
1457        }
1458      }
1459    } catch (Throwable e) {
1460      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1461    }
1462
1463    return null;
1464  }
1465
1466  @Test
1467  public void TestConfigureCompression() throws Exception {
1468    Configuration conf = new Configuration(this.util.getConfiguration());
1469    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1470    TaskAttemptContext context = null;
1471    Path dir = util.getDataTestDir("TestConfigureCompression");
1472    String hfileoutputformatCompression = "gz";
1473
1474    try {
1475      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1476      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1477
1478      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1479
1480      Job job = Job.getInstance(conf);
1481      FileOutputFormat.setOutputPath(job, dir);
1482      context = createTestTaskAttemptContext(job);
1483      HFileOutputFormat2 hof = new HFileOutputFormat2();
1484      writer = hof.getRecordWriter(context);
1485      final byte[] b = Bytes.toBytes("b");
1486
1487      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1488      writer.write(new ImmutableBytesWritable(), kv);
1489      writer.close(context);
1490      writer = null;
1491      FileSystem fs = dir.getFileSystem(conf);
1492      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1493      while (iterator.hasNext()) {
1494        LocatedFileStatus keyFileStatus = iterator.next();
1495        HFile.Reader reader =
1496          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1497        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
1498          hfileoutputformatCompression);
1499      }
1500    } finally {
1501      if (writer != null && context != null) {
1502        writer.close(context);
1503      }
1504      dir.getFileSystem(conf).delete(dir, true);
1505    }
1506
1507  }
1508
1509  @Test
1510  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
1511    // Start cluster A
1512    util = new HBaseTestingUtility();
1513    Configuration confA = util.getConfiguration();
1514    int hostCount = 3;
1515    int regionNum = 20;
1516    String[] hostnames = new String[hostCount];
1517    for (int i = 0; i < hostCount; ++i) {
1518      hostnames[i] = "datanode_" + i;
1519    }
1520    StartMiniClusterOption option =
1521      StartMiniClusterOption.builder().numRegionServers(hostCount).dataNodeHosts(hostnames).build();
1522    util.startMiniCluster(option);
1523
1524    // Start cluster B
1525    HBaseTestingUtility utilB = new HBaseTestingUtility();
1526    Configuration confB = utilB.getConfiguration();
1527    utilB.startMiniCluster(option);
1528
1529    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
1530
1531    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
1532    TableName tableName = TableName.valueOf("table");
1533    // Create table in cluster B
1534    try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
1535      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
1536      // Generate the bulk load files
1537      // Job has zookeeper configuration for cluster A
1538      // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
1539      Job job = new Job(confA, "testLocalMRIncrementalLoad");
1540      Configuration jobConf = job.getConfiguration();
1541      final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
1542      job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
1543      setupRandomGeneratorMapper(job, false);
1544      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
1545
1546      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1547        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
1548      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1549        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
1550      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1551        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
1552
1553      String bSpecificConfigKey = "my.override.config.for.b";
1554      String bSpecificConfigValue = "b-specific-value";
1555      jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey,
1556        bSpecificConfigValue);
1557
1558      FileOutputFormat.setOutputPath(job, testDir);
1559
1560      assertFalse(util.getTestFileSystem().exists(testDir));
1561
1562      assertTrue(job.waitForCompletion(true));
1563
1564      final List<Configuration> configs =
1565        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
1566
1567      assertFalse(configs.isEmpty());
1568      for (Configuration config : configs) {
1569        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1570          config.get(HConstants.ZOOKEEPER_QUORUM));
1571        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1572          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
1573        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1574          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
1575
1576        assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey));
1577      }
1578    } finally {
1579      utilB.deleteTable(tableName);
1580      testDir.getFileSystem(confA).delete(testDir, true);
1581      util.shutdownMiniCluster();
1582      utilB.shutdownMiniCluster();
1583    }
1584  }
1585
1586  private static class ConfigurationCaptorConnection implements Connection {
1587    private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
1588
1589    private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
1590
1591    private final Connection delegate;
1592
1593    public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user)
1594      throws IOException {
1595      Configuration confForDelegate = new Configuration(conf);
1596      confForDelegate.unset(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL);
1597      delegate = createConnection(confForDelegate, es, user);
1598
1599      final String uuid = conf.get(UUID_KEY);
1600      if (uuid != null) {
1601        confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
1602      }
1603    }
1604
1605    static UUID configureConnectionImpl(Configuration conf) {
1606      conf.setClass(ClusterConnection.HBASE_CLIENT_CONNECTION_IMPL,
1607        ConfigurationCaptorConnection.class, Connection.class);
1608
1609      final UUID uuid = UUID.randomUUID();
1610      conf.set(UUID_KEY, uuid.toString());
1611      return uuid;
1612    }
1613
1614    static List<Configuration> getCapturedConfigarutions(UUID key) {
1615      return confs.get(key);
1616    }
1617
1618    @Override
1619    public Configuration getConfiguration() {
1620      return delegate.getConfiguration();
1621    }
1622
1623    @Override
1624    public Table getTable(TableName tableName) throws IOException {
1625      return delegate.getTable(tableName);
1626    }
1627
1628    @Override
1629    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
1630      return delegate.getTable(tableName, pool);
1631    }
1632
1633    @Override
1634    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
1635      return delegate.getBufferedMutator(tableName);
1636    }
1637
1638    @Override
1639    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
1640      return delegate.getBufferedMutator(params);
1641    }
1642
1643    @Override
1644    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
1645      return delegate.getRegionLocator(tableName);
1646    }
1647
1648    @Override
1649    public void clearRegionLocationCache() {
1650      delegate.clearRegionLocationCache();
1651    }
1652
1653    @Override
1654    public Admin getAdmin() throws IOException {
1655      return delegate.getAdmin();
1656    }
1657
1658    @Override
1659    public void close() throws IOException {
1660      delegate.close();
1661    }
1662
1663    @Override
1664    public boolean isClosed() {
1665      return delegate.isClosed();
1666    }
1667
1668    @Override
1669    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
1670      return delegate.getTableBuilder(tableName, pool);
1671    }
1672
1673    @Override
1674    public Hbck getHbck() throws IOException {
1675      return delegate.getHbck();
1676    }
1677
1678    @Override
1679    public Hbck getHbck(ServerName masterServer) throws IOException {
1680      return delegate.getHbck(masterServer);
1681    }
1682
1683    @Override
1684    public void abort(String why, Throwable e) {
1685      delegate.abort(why, e);
1686    }
1687
1688    @Override
1689    public boolean isAborted() {
1690      return delegate.isAborted();
1691    }
1692
1693    @Override
1694    public String getClusterId() {
1695      return delegate.getClusterId();
1696    }
1697  }
1698}