001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027import static org.mockito.Mockito.verify;
028
029import java.io.IOException;
030import java.lang.reflect.Field;
031import java.security.PrivilegedAction;
032import java.util.ArrayList;
033import java.util.Arrays;
034import java.util.HashMap;
035import java.util.LinkedList;
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Random;
040import java.util.Set;
041import java.util.UUID;
042import java.util.concurrent.Callable;
043import java.util.concurrent.ConcurrentHashMap;
044import java.util.concurrent.CopyOnWriteArrayList;
045import java.util.concurrent.ExecutorService;
046import java.util.concurrent.ThreadLocalRandom;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.FileStatus;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.LocatedFileStatus;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.RemoteIterator;
055import org.apache.hadoop.hbase.ArrayBackedTag;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellUtil;
058import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
059import org.apache.hadoop.hbase.ExtendedCell;
060import org.apache.hadoop.hbase.HBaseClassTestRule;
061import org.apache.hadoop.hbase.HBaseConfiguration;
062import org.apache.hadoop.hbase.HBaseTestingUtil;
063import org.apache.hadoop.hbase.HConstants;
064import org.apache.hadoop.hbase.HDFSBlocksDistribution;
065import org.apache.hadoop.hbase.HadoopShims;
066import org.apache.hadoop.hbase.KeyValue;
067import org.apache.hadoop.hbase.PerformanceEvaluation;
068import org.apache.hadoop.hbase.PrivateCellUtil;
069import org.apache.hadoop.hbase.ServerName;
070import org.apache.hadoop.hbase.StartTestingClusterOption;
071import org.apache.hadoop.hbase.TableName;
072import org.apache.hadoop.hbase.Tag;
073import org.apache.hadoop.hbase.TagType;
074import org.apache.hadoop.hbase.client.Admin;
075import org.apache.hadoop.hbase.client.AsyncConnection;
076import org.apache.hadoop.hbase.client.BufferedMutator;
077import org.apache.hadoop.hbase.client.BufferedMutatorParams;
078import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
079import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
080import org.apache.hadoop.hbase.client.Connection;
081import org.apache.hadoop.hbase.client.ConnectionFactory;
082import org.apache.hadoop.hbase.client.ConnectionRegistry;
083import org.apache.hadoop.hbase.client.ConnectionUtils;
084import org.apache.hadoop.hbase.client.Hbck;
085import org.apache.hadoop.hbase.client.Put;
086import org.apache.hadoop.hbase.client.RegionLocator;
087import org.apache.hadoop.hbase.client.Result;
088import org.apache.hadoop.hbase.client.ResultScanner;
089import org.apache.hadoop.hbase.client.Scan;
090import org.apache.hadoop.hbase.client.Table;
091import org.apache.hadoop.hbase.client.TableBuilder;
092import org.apache.hadoop.hbase.client.TableDescriptor;
093import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
094import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
095import org.apache.hadoop.hbase.io.compress.Compression;
096import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
097import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
098import org.apache.hadoop.hbase.io.hfile.CacheConfig;
099import org.apache.hadoop.hbase.io.hfile.HFile;
100import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
101import org.apache.hadoop.hbase.io.hfile.HFileScanner;
102import org.apache.hadoop.hbase.regionserver.BloomType;
103import org.apache.hadoop.hbase.regionserver.HRegion;
104import org.apache.hadoop.hbase.regionserver.HStore;
105import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
106import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
107import org.apache.hadoop.hbase.security.User;
108import org.apache.hadoop.hbase.testclassification.LargeTests;
109import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
110import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
111import org.apache.hadoop.hbase.util.Bytes;
112import org.apache.hadoop.hbase.util.CommonFSUtils;
113import org.apache.hadoop.hbase.util.FSUtils;
114import org.apache.hadoop.hbase.util.FutureUtils;
115import org.apache.hadoop.hbase.util.ReflectionUtils;
116import org.apache.hadoop.hdfs.DistributedFileSystem;
117import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
118import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
119import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
120import org.apache.hadoop.io.NullWritable;
121import org.apache.hadoop.mapreduce.Job;
122import org.apache.hadoop.mapreduce.Mapper;
123import org.apache.hadoop.mapreduce.RecordWriter;
124import org.apache.hadoop.mapreduce.TaskAttemptContext;
125import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
126import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
127import org.apache.hadoop.security.UserGroupInformation;
128import org.junit.Assert;
129import org.junit.ClassRule;
130import org.junit.Ignore;
131import org.junit.Test;
132import org.junit.experimental.categories.Category;
133import org.mockito.Mockito;
134import org.slf4j.Logger;
135import org.slf4j.LoggerFactory;
136
137/**
138 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
139 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
140 * values like those of {@link PerformanceEvaluation}.
141 */
142@Category({ VerySlowMapReduceTests.class, LargeTests.class })
143public class TestHFileOutputFormat2 {
144
145  @ClassRule
146  public static final HBaseClassTestRule CLASS_RULE =
147    HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
148
149  private final static int ROWSPERSPLIT = 1024;
150
151  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
152  private static final byte[][] FAMILIES =
153    { Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B")) };
154  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2", "TestTable3")
155    .map(TableName::valueOf).toArray(TableName[]::new);
156
157  private HBaseTestingUtil util = new HBaseTestingUtil();
158
159  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
160
161  /**
162   * Simple mapper that makes KeyValue output.
163   */
164  static class RandomKVGeneratingMapper
165    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell> {
166
167    private int keyLength;
168    private static final int KEYLEN_DEFAULT = 10;
169    private static final String KEYLEN_CONF = "randomkv.key.length";
170
171    private int valLength;
172    private static final int VALLEN_DEFAULT = 10;
173    private static final String VALLEN_CONF = "randomkv.val.length";
174    private static final byte[] QUALIFIER = Bytes.toBytes("data");
175    private boolean multiTableMapper = false;
176    private TableName[] tables = null;
177
178    @Override
179    protected void setup(Context context) throws IOException, InterruptedException {
180      super.setup(context);
181
182      Configuration conf = context.getConfiguration();
183      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
184      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
185      multiTableMapper =
186        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
187      if (multiTableMapper) {
188        tables = TABLE_NAMES;
189      } else {
190        tables = new TableName[] { TABLE_NAMES[0] };
191      }
192    }
193
194    @Override
195    protected void map(NullWritable n1, NullWritable n2,
196      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Cell>.Context context)
197      throws java.io.IOException, InterruptedException {
198
199      byte keyBytes[] = new byte[keyLength];
200      byte valBytes[] = new byte[valLength];
201
202      int taskId = context.getTaskAttemptID().getTaskID().getId();
203      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
204      byte[] key;
205      for (int j = 0; j < tables.length; ++j) {
206        for (int i = 0; i < ROWSPERSPLIT; i++) {
207          Bytes.random(keyBytes);
208          // Ensure that unique tasks generate unique keys
209          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
210          Bytes.random(valBytes);
211          key = keyBytes;
212          if (multiTableMapper) {
213            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
214          }
215
216          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
217            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
218            context.write(new ImmutableBytesWritable(key), kv);
219          }
220        }
221      }
222    }
223  }
224
225  /**
226   * Simple mapper that makes Put output.
227   */
228  static class RandomPutGeneratingMapper
229    extends Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put> {
230
231    private int keyLength;
232    private static final int KEYLEN_DEFAULT = 10;
233    private static final String KEYLEN_CONF = "randomkv.key.length";
234
235    private int valLength;
236    private static final int VALLEN_DEFAULT = 10;
237    private static final String VALLEN_CONF = "randomkv.val.length";
238    private static final byte[] QUALIFIER = Bytes.toBytes("data");
239    private boolean multiTableMapper = false;
240    private TableName[] tables = null;
241
242    @Override
243    protected void setup(Context context) throws IOException, InterruptedException {
244      super.setup(context);
245
246      Configuration conf = context.getConfiguration();
247      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
248      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
249      multiTableMapper =
250        conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY, false);
251      if (multiTableMapper) {
252        tables = TABLE_NAMES;
253      } else {
254        tables = new TableName[] { TABLE_NAMES[0] };
255      }
256    }
257
258    @Override
259    protected void map(NullWritable n1, NullWritable n2,
260      Mapper<NullWritable, NullWritable, ImmutableBytesWritable, Put>.Context context)
261      throws java.io.IOException, InterruptedException {
262
263      byte keyBytes[] = new byte[keyLength];
264      byte valBytes[] = new byte[valLength];
265
266      int taskId = context.getTaskAttemptID().getTaskID().getId();
267      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
268
269      byte[] key;
270      for (int j = 0; j < tables.length; ++j) {
271        for (int i = 0; i < ROWSPERSPLIT; i++) {
272          Bytes.random(keyBytes);
273          // Ensure that unique tasks generate unique keys
274          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
275          Bytes.random(valBytes);
276          key = keyBytes;
277          if (multiTableMapper) {
278            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
279          }
280
281          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
282            Put p = new Put(keyBytes);
283            p.addColumn(family, QUALIFIER, valBytes);
284            // set TTL to very low so that the scan does not return any value
285            p.setTTL(1l);
286            context.write(new ImmutableBytesWritable(key), p);
287          }
288        }
289      }
290    }
291  }
292
293  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
294    if (putSortReducer) {
295      job.setInputFormatClass(NMapInputFormat.class);
296      job.setMapperClass(RandomPutGeneratingMapper.class);
297      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
298      job.setMapOutputValueClass(Put.class);
299    } else {
300      job.setInputFormatClass(NMapInputFormat.class);
301      job.setMapperClass(RandomKVGeneratingMapper.class);
302      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
303      job.setMapOutputValueClass(KeyValue.class);
304    }
305  }
306
307  /**
308   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
309   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
310   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
311   */
312  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
313  @Test
314  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
315    Configuration conf = new Configuration(this.util.getConfiguration());
316    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
317    TaskAttemptContext context = null;
318    Path dir = util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
319    try {
320      Job job = new Job(conf);
321      FileOutputFormat.setOutputPath(job, dir);
322      context = createTestTaskAttemptContext(job);
323      HFileOutputFormat2 hof = new HFileOutputFormat2();
324      writer = hof.getRecordWriter(context);
325      final byte[] b = Bytes.toBytes("b");
326
327      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
328      // changed by call to write. Check all in kv is same but ts.
329      KeyValue kv = new KeyValue(b, b, b);
330      KeyValue original = kv.clone();
331      writer.write(new ImmutableBytesWritable(), kv);
332      assertFalse(original.equals(kv));
333      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
334      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
335      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
336      assertNotSame(original.getTimestamp(), kv.getTimestamp());
337      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
338
339      // Test 2. Now test passing a kv that has explicit ts. It should not be
340      // changed by call to record write.
341      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
342      original = kv.clone();
343      writer.write(new ImmutableBytesWritable(), kv);
344      assertTrue(original.equals(kv));
345    } finally {
346      if (writer != null && context != null) writer.close(context);
347      dir.getFileSystem(conf).delete(dir, true);
348    }
349  }
350
351  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
352    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
353    TaskAttemptContext context =
354      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
355    return context;
356  }
357
358  /*
359   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
360   * time-restricted scans.
361   */
362  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
363  @Test
364  public void test_TIMERANGE() throws Exception {
365    Configuration conf = new Configuration(this.util.getConfiguration());
366    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
367    TaskAttemptContext context = null;
368    Path dir = util.getDataTestDir("test_TIMERANGE_present");
369    LOG.info("Timerange dir writing to dir: " + dir);
370    try {
371      // build a record writer using HFileOutputFormat2
372      Job job = new Job(conf);
373      FileOutputFormat.setOutputPath(job, dir);
374      context = createTestTaskAttemptContext(job);
375      HFileOutputFormat2 hof = new HFileOutputFormat2();
376      writer = hof.getRecordWriter(context);
377
378      // Pass two key values with explicit times stamps
379      final byte[] b = Bytes.toBytes("b");
380
381      // value 1 with timestamp 2000
382      KeyValue kv = new KeyValue(b, b, b, 2000, b);
383      KeyValue original = kv.clone();
384      writer.write(new ImmutableBytesWritable(), kv);
385      assertEquals(original, kv);
386
387      // value 2 with timestamp 1000
388      kv = new KeyValue(b, b, b, 1000, b);
389      original = kv.clone();
390      writer.write(new ImmutableBytesWritable(), kv);
391      assertEquals(original, kv);
392
393      // verify that the file has the proper FileInfo.
394      writer.close(context);
395
396      // the generated file lives 1 directory down from the attempt directory
397      // and is the only file, e.g.
398      // _attempt__0000_r_000000_0/b/1979617994050536795
399      FileSystem fs = FileSystem.get(conf);
400      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
401      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
402      FileStatus[] file = fs.listStatus(sub1[0].getPath());
403
404      // open as HFile Reader and pull out TIMERANGE FileInfo.
405      HFile.Reader rd =
406        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
407      Map<byte[], byte[]> finfo = rd.getHFileInfo();
408      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
409      assertNotNull(range);
410
411      // unmarshall and check values.
412      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
413      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
414      assertEquals(1000, timeRangeTracker.getMin());
415      assertEquals(2000, timeRangeTracker.getMax());
416      rd.close();
417    } finally {
418      if (writer != null && context != null) writer.close(context);
419      dir.getFileSystem(conf).delete(dir, true);
420    }
421  }
422
423  /**
424   * Run small MR job.
425   */
426  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
427  @Test
428  public void testWritingPEData() throws Exception {
429    Configuration conf = util.getConfiguration();
430    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
431    FileSystem fs = testDir.getFileSystem(conf);
432
433    // Set down this value or we OOME in eclipse.
434    conf.setInt("mapreduce.task.io.sort.mb", 20);
435    // Write a few files.
436    long hregionMaxFilesize = 10 * 1024;
437    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
438
439    Job job = new Job(conf, "testWritingPEData");
440    setupRandomGeneratorMapper(job, false);
441    // This partitioner doesn't work well for number keys but using it anyways
442    // just to demonstrate how to configure it.
443    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
444    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
445
446    Arrays.fill(startKey, (byte) 0);
447    Arrays.fill(endKey, (byte) 0xff);
448
449    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
450    // Set start and end rows for partitioner.
451    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
452    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
453    job.setReducerClass(CellSortReducer.class);
454    job.setOutputFormatClass(HFileOutputFormat2.class);
455    job.setNumReduceTasks(4);
456    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
457      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
458      CellSerialization.class.getName());
459
460    FileOutputFormat.setOutputPath(job, testDir);
461    assertTrue(job.waitForCompletion(false));
462    FileStatus[] files = fs.listStatus(testDir);
463    assertTrue(files.length > 0);
464
465    // check output file num and size.
466    for (byte[] family : FAMILIES) {
467      long kvCount = 0;
468      RemoteIterator<LocatedFileStatus> iterator =
469        fs.listFiles(testDir.suffix("/" + new String(family)), true);
470      while (iterator.hasNext()) {
471        LocatedFileStatus keyFileStatus = iterator.next();
472        HFile.Reader reader =
473          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
474        HFileScanner scanner = reader.getScanner(conf, false, false, false);
475
476        kvCount += reader.getEntries();
477        scanner.seekTo();
478        long perKVSize = scanner.getCell().getSerializedSize();
479        assertTrue("Data size of each file should not be too large.",
480          perKVSize * reader.getEntries() <= hregionMaxFilesize);
481      }
482      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
483    }
484  }
485
486  /**
487   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
488   */
489  @Test
490  public void test_WritingTagData() throws Exception {
491    Configuration conf = new Configuration(this.util.getConfiguration());
492    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
493    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
494    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
495    TaskAttemptContext context = null;
496    Path dir = util.getDataTestDir("WritingTagData");
497    try {
498      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
499      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
500      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
501      Job job = new Job(conf);
502      FileOutputFormat.setOutputPath(job, dir);
503      context = createTestTaskAttemptContext(job);
504      HFileOutputFormat2 hof = new HFileOutputFormat2();
505      writer = hof.getRecordWriter(context);
506      final byte[] b = Bytes.toBytes("b");
507
508      List<Tag> tags = new ArrayList<>();
509      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
510      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
511      writer.write(new ImmutableBytesWritable(), kv);
512      writer.close(context);
513      writer = null;
514      FileSystem fs = dir.getFileSystem(conf);
515      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
516      while (iterator.hasNext()) {
517        LocatedFileStatus keyFileStatus = iterator.next();
518        HFile.Reader reader =
519          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
520        HFileScanner scanner = reader.getScanner(conf, false, false, false);
521        scanner.seekTo();
522        ExtendedCell cell = scanner.getCell();
523        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
524        assertTrue(tagsFromCell.size() > 0);
525        for (Tag tag : tagsFromCell) {
526          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
527        }
528      }
529    } finally {
530      if (writer != null && context != null) writer.close(context);
531      dir.getFileSystem(conf).delete(dir, true);
532    }
533  }
534
535  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
536  @Test
537  public void testJobConfiguration() throws Exception {
538    Configuration conf = new Configuration(this.util.getConfiguration());
539    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
540      util.getDataTestDir("testJobConfiguration").toString());
541    Job job = new Job(conf);
542    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
543    Table table = Mockito.mock(Table.class);
544    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
545    setupMockStartKeys(regionLocator);
546    setupMockTableName(regionLocator);
547    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
548    assertEquals(job.getNumReduceTasks(), 4);
549  }
550
551  private byte[][] generateRandomStartKeys(int numKeys) {
552    Random random = ThreadLocalRandom.current();
553    byte[][] ret = new byte[numKeys][];
554    // first region start key is always empty
555    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
556    for (int i = 1; i < numKeys; i++) {
557      ret[i] =
558        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
559    }
560    return ret;
561  }
562
563  private byte[][] generateRandomSplitKeys(int numKeys) {
564    Random random = ThreadLocalRandom.current();
565    byte[][] ret = new byte[numKeys][];
566    for (int i = 0; i < numKeys; i++) {
567      ret[i] =
568        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
569    }
570    return ret;
571  }
572
573  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
574  @Test
575  public void testMRIncrementalLoad() throws Exception {
576    LOG.info("\nStarting test testMRIncrementalLoad\n");
577    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
578  }
579
580  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
581  @Test
582  public void testMRIncrementalLoadWithSplit() throws Exception {
583    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
584    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
585  }
586
587  /**
588   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true This test could only check the
589   * correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY is set to true. Because
590   * MiniHBaseCluster always run with single hostname (and different ports), it's not possible to
591   * check the region locality by comparing region locations and DN hostnames. When MiniHBaseCluster
592   * supports explicit hostnames parameter (just like MiniDFSCluster does), we could test region
593   * locality features more easily.
594   */
595  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
596  @Test
597  public void testMRIncrementalLoadWithLocality() throws Exception {
598    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
599    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
600    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
601  }
602
603  // @Ignore("Wahtevs")
604  @Test
605  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
606    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
607    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
608  }
609
610  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
611    boolean putSortReducer, String tableStr) throws Exception {
612    doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
613      Arrays.asList(tableStr));
614  }
615
616  @Test
617  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
618    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
619    doIncrementalLoadTest(false, false, true,
620      Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList()));
621  }
622
623  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
624    boolean putSortReducer, List<String> tableStr) throws Exception {
625    util = new HBaseTestingUtil();
626    Configuration conf = util.getConfiguration();
627    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
628    int hostCount = 1;
629    int regionNum = 5;
630    if (shouldKeepLocality) {
631      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
632      // explicit hostnames parameter just like MiniDFSCluster does.
633      hostCount = 3;
634      regionNum = 20;
635    }
636
637    String[] hostnames = new String[hostCount];
638    for (int i = 0; i < hostCount; ++i) {
639      hostnames[i] = "datanode_" + i;
640    }
641    StartTestingClusterOption option = StartTestingClusterOption.builder()
642      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
643    util.startMiniCluster(option);
644
645    Map<String, Table> allTables = new HashMap<>(tableStr.size());
646    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
647    boolean writeMultipleTables = tableStr.size() > 1;
648    for (String tableStrSingle : tableStr) {
649      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
650      TableName tableName = TableName.valueOf(tableStrSingle);
651      Table table = util.createTable(tableName, FAMILIES, splitKeys);
652
653      RegionLocator r = util.getConnection().getRegionLocator(tableName);
654      assertEquals("Should start with empty table", 0, util.countRows(table));
655      int numRegions = r.getStartKeys().length;
656      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
657
658      allTables.put(tableStrSingle, table);
659      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r));
660    }
661    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
662    // Generate the bulk load files
663    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
664    if (writeMultipleTables) {
665      testDir = new Path(testDir, "default");
666    }
667
668    for (Table tableSingle : allTables.values()) {
669      // This doesn't write into the table, just makes files
670      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
671    }
672    int numTableDirs = 0;
673    FileStatus[] fss = testDir.getFileSystem(conf).listStatus(testDir);
674    for (FileStatus tf : fss) {
675      Path tablePath = testDir;
676      if (writeMultipleTables) {
677        if (allTables.containsKey(tf.getPath().getName())) {
678          ++numTableDirs;
679          tablePath = tf.getPath();
680        } else {
681          continue;
682        }
683      }
684
685      // Make sure that a directory was created for every CF
686      int dir = 0;
687      fss = tablePath.getFileSystem(conf).listStatus(tablePath);
688      for (FileStatus f : fss) {
689        for (byte[] family : FAMILIES) {
690          if (Bytes.toString(family).equals(f.getPath().getName())) {
691            ++dir;
692          }
693        }
694      }
695      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
696    }
697    if (writeMultipleTables) {
698      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
699    }
700
701    Admin admin = util.getConnection().getAdmin();
702    try {
703      // handle the split case
704      if (shouldChangeRegions) {
705        Table chosenTable = allTables.values().iterator().next();
706        // Choose a semi-random table if multiple tables are available
707        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
708        admin.disableTable(chosenTable.getName());
709        util.waitUntilNoRegionsInTransition();
710
711        util.deleteTable(chosenTable.getName());
712        byte[][] newSplitKeys = generateRandomSplitKeys(14);
713        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
714
715        while (
716          util.getConnection().getRegionLocator(chosenTable.getName()).getAllRegionLocations()
717            .size() != 15 || !admin.isTableAvailable(table.getName())
718        ) {
719          Thread.sleep(200);
720          LOG.info("Waiting for new region assignment to happen");
721        }
722      }
723
724      // Perform the actual load
725      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
726        Path tableDir = testDir;
727        String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
728        LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
729        if (writeMultipleTables) {
730          tableDir = new Path(testDir, tableNameStr);
731        }
732        Table currentTable = allTables.get(tableNameStr);
733        TableName currentTableName = currentTable.getName();
734        BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir);
735
736        // Ensure data shows up
737        int expectedRows = 0;
738        if (putSortReducer) {
739          // no rows should be extracted
740          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
741            util.countRows(currentTable));
742        } else {
743          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
744          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
745            util.countRows(currentTable));
746          Scan scan = new Scan();
747          ResultScanner results = currentTable.getScanner(scan);
748          for (Result res : results) {
749            assertEquals(FAMILIES.length, res.rawCells().length);
750            Cell first = res.rawCells()[0];
751            for (Cell kv : res.rawCells()) {
752              assertTrue(CellUtil.matchingRows(first, kv));
753              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
754            }
755          }
756          results.close();
757        }
758        String tableDigestBefore = util.checksumRows(currentTable);
759        // Check region locality
760        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
761        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
762          hbd.add(region.getHDFSBlocksDistribution());
763        }
764        for (String hostname : hostnames) {
765          float locality = hbd.getBlockLocalityIndex(hostname);
766          LOG.info("locality of [" + hostname + "]: " + locality);
767          assertEquals(100, (int) (locality * 100));
768        }
769
770        // Cause regions to reopen
771        admin.disableTable(currentTableName);
772        while (!admin.isTableDisabled(currentTableName)) {
773          Thread.sleep(200);
774          LOG.info("Waiting for table to disable");
775        }
776        admin.enableTable(currentTableName);
777        util.waitTableAvailable(currentTableName);
778        assertEquals("Data should remain after reopening of regions", tableDigestBefore,
779          util.checksumRows(currentTable));
780      }
781    } finally {
782      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
783        tableInfoSingle.getRegionLocator().close();
784      }
785      for (Entry<String, Table> singleTable : allTables.entrySet()) {
786        singleTable.getValue().close();
787        util.deleteTable(singleTable.getValue().getName());
788      }
789      testDir.getFileSystem(conf).delete(testDir, true);
790      util.shutdownMiniCluster();
791    }
792  }
793
794  private void runIncrementalPELoad(Configuration conf,
795    List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir, boolean putSortReducer)
796    throws IOException, InterruptedException, ClassNotFoundException {
797    Job job = new Job(conf, "testLocalMRIncrementalLoad");
798    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
799    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
800      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
801      CellSerialization.class.getName());
802    setupRandomGeneratorMapper(job, putSortReducer);
803    if (tableInfo.size() > 1) {
804      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
805      int sum = 0;
806      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
807        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
808      }
809      assertEquals(sum, job.getNumReduceTasks());
810    } else {
811      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
812      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(),
813        regionLocator);
814      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
815    }
816
817    FileOutputFormat.setOutputPath(job, outDir);
818
819    assertFalse(util.getTestFileSystem().exists(outDir));
820
821    assertTrue(job.waitForCompletion(true));
822  }
823
824  /**
825   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
826   * family compression map is correctly serialized into and deserialized from configuration
827   */
828  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
829  @Test
830  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
831    for (int numCfs = 0; numCfs <= 3; numCfs++) {
832      Configuration conf = new Configuration(this.util.getConfiguration());
833      Map<String, Compression.Algorithm> familyToCompression =
834        getMockColumnFamiliesForCompression(numCfs);
835      Table table = Mockito.mock(Table.class);
836      setupMockColumnFamiliesForCompression(table, familyToCompression);
837      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
838        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
839          Arrays.asList(table.getDescriptor())));
840
841      // read back family specific compression setting from the configuration
842      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
843        HFileOutputFormat2.createFamilyCompressionMap(conf);
844
845      // test that we have a value for all column families that matches with the
846      // used mock values
847      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
848        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(),
849          entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())));
850      }
851    }
852  }
853
854  private void setupMockColumnFamiliesForCompression(Table table,
855    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
856
857    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
858    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
859      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
860        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
861        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
862
863      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
864    }
865    Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor();
866  }
867
868  /**
869   * @return a map from column family names to compression algorithms for testing column family
870   *         compression. Column family names have special characters
871   */
872  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
873    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
874    // use column family names having special characters
875    if (numCfs-- > 0) {
876      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
877    }
878    if (numCfs-- > 0) {
879      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
880    }
881    if (numCfs-- > 0) {
882      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
883    }
884    if (numCfs-- > 0) {
885      familyToCompression.put("Family3", Compression.Algorithm.NONE);
886    }
887    return familyToCompression;
888  }
889
890  /**
891   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the
892   * family bloom type map is correctly serialized into and deserialized from configuration
893   */
894  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
895  @Test
896  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
897    for (int numCfs = 0; numCfs <= 2; numCfs++) {
898      Configuration conf = new Configuration(this.util.getConfiguration());
899      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
900      Table table = Mockito.mock(Table.class);
901      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
902      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
903        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
904          Arrays.asList(table.getDescriptor())));
905
906      // read back family specific data block encoding settings from the
907      // configuration
908      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
909        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
910
911      // test that we have a value for all column families that matches with the
912      // used mock values
913      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
914        assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(),
915          entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())));
916      }
917    }
918  }
919
920  private void setupMockColumnFamiliesForBloomType(Table table,
921    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
922    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
923    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
924      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
925        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
926        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
927      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
928    }
929    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
930  }
931
932  /**
933   * @return a map from column family names to compression algorithms for testing column family
934   *         compression. Column family names have special characters
935   */
936  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
937    Map<String, BloomType> familyToBloomType = new HashMap<>();
938    // use column family names having special characters
939    if (numCfs-- > 0) {
940      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
941    }
942    if (numCfs-- > 0) {
943      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
944    }
945    if (numCfs-- > 0) {
946      familyToBloomType.put("Family3", BloomType.NONE);
947    }
948    return familyToBloomType;
949  }
950
951  /**
952   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the
953   * family block size map is correctly serialized into and deserialized from configuration
954   */
955  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
956  @Test
957  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
958    for (int numCfs = 0; numCfs <= 3; numCfs++) {
959      Configuration conf = new Configuration(this.util.getConfiguration());
960      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
961      Table table = Mockito.mock(Table.class);
962      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
963      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
964        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
965          Arrays.asList(table.getDescriptor())));
966
967      // read back family specific data block encoding settings from the
968      // configuration
969      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
970        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
971
972      // test that we have a value for all column families that matches with the
973      // used mock values
974      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
975        assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(),
976          entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())));
977      }
978    }
979  }
980
981  private void setupMockColumnFamiliesForBlockSize(Table table,
982    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
983    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
984    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
985      ColumnFamilyDescriptor columnFamilyDescriptor =
986        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
987          .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
988      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
989    }
990    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
991  }
992
993  /**
994   * @return a map from column family names to compression algorithms for testing column family
995   *         compression. Column family names have special characters
996   */
997  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
998    Map<String, Integer> familyToBlockSize = new HashMap<>();
999    // use column family names having special characters
1000    if (numCfs-- > 0) {
1001      familyToBlockSize.put("Family1!@#!@#&", 1234);
1002    }
1003    if (numCfs-- > 0) {
1004      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
1005    }
1006    if (numCfs-- > 0) {
1007      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
1008    }
1009    if (numCfs-- > 0) {
1010      familyToBlockSize.put("Family3", 0);
1011    }
1012    return familyToBlockSize;
1013  }
1014
1015  /**
1016   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that
1017   * the family data block encoding map is correctly serialized into and deserialized from
1018   * configuration
1019   */
1020  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1021  @Test
1022  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1023    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1024      Configuration conf = new Configuration(this.util.getConfiguration());
1025      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1026        getMockColumnFamiliesForDataBlockEncoding(numCfs);
1027      Table table = Mockito.mock(Table.class);
1028      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
1029      TableDescriptor tableDescriptor = table.getDescriptor();
1030      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1031        HFileOutputFormat2.serializeColumnFamilyAttribute(
1032          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
1033
1034      // read back family specific data block encoding settings from the
1035      // configuration
1036      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1037        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
1038
1039      // test that we have a value for all column families that matches with the
1040      // used mock values
1041      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1042        assertEquals(
1043          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(),
1044          entry.getValue(),
1045          retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())));
1046      }
1047    }
1048  }
1049
1050  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1051    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1052    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1053    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1054      ColumnFamilyDescriptor columnFamilyDescriptor =
1055        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
1056          .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)
1057          .build();
1058      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
1059    }
1060    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
1061  }
1062
1063  /**
1064   * @return a map from column family names to compression algorithms for testing column family
1065   *         compression. Column family names have special characters
1066   */
1067  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
1068    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1069    // use column family names having special characters
1070    if (numCfs-- > 0) {
1071      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1072    }
1073    if (numCfs-- > 0) {
1074      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
1075    }
1076    if (numCfs-- > 0) {
1077      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
1078    }
1079    if (numCfs-- > 0) {
1080      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1081    }
1082    return familyToDataBlockEncoding;
1083  }
1084
1085  private void setupMockStartKeys(RegionLocator table) throws IOException {
1086    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
1087      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
1088    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1089  }
1090
1091  private void setupMockTableName(RegionLocator table) throws IOException {
1092    TableName mockTableName = TableName.valueOf("mock_table");
1093    Mockito.doReturn(mockTableName).when(table).getName();
1094  }
1095
1096  /**
1097   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
1098   * from the column family descriptor
1099   */
1100  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1101  @Test
1102  public void testColumnFamilySettings() throws Exception {
1103    Configuration conf = new Configuration(this.util.getConfiguration());
1104    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1105    TaskAttemptContext context = null;
1106    Path dir = util.getDataTestDir("testColumnFamilySettings");
1107
1108    // Setup table descriptor
1109    Table table = Mockito.mock(Table.class);
1110    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1111    TableDescriptorBuilder tableDescriptorBuilder =
1112      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1113
1114    Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor();
1115    for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) {
1116      tableDescriptorBuilder.setColumnFamily(hcd);
1117    }
1118
1119    // set up the table to return some mock keys
1120    setupMockStartKeys(regionLocator);
1121
1122    try {
1123      // partial map red setup to get an operational writer for testing
1124      // We turn off the sequence file compression, because DefaultCodec
1125      // pollutes the GZip codec pool with an incompatible compressor.
1126      conf.set("io.seqfile.compression.type", "NONE");
1127      conf.set("hbase.fs.tmp.dir", dir.toString());
1128      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1129      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1130
1131      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1132      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1133      setupRandomGeneratorMapper(job, false);
1134      HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
1135      FileOutputFormat.setOutputPath(job, dir);
1136      context = createTestTaskAttemptContext(job);
1137      HFileOutputFormat2 hof = new HFileOutputFormat2();
1138      writer = hof.getRecordWriter(context);
1139
1140      // write out random rows
1141      writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(),
1142        ROWSPERSPLIT);
1143      writer.close(context);
1144
1145      // Make sure that a directory was created for every CF
1146      FileSystem fs = dir.getFileSystem(conf);
1147
1148      // commit so that the filesystem has one directory per column family
1149      hof.getOutputCommitter(context).commitTask(context);
1150      hof.getOutputCommitter(context).commitJob(context);
1151      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1152      assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length);
1153      for (FileStatus f : families) {
1154        String familyStr = f.getPath().getName();
1155        ColumnFamilyDescriptor hcd =
1156          tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr));
1157        // verify that the compression on this file matches the configured
1158        // compression
1159        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1160        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1161        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1162
1163        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1164        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1165        assertEquals(
1166          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")",
1167          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1168        assertEquals(
1169          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1170          hcd.getCompressionType(), reader.getFileContext().getCompression());
1171      }
1172    } finally {
1173      dir.getFileSystem(conf).delete(dir, true);
1174    }
1175  }
1176
1177  /**
1178   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
1179   * family descriptors
1180   */
1181  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1182    TaskAttemptContext context, Set<byte[]> families, int numRows)
1183    throws IOException, InterruptedException {
1184    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1185    int valLength = 10;
1186    byte valBytes[] = new byte[valLength];
1187
1188    int taskId = context.getTaskAttemptID().getTaskID().getId();
1189    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1190    final byte[] qualifier = Bytes.toBytes("data");
1191    for (int i = 0; i < numRows; i++) {
1192      Bytes.putInt(keyBytes, 0, i);
1193      Bytes.random(valBytes);
1194      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1195      for (byte[] family : families) {
1196        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1197        writer.write(key, kv);
1198      }
1199    }
1200  }
1201
1202  /**
1203   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
1204   * excluded from minor compaction. Without the fix of HBASE-6901, an
1205   * ArrayIndexOutOfBoundsException will be thrown.
1206   */
1207  @Ignore("Flakey: See HBASE-9051")
1208  @Test
1209  public void testExcludeAllFromMinorCompaction() throws Exception {
1210    Configuration conf = util.getConfiguration();
1211    conf.setInt("hbase.hstore.compaction.min", 2);
1212    generateRandomStartKeys(5);
1213
1214    util.startMiniCluster();
1215    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
1216      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1217      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1218      final FileSystem fs = util.getDFSCluster().getFileSystem();
1219      assertEquals("Should start with empty table", 0, util.countRows(table));
1220
1221      // deep inspection: get the StoreFile dir
1222      final Path storePath =
1223        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1224          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1225            Bytes.toString(FAMILIES[0])));
1226      assertEquals(0, fs.listStatus(storePath).length);
1227
1228      // Generate two bulk load files
1229      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1230
1231      for (int i = 0; i < 2; i++) {
1232        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1233        runIncrementalPELoad(conf,
1234          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(),
1235            conn.getRegionLocator(TABLE_NAMES[0]))),
1236          testDir, false);
1237        // Perform the actual load
1238        BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1239      }
1240
1241      // Ensure data shows up
1242      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1243      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
1244        util.countRows(table));
1245
1246      // should have a second StoreFile now
1247      assertEquals(2, fs.listStatus(storePath).length);
1248
1249      // minor compactions shouldn't get rid of the file
1250      admin.compact(TABLE_NAMES[0]);
1251      try {
1252        quickPoll(new Callable<Boolean>() {
1253          @Override
1254          public Boolean call() throws Exception {
1255            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1256            for (HRegion region : regions) {
1257              for (HStore store : region.getStores()) {
1258                store.closeAndArchiveCompactedFiles();
1259              }
1260            }
1261            return fs.listStatus(storePath).length == 1;
1262          }
1263        }, 5000);
1264        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1265      } catch (AssertionError ae) {
1266        // this is expected behavior
1267      }
1268
1269      // a major compaction should work though
1270      admin.majorCompact(TABLE_NAMES[0]);
1271      quickPoll(new Callable<Boolean>() {
1272        @Override
1273        public Boolean call() throws Exception {
1274          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1275          for (HRegion region : regions) {
1276            for (HStore store : region.getStores()) {
1277              store.closeAndArchiveCompactedFiles();
1278            }
1279          }
1280          return fs.listStatus(storePath).length == 1;
1281        }
1282      }, 5000);
1283
1284    } finally {
1285      util.shutdownMiniCluster();
1286    }
1287  }
1288
1289  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
1290  @Test
1291  public void testExcludeMinorCompaction() throws Exception {
1292    Configuration conf = util.getConfiguration();
1293    conf.setInt("hbase.hstore.compaction.min", 2);
1294    generateRandomStartKeys(5);
1295
1296    util.startMiniCluster();
1297    try (Connection conn = ConnectionFactory.createConnection(conf);
1298      Admin admin = conn.getAdmin()) {
1299      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1300      final FileSystem fs = util.getDFSCluster().getFileSystem();
1301      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1302      assertEquals("Should start with empty table", 0, util.countRows(table));
1303
1304      // deep inspection: get the StoreFile dir
1305      final Path storePath =
1306        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1307          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1308            Bytes.toString(FAMILIES[0])));
1309      assertEquals(0, fs.listStatus(storePath).length);
1310
1311      // put some data in it and flush to create a storefile
1312      Put p = new Put(Bytes.toBytes("test"));
1313      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1314      table.put(p);
1315      admin.flush(TABLE_NAMES[0]);
1316      assertEquals(1, util.countRows(table));
1317      quickPoll(new Callable<Boolean>() {
1318        @Override
1319        public Boolean call() throws Exception {
1320          return fs.listStatus(storePath).length == 1;
1321        }
1322      }, 5000);
1323
1324      // Generate a bulk load file with more rows
1325      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
1326
1327      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1328      runIncrementalPELoad(conf,
1329        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)),
1330        testDir, false);
1331
1332      // Perform the actual load
1333      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1334
1335      // Ensure data shows up
1336      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1337      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1,
1338        util.countRows(table));
1339
1340      // should have a second StoreFile now
1341      assertEquals(2, fs.listStatus(storePath).length);
1342
1343      // minor compactions shouldn't get rid of the file
1344      admin.compact(TABLE_NAMES[0]);
1345      try {
1346        quickPoll(new Callable<Boolean>() {
1347          @Override
1348          public Boolean call() throws Exception {
1349            return fs.listStatus(storePath).length == 1;
1350          }
1351        }, 5000);
1352        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1353      } catch (AssertionError ae) {
1354        // this is expected behavior
1355      }
1356
1357      // a major compaction should work though
1358      admin.majorCompact(TABLE_NAMES[0]);
1359      quickPoll(new Callable<Boolean>() {
1360        @Override
1361        public Boolean call() throws Exception {
1362          return fs.listStatus(storePath).length == 1;
1363        }
1364      }, 5000);
1365
1366    } finally {
1367      util.shutdownMiniCluster();
1368    }
1369  }
1370
1371  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1372    int sleepMs = 10;
1373    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1374    while (retries-- > 0) {
1375      if (c.call().booleanValue()) {
1376        return;
1377      }
1378      Thread.sleep(sleepMs);
1379    }
1380    fail();
1381  }
1382
1383  public static void main(String args[]) throws Exception {
1384    new TestHFileOutputFormat2().manualTest(args);
1385  }
1386
1387  public void manualTest(String args[]) throws Exception {
1388    Configuration conf = HBaseConfiguration.create();
1389    util = new HBaseTestingUtil(conf);
1390    if ("newtable".equals(args[0])) {
1391      TableName tname = TableName.valueOf(args[1]);
1392      byte[][] splitKeys = generateRandomSplitKeys(4);
1393      Table table = util.createTable(tname, FAMILIES, splitKeys);
1394    } else if ("incremental".equals(args[0])) {
1395      TableName tname = TableName.valueOf(args[1]);
1396      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
1397        RegionLocator regionLocator = c.getRegionLocator(tname)) {
1398        Path outDir = new Path("incremental-out");
1399        runIncrementalPELoad(conf,
1400          Arrays
1401            .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)),
1402          outDir, false);
1403      }
1404    } else {
1405      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
1406    }
1407  }
1408
1409  @Test
1410  public void testBlockStoragePolicy() throws Exception {
1411    util = new HBaseTestingUtil();
1412    Configuration conf = util.getConfiguration();
1413    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1414
1415    conf.set(
1416      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
1417        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
1418      "ONE_SSD");
1419    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1420    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1421    util.startMiniDFSCluster(3);
1422    FileSystem fs = util.getDFSCluster().getFileSystem();
1423    try {
1424      fs.mkdirs(cf1Dir);
1425      fs.mkdirs(cf2Dir);
1426
1427      // the original block storage policy would be HOT
1428      String spA = getStoragePolicyName(fs, cf1Dir);
1429      String spB = getStoragePolicyName(fs, cf2Dir);
1430      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1431      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1432      assertEquals("HOT", spA);
1433      assertEquals("HOT", spB);
1434
1435      // alter table cf schema to change storage policies
1436      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1437        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1438      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1439        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1440      spA = getStoragePolicyName(fs, cf1Dir);
1441      spB = getStoragePolicyName(fs, cf2Dir);
1442      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1443      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1444      assertNotNull(spA);
1445      assertEquals("ONE_SSD", spA);
1446      assertNotNull(spB);
1447      assertEquals("ALL_SSD", spB);
1448    } finally {
1449      fs.delete(cf1Dir, true);
1450      fs.delete(cf2Dir, true);
1451      util.shutdownMiniDFSCluster();
1452    }
1453  }
1454
1455  private String getStoragePolicyName(FileSystem fs, Path path) {
1456    try {
1457      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1458      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1459    } catch (Exception e) {
1460      // Maybe fail because of using old HDFS version, try the old way
1461      if (LOG.isTraceEnabled()) {
1462        LOG.trace("Failed to get policy directly", e);
1463      }
1464      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1465      return policy == null ? "HOT" : policy;// HOT by default
1466    }
1467  }
1468
1469  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1470    try {
1471      if (fs instanceof DistributedFileSystem) {
1472        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1473        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1474        if (null != status) {
1475          byte storagePolicyId = status.getStoragePolicy();
1476          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1477          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1478            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1479            for (BlockStoragePolicy policy : policies) {
1480              if (policy.getId() == storagePolicyId) {
1481                return policy.getName();
1482              }
1483            }
1484          }
1485        }
1486      }
1487    } catch (Throwable e) {
1488      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1489    }
1490
1491    return null;
1492  }
1493
1494  @Test
1495  public void TestConfigurePartitioner() throws IOException {
1496    Configuration conf = util.getConfiguration();
1497    // Create a user who is not the current user
1498    String fooUserName = "foo1234";
1499    String fooGroupName = "group1";
1500    UserGroupInformation ugi =
1501      UserGroupInformation.createUserForTesting(fooUserName, new String[] { fooGroupName });
1502    // Get user's home directory
1503    Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() {
1504      @Override
1505      public Path run() {
1506        try (FileSystem fs = FileSystem.get(conf)) {
1507          return fs.makeQualified(fs.getHomeDirectory());
1508        } catch (IOException ioe) {
1509          LOG.error("Failed to get foo's home directory", ioe);
1510        }
1511        return null;
1512      }
1513    });
1514
1515    Job job = Mockito.mock(Job.class);
1516    Mockito.doReturn(conf).when(job).getConfiguration();
1517    ImmutableBytesWritable writable = new ImmutableBytesWritable();
1518    List<ImmutableBytesWritable> splitPoints = new LinkedList<ImmutableBytesWritable>();
1519    splitPoints.add(writable);
1520
1521    ugi.doAs(new PrivilegedAction<Void>() {
1522      @Override
1523      public Void run() {
1524        try {
1525          HFileOutputFormat2.configurePartitioner(job, splitPoints, false);
1526        } catch (IOException ioe) {
1527          LOG.error("Failed to configure partitioner", ioe);
1528        }
1529        return null;
1530      }
1531    });
1532    FileSystem fs = FileSystem.get(conf);
1533    // verify that the job uses TotalOrderPartitioner
1534    verify(job).setPartitionerClass(TotalOrderPartitioner.class);
1535    // verify that TotalOrderPartitioner.setPartitionFile() is called.
1536    String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path");
1537    Assert.assertNotNull(partitionPathString);
1538    // Make sure the partion file is in foo1234's home directory, and that
1539    // the file exists.
1540    Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
1541    Assert.assertTrue(fs.exists(new Path(partitionPathString)));
1542  }
1543
1544  @Test
1545  public void TestConfigureCompression() throws Exception {
1546    Configuration conf = new Configuration(this.util.getConfiguration());
1547    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1548    TaskAttemptContext context = null;
1549    Path dir = util.getDataTestDir("TestConfigureCompression");
1550    String hfileoutputformatCompression = "gz";
1551
1552    try {
1553      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1554      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1555
1556      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1557
1558      Job job = Job.getInstance(conf);
1559      FileOutputFormat.setOutputPath(job, dir);
1560      context = createTestTaskAttemptContext(job);
1561      HFileOutputFormat2 hof = new HFileOutputFormat2();
1562      writer = hof.getRecordWriter(context);
1563      final byte[] b = Bytes.toBytes("b");
1564
1565      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1566      writer.write(new ImmutableBytesWritable(), kv);
1567      writer.close(context);
1568      writer = null;
1569      FileSystem fs = dir.getFileSystem(conf);
1570      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1571      while (iterator.hasNext()) {
1572        LocatedFileStatus keyFileStatus = iterator.next();
1573        HFile.Reader reader =
1574          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1575        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
1576          hfileoutputformatCompression);
1577      }
1578    } finally {
1579      if (writer != null && context != null) {
1580        writer.close(context);
1581      }
1582      dir.getFileSystem(conf).delete(dir, true);
1583    }
1584
1585  }
1586
1587  @Test
1588  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
1589    // Start cluster A
1590    util = new HBaseTestingUtil();
1591    Configuration confA = util.getConfiguration();
1592    int hostCount = 3;
1593    int regionNum = 20;
1594    String[] hostnames = new String[hostCount];
1595    for (int i = 0; i < hostCount; ++i) {
1596      hostnames[i] = "datanode_" + i;
1597    }
1598    StartTestingClusterOption option = StartTestingClusterOption.builder()
1599      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
1600    util.startMiniCluster(option);
1601
1602    // Start cluster B
1603    HBaseTestingUtil utilB = new HBaseTestingUtil();
1604    Configuration confB = utilB.getConfiguration();
1605    utilB.startMiniCluster(option);
1606
1607    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
1608
1609    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
1610    TableName tableName = TableName.valueOf("table");
1611    // Create table in cluster B
1612    try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
1613      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
1614      // Generate the bulk load files
1615      // Job has zookeeper configuration for cluster A
1616      // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
1617      Job job = new Job(confA, "testLocalMRIncrementalLoad");
1618      Configuration jobConf = job.getConfiguration();
1619      final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
1620      job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
1621      setupRandomGeneratorMapper(job, false);
1622      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
1623
1624      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1625        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
1626      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1627        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
1628      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1629        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
1630
1631      String bSpecificConfigKey = "my.override.config.for.b";
1632      String bSpecificConfigValue = "b-specific-value";
1633      jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey,
1634        bSpecificConfigValue);
1635
1636      FileOutputFormat.setOutputPath(job, testDir);
1637
1638      assertFalse(util.getTestFileSystem().exists(testDir));
1639
1640      assertTrue(job.waitForCompletion(true));
1641
1642      final List<Configuration> configs =
1643        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
1644
1645      assertFalse(configs.isEmpty());
1646      for (Configuration config : configs) {
1647        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1648          config.get(HConstants.ZOOKEEPER_QUORUM));
1649        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1650          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
1651        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1652          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
1653
1654        assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey));
1655      }
1656    } finally {
1657      utilB.deleteTable(tableName);
1658      testDir.getFileSystem(confA).delete(testDir, true);
1659      util.shutdownMiniCluster();
1660      utilB.shutdownMiniCluster();
1661    }
1662  }
1663
1664  private static class ConfigurationCaptorConnection implements Connection {
1665    private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
1666
1667    private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
1668
1669    private final Connection delegate;
1670
1671    public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user,
1672      ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
1673      // here we do not use this registry, so close it...
1674      registry.close();
1675      // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection
1676      // implementation in below method
1677      delegate =
1678        FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes))
1679          .toConnection();
1680
1681      final String uuid = conf.get(UUID_KEY);
1682      if (uuid != null) {
1683        confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
1684      }
1685    }
1686
1687    static UUID configureConnectionImpl(Configuration conf) {
1688      conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
1689        ConfigurationCaptorConnection.class, Connection.class);
1690
1691      final UUID uuid = UUID.randomUUID();
1692      conf.set(UUID_KEY, uuid.toString());
1693      return uuid;
1694    }
1695
1696    static List<Configuration> getCapturedConfigarutions(UUID key) {
1697      return confs.get(key);
1698    }
1699
1700    @Override
1701    public Configuration getConfiguration() {
1702      return delegate.getConfiguration();
1703    }
1704
1705    @Override
1706    public Table getTable(TableName tableName) throws IOException {
1707      return delegate.getTable(tableName);
1708    }
1709
1710    @Override
1711    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
1712      return delegate.getTable(tableName, pool);
1713    }
1714
1715    @Override
1716    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
1717      return delegate.getBufferedMutator(tableName);
1718    }
1719
1720    @Override
1721    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
1722      return delegate.getBufferedMutator(params);
1723    }
1724
1725    @Override
1726    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
1727      return delegate.getRegionLocator(tableName);
1728    }
1729
1730    @Override
1731    public void clearRegionLocationCache() {
1732      delegate.clearRegionLocationCache();
1733    }
1734
1735    @Override
1736    public Admin getAdmin() throws IOException {
1737      return delegate.getAdmin();
1738    }
1739
1740    @Override
1741    public void close() throws IOException {
1742      delegate.close();
1743    }
1744
1745    @Override
1746    public boolean isClosed() {
1747      return delegate.isClosed();
1748    }
1749
1750    @Override
1751    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
1752      return delegate.getTableBuilder(tableName, pool);
1753    }
1754
1755    @Override
1756    public AsyncConnection toAsyncConnection() {
1757      return delegate.toAsyncConnection();
1758    }
1759
1760    @Override
1761    public String getClusterId() {
1762      return delegate.getClusterId();
1763    }
1764
1765    @Override
1766    public Hbck getHbck() throws IOException {
1767      return delegate.getHbck();
1768    }
1769
1770    @Override
1771    public Hbck getHbck(ServerName masterServer) throws IOException {
1772      return delegate.getHbck(masterServer);
1773    }
1774
1775    @Override
1776    public void abort(String why, Throwable e) {
1777      delegate.abort(why, e);
1778    }
1779
1780    @Override
1781    public boolean isAborted() {
1782      return delegate.isAborted();
1783    }
1784  }
1785
1786}