001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.client.ConnectionFactory.createAsyncConnection;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNotSame;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.mockito.Mockito.verify;
029
030import java.io.IOException;
031import java.lang.reflect.Field;
032import java.security.PrivilegedAction;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.HashMap;
036import java.util.LinkedList;
037import java.util.List;
038import java.util.Map;
039import java.util.Map.Entry;
040import java.util.Random;
041import java.util.Set;
042import java.util.UUID;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.CopyOnWriteArrayList;
046import java.util.concurrent.ExecutorService;
047import java.util.concurrent.ThreadLocalRandom;
048import java.util.stream.Collectors;
049import java.util.stream.Stream;
050import org.apache.hadoop.conf.Configuration;
051import org.apache.hadoop.fs.FileStatus;
052import org.apache.hadoop.fs.FileSystem;
053import org.apache.hadoop.fs.LocatedFileStatus;
054import org.apache.hadoop.fs.Path;
055import org.apache.hadoop.fs.RemoteIterator;
056import org.apache.hadoop.hbase.ArrayBackedTag;
057import org.apache.hadoop.hbase.Cell;
058import org.apache.hadoop.hbase.CellUtil;
059import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
060import org.apache.hadoop.hbase.HBaseClassTestRule;
061import org.apache.hadoop.hbase.HBaseConfiguration;
062import org.apache.hadoop.hbase.HBaseTestingUtil;
063import org.apache.hadoop.hbase.HConstants;
064import org.apache.hadoop.hbase.HDFSBlocksDistribution;
065import org.apache.hadoop.hbase.HadoopShims;
066import org.apache.hadoop.hbase.KeyValue;
067import org.apache.hadoop.hbase.PerformanceEvaluation;
068import org.apache.hadoop.hbase.PrivateCellUtil;
069import org.apache.hadoop.hbase.ServerName;
070import org.apache.hadoop.hbase.StartTestingClusterOption;
071import org.apache.hadoop.hbase.TableName;
072import org.apache.hadoop.hbase.Tag;
073import org.apache.hadoop.hbase.TagType;
074import org.apache.hadoop.hbase.client.Admin;
075import org.apache.hadoop.hbase.client.AsyncConnection;
076import org.apache.hadoop.hbase.client.BufferedMutator;
077import org.apache.hadoop.hbase.client.BufferedMutatorParams;
078import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
079import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
080import org.apache.hadoop.hbase.client.Connection;
081import org.apache.hadoop.hbase.client.ConnectionFactory;
082import org.apache.hadoop.hbase.client.ConnectionUtils;
083import org.apache.hadoop.hbase.client.Hbck;
084import org.apache.hadoop.hbase.client.Put;
085import org.apache.hadoop.hbase.client.RegionLocator;
086import org.apache.hadoop.hbase.client.Result;
087import org.apache.hadoop.hbase.client.ResultScanner;
088import org.apache.hadoop.hbase.client.Scan;
089import org.apache.hadoop.hbase.client.Table;
090import org.apache.hadoop.hbase.client.TableBuilder;
091import org.apache.hadoop.hbase.client.TableDescriptor;
092import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
093import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
094import org.apache.hadoop.hbase.io.compress.Compression;
095import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
096import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
097import org.apache.hadoop.hbase.io.hfile.CacheConfig;
098import org.apache.hadoop.hbase.io.hfile.HFile;
099import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
100import org.apache.hadoop.hbase.io.hfile.HFileScanner;
101import org.apache.hadoop.hbase.regionserver.BloomType;
102import org.apache.hadoop.hbase.regionserver.HRegion;
103import org.apache.hadoop.hbase.regionserver.HStore;
104import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
105import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
106import org.apache.hadoop.hbase.security.User;
107import org.apache.hadoop.hbase.testclassification.LargeTests;
108import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
109import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
110import org.apache.hadoop.hbase.util.Bytes;
111import org.apache.hadoop.hbase.util.CommonFSUtils;
112import org.apache.hadoop.hbase.util.FSUtils;
113import org.apache.hadoop.hbase.util.FutureUtils;
114import org.apache.hadoop.hbase.util.ReflectionUtils;
115import org.apache.hadoop.hdfs.DistributedFileSystem;
116import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
117import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
118import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
119import org.apache.hadoop.io.NullWritable;
120import org.apache.hadoop.mapreduce.Job;
121import org.apache.hadoop.mapreduce.Mapper;
122import org.apache.hadoop.mapreduce.RecordWriter;
123import org.apache.hadoop.mapreduce.TaskAttemptContext;
124import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
125import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
126import org.apache.hadoop.security.UserGroupInformation;
127import org.junit.Assert;
128import org.junit.ClassRule;
129import org.junit.Ignore;
130import org.junit.Test;
131import org.junit.experimental.categories.Category;
132import org.mockito.Mockito;
133import org.slf4j.Logger;
134import org.slf4j.LoggerFactory;
135
136/**
137 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
138 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
139 * values like those of {@link PerformanceEvaluation}.
140 */
141@Category({ VerySlowMapReduceTests.class, LargeTests.class })
142public class TestHFileOutputFormat2 {
143
144  @ClassRule
145  public static final HBaseClassTestRule CLASS_RULE =
146    HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
147
148  private final static int ROWSPERSPLIT = 1024;
149
150  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
151  private static final byte[][] FAMILIES =
152    { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) };
153  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3")
154    .map(TableName::valueOf).toArray(TableName[]::new);
155
156  private HBaseTestingUtil util = new HBaseTestingUtil();
157
158  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
159
160  /**
161   * Simple mapper that makes KeyValue output.
162   */
163  static class RandomKVGeneratingMapper
164    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
165
166    private int keyLength;
167    private static final int KEYLEN_DEFAULT = 10;
168    private static final String KEYLEN_CONF = "randomkv.key.length";
169
170    private int valLength;
171    private static final int VALLEN_DEFAULT = 10;
172    private static final String VALLEN_CONF = "randomkv.val.length";
173    private static final byte[] QUALIFIER = Bytes.toBytes("data");
174    private boolean multiTableMapper = false;
175    private TableName[] tables = null;
176
177    @Override
178    protected void setup(Context context) throws IOException, InterruptedException {
179      super.setup(context);
180
181      Configuration conf = context.getConfiguration();
182      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
183      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
184      multiTableMapper =
185        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
186      if (multiTableMapper) {
187        tables = TABLE_NAMES;
188      } else {
189        tables = new TableName[] { TABLE_NAMES[0] };
190      }
191    }
192
193    @Override
194    protected void map(NullWritable n1, NullWritable n2,
195      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
196      throws java.io.IOException, InterruptedException {
197
198      byte keyBytes[] = new byte[keyLength];
199      byte valBytes[] = new byte[valLength];
200
201      int taskId = context.getTaskAttemptID().getTaskID().getId();
202      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
203      byte[] key;
204      for (int j = 0; j < tables.length; ++j) {
205        for (int i = 0; i < ROWSPERSPLIT; i++) {
206          Bytes.random(keyBytes);
207          // Ensure that unique tasks generate unique keys
208          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
209          Bytes.random(valBytes);
210          key = keyBytes;
211          if (multiTableMapper) {
212            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
213          }
214
215          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
216            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
217            context.write(new ImmutableBytesWritable(key), kv);
218          }
219        }
220      }
221    }
222  }
223
224  /**
225   * Simple mapper that makes Put output.
226   */
227  static class RandomPutGeneratingMapper
228    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
229
230    private int keyLength;
231    private static final int KEYLEN_DEFAULT = 10;
232    private static final String KEYLEN_CONF = "randomkv.key.length";
233
234    private int valLength;
235    private static final int VALLEN_DEFAULT = 10;
236    private static final String VALLEN_CONF = "randomkv.val.length";
237    private static final byte[] QUALIFIER = Bytes.toBytes("data");
238    private boolean multiTableMapper = false;
239    private TableName[] tables = null;
240
241    @Override
242    protected void setup(Context context) throws IOException, InterruptedException {
243      super.setup(context);
244
245      Configuration conf = context.getConfiguration();
246      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
247      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
248      multiTableMapper =
249        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
250      if (multiTableMapper) {
251        tables = TABLE_NAMES;
252      } else {
253        tables = new TableName[] { TABLE_NAMES[0] };
254      }
255    }
256
257    @Override
258    protected void map(NullWritable n1, NullWritable n2,
259      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context)
260      throws java.io.IOException, InterruptedException {
261
262      byte keyBytes[] = new byte[keyLength];
263      byte valBytes[] = new byte[valLength];
264
265      int taskId = context.getTaskAttemptID().getTaskID().getId();
266      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
267
268      byte[] key;
269      for (int j = 0; j < tables.length; ++j) {
270        for (int i = 0; i < ROWSPERSPLIT; i++) {
271          Bytes.random(keyBytes);
272          // Ensure that unique tasks generate unique keys
273          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
274          Bytes.random(valBytes);
275          key = keyBytes;
276          if (multiTableMapper) {
277            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
278          }
279
280          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
281            Put p = new Put(keyBytes);
282            p.addColumn(family, QUALIFIER, valBytes);
283            // set TTL to very low so that the scan does not return any value
284            p.setTTL(1l);
285            context.write(new ImmutableBytesWritable(key), p);
286          }
287        }
288      }
289    }
290  }
291
292  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
293    if (putSortReducer) {
294      job.setInputFormatClass(NMapInputFormat.class);
295      job.setMapperClass(RandomPutGeneratingMapper.class);
296      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
297      job.setMapOutputValueClass(Put.class);
298    } else {
299      job.setInputFormatClass(NMapInputFormat.class);
300      job.setMapperClass(RandomKVGeneratingMapper.class);
301      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
302      job.setMapOutputValueClass(KeyValue.class);
303    }
304  }
305
306  /**
307   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
308   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
309   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
310   */
311  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
312  @Test
313  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
314    Configuration conf = new Configuration(this.util.getConfiguration());
315    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
316    TaskAttemptContext context = null;
317    Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
318    try {
319      Job job = new Job(conf);
320      FileOutputFormat.setOutputPath(job, dir);
321      context = createTestTaskAttemptContext(job);
322      HFileOutputFormat2 hof = new HFileOutputFormat2();
323      writer = hof.getRecordWriter(context);
324      final byte[] b = Bytes.toBytes("b");
325
326      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
327      // changed by call to write. Check all in kv is same but ts.
328      KeyValue kv = new KeyValue(b, b, b);
329      KeyValue original = kv.clone();
330      writer.write(new ImmutableBytesWritable(), kv);
331      assertFalse(original.equals(kv));
332      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
333      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
334      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
335      assertNotSame(original.getTimestamp(), kv.getTimestamp());
336      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
337
338      // Test 2. Now test passing a kv that has explicit ts. It should not be
339      // changed by call to record write.
340      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
341      original = kv.clone();
342      writer.write(new ImmutableBytesWritable(), kv);
343      assertTrue(original.equals(kv));
344    } finally {
345      if (writer != null && context != null) writer.close(context);
346      dir.getFileSystem(conf).delete(dir, true);
347    }
348  }
349
350  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
351    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
352    TaskAttemptContext context =
353      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
354    return context;
355  }
356
357  /*
358   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
359   * time-restricted scans.
360   */
361  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
362  @Test
363  public void test_TIMERANGE() throws Exception {
364    Configuration conf = new Configuration(this.util.getConfiguration());
365    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
366    TaskAttemptContext context = null;
367    Path dir = util.getDataTestDir("test_TIMERANGE_present");
368    LOG.info("Timerange dir writing to dir: " + dir);
369    try {
370      // build a record writer using HFileOutputFormat2
371      Job job = new Job(conf);
372      FileOutputFormat.setOutputPath(job, dir);
373      context = createTestTaskAttemptContext(job);
374      HFileOutputFormat2 hof = new HFileOutputFormat2();
375      writer = hof.getRecordWriter(context);
376
377      // Pass two key values with explicit times stamps
378      final byte[] b = Bytes.toBytes("b");
379
380      // value 1 with timestamp 2000
381      KeyValue kv = new KeyValue(b, b, b, 2000, b);
382      KeyValue original = kv.clone();
383      writer.write(new ImmutableBytesWritable(), kv);
384      assertEquals(original, kv);
385
386      // value 2 with timestamp 1000
387      kv = new KeyValue(b, b, b, 1000, b);
388      original = kv.clone();
389      writer.write(new ImmutableBytesWritable(), kv);
390      assertEquals(original, kv);
391
392      // verify that the file has the proper FileInfo.
393      writer.close(context);
394
395      // the generated file lives 1 directory down from the attempt directory
396      // and is the only file, e.g.
397      // _attempt__0000_r_000000_0/b/1979617994050536795
398      FileSystem fs = FileSystem.get(conf);
399      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
400      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
401      FileStatus[] file = fs.listStatus(sub1[0].getPath());
402
403      // open as HFile Reader and pull out TIMERANGE FileInfo.
404      HFile.Reader rd =
405        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
406      Map<byte[], byte[]> finfo = rd.getHFileInfo();
407      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
408      assertNotNull(range);
409
410      // unmarshall and check values.
411      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
412      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
413      assertEquals(1000, timeRangeTracker.getMin());
414      assertEquals(2000, timeRangeTracker.getMax());
415      rd.close();
416    } finally {
417      if (writer != null && context != null) writer.close(context);
418      dir.getFileSystem(conf).delete(dir, true);
419    }
420  }
421
422  /**
423   * Run small MR job.
424   */
425  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
426  @Test
427  public void testWritingPEData() throws Exception {
428    Configuration conf = util.getConfiguration();
429    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
430    FileSystem fs = testDir.getFileSystem(conf);
431
432    // Set down this value or we OOME in eclipse.
433    conf.setInt("mapreduce.task.io.sort.mb", 20);
434    // Write a few files.
435    long hregionMaxFilesize = 10 * 1024;
436    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
437
438    Job job = new Job(conf, "testWritingPEData");
439    setupRandomGeneratorMapper(job, false);
440    // This partitioner doesn't work well for number keys but using it anyways
441    // just to demonstrate how to configure it.
442    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
443    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
444
445    Arrays.fill(startKey, (byte) 0);
446    Arrays.fill(endKey, (byte) 0xff);
447
448    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
449    // Set start and end rows for partitioner.
450    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
451    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
452    job.setReducerClass(CellSortReducer.class);
453    job.setOutputFormatClass(HFileOutputFormat2.class);
454    job.setNumReduceTasks(4);
455    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
456      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
457      CellSerialization.class.getName());
458
459    FileOutputFormat.setOutputPath(job, testDir);
460    assertTrue(job.waitForCompletion(false));
461    FileStatus[] files = fs.listStatus(testDir);
462    assertTrue(files.length > 0);
463
464    // check output file num and size.
465    for (byte[] family : FAMILIES) {
466      long kvCount = 0;
467      RemoteIterator<LocatedFileStatus> iterator =
468        fs.listFiles(testDir.suffix("/" + new String(family)), true);
469      while (iterator.hasNext()) {
470        LocatedFileStatus keyFileStatus = iterator.next();
471        HFile.Reader reader =
472          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
473        HFileScanner scanner = reader.getScanner(conf, false, false, false);
474
475        kvCount += reader.getEntries();
476        scanner.seekTo();
477        long perKVSize = scanner.getCell().getSerializedSize();
478        assertTrue("Data size of each file should not be too large.",
479          perKVSize * reader.getEntries() <= hregionMaxFilesize);
480      }
481      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
482    }
483  }
484
485  /**
486   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
487   */
488  @Test
489  public void test_WritingTagData() throws Exception {
490    Configuration conf = new Configuration(this.util.getConfiguration());
491    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
492    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
493    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
494    TaskAttemptContext context = null;
495    Path dir = util.getDataTestDir("WritingTagData");
496    try {
497      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
498      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
499      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
500      Job job = new Job(conf);
501      FileOutputFormat.setOutputPath(job, dir);
502      context = createTestTaskAttemptContext(job);
503      HFileOutputFormat2 hof = new HFileOutputFormat2();
504      writer = hof.getRecordWriter(context);
505      final byte[] b = Bytes.toBytes("b");
506
507      List<Tag> tags = new ArrayList<>();
508      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
509      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
510      writer.write(new ImmutableBytesWritable(), kv);
511      writer.close(context);
512      writer = null;
513      FileSystem fs = dir.getFileSystem(conf);
514      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
515      while (iterator.hasNext()) {
516        LocatedFileStatus keyFileStatus = iterator.next();
517        HFile.Reader reader =
518          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
519        HFileScanner scanner = reader.getScanner(conf, false, false, false);
520        scanner.seekTo();
521        Cell cell = scanner.getCell();
522        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
523        assertTrue(tagsFromCell.size() > 0);
524        for (Tag tag : tagsFromCell) {
525          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
526        }
527      }
528    } finally {
529      if (writer != null && context != null) writer.close(context);
530      dir.getFileSystem(conf).delete(dir, true);
531    }
532  }
533
534  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
535  @Test
536  public void testJobConfiguration() throws Exception {
537    Configuration conf = new Configuration(this.util.getConfiguration());
538    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
539      util.getDataTestDir("testJobConfiguration").toString());
540    Job job = new Job(conf);
541    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
542    Table table = Mockito.mock(Table.class);
543    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
544    setupMockStartKeys(regionLocator);
545    setupMockTableName(regionLocator);
546    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
547    assertEquals(job.getNumReduceTasks(), 4);
548  }
549
550  private byte[][] generateRandomStartKeys(int numKeys) {
551    Random random = ThreadLocalRandom.current();
552    byte[][] ret = new byte[numKeys][];
553    // first region start key is always empty
554    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
555    for (int i = 1; i < numKeys; i++) {
556      ret[i] =
557        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
558    }
559    return ret;
560  }
561
562  private byte[][] generateRandomSplitKeys(int numKeys) {
563    Random random = ThreadLocalRandom.current();
564    byte[][] ret = new byte[numKeys][];
565    for (int i = 0; i < numKeys; i++) {
566      ret[i] =
567        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
568    }
569    return ret;
570  }
571
572  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
573  @Test
574  public void testMRIncrementalLoad() throws Exception {
575    LOG.info("\nStarting test testMRIncrementalLoad\n");
576    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
577  }
578
579  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
580  @Test
581  public void testMRIncrementalLoadWithSplit() throws Exception {
582    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
583    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
584  }
585
586  /**
587   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
588   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
589   * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
590   * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
591   * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
592   * locality features more easily.
593   */
594  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
595  @Test
596  public void testMRIncrementalLoadWithLocality() throws Exception {
597    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
598    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
599    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
600  }
601
602  // @Ignore("Wahtevs")
603  @Test
604  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
605    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
606    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
607  }
608
609  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
610    boolean putSortReducer, String tableStr) throws Exception {
611    doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
612      Arrays.asList(tableStr));
613  }
614
615  @Test
616  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
617    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
618    doIncrementalLoadTest(false, false, true,
619      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
620  }
621
622  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
623    boolean putSortReducer, List<String> tableStr) throws Exception {
624    util = new HBaseTestingUtil();
625    Configuration conf = util.getConfiguration();
626    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
627    int hostCount = 1;
628    int regionNum = 5;
629    if (shouldKeepLocality) {
630      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
631      // explicit hostnames parameter just like MiniDFSCluster does.
632      hostCount = 3;
633      regionNum = 20;
634    }
635
636    String[] hostnames = new String[hostCount];
637    for (int i = 0; i < hostCount; ++i) {
638      hostnames[i] = "datanode_" + i;
639    }
640    StartTestingClusterOption option = StartTestingClusterOption.builder()
641      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
642    util.startMiniCluster(option);
643
644    Map<String, Table> allTables = new HashMap<>(tableStr.size());
645    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
646    boolean writeMultipleTables = tableStr.size() > 1;
647    for (String tableStrSingle : tableStr) {
648      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
649      TableName tableName = TableName.valueOf(tableStrSingle);
650      Table table = util.createTable(tableName, FAMILIES, splitKeys);
651
652      RegionLocator r = util.getConnection().getRegionLocator(tableName);
653      assertEquals("Should start with empty table", 0, util.countRows(table));
654      int numRegions = r.getStartKeys().length;
655      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
656
657      allTables.put(tableStrSingle, table);
658      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r));
659    }
660    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
661    // Generate the bulk load files
662    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
663    if (writeMultipleTables) {
664      testDir = new Path(testDir, "default");
665    }
666
667    for (Table tableSingle : allTables.values()) {
668      // This doesn't write into the table, just makes files
669      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
670    }
671    int numTableDirs = 0;
672    FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir);
673    for (FileStatus tf : fss) {
674      Path tablePath = testDir;
675      if (writeMultipleTables) {
676        if (allTables.containsKey(tf.getPath().getName())) {
677          ++numTableDirs;
678          tablePath = tf.getPath();
679        } else {
680          continue;
681        }
682      }
683
684      // Make sure that a directory was created for every CF
685      int dir = 0;
686      fss = tablePath.getFileSystem(conf).listStatus(tablePath);
687      for (FileStatus f : fss) {
688        for (byte[] family : FAMILIES) {
689          if (Bytes.toString(family).equals(f.getPath().getName())) {
690            ++dir;
691          }
692        }
693      }
694      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
695    }
696    if (writeMultipleTables) {
697      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
698    }
699
700    Admin admin = util.getConnection().getAdmin();
701    try {
702      // handle the split case
703      if (shouldChangeRegions) {
704        Table chosenTable = allTables.values().iterator().next();
705        // Choose a semi-random table if multiple tables are available
706        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
707        admin.disableTable(chosenTable.getName());
708        util.waitUntilNoRegionsInTransition();
709
710        util.deleteTable(chosenTable.getName());
711        byte[][] newSplitKeys = generateRandomSplitKeys(14);
712        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
713
714        while (
715          util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations()
716            .size() != 15 || !admin.isTableAvailable(table.getName())
717        ) {
718          Thread.sleep(200);
719          LOG.info("Waiting for new region assignment to happen");
720        }
721      }
722
723      // Perform the actual load
724      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
725        Path tableDir = testDir;
726        String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
727        LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
728        if (writeMultipleTables) {
729          tableDir = new Path(testDir, tableNameStr);
730        }
731        Table currentTable = allTables.get(tableNameStr);
732        TableName currentTableName = currentTable.getName();
733        BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir);
734
735        // Ensure data shows up
736        int expectedRows = 0;
737        if (putSortReducer) {
738          // no rows should be extracted
739          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
740            util.countRows(currentTable));
741        } else {
742          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
743          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
744            util.countRows(currentTable));
745          Scan scan = new Scan();
746          ResultScanner results = currentTable.getScanner(scan);
747          for (Result res : results) {
748            assertEquals(FAMILIES.length, res.rawCells().length);
749            Cell first = res.rawCells()[0];
750            for (Cell kv : res.rawCells()) {
751              assertTrue(CellUtil.matchingRows(first, kv));
752              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
753            }
754          }
755          results.close();
756        }
757        String tableDigestBefore = util.checksumRows(currentTable);
758        // Check region locality
759        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
760        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
761          hbd.add(region.getHDFSBlocksDistribution());
762        }
763        for (String hostname : hostnames) {
764          float locality = hbd.getBlockLocalityIndex(hostname);
765          LOG.info("locality of [" + hostname + "]: " + locality);
766          assertEquals(100, (int) (locality * 100));
767        }
768
769        // Cause regions to reopen
770        admin.disableTable(currentTableName);
771        while (!admin.isTableDisabled(currentTableName)) {
772          Thread.sleep(200);
773          LOG.info("Waiting for table to disable");
774        }
775        admin.enableTable(currentTableName);
776        util.waitTableAvailable(currentTableName);
777        assertEquals("Data should remain after reopening of regions", tableDigestBefore,
778          util.checksumRows(currentTable));
779      }
780    } finally {
781      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
782        tableInfoSingle.getRegionLocator().close();
783      }
784      for (Entry<String, Table> singleTable : allTables.entrySet()) {
785        singleTable.getValue().close();
786        util.deleteTable(singleTable.getValue().getName());
787      }
788      testDir.getFileSystem(conf).delete(testDir, true);
789      util.shutdownMiniCluster();
790    }
791  }
792
793  private void runIncrementalPELoad(Configuration conf,
794    List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer)
795    throws IOException, InterruptedException, ClassNotFoundException {
796    Job job = new Job(conf, "testLocalMRIncrementalLoad");
797    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
798    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
799      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
800      CellSerialization.class.getName());
801    setupRandomGeneratorMapper(job, putSortReducer);
802    if (tableInfo.size() > 1) {
803      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
804      int sum = 0;
805      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
806        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
807      }
808      assertEquals(sum, job.getNumReduceTasks());
809    } else {
810      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
811      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(),
812        regionLocator);
813      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
814    }
815
816    FileOutputFormat.setOutputPath(job, outDir);
817
818    assertFalse(util.getTestFileSystem().exists(outDir));
819
820    assertTrue(job.waitForCompletion(true));
821  }
822
823  /**
824   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
825   * family compression map is correctly serialized into and deserialized from configuration
826   */
827  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
828  @Test
829  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
830    for (int numCfs = 0; numCfs <= 3; numCfs++) {
831      Configuration conf = new Configuration(this.util.getConfiguration());
832      Map<String, Compression.Algorithm> familyToCompression =
833        getMockColumnFamiliesForCompression(numCfs);
834      Table table = Mockito.mock(Table.class);
835      setupMockColumnFamiliesForCompression(table, familyToCompression);
836      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
837        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
838          Arrays.asList(table.getDescriptor())));
839
840      // read back family specific compression setting from the configuration
841      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
842        HFileOutputFormat2.createFamilyCompressionMap(conf);
843
844      // test that we have a value for all column families that matches with the
845      // used mock values
846      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
847        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(),
848          entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())));
849      }
850    }
851  }
852
853  private void setupMockColumnFamiliesForCompression(Table table,
854    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
855
856    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
857    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
858      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
859        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
860        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
861
862      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
863    }
864    Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor();
865  }
866
867  /**
868   * @return a map from column family names to compression algorithms for testing column family
869   *         compression. Column family names have special characters
870   */
871  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
872    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
873    // use column family names having special characters
874    if (numCfs-- > 0) {
875      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
876    }
877    if (numCfs-- > 0) {
878      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
879    }
880    if (numCfs-- > 0) {
881      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
882    }
883    if (numCfs-- > 0) {
884      familyToCompression.put("Family3", Compression.Algorithm.NONE);
885    }
886    return familyToCompression;
887  }
888
889  /**
890   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the
891   * family bloom type map is correctly serialized into and deserialized from configuration
892   */
893  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
894  @Test
895  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
896    for (int numCfs = 0; numCfs <= 2; numCfs++) {
897      Configuration conf = new Configuration(this.util.getConfiguration());
898      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
899      Table table = Mockito.mock(Table.class);
900      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
901      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
902        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
903          Arrays.asList(table.getDescriptor())));
904
905      // read back family specific data block encoding settings from the
906      // configuration
907      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
908        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
909
910      // test that we have a value for all column families that matches with the
911      // used mock values
912      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
913        assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(),
914          entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())));
915      }
916    }
917  }
918
919  private void setupMockColumnFamiliesForBloomType(Table table,
920    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
921    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
922    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
923      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
924        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
925        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
926      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
927    }
928    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
929  }
930
931  /**
932   * @return a map from column family names to compression algorithms for testing column family
933   *         compression. Column family names have special characters
934   */
935  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
936    Map<String, BloomType> familyToBloomType = new HashMap<>();
937    // use column family names having special characters
938    if (numCfs-- > 0) {
939      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
940    }
941    if (numCfs-- > 0) {
942      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
943    }
944    if (numCfs-- > 0) {
945      familyToBloomType.put("Family3", BloomType.NONE);
946    }
947    return familyToBloomType;
948  }
949
950  /**
951   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the
952   * family block size map is correctly serialized into and deserialized from configuration
953   */
954  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
955  @Test
956  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
957    for (int numCfs = 0; numCfs <= 3; numCfs++) {
958      Configuration conf = new Configuration(this.util.getConfiguration());
959      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
960      Table table = Mockito.mock(Table.class);
961      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
962      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
963        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
964          Arrays.asList(table.getDescriptor())));
965
966      // read back family specific data block encoding settings from the
967      // configuration
968      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
969        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
970
971      // test that we have a value for all column families that matches with the
972      // used mock values
973      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
974        assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(),
975          entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())));
976      }
977    }
978  }
979
980  private void setupMockColumnFamiliesForBlockSize(Table table,
981    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
982    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
983    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
984      ColumnFamilyDescriptor columnFamilyDescriptor =
985        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
986          .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
987      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
988    }
989    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
990  }
991
992  /**
993   * @return a map from column family names to compression algorithms for testing column family
994   *         compression. Column family names have special characters
995   */
996  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
997    Map<String, Integer> familyToBlockSize = new HashMap<>();
998    // use column family names having special characters
999    if (numCfs-- > 0) {
1000      familyToBlockSize.put("Family1!@#!@#&", 1234);
1001    }
1002    if (numCfs-- > 0) {
1003      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
1004    }
1005    if (numCfs-- > 0) {
1006      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
1007    }
1008    if (numCfs-- > 0) {
1009      familyToBlockSize.put("Family3", 0);
1010    }
1011    return familyToBlockSize;
1012  }
1013
1014  /**
1015   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that
1016   * the family data block encoding map is correctly serialized into and deserialized from
1017   * configuration
1018   */
1019  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1020  @Test
1021  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1022    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1023      Configuration conf = new Configuration(this.util.getConfiguration());
1024      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1025        getMockColumnFamiliesForDataBlockEncoding(numCfs);
1026      Table table = Mockito.mock(Table.class);
1027      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
1028      TableDescriptor tableDescriptor = table.getDescriptor();
1029      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1030        HFileOutputFormat2.serializeColumnFamilyAttribute(
1031          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
1032
1033      // read back family specific data block encoding settings from the
1034      // configuration
1035      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1036        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
1037
1038      // test that we have a value for all column families that matches with the
1039      // used mock values
1040      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1041        assertEquals(
1042          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(),
1043          entry.getValue(),
1044          retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())));
1045      }
1046    }
1047  }
1048
1049  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1050    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1051    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1052    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1053      ColumnFamilyDescriptor columnFamilyDescriptor =
1054        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
1055          .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)
1056          .build();
1057      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
1058    }
1059    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
1060  }
1061
1062  /**
1063   * @return a map from column family names to compression algorithms for testing column family
1064   *         compression. Column family names have special characters
1065   */
1066  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
1067    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1068    // use column family names having special characters
1069    if (numCfs-- > 0) {
1070      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1071    }
1072    if (numCfs-- > 0) {
1073      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
1074    }
1075    if (numCfs-- > 0) {
1076      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
1077    }
1078    if (numCfs-- > 0) {
1079      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1080    }
1081    return familyToDataBlockEncoding;
1082  }
1083
1084  private void setupMockStartKeys(RegionLocator table) throws IOException {
1085    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
1086      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
1087    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1088  }
1089
1090  private void setupMockTableName(RegionLocator table) throws IOException {
1091    TableName mockTableName = TableName.valueOf("mock_table");
1092    Mockito.doReturn(mockTableName).when(table).getName();
1093  }
1094
1095  /**
1096   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
1097   * from the column family descriptor
1098   */
1099  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1100  @Test
1101  public void testColumnFamilySettings() throws Exception {
1102    Configuration conf = new Configuration(this.util.getConfiguration());
1103    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1104    TaskAttemptContext context = null;
1105    Path dir = util.getDataTestDir("testColumnFamilySettings");
1106
1107    // Setup table descriptor
1108    Table table = Mockito.mock(Table.class);
1109    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1110    TableDescriptorBuilder tableDescriptorBuilder =
1111      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1112
1113    Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor();
1114    for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) {
1115      tableDescriptorBuilder.setColumnFamily(hcd);
1116    }
1117
1118    // set up the table to return some mock keys
1119    setupMockStartKeys(regionLocator);
1120
1121    try {
1122      // partial map red setup to get an operational writer for testing
1123      // We turn off the sequence file compression, because DefaultCodec
1124      // pollutes the GZip codec pool with an incompatible compressor.
1125      conf.set("io.seqfile.compression.type", "NONE");
1126      conf.set("hbase.fs.tmp.dir", dir.toString());
1127      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1128      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1129
1130      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1131      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1132      setupRandomGeneratorMapper(job, false);
1133      HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
1134      FileOutputFormat.setOutputPath(job, dir);
1135      context = createTestTaskAttemptContext(job);
1136      HFileOutputFormat2 hof = new HFileOutputFormat2();
1137      writer = hof.getRecordWriter(context);
1138
1139      // write out random rows
1140      writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(),
1141        ROWSPERSPLIT);
1142      writer.close(context);
1143
1144      // Make sure that a directory was created for every CF
1145      FileSystem fs = dir.getFileSystem(conf);
1146
1147      // commit so that the filesystem has one directory per column family
1148      hof.getOutputCommitter(context).commitTask(context);
1149      hof.getOutputCommitter(context).commitJob(context);
1150      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1151      assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length);
1152      for (FileStatus f : families) {
1153        String familyStr = f.getPath().getName();
1154        ColumnFamilyDescriptor hcd =
1155          tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr));
1156        // verify that the compression on this file matches the configured
1157        // compression
1158        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1159        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1160        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1161
1162        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1163        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1164        assertEquals(
1165          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")",
1166          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1167        assertEquals(
1168          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1169          hcd.getCompressionType(), reader.getFileContext().getCompression());
1170      }
1171    } finally {
1172      dir.getFileSystem(conf).delete(dir, true);
1173    }
1174  }
1175
1176  /**
1177   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
1178   * family descriptors
1179   */
1180  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1181    TaskAttemptContext context, Set<byte[]> families, int numRows)
1182    throws IOException, InterruptedException {
1183    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1184    int valLength = 10;
1185    byte valBytes[] = new byte[valLength];
1186
1187    int taskId = context.getTaskAttemptID().getTaskID().getId();
1188    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1189    final byte[] qualifier = Bytes.toBytes("data");
1190    for (int i = 0; i < numRows; i++) {
1191      Bytes.putInt(keyBytes, 0, i);
1192      Bytes.random(valBytes);
1193      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1194      for (byte[] family : families) {
1195        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1196        writer.write(key, kv);
1197      }
1198    }
1199  }
1200
1201  /**
1202   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
1203   * excluded from minor compaction. Without the fix of HBASE-6901, an
1204   * ArrayIndexOutOfBoundsException will be thrown.
1205   */
1206  @Ignore("Flakey: See HBASE-9051")
1207  @Test
1208  public void testExcludeAllFromMinorCompaction() throws Exception {
1209    Configuration conf = util.getConfiguration();
1210    conf.setInt("hbase.hstore.compaction.min", 2);
1211    generateRandomStartKeys(5);
1212
1213    util.startMiniCluster();
1214    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
1215      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1216      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1217      final FileSystem fs = util.getDFSCluster().getFileSystem();
1218      assertEquals("Should start with empty table", 0, util.countRows(table));
1219
1220      // deep inspection: get the StoreFile dir
1221      final Path storePath =
1222        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1223          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1224            Bytes.toString(FAMILIES[0])));
1225      assertEquals(0, fs.listStatus(storePath).length);
1226
1227      // Generate two bulk load files
1228      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1229
1230      for (int i = 0; i < 2; i++) {
1231        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1232        runIncrementalPELoad(conf,
1233          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(),
1234            conn.getRegionLocator(TABLE_NAMES[0]))),
1235          testDir, false);
1236        // Perform the actual load
1237        BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1238      }
1239
1240      // Ensure data shows up
1241      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1242      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
1243        util.countRows(table));
1244
1245      // should have a second StoreFile now
1246      assertEquals(2, fs.listStatus(storePath).length);
1247
1248      // minor compactions shouldn't get rid of the file
1249      admin.compact(TABLE_NAMES[0]);
1250      try {
1251        quickPoll(new Callable<Boolean>() {
1252          @Override
1253          public Boolean call() throws Exception {
1254            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1255            for (HRegion region : regions) {
1256              for (HStore store : region.getStores()) {
1257                store.closeAndArchiveCompactedFiles();
1258              }
1259            }
1260            return fs.listStatus(storePath).length == 1;
1261          }
1262        }, 5000);
1263        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1264      } catch (AssertionError ae) {
1265        // this is expected behavior
1266      }
1267
1268      // a major compaction should work though
1269      admin.majorCompact(TABLE_NAMES[0]);
1270      quickPoll(new Callable<Boolean>() {
1271        @Override
1272        public Boolean call() throws Exception {
1273          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1274          for (HRegion region : regions) {
1275            for (HStore store : region.getStores()) {
1276              store.closeAndArchiveCompactedFiles();
1277            }
1278          }
1279          return fs.listStatus(storePath).length == 1;
1280        }
1281      }, 5000);
1282
1283    } finally {
1284      util.shutdownMiniCluster();
1285    }
1286  }
1287
1288  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1289  @Test
1290  public void testExcludeMinorCompaction() throws Exception {
1291    Configuration conf = util.getConfiguration();
1292    conf.setInt("hbase.hstore.compaction.min", 2);
1293    generateRandomStartKeys(5);
1294
1295    util.startMiniCluster();
1296    try (Connection conn = ConnectionFactory.createConnection(conf);
1297      Admin admin = conn.getAdmin()) {
1298      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1299      final FileSystem fs = util.getDFSCluster().getFileSystem();
1300      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1301      assertEquals("Should start with empty table", 0, util.countRows(table));
1302
1303      // deep inspection: get the StoreFile dir
1304      final Path storePath =
1305        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1306          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1307            Bytes.toString(FAMILIES[0])));
1308      assertEquals(0, fs.listStatus(storePath).length);
1309
1310      // put some data in it and flush to create a storefile
1311      Put p = new Put(Bytes.toBytes("test"));
1312      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1313      table.put(p);
1314      admin.flush(TABLE_NAMES[0]);
1315      assertEquals(1, util.countRows(table));
1316      quickPoll(new Callable<Boolean>() {
1317        @Override
1318        public Boolean call() throws Exception {
1319          return fs.listStatus(storePath).length == 1;
1320        }
1321      }, 5000);
1322
1323      // Generate a bulk load file with more rows
1324      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1325
1326      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1327      runIncrementalPELoad(conf,
1328        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)),
1329        testDir, false);
1330
1331      // Perform the actual load
1332      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1333
1334      // Ensure data shows up
1335      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1336      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1,
1337        util.countRows(table));
1338
1339      // should have a second StoreFile now
1340      assertEquals(2, fs.listStatus(storePath).length);
1341
1342      // minor compactions shouldn't get rid of the file
1343      admin.compact(TABLE_NAMES[0]);
1344      try {
1345        quickPoll(new Callable<Boolean>() {
1346          @Override
1347          public Boolean call() throws Exception {
1348            return fs.listStatus(storePath).length == 1;
1349          }
1350        }, 5000);
1351        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1352      } catch (AssertionError ae) {
1353        // this is expected behavior
1354      }
1355
1356      // a major compaction should work though
1357      admin.majorCompact(TABLE_NAMES[0]);
1358      quickPoll(new Callable<Boolean>() {
1359        @Override
1360        public Boolean call() throws Exception {
1361          return fs.listStatus(storePath).length == 1;
1362        }
1363      }, 5000);
1364
1365    } finally {
1366      util.shutdownMiniCluster();
1367    }
1368  }
1369
1370  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1371    int sleepMs = 10;
1372    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1373    while (retries-- > 0) {
1374      if (c.call().booleanValue()) {
1375        return;
1376      }
1377      Thread.sleep(sleepMs);
1378    }
1379    fail();
1380  }
1381
1382  public static void main(String args[]) throws Exception {
1383    new TestHFileOutputFormat2().manualTest(args);
1384  }
1385
1386  public void manualTest(String args[]) throws Exception {
1387    Configuration conf = HBaseConfiguration.create();
1388    util = new HBaseTestingUtil(conf);
1389    if ("newtable".equals(args[0])) {
1390      TableName tname = TableName.valueOf(args[1]);
1391      byte[][] splitKeys = generateRandomSplitKeys(4);
1392      Table table = util.createTable(tname, FAMILIES, splitKeys);
1393    } else if ("incremental".equals(args[0])) {
1394      TableName tname = TableName.valueOf(args[1]);
1395      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
1396        RegionLocator regionLocator = c.getRegionLocator(tname)) {
1397        Path outDir = new Path("incremental-out");
1398        runIncrementalPELoad(conf,
1399          Arrays
1400            .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)),
1401          outDir, false);
1402      }
1403    } else {
1404      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
1405    }
1406  }
1407
1408  @Test
1409  public void testBlockStoragePolicy() throws Exception {
1410    util = new HBaseTestingUtil();
1411    Configuration conf = util.getConfiguration();
1412    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1413
1414    conf.set(
1415      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
1416        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
1417      "ONE_SSD");
1418    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1419    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1420    util.startMiniDFSCluster(3);
1421    FileSystem fs = util.getDFSCluster().getFileSystem();
1422    try {
1423      fs.mkdirs(cf1Dir);
1424      fs.mkdirs(cf2Dir);
1425
1426      // the original block storage policy would be HOT
1427      String spA = getStoragePolicyName(fs, cf1Dir);
1428      String spB = getStoragePolicyName(fs, cf2Dir);
1429      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1430      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1431      assertEquals("HOT", spA);
1432      assertEquals("HOT", spB);
1433
1434      // alter table cf schema to change storage policies
1435      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1436        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1437      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1438        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1439      spA = getStoragePolicyName(fs, cf1Dir);
1440      spB = getStoragePolicyName(fs, cf2Dir);
1441      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1442      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1443      assertNotNull(spA);
1444      assertEquals("ONE_SSD", spA);
1445      assertNotNull(spB);
1446      assertEquals("ALL_SSD", spB);
1447    } finally {
1448      fs.delete(cf1Dir, true);
1449      fs.delete(cf2Dir, true);
1450      util.shutdownMiniDFSCluster();
1451    }
1452  }
1453
1454  private String getStoragePolicyName(FileSystem fs, Path path) {
1455    try {
1456      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1457      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1458    } catch (Exception e) {
1459      // Maybe fail because of using old HDFS version, try the old way
1460      if (LOG.isTraceEnabled()) {
1461        LOG.trace("Failed to get policy directly", e);
1462      }
1463      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1464      return policy == null ? "HOT" : policy;// HOT by default
1465    }
1466  }
1467
1468  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1469    try {
1470      if (fs instanceof DistributedFileSystem) {
1471        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1472        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1473        if (null != status) {
1474          byte storagePolicyId = status.getStoragePolicy();
1475          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1476          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1477            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1478            for (BlockStoragePolicy policy : policies) {
1479              if (policy.getId() == storagePolicyId) {
1480                return policy.getName();
1481              }
1482            }
1483          }
1484        }
1485      }
1486    } catch (Throwable e) {
1487      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1488    }
1489
1490    return null;
1491  }
1492
1493  @Test
1494  public void TestConfigurePartitioner() throws IOException {
1495    Configuration conf = util.getConfiguration();
1496    // Create a user who is not the current user
1497    String fooUserName = "foo1234";
1498    String fooGroupName = "group1";
1499    UserGroupInformation ugi =
1500      UserGroupInformation.createUserForTesting(fooUserName, new String[] { fooGroupName });
1501    // Get user's home directory
1502    Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() {
1503      @Override
1504      public Path run() {
1505        try (FileSystem fs = FileSystem.get(conf)) {
1506          return fs.makeQualified(fs.getHomeDirectory());
1507        } catch (IOException ioe) {
1508          LOG.error("Failed to get foo's home directory", ioe);
1509        }
1510        return null;
1511      }
1512    });
1513
1514    Job job = Mockito.mock(Job.class);
1515    Mockito.doReturn(conf).when(job).getConfiguration();
1516    ImmutableBytesWritable writable = new ImmutableBytesWritable();
1517    List<ImmutableBytesWritable> splitPoints = new LinkedList<ImmutableBytesWritable>();
1518    splitPoints.add(writable);
1519
1520    ugi.doAs(new PrivilegedAction<Void>() {
1521      @Override
1522      public Void run() {
1523        try {
1524          HFileOutputFormat2.configurePartitioner(job, splitPoints, false);
1525        } catch (IOException ioe) {
1526          LOG.error("Failed to configure partitioner", ioe);
1527        }
1528        return null;
1529      }
1530    });
1531    FileSystem fs = FileSystem.get(conf);
1532    // verify that the job uses TotalOrderPartitioner
1533    verify(job).setPartitionerClass(TotalOrderPartitioner.class);
1534    // verify that TotalOrderPartitioner.setPartitionFile() is called.
1535    String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path");
1536    Assert.assertNotNull(partitionPathString);
1537    // Make sure the partion file is in foo1234's home directory, and that
1538    // the file exists.
1539    Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
1540    Assert.assertTrue(fs.exists(new Path(partitionPathString)));
1541  }
1542
1543  @Test
1544  public void TestConfigureCompression() throws Exception {
1545    Configuration conf = new Configuration(this.util.getConfiguration());
1546    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1547    TaskAttemptContext context = null;
1548    Path dir = util.getDataTestDir("TestConfigureCompression");
1549    String hfileoutputformatCompression = "gz";
1550
1551    try {
1552      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1553      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1554
1555      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1556
1557      Job job = Job.getInstance(conf);
1558      FileOutputFormat.setOutputPath(job, dir);
1559      context = createTestTaskAttemptContext(job);
1560      HFileOutputFormat2 hof = new HFileOutputFormat2();
1561      writer = hof.getRecordWriter(context);
1562      final byte[] b = Bytes.toBytes("b");
1563
1564      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1565      writer.write(new ImmutableBytesWritable(), kv);
1566      writer.close(context);
1567      writer = null;
1568      FileSystem fs = dir.getFileSystem(conf);
1569      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1570      while (iterator.hasNext()) {
1571        LocatedFileStatus keyFileStatus = iterator.next();
1572        HFile.Reader reader =
1573          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1574        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
1575          hfileoutputformatCompression);
1576      }
1577    } finally {
1578      if (writer != null && context != null) {
1579        writer.close(context);
1580      }
1581      dir.getFileSystem(conf).delete(dir, true);
1582    }
1583
1584  }
1585
1586  @Test
1587  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
1588    // Start cluster A
1589    util = new HBaseTestingUtil();
1590    Configuration confA = util.getConfiguration();
1591    int hostCount = 3;
1592    int regionNum = 20;
1593    String[] hostnames = new String[hostCount];
1594    for (int i = 0; i < hostCount; ++i) {
1595      hostnames[i] = "datanode_" + i;
1596    }
1597    StartTestingClusterOption option = StartTestingClusterOption.builder()
1598      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
1599    util.startMiniCluster(option);
1600
1601    // Start cluster B
1602    HBaseTestingUtil utilB = new HBaseTestingUtil();
1603    Configuration confB = utilB.getConfiguration();
1604    utilB.startMiniCluster(option);
1605
1606    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
1607
1608    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
1609    TableName tableName = TableName.valueOf("table");
1610    // Create table in cluster B
1611    try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
1612      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
1613      // Generate the bulk load files
1614      // Job has zookeeper configuration for cluster A
1615      // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
1616      Job job = new Job(confA, "testLocalMRIncrementalLoad");
1617      Configuration jobConf = job.getConfiguration();
1618      final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
1619      job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
1620      setupRandomGeneratorMapper(job, false);
1621      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
1622
1623      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1624        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
1625      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1626        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
1627      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1628        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
1629
1630      String bSpecificConfigKey = "my.override.config.for.b";
1631      String bSpecificConfigValue = "b-specific-value";
1632      jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey,
1633        bSpecificConfigValue);
1634
1635      FileOutputFormat.setOutputPath(job, testDir);
1636
1637      assertFalse(util.getTestFileSystem().exists(testDir));
1638
1639      assertTrue(job.waitForCompletion(true));
1640
1641      final List<Configuration> configs =
1642        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
1643
1644      assertFalse(configs.isEmpty());
1645      for (Configuration config : configs) {
1646        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1647          config.get(HConstants.ZOOKEEPER_QUORUM));
1648        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1649          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
1650        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1651          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
1652
1653        assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey));
1654      }
1655    } finally {
1656      utilB.deleteTable(tableName);
1657      testDir.getFileSystem(confA).delete(testDir, true);
1658      util.shutdownMiniCluster();
1659      utilB.shutdownMiniCluster();
1660    }
1661  }
1662
1663  private static class ConfigurationCaptorConnection implements Connection {
1664    private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
1665
1666    private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
1667
1668    private final Connection delegate;
1669
1670    public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user,
1671      Map<String, byte[]> connectionAttributes) throws IOException {
1672      delegate =
1673        FutureUtils.get(createAsyncConnection(conf, user, connectionAttributes)).toConnection();
1674
1675      final String uuid = conf.get(UUID_KEY);
1676      if (uuid != null) {
1677        confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
1678      }
1679    }
1680
1681    static UUID configureConnectionImpl(Configuration conf) {
1682      conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
1683        ConfigurationCaptorConnection.class, Connection.class);
1684
1685      final UUID uuid = UUID.randomUUID();
1686      conf.set(UUID_KEY, uuid.toString());
1687      return uuid;
1688    }
1689
1690    static List<Configuration> getCapturedConfigarutions(UUID key) {
1691      return confs.get(key);
1692    }
1693
1694    @Override
1695    public Configuration getConfiguration() {
1696      return delegate.getConfiguration();
1697    }
1698
1699    @Override
1700    public Table getTable(TableName tableName) throws IOException {
1701      return delegate.getTable(tableName);
1702    }
1703
1704    @Override
1705    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
1706      return delegate.getTable(tableName, pool);
1707    }
1708
1709    @Override
1710    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
1711      return delegate.getBufferedMutator(tableName);
1712    }
1713
1714    @Override
1715    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
1716      return delegate.getBufferedMutator(params);
1717    }
1718
1719    @Override
1720    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
1721      return delegate.getRegionLocator(tableName);
1722    }
1723
1724    @Override
1725    public void clearRegionLocationCache() {
1726      delegate.clearRegionLocationCache();
1727    }
1728
1729    @Override
1730    public Admin getAdmin() throws IOException {
1731      return delegate.getAdmin();
1732    }
1733
1734    @Override
1735    public void close() throws IOException {
1736      delegate.close();
1737    }
1738
1739    @Override
1740    public boolean isClosed() {
1741      return delegate.isClosed();
1742    }
1743
1744    @Override
1745    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
1746      return delegate.getTableBuilder(tableName, pool);
1747    }
1748
1749    @Override
1750    public AsyncConnection toAsyncConnection() {
1751      return delegate.toAsyncConnection();
1752    }
1753
1754    @Override
1755    public String getClusterId() {
1756      return delegate.getClusterId();
1757    }
1758
1759    @Override
1760    public Hbck getHbck() throws IOException {
1761      return delegate.getHbck();
1762    }
1763
1764    @Override
1765    public Hbck getHbck(ServerName masterServer) throws IOException {
1766      return delegate.getHbck(masterServer);
1767    }
1768
1769    @Override
1770    public void abort(String why, Throwable e) {
1771      delegate.abort(why, e);
1772    }
1773
1774    @Override
1775    public boolean isAborted() {
1776      return delegate.isAborted();
1777    }
1778  }
1779
1780}