001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.client.ConnectionFactory.createAsyncConnection;
021import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertNotNull;
025import static org.junit.Assert.assertNotSame;
026import static org.junit.Assert.assertTrue;
027import static org.junit.Assert.fail;
028import static org.mockito.Mockito.verify;
029
030import java.io.IOException;
031import java.lang.reflect.Field;
032import java.security.PrivilegedAction;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.HashMap;
036import java.util.LinkedList;
037import java.util.List;
038import java.util.Map;
039import java.util.Map.Entry;
040import java.util.Random;
041import java.util.Set;
042import java.util.UUID;
043import java.util.concurrent.Callable;
044import java.util.concurrent.ConcurrentHashMap;
045import java.util.concurrent.CopyOnWriteArrayList;
046import java.util.concurrent.ExecutorService;
047import java.util.stream.Collectors;
048import java.util.stream.Stream;
049import org.apache.hadoop.conf.Configuration;
050import org.apache.hadoop.fs.FileStatus;
051import org.apache.hadoop.fs.FileSystem;
052import org.apache.hadoop.fs.LocatedFileStatus;
053import org.apache.hadoop.fs.Path;
054import org.apache.hadoop.fs.RemoteIterator;
055import org.apache.hadoop.hbase.ArrayBackedTag;
056import org.apache.hadoop.hbase.Cell;
057import org.apache.hadoop.hbase.CellUtil;
058import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
059import org.apache.hadoop.hbase.HBaseClassTestRule;
060import org.apache.hadoop.hbase.HBaseConfiguration;
061import org.apache.hadoop.hbase.HBaseTestingUtil;
062import org.apache.hadoop.hbase.HConstants;
063import org.apache.hadoop.hbase.HDFSBlocksDistribution;
064import org.apache.hadoop.hbase.HadoopShims;
065import org.apache.hadoop.hbase.KeyValue;
066import org.apache.hadoop.hbase.PerformanceEvaluation;
067import org.apache.hadoop.hbase.PrivateCellUtil;
068import org.apache.hadoop.hbase.ServerName;
069import org.apache.hadoop.hbase.StartTestingClusterOption;
070import org.apache.hadoop.hbase.TableName;
071import org.apache.hadoop.hbase.Tag;
072import org.apache.hadoop.hbase.TagType;
073import org.apache.hadoop.hbase.client.Admin;
074import org.apache.hadoop.hbase.client.AsyncConnection;
075import org.apache.hadoop.hbase.client.BufferedMutator;
076import org.apache.hadoop.hbase.client.BufferedMutatorParams;
077import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
078import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
079import org.apache.hadoop.hbase.client.Connection;
080import org.apache.hadoop.hbase.client.ConnectionFactory;
081import org.apache.hadoop.hbase.client.ConnectionUtils;
082import org.apache.hadoop.hbase.client.Hbck;
083import org.apache.hadoop.hbase.client.Put;
084import org.apache.hadoop.hbase.client.RegionLocator;
085import org.apache.hadoop.hbase.client.Result;
086import org.apache.hadoop.hbase.client.ResultScanner;
087import org.apache.hadoop.hbase.client.Scan;
088import org.apache.hadoop.hbase.client.Table;
089import org.apache.hadoop.hbase.client.TableBuilder;
090import org.apache.hadoop.hbase.client.TableDescriptor;
091import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
092import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
093import org.apache.hadoop.hbase.io.compress.Compression;
094import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
095import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
096import org.apache.hadoop.hbase.io.hfile.CacheConfig;
097import org.apache.hadoop.hbase.io.hfile.HFile;
098import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
099import org.apache.hadoop.hbase.io.hfile.HFileScanner;
100import org.apache.hadoop.hbase.regionserver.BloomType;
101import org.apache.hadoop.hbase.regionserver.HRegion;
102import org.apache.hadoop.hbase.regionserver.HStore;
103import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
104import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
105import org.apache.hadoop.hbase.security.SecurityConstants;
106import org.apache.hadoop.hbase.security.User;
107import org.apache.hadoop.hbase.testclassification.LargeTests;
108import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
109import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
110import org.apache.hadoop.hbase.util.Bytes;
111import org.apache.hadoop.hbase.util.CommonFSUtils;
112import org.apache.hadoop.hbase.util.FSUtils;
113import org.apache.hadoop.hbase.util.FutureUtils;
114import org.apache.hadoop.hbase.util.ReflectionUtils;
115import org.apache.hadoop.hdfs.DistributedFileSystem;
116import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
117import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
118import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
119import org.apache.hadoop.io.NullWritable;
120import org.apache.hadoop.mapreduce.Job;
121import org.apache.hadoop.mapreduce.Mapper;
122import org.apache.hadoop.mapreduce.RecordWriter;
123import org.apache.hadoop.mapreduce.TaskAttemptContext;
124import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
125import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
126import org.apache.hadoop.security.UserGroupInformation;
127import org.junit.Assert;
128import org.junit.ClassRule;
129import org.junit.Ignore;
130import org.junit.Test;
131import org.junit.experimental.categories.Category;
132import org.mockito.Mockito;
133import org.slf4j.Logger;
134import org.slf4j.LoggerFactory;
135
136/**
137 * Simple test for {@link HFileOutputFormat2}.
138 * Sets up and runs a mapreduce job that writes hfile output.
139 * Creates a few inner classes to implement splits and an inputformat that
140 * emits keys and values like those of {@link PerformanceEvaluation}.
141 */
142@Category({VerySlowMapReduceTests.class, LargeTests.class})
143public class TestHFileOutputFormat2  {
144
145  @ClassRule
146  public static final HBaseClassTestRule CLASS_RULE =
147      HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
148
149  private final static int ROWSPERSPLIT = 1024;
150
151  public static final byte[] FAMILY_NAME = TestHRegionFileSystem.FAMILY_NAME;
152  private static final byte[][] FAMILIES = {
153    Bytes.add(FAMILY_NAME, Bytes.toBytes("-A")), Bytes.add(FAMILY_NAME, Bytes.toBytes("-B"))};
154  private static final TableName[] TABLE_NAMES = Stream.of("TestTable", "TestTable2",
155          "TestTable3").map(TableName::valueOf).toArray(TableName[]::new);
156
157  private HBaseTestingUtil util = new HBaseTestingUtil();
158
159  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
160
161  /**
162   * Simple mapper that makes KeyValue output.
163   */
164  static class RandomKVGeneratingMapper
165      extends Mapper<NullWritable, NullWritable,
166                 ImmutableBytesWritable, Cell> {
167
168    private int keyLength;
169    private static final int KEYLEN_DEFAULT=10;
170    private static final String KEYLEN_CONF="randomkv.key.length";
171
172    private int valLength;
173    private static final int VALLEN_DEFAULT=10;
174    private static final String VALLEN_CONF="randomkv.val.length";
175    private static final byte [] QUALIFIER = Bytes.toBytes("data");
176    private boolean multiTableMapper = false;
177    private TableName[] tables = null;
178
179
180    @Override
181    protected void setup(Context context) throws IOException,
182        InterruptedException {
183      super.setup(context);
184
185      Configuration conf = context.getConfiguration();
186      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
187      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
188      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
189              false);
190      if (multiTableMapper) {
191        tables = TABLE_NAMES;
192      } else {
193        tables = new TableName[]{TABLE_NAMES[0]};
194      }
195    }
196
197    @Override
198    protected void map(
199        NullWritable n1, NullWritable n2,
200        Mapper<NullWritable, NullWritable,
201               ImmutableBytesWritable,Cell>.Context context)
202        throws java.io.IOException ,InterruptedException
203    {
204
205      byte keyBytes[] = new byte[keyLength];
206      byte valBytes[] = new byte[valLength];
207
208      int taskId = context.getTaskAttemptID().getTaskID().getId();
209      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
210      Random random = new Random();
211      byte[] key;
212      for (int j = 0; j < tables.length; ++j) {
213        for (int i = 0; i < ROWSPERSPLIT; i++) {
214          random.nextBytes(keyBytes);
215          // Ensure that unique tasks generate unique keys
216          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
217          random.nextBytes(valBytes);
218          key = keyBytes;
219          if (multiTableMapper) {
220            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
221          }
222
223          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
224            Cell kv = new KeyValue(keyBytes, family, QUALIFIER, valBytes);
225            context.write(new ImmutableBytesWritable(key), kv);
226          }
227        }
228      }
229    }
230  }
231
232  /**
233   * Simple mapper that makes Put output.
234   */
235  static class RandomPutGeneratingMapper
236      extends Mapper<NullWritable, NullWritable,
237                 ImmutableBytesWritable, Put> {
238
239    private int keyLength;
240    private static final int KEYLEN_DEFAULT = 10;
241    private static final String KEYLEN_CONF = "randomkv.key.length";
242
243    private int valLength;
244    private static final int VALLEN_DEFAULT = 10;
245    private static final String VALLEN_CONF = "randomkv.val.length";
246    private static final byte[] QUALIFIER = Bytes.toBytes("data");
247    private boolean multiTableMapper = false;
248    private TableName[] tables = null;
249
250    @Override
251    protected void setup(Context context) throws IOException,
252            InterruptedException {
253      super.setup(context);
254
255      Configuration conf = context.getConfiguration();
256      keyLength = conf.getInt(KEYLEN_CONF, KEYLEN_DEFAULT);
257      valLength = conf.getInt(VALLEN_CONF, VALLEN_DEFAULT);
258      multiTableMapper = conf.getBoolean(HFileOutputFormat2.MULTI_TABLE_HFILEOUTPUTFORMAT_CONF_KEY,
259              false);
260      if (multiTableMapper) {
261        tables = TABLE_NAMES;
262      } else {
263        tables = new TableName[]{TABLE_NAMES[0]};
264      }
265    }
266
267    @Override
268    protected void map(
269            NullWritable n1, NullWritable n2,
270            Mapper<NullWritable, NullWritable,
271                    ImmutableBytesWritable, Put>.Context context)
272            throws java.io.IOException, InterruptedException {
273
274      byte keyBytes[] = new byte[keyLength];
275      byte valBytes[] = new byte[valLength];
276
277      int taskId = context.getTaskAttemptID().getTaskID().getId();
278      assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
279
280      Random random = new Random();
281      byte[] key;
282      for (int j = 0; j < tables.length; ++j) {
283        for (int i = 0; i < ROWSPERSPLIT; i++) {
284          random.nextBytes(keyBytes);
285          // Ensure that unique tasks generate unique keys
286          keyBytes[keyLength - 1] = (byte) (taskId & 0xFF);
287          random.nextBytes(valBytes);
288          key = keyBytes;
289          if (multiTableMapper) {
290            key = MultiTableHFileOutputFormat.createCompositeKey(tables[j].getName(), keyBytes);
291          }
292
293          for (byte[] family : TestHFileOutputFormat2.FAMILIES) {
294            Put p = new Put(keyBytes);
295            p.addColumn(family, QUALIFIER, valBytes);
296            // set TTL to very low so that the scan does not return any value
297            p.setTTL(1l);
298            context.write(new ImmutableBytesWritable(key), p);
299          }
300        }
301      }
302    }
303  }
304
305  private void setupRandomGeneratorMapper(Job job, boolean putSortReducer) {
306    if (putSortReducer) {
307      job.setInputFormatClass(NMapInputFormat.class);
308      job.setMapperClass(RandomPutGeneratingMapper.class);
309      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
310      job.setMapOutputValueClass(Put.class);
311    } else {
312      job.setInputFormatClass(NMapInputFormat.class);
313      job.setMapperClass(RandomKVGeneratingMapper.class);
314      job.setMapOutputKeyClass(ImmutableBytesWritable.class);
315      job.setMapOutputValueClass(KeyValue.class);
316    }
317  }
318
319  /**
320   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if
321   * passed a keyvalue whose timestamp is {@link HConstants#LATEST_TIMESTAMP}.
322   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
323   */
324  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
325  public void test_LATEST_TIMESTAMP_isReplaced()
326  throws Exception {
327    Configuration conf = new Configuration(this.util.getConfiguration());
328    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
329    TaskAttemptContext context = null;
330    Path dir =
331      util.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
332    try {
333      Job job = new Job(conf);
334      FileOutputFormat.setOutputPath(job, dir);
335      context = createTestTaskAttemptContext(job);
336      HFileOutputFormat2 hof = new HFileOutputFormat2();
337      writer = hof.getRecordWriter(context);
338      final byte [] b = Bytes.toBytes("b");
339
340      // Test 1.  Pass a KV that has a ts of LATEST_TIMESTAMP.  It should be
341      // changed by call to write.  Check all in kv is same but ts.
342      KeyValue kv = new KeyValue(b, b, b);
343      KeyValue original = kv.clone();
344      writer.write(new ImmutableBytesWritable(), kv);
345      assertFalse(original.equals(kv));
346      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
347      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
348      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
349      assertNotSame(original.getTimestamp(), kv.getTimestamp());
350      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
351
352      // Test 2. Now test passing a kv that has explicit ts.  It should not be
353      // changed by call to record write.
354      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
355      original = kv.clone();
356      writer.write(new ImmutableBytesWritable(), kv);
357      assertTrue(original.equals(kv));
358    } finally {
359      if (writer != null && context != null) writer.close(context);
360      dir.getFileSystem(conf).delete(dir, true);
361    }
362  }
363
364  private TaskAttemptContext createTestTaskAttemptContext(final Job job)
365  throws Exception {
366    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
367    TaskAttemptContext context = hadoop.createTestTaskAttemptContext(
368      job, "attempt_201402131733_0001_m_000000_0");
369    return context;
370  }
371
372  /*
373   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE
374   * metadata used by time-restricted scans.
375   */
376  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
377  public void test_TIMERANGE() throws Exception {
378    Configuration conf = new Configuration(this.util.getConfiguration());
379    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
380    TaskAttemptContext context = null;
381    Path dir =
382      util.getDataTestDir("test_TIMERANGE_present");
383    LOG.info("Timerange dir writing to dir: "+ dir);
384    try {
385      // build a record writer using HFileOutputFormat2
386      Job job = new Job(conf);
387      FileOutputFormat.setOutputPath(job, dir);
388      context = createTestTaskAttemptContext(job);
389      HFileOutputFormat2 hof = new HFileOutputFormat2();
390      writer = hof.getRecordWriter(context);
391
392      // Pass two key values with explicit times stamps
393      final byte [] b = Bytes.toBytes("b");
394
395      // value 1 with timestamp 2000
396      KeyValue kv = new KeyValue(b, b, b, 2000, b);
397      KeyValue original = kv.clone();
398      writer.write(new ImmutableBytesWritable(), kv);
399      assertEquals(original,kv);
400
401      // value 2 with timestamp 1000
402      kv = new KeyValue(b, b, b, 1000, b);
403      original = kv.clone();
404      writer.write(new ImmutableBytesWritable(), kv);
405      assertEquals(original, kv);
406
407      // verify that the file has the proper FileInfo.
408      writer.close(context);
409
410      // the generated file lives 1 directory down from the attempt directory
411      // and is the only file, e.g.
412      // _attempt__0000_r_000000_0/b/1979617994050536795
413      FileSystem fs = FileSystem.get(conf);
414      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
415      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
416      FileStatus[] file = fs.listStatus(sub1[0].getPath());
417
418      // open as HFile Reader and pull out TIMERANGE FileInfo.
419      HFile.Reader rd =
420          HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
421      Map<byte[],byte[]> finfo = rd.getHFileInfo();
422      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
423      assertNotNull(range);
424
425      // unmarshall and check values.
426      TimeRangeTracker timeRangeTracker =TimeRangeTracker.parseFrom(range);
427      LOG.info(timeRangeTracker.getMin() +
428          "...." + timeRangeTracker.getMax());
429      assertEquals(1000, timeRangeTracker.getMin());
430      assertEquals(2000, timeRangeTracker.getMax());
431      rd.close();
432    } finally {
433      if (writer != null && context != null) writer.close(context);
434      dir.getFileSystem(conf).delete(dir, true);
435    }
436  }
437
438  /**
439   * Run small MR job.
440   */
441  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
442  public void testWritingPEData() throws Exception {
443    Configuration conf = util.getConfiguration();
444    Path testDir = util.getDataTestDirOnTestFS("testWritingPEData");
445    FileSystem fs = testDir.getFileSystem(conf);
446
447    // Set down this value or we OOME in eclipse.
448    conf.setInt("mapreduce.task.io.sort.mb", 20);
449    // Write a few files.
450    long hregionMaxFilesize = 10 * 1024;
451    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
452
453    Job job = new Job(conf, "testWritingPEData");
454    setupRandomGeneratorMapper(job, false);
455    // This partitioner doesn't work well for number keys but using it anyways
456    // just to demonstrate how to configure it.
457    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
458    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
459
460    Arrays.fill(startKey, (byte)0);
461    Arrays.fill(endKey, (byte)0xff);
462
463    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
464    // Set start and end rows for partitioner.
465    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
466    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
467    job.setReducerClass(CellSortReducer.class);
468    job.setOutputFormatClass(HFileOutputFormat2.class);
469    job.setNumReduceTasks(4);
470    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
471        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
472        CellSerialization.class.getName());
473
474    FileOutputFormat.setOutputPath(job, testDir);
475    assertTrue(job.waitForCompletion(false));
476    FileStatus [] files = fs.listStatus(testDir);
477    assertTrue(files.length > 0);
478
479    //check output file num and size.
480    for (byte[] family : FAMILIES) {
481      long kvCount= 0;
482      RemoteIterator<LocatedFileStatus> iterator =
483              fs.listFiles(testDir.suffix("/" + new String(family)), true);
484      while (iterator.hasNext()) {
485        LocatedFileStatus keyFileStatus = iterator.next();
486        HFile.Reader reader =
487                HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
488        HFileScanner scanner = reader.getScanner(conf, false, false, false);
489
490        kvCount += reader.getEntries();
491        scanner.seekTo();
492        long perKVSize = scanner.getCell().getSerializedSize();
493        assertTrue("Data size of each file should not be too large.",
494                perKVSize * reader.getEntries() <= hregionMaxFilesize);
495      }
496      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
497    }
498  }
499
500  /**
501   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into
502   * hfile.
503   */
504  @Test
505  public void test_WritingTagData()
506      throws Exception {
507    Configuration conf = new Configuration(this.util.getConfiguration());
508    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
509    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
510    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
511    TaskAttemptContext context = null;
512    Path dir =
513        util.getDataTestDir("WritingTagData");
514    try {
515      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
516      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
517      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
518      Job job = new Job(conf);
519      FileOutputFormat.setOutputPath(job, dir);
520      context = createTestTaskAttemptContext(job);
521      HFileOutputFormat2 hof = new HFileOutputFormat2();
522      writer = hof.getRecordWriter(context);
523      final byte [] b = Bytes.toBytes("b");
524
525      List< Tag > tags = new ArrayList<>();
526      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
527      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
528      writer.write(new ImmutableBytesWritable(), kv);
529      writer.close(context);
530      writer = null;
531      FileSystem fs = dir.getFileSystem(conf);
532      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
533      while(iterator.hasNext()) {
534        LocatedFileStatus keyFileStatus = iterator.next();
535        HFile.Reader reader =
536            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
537        HFileScanner scanner = reader.getScanner(conf, false, false, false);
538        scanner.seekTo();
539        Cell cell = scanner.getCell();
540        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
541        assertTrue(tagsFromCell.size() > 0);
542        for (Tag tag : tagsFromCell) {
543          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
544        }
545      }
546    } finally {
547      if (writer != null && context != null) writer.close(context);
548      dir.getFileSystem(conf).delete(dir, true);
549    }
550  }
551
552  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
553  public void testJobConfiguration() throws Exception {
554    Configuration conf = new Configuration(this.util.getConfiguration());
555    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY, util.getDataTestDir("testJobConfiguration")
556        .toString());
557    Job job = new Job(conf);
558    job.setWorkingDirectory(util.getDataTestDir("testJobConfiguration"));
559    Table table = Mockito.mock(Table.class);
560    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
561    setupMockStartKeys(regionLocator);
562    setupMockTableName(regionLocator);
563    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
564    assertEquals(job.getNumReduceTasks(), 4);
565  }
566
567  private byte [][] generateRandomStartKeys(int numKeys) {
568    Random random = new Random();
569    byte[][] ret = new byte[numKeys][];
570    // first region start key is always empty
571    ret[0] = HConstants.EMPTY_BYTE_ARRAY;
572    for (int i = 1; i < numKeys; i++) {
573      ret[i] =
574        PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
575    }
576    return ret;
577  }
578
579  private byte[][] generateRandomSplitKeys(int numKeys) {
580    Random random = new Random();
581    byte[][] ret = new byte[numKeys][];
582    for (int i = 0; i < numKeys; i++) {
583      ret[i] =
584          PerformanceEvaluation.generateData(random, PerformanceEvaluation.DEFAULT_VALUE_LENGTH);
585    }
586    return ret;
587  }
588
589  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
590  public void testMRIncrementalLoad() throws Exception {
591    LOG.info("\nStarting test testMRIncrementalLoad\n");
592    doIncrementalLoadTest(false, false, false, "testMRIncrementalLoad");
593  }
594
595  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
596  public void testMRIncrementalLoadWithSplit() throws Exception {
597    LOG.info("\nStarting test testMRIncrementalLoadWithSplit\n");
598    doIncrementalLoadTest(true, false, false, "testMRIncrementalLoadWithSplit");
599  }
600
601  /**
602   * Test for HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY = true
603   * This test could only check the correctness of original logic if LOCALITY_SENSITIVE_CONF_KEY
604   * is set to true. Because MiniHBaseCluster always run with single hostname (and different ports),
605   * it's not possible to check the region locality by comparing region locations and DN hostnames.
606   * When MiniHBaseCluster supports explicit hostnames parameter (just like MiniDFSCluster does),
607   * we could test region locality features more easily.
608   */
609  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
610  public void testMRIncrementalLoadWithLocality() throws Exception {
611    LOG.info("\nStarting test testMRIncrementalLoadWithLocality\n");
612    doIncrementalLoadTest(false, true, false, "testMRIncrementalLoadWithLocality1");
613    doIncrementalLoadTest(true, true, false, "testMRIncrementalLoadWithLocality2");
614  }
615
616  //@Ignore("Wahtevs")
617  @Test
618  public void testMRIncrementalLoadWithPutSortReducer() throws Exception {
619    LOG.info("\nStarting test testMRIncrementalLoadWithPutSortReducer\n");
620    doIncrementalLoadTest(false, false, true, "testMRIncrementalLoadWithPutSortReducer");
621  }
622
623  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
624                                     boolean putSortReducer, String tableStr) throws Exception {
625      doIncrementalLoadTest(shouldChangeRegions, shouldKeepLocality, putSortReducer,
626              Arrays.asList(tableStr));
627  }
628
629  @Test
630  public void testMultiMRIncrementalLoadWithPutSortReducer() throws Exception {
631    LOG.info("\nStarting test testMultiMRIncrementalLoadWithPutSortReducer\n");
632    doIncrementalLoadTest(false, false, true,
633            Arrays.stream(TABLE_NAMES).map(TableName::getNameAsString).collect(Collectors.toList
634                    ()));
635  }
636
637  private void doIncrementalLoadTest(boolean shouldChangeRegions, boolean shouldKeepLocality,
638      boolean putSortReducer, List<String> tableStr) throws Exception {
639    util = new HBaseTestingUtil();
640    Configuration conf = util.getConfiguration();
641    conf.setBoolean(MultiTableHFileOutputFormat.LOCALITY_SENSITIVE_CONF_KEY, shouldKeepLocality);
642    int hostCount = 1;
643    int regionNum = 5;
644    if (shouldKeepLocality) {
645      // We should change host count higher than hdfs replica count when MiniHBaseCluster supports
646      // explicit hostnames parameter just like MiniDFSCluster does.
647      hostCount = 3;
648      regionNum = 20;
649    }
650
651    String[] hostnames = new String[hostCount];
652    for (int i = 0; i < hostCount; ++i) {
653      hostnames[i] = "datanode_" + i;
654    }
655    StartTestingClusterOption option = StartTestingClusterOption.builder()
656        .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
657    util.startMiniCluster(option);
658
659    Map<String, Table> allTables = new HashMap<>(tableStr.size());
660    List<HFileOutputFormat2.TableInfo> tableInfo = new ArrayList<>(tableStr.size());
661    boolean writeMultipleTables = tableStr.size() > 1;
662    for (String tableStrSingle : tableStr) {
663      byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
664      TableName tableName = TableName.valueOf(tableStrSingle);
665      Table table = util.createTable(tableName, FAMILIES, splitKeys);
666
667      RegionLocator r = util.getConnection().getRegionLocator(tableName);
668      assertEquals("Should start with empty table", 0, util.countRows(table));
669      int numRegions = r.getStartKeys().length;
670      assertEquals("Should make " + regionNum + " regions", numRegions, regionNum);
671
672      allTables.put(tableStrSingle, table);
673      tableInfo.add(new HFileOutputFormat2.TableInfo(table.getDescriptor(), r));
674    }
675    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
676    // Generate the bulk load files
677    runIncrementalPELoad(conf, tableInfo, testDir, putSortReducer);
678    if (writeMultipleTables) {
679      testDir = new Path(testDir, "default");
680    }
681
682    for (Table tableSingle : allTables.values()) {
683      // This doesn't write into the table, just makes files
684      assertEquals("HFOF should not touch actual table", 0, util.countRows(tableSingle));
685    }
686    int numTableDirs = 0;
687    FileStatus[] fss =
688        testDir.getFileSystem(conf).listStatus(testDir);
689    for (FileStatus tf: fss) {
690      Path tablePath = testDir;
691      if (writeMultipleTables) {
692        if (allTables.containsKey(tf.getPath().getName())) {
693          ++numTableDirs;
694          tablePath = tf.getPath();
695        }
696        else {
697          continue;
698        }
699      }
700
701      // Make sure that a directory was created for every CF
702      int dir = 0;
703      fss = tablePath.getFileSystem(conf).listStatus(tablePath);
704      for (FileStatus f: fss) {
705        for (byte[] family : FAMILIES) {
706          if (Bytes.toString(family).equals(f.getPath().getName())) {
707            ++dir;
708          }
709        }
710      }
711      assertEquals("Column family not found in FS.", FAMILIES.length, dir);
712    }
713    if (writeMultipleTables) {
714      assertEquals("Dir for all input tables not created", numTableDirs, allTables.size());
715    }
716
717    Admin admin = util.getConnection().getAdmin();
718    try {
719      // handle the split case
720      if (shouldChangeRegions) {
721        Table chosenTable = allTables.values().iterator().next();
722        // Choose a semi-random table if multiple tables are available
723        LOG.info("Changing regions in table " + chosenTable.getName().getNameAsString());
724        admin.disableTable(chosenTable.getName());
725        util.waitUntilNoRegionsInTransition();
726
727        util.deleteTable(chosenTable.getName());
728        byte[][] newSplitKeys = generateRandomSplitKeys(14);
729        Table table = util.createTable(chosenTable.getName(), FAMILIES, newSplitKeys);
730
731        while (util.getConnection().getRegionLocator(chosenTable.getName())
732                .getAllRegionLocations().size() != 15 ||
733                !admin.isTableAvailable(table.getName())) {
734          Thread.sleep(200);
735          LOG.info("Waiting for new region assignment to happen");
736        }
737      }
738
739      // Perform the actual load
740      for (HFileOutputFormat2.TableInfo singleTableInfo : tableInfo) {
741        Path tableDir = testDir;
742        String tableNameStr = singleTableInfo.getTableDescriptor().getTableName().getNameAsString();
743        LOG.info("Running BulkLoadHFiles on table" + tableNameStr);
744        if (writeMultipleTables) {
745          tableDir = new Path(testDir, tableNameStr);
746        }
747        Table currentTable = allTables.get(tableNameStr);
748        TableName currentTableName = currentTable.getName();
749        BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir);
750
751        // Ensure data shows up
752        int expectedRows = 0;
753        if (putSortReducer) {
754          // no rows should be extracted
755          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
756                  util.countRows(currentTable));
757        } else {
758          expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
759          assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
760                  util.countRows(currentTable));
761          Scan scan = new Scan();
762          ResultScanner results = currentTable.getScanner(scan);
763          for (Result res : results) {
764            assertEquals(FAMILIES.length, res.rawCells().length);
765            Cell first = res.rawCells()[0];
766            for (Cell kv : res.rawCells()) {
767              assertTrue(CellUtil.matchingRows(first, kv));
768              assertTrue(Bytes.equals(CellUtil.cloneValue(first), CellUtil.cloneValue(kv)));
769            }
770          }
771          results.close();
772        }
773        String tableDigestBefore = util.checksumRows(currentTable);
774        // Check region locality
775        HDFSBlocksDistribution hbd = new HDFSBlocksDistribution();
776        for (HRegion region : util.getHBaseCluster().getRegions(currentTableName)) {
777          hbd.add(region.getHDFSBlocksDistribution());
778        }
779        for (String hostname : hostnames) {
780          float locality = hbd.getBlockLocalityIndex(hostname);
781          LOG.info("locality of [" + hostname + "]: " + locality);
782          assertEquals(100, (int) (locality * 100));
783        }
784
785        // Cause regions to reopen
786        admin.disableTable(currentTableName);
787        while (!admin.isTableDisabled(currentTableName)) {
788          Thread.sleep(200);
789          LOG.info("Waiting for table to disable");
790        }
791        admin.enableTable(currentTableName);
792        util.waitTableAvailable(currentTableName);
793        assertEquals("Data should remain after reopening of regions",
794                tableDigestBefore, util.checksumRows(currentTable));
795      }
796    } finally {
797      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
798          tableInfoSingle.getRegionLocator().close();
799      }
800      for (Entry<String, Table> singleTable : allTables.entrySet() ) {
801        singleTable.getValue().close();
802        util.deleteTable(singleTable.getValue().getName());
803      }
804      testDir.getFileSystem(conf).delete(testDir, true);
805      util.shutdownMiniCluster();
806    }
807  }
808
809  private void runIncrementalPELoad(Configuration conf, List<HFileOutputFormat2.TableInfo> tableInfo, Path outDir,
810                                    boolean putSortReducer) throws IOException,
811          InterruptedException, ClassNotFoundException {
812    Job job = new Job(conf, "testLocalMRIncrementalLoad");
813    job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
814    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
815        MutationSerialization.class.getName(), ResultSerialization.class.getName(),
816        CellSerialization.class.getName());
817    setupRandomGeneratorMapper(job, putSortReducer);
818    if (tableInfo.size() > 1) {
819      MultiTableHFileOutputFormat.configureIncrementalLoad(job, tableInfo);
820      int sum = 0;
821      for (HFileOutputFormat2.TableInfo tableInfoSingle : tableInfo) {
822        sum += tableInfoSingle.getRegionLocator().getAllRegionLocations().size();
823      }
824      assertEquals(sum, job.getNumReduceTasks());
825    }
826    else {
827      RegionLocator regionLocator = tableInfo.get(0).getRegionLocator();
828      HFileOutputFormat2.configureIncrementalLoad(job, tableInfo.get(0).getTableDescriptor(),
829              regionLocator);
830      assertEquals(regionLocator.getAllRegionLocations().size(), job.getNumReduceTasks());
831    }
832
833    FileOutputFormat.setOutputPath(job, outDir);
834
835    assertFalse(util.getTestFileSystem().exists(outDir)) ;
836
837    assertTrue(job.waitForCompletion(true));
838  }
839
840  /**
841   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}.
842   * Tests that the family compression map is correctly serialized into
843   * and deserialized from configuration
844   *
845   * @throws IOException
846   */
847  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
848  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
849    for (int numCfs = 0; numCfs <= 3; numCfs++) {
850      Configuration conf = new Configuration(this.util.getConfiguration());
851      Map<String, Compression.Algorithm> familyToCompression =
852          getMockColumnFamiliesForCompression(numCfs);
853      Table table = Mockito.mock(Table.class);
854      setupMockColumnFamiliesForCompression(table, familyToCompression);
855      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
856              HFileOutputFormat2.serializeColumnFamilyAttribute
857                      (HFileOutputFormat2.compressionDetails,
858                              Arrays.asList(table.getDescriptor())));
859
860      // read back family specific compression setting from the configuration
861      Map<byte[], Algorithm> retrievedFamilyToCompressionMap = HFileOutputFormat2
862          .createFamilyCompressionMap(conf);
863
864      // test that we have a value for all column families that matches with the
865      // used mock values
866      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
867        assertEquals("Compression configuration incorrect for column family:"
868            + entry.getKey(), entry.getValue(),
869            retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())));
870      }
871    }
872  }
873
874  private void setupMockColumnFamiliesForCompression(Table table,
875      Map<String, Compression.Algorithm> familyToCompression) throws IOException {
876
877    TableDescriptorBuilder mockTableDescriptor =
878      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
879    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
880      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
881        .newBuilder(Bytes.toBytes(entry.getKey()))
882        .setMaxVersions(1)
883        .setCompressionType(entry.getValue())
884        .setBlockCacheEnabled(false)
885        .setTimeToLive(0)
886        .build();
887
888      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
889    }
890    Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor();
891  }
892
893  /**
894   * @return a map from column family names to compression algorithms for
895   *         testing column family compression. Column family names have special characters
896   */
897  private Map<String, Compression.Algorithm>
898      getMockColumnFamiliesForCompression (int numCfs) {
899    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
900    // use column family names having special characters
901    if (numCfs-- > 0) {
902      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
903    }
904    if (numCfs-- > 0) {
905      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
906    }
907    if (numCfs-- > 0) {
908      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
909    }
910    if (numCfs-- > 0) {
911      familyToCompression.put("Family3", Compression.Algorithm.NONE);
912    }
913    return familyToCompression;
914  }
915
916
917  /**
918   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}.
919   * Tests that the family bloom type map is correctly serialized into
920   * and deserialized from configuration
921   *
922   * @throws IOException
923   */
924  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
925  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
926    for (int numCfs = 0; numCfs <= 2; numCfs++) {
927      Configuration conf = new Configuration(this.util.getConfiguration());
928      Map<String, BloomType> familyToBloomType =
929          getMockColumnFamiliesForBloomType(numCfs);
930      Table table = Mockito.mock(Table.class);
931      setupMockColumnFamiliesForBloomType(table,
932          familyToBloomType);
933      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
934              HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
935              Arrays.asList(table.getDescriptor())));
936
937      // read back family specific data block encoding settings from the
938      // configuration
939      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
940          HFileOutputFormat2
941              .createFamilyBloomTypeMap(conf);
942
943      // test that we have a value for all column families that matches with the
944      // used mock values
945      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
946        assertEquals("BloomType configuration incorrect for column family:"
947            + entry.getKey(), entry.getValue(),
948            retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())));
949      }
950    }
951  }
952
953  private void setupMockColumnFamiliesForBloomType(Table table,
954      Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
955    TableDescriptorBuilder mockTableDescriptor =
956      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
957    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
958      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
959        .newBuilder(Bytes.toBytes(entry.getKey()))
960        .setMaxVersions(1)
961        .setBloomFilterType(entry.getValue())
962        .setBlockCacheEnabled(false)
963        .setTimeToLive(0).build();
964      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
965    }
966    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
967  }
968
969  /**
970   * @return a map from column family names to compression algorithms for
971   *         testing column family compression. Column family names have special characters
972   */
973  private Map<String, BloomType>
974  getMockColumnFamiliesForBloomType (int numCfs) {
975    Map<String, BloomType> familyToBloomType = new HashMap<>();
976    // use column family names having special characters
977    if (numCfs-- > 0) {
978      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
979    }
980    if (numCfs-- > 0) {
981      familyToBloomType.put("Family2=asdads&!AASD",
982          BloomType.ROWCOL);
983    }
984    if (numCfs-- > 0) {
985      familyToBloomType.put("Family3", BloomType.NONE);
986    }
987    return familyToBloomType;
988  }
989
990  /**
991   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}.
992   * Tests that the family block size map is correctly serialized into
993   * and deserialized from configuration
994   *
995   * @throws IOException
996   */
997  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
998  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
999    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1000      Configuration conf = new Configuration(this.util.getConfiguration());
1001      Map<String, Integer> familyToBlockSize =
1002          getMockColumnFamiliesForBlockSize(numCfs);
1003      Table table = Mockito.mock(Table.class);
1004      setupMockColumnFamiliesForBlockSize(table,
1005          familyToBlockSize);
1006      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
1007              HFileOutputFormat2.serializeColumnFamilyAttribute
1008                      (HFileOutputFormat2.blockSizeDetails, Arrays.asList(table
1009                              .getDescriptor())));
1010
1011      // read back family specific data block encoding settings from the
1012      // configuration
1013      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
1014          HFileOutputFormat2
1015              .createFamilyBlockSizeMap(conf);
1016
1017      // test that we have a value for all column families that matches with the
1018      // used mock values
1019      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()
1020          ) {
1021        assertEquals("BlockSize configuration incorrect for column family:"
1022            + entry.getKey(), entry.getValue(),
1023            retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())));
1024      }
1025    }
1026  }
1027
1028  private void setupMockColumnFamiliesForBlockSize(Table table,
1029      Map<String, Integer> familyToDataBlockEncoding) throws IOException {
1030    TableDescriptorBuilder mockTableDescriptor =
1031      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1032    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
1033      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
1034        .newBuilder(Bytes.toBytes(entry.getKey()))
1035        .setMaxVersions(1)
1036        .setBlocksize(entry.getValue())
1037        .setBlockCacheEnabled(false)
1038        .setTimeToLive(0).build();
1039      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
1040    }
1041    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
1042  }
1043
1044  /**
1045   * @return a map from column family names to compression algorithms for
1046   *         testing column family compression. Column family names have special characters
1047   */
1048  private Map<String, Integer>
1049  getMockColumnFamiliesForBlockSize (int numCfs) {
1050    Map<String, Integer> familyToBlockSize = new HashMap<>();
1051    // use column family names having special characters
1052    if (numCfs-- > 0) {
1053      familyToBlockSize.put("Family1!@#!@#&", 1234);
1054    }
1055    if (numCfs-- > 0) {
1056      familyToBlockSize.put("Family2=asdads&!AASD",
1057          Integer.MAX_VALUE);
1058    }
1059    if (numCfs-- > 0) {
1060      familyToBlockSize.put("Family2=asdads&!AASD",
1061          Integer.MAX_VALUE);
1062    }
1063    if (numCfs-- > 0) {
1064      familyToBlockSize.put("Family3", 0);
1065    }
1066    return familyToBlockSize;
1067  }
1068
1069  /**
1070   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}.
1071   * Tests that the family data block encoding map is correctly serialized into
1072   * and deserialized from configuration
1073   *
1074   * @throws IOException
1075   */
1076  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1077  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
1078    for (int numCfs = 0; numCfs <= 3; numCfs++) {
1079      Configuration conf = new Configuration(this.util.getConfiguration());
1080      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
1081          getMockColumnFamiliesForDataBlockEncoding(numCfs);
1082      Table table = Mockito.mock(Table.class);
1083      setupMockColumnFamiliesForDataBlockEncoding(table,
1084          familyToDataBlockEncoding);
1085      TableDescriptor tableDescriptor = table.getDescriptor();
1086      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
1087              HFileOutputFormat2.serializeColumnFamilyAttribute
1088                      (HFileOutputFormat2.dataBlockEncodingDetails, Arrays
1089                      .asList(tableDescriptor)));
1090
1091      // read back family specific data block encoding settings from the
1092      // configuration
1093      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
1094          HFileOutputFormat2
1095          .createFamilyDataBlockEncodingMap(conf);
1096
1097      // test that we have a value for all column families that matches with the
1098      // used mock values
1099      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1100        assertEquals("DataBlockEncoding configuration incorrect for column family:"
1101            + entry.getKey(), entry.getValue(),
1102            retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())));
1103      }
1104    }
1105  }
1106
1107  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
1108      Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
1109    TableDescriptorBuilder mockTableDescriptor =
1110      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1111    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
1112      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
1113        .newBuilder(Bytes.toBytes(entry.getKey()))
1114        .setMaxVersions(1)
1115        .setDataBlockEncoding(entry.getValue())
1116        .setBlockCacheEnabled(false)
1117        .setTimeToLive(0).build();
1118      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
1119    }
1120    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
1121  }
1122
1123  /**
1124   * @return a map from column family names to compression algorithms for
1125   *         testing column family compression. Column family names have special characters
1126   */
1127  private Map<String, DataBlockEncoding>
1128      getMockColumnFamiliesForDataBlockEncoding (int numCfs) {
1129    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
1130    // use column family names having special characters
1131    if (numCfs-- > 0) {
1132      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
1133    }
1134    if (numCfs-- > 0) {
1135      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1136          DataBlockEncoding.FAST_DIFF);
1137    }
1138    if (numCfs-- > 0) {
1139      familyToDataBlockEncoding.put("Family2=asdads&!AASD",
1140          DataBlockEncoding.PREFIX);
1141    }
1142    if (numCfs-- > 0) {
1143      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
1144    }
1145    return familyToDataBlockEncoding;
1146  }
1147
1148  private void setupMockStartKeys(RegionLocator table) throws IOException {
1149    byte[][] mockKeys = new byte[][] {
1150        HConstants.EMPTY_BYTE_ARRAY,
1151        Bytes.toBytes("aaa"),
1152        Bytes.toBytes("ggg"),
1153        Bytes.toBytes("zzz")
1154    };
1155    Mockito.doReturn(mockKeys).when(table).getStartKeys();
1156  }
1157
1158  private void setupMockTableName(RegionLocator table) throws IOException {
1159    TableName mockTableName = TableName.valueOf("mock_table");
1160    Mockito.doReturn(mockTableName).when(table).getName();
1161  }
1162
1163  /**
1164   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and
1165   * bloom filter settings from the column family descriptor
1166   */
1167  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1168  public void testColumnFamilySettings() throws Exception {
1169    Configuration conf = new Configuration(this.util.getConfiguration());
1170    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1171    TaskAttemptContext context = null;
1172    Path dir = util.getDataTestDir("testColumnFamilySettings");
1173
1174    // Setup table descriptor
1175    Table table = Mockito.mock(Table.class);
1176    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
1177    TableDescriptorBuilder tableDescriptorBuilder =
1178      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
1179
1180    Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor();
1181    for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) {
1182      tableDescriptorBuilder.setColumnFamily(hcd);
1183    }
1184
1185    // set up the table to return some mock keys
1186    setupMockStartKeys(regionLocator);
1187
1188    try {
1189      // partial map red setup to get an operational writer for testing
1190      // We turn off the sequence file compression, because DefaultCodec
1191      // pollutes the GZip codec pool with an incompatible compressor.
1192      conf.set("io.seqfile.compression.type", "NONE");
1193      conf.set("hbase.fs.tmp.dir", dir.toString());
1194      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
1195      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1196
1197      Job job = new Job(conf, "testLocalMRIncrementalLoad");
1198      job.setWorkingDirectory(util.getDataTestDirOnTestFS("testColumnFamilySettings"));
1199      setupRandomGeneratorMapper(job, false);
1200      HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
1201      FileOutputFormat.setOutputPath(job, dir);
1202      context = createTestTaskAttemptContext(job);
1203      HFileOutputFormat2 hof = new HFileOutputFormat2();
1204      writer = hof.getRecordWriter(context);
1205
1206      // write out random rows
1207      writeRandomKeyValues(writer, context,
1208        tableDescriptorBuilder.build().getColumnFamilyNames(), ROWSPERSPLIT);
1209      writer.close(context);
1210
1211      // Make sure that a directory was created for every CF
1212      FileSystem fs = dir.getFileSystem(conf);
1213
1214      // commit so that the filesystem has one directory per column family
1215      hof.getOutputCommitter(context).commitTask(context);
1216      hof.getOutputCommitter(context).commitJob(context);
1217      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
1218      assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length);
1219      for (FileStatus f : families) {
1220        String familyStr = f.getPath().getName();
1221        ColumnFamilyDescriptor hcd = tableDescriptorBuilder.build()
1222          .getColumnFamily(Bytes.toBytes(familyStr));
1223        // verify that the compression on this file matches the configured
1224        // compression
1225        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
1226        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
1227        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
1228
1229        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
1230        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
1231        assertEquals("Incorrect bloom filter used for column family " + familyStr +
1232          "(reader: " + reader + ")",
1233          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
1234        assertEquals(
1235          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
1236          hcd.getCompressionType(), reader.getFileContext().getCompression());
1237      }
1238    } finally {
1239      dir.getFileSystem(conf).delete(dir, true);
1240    }
1241  }
1242
1243  /**
1244   * Write random values to the writer assuming a table created using
1245   * {@link #FAMILIES} as column family descriptors
1246   */
1247  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
1248      TaskAttemptContext context, Set<byte[]> families, int numRows)
1249      throws IOException, InterruptedException {
1250    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
1251    int valLength = 10;
1252    byte valBytes[] = new byte[valLength];
1253
1254    int taskId = context.getTaskAttemptID().getTaskID().getId();
1255    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
1256    final byte [] qualifier = Bytes.toBytes("data");
1257    Random random = new Random();
1258    for (int i = 0; i < numRows; i++) {
1259
1260      Bytes.putInt(keyBytes, 0, i);
1261      random.nextBytes(valBytes);
1262      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
1263
1264      for (byte[] family : families) {
1265        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
1266        writer.write(key, kv);
1267      }
1268    }
1269  }
1270
1271  /**
1272   * This test is to test the scenario happened in HBASE-6901.
1273   * All files are bulk loaded and excluded from minor compaction.
1274   * Without the fix of HBASE-6901, an ArrayIndexOutOfBoundsException
1275   * will be thrown.
1276   */
1277  @Ignore ("Flakey: See HBASE-9051") @Test
1278  public void testExcludeAllFromMinorCompaction() throws Exception {
1279    Configuration conf = util.getConfiguration();
1280    conf.setInt("hbase.hstore.compaction.min", 2);
1281    generateRandomStartKeys(5);
1282
1283    util.startMiniCluster();
1284    try (Connection conn = ConnectionFactory.createConnection();
1285        Admin admin = conn.getAdmin();
1286        Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1287        RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
1288      final FileSystem fs = util.getDFSCluster().getFileSystem();
1289      assertEquals("Should start with empty table", 0, util.countRows(table));
1290
1291      // deep inspection: get the StoreFile dir
1292      final Path storePath = new Path(
1293        CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1294          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1295            Bytes.toString(FAMILIES[0])));
1296      assertEquals(0, fs.listStatus(storePath).length);
1297
1298      // Generate two bulk load files
1299      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1300          true);
1301
1302      for (int i = 0; i < 2; i++) {
1303        Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
1304        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
1305                .getDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
1306        // Perform the actual load
1307        BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1308      }
1309
1310      // Ensure data shows up
1311      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1312      assertEquals("BulkLoadHFiles should put expected data in table",
1313          expectedRows, util.countRows(table));
1314
1315      // should have a second StoreFile now
1316      assertEquals(2, fs.listStatus(storePath).length);
1317
1318      // minor compactions shouldn't get rid of the file
1319      admin.compact(TABLE_NAMES[0]);
1320      try {
1321        quickPoll(new Callable<Boolean>() {
1322          @Override
1323          public Boolean call() throws Exception {
1324            List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1325            for (HRegion region : regions) {
1326              for (HStore store : region.getStores()) {
1327                store.closeAndArchiveCompactedFiles();
1328              }
1329            }
1330            return fs.listStatus(storePath).length == 1;
1331          }
1332        }, 5000);
1333        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1334      } catch (AssertionError ae) {
1335        // this is expected behavior
1336      }
1337
1338      // a major compaction should work though
1339      admin.majorCompact(TABLE_NAMES[0]);
1340      quickPoll(new Callable<Boolean>() {
1341        @Override
1342        public Boolean call() throws Exception {
1343          List<HRegion> regions = util.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
1344          for (HRegion region : regions) {
1345            for (HStore store : region.getStores()) {
1346              store.closeAndArchiveCompactedFiles();
1347            }
1348          }
1349          return fs.listStatus(storePath).length == 1;
1350        }
1351      }, 5000);
1352
1353    } finally {
1354      util.shutdownMiniCluster();
1355    }
1356  }
1357
1358  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563") @Test
1359  public void testExcludeMinorCompaction() throws Exception {
1360    Configuration conf = util.getConfiguration();
1361    conf.setInt("hbase.hstore.compaction.min", 2);
1362    generateRandomStartKeys(5);
1363
1364    util.startMiniCluster();
1365    try (Connection conn = ConnectionFactory.createConnection(conf);
1366        Admin admin = conn.getAdmin()){
1367      Path testDir = util.getDataTestDirOnTestFS("testExcludeMinorCompaction");
1368      final FileSystem fs = util.getDFSCluster().getFileSystem();
1369      Table table = util.createTable(TABLE_NAMES[0], FAMILIES);
1370      assertEquals("Should start with empty table", 0, util.countRows(table));
1371
1372      // deep inspection: get the StoreFile dir
1373      final Path storePath = new Path(
1374        CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
1375          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
1376            Bytes.toString(FAMILIES[0])));
1377      assertEquals(0, fs.listStatus(storePath).length);
1378
1379      // put some data in it and flush to create a storefile
1380      Put p = new Put(Bytes.toBytes("test"));
1381      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
1382      table.put(p);
1383      admin.flush(TABLE_NAMES[0]);
1384      assertEquals(1, util.countRows(table));
1385      quickPoll(new Callable<Boolean>() {
1386        @Override
1387        public Boolean call() throws Exception {
1388          return fs.listStatus(storePath).length == 1;
1389        }
1390      }, 5000);
1391
1392      // Generate a bulk load file with more rows
1393      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude",
1394          true);
1395
1396      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
1397      runIncrementalPELoad(conf,
1398        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)),
1399        testDir, false);
1400
1401      // Perform the actual load
1402      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
1403
1404      // Ensure data shows up
1405      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
1406      assertEquals("BulkLoadHFiles should put expected data in table",
1407          expectedRows + 1, util.countRows(table));
1408
1409      // should have a second StoreFile now
1410      assertEquals(2, fs.listStatus(storePath).length);
1411
1412      // minor compactions shouldn't get rid of the file
1413      admin.compact(TABLE_NAMES[0]);
1414      try {
1415        quickPoll(new Callable<Boolean>() {
1416          @Override
1417          public Boolean call() throws Exception {
1418            return fs.listStatus(storePath).length == 1;
1419          }
1420        }, 5000);
1421        throw new IOException("SF# = " + fs.listStatus(storePath).length);
1422      } catch (AssertionError ae) {
1423        // this is expected behavior
1424      }
1425
1426      // a major compaction should work though
1427      admin.majorCompact(TABLE_NAMES[0]);
1428      quickPoll(new Callable<Boolean>() {
1429        @Override
1430        public Boolean call() throws Exception {
1431          return fs.listStatus(storePath).length == 1;
1432        }
1433      }, 5000);
1434
1435    } finally {
1436      util.shutdownMiniCluster();
1437    }
1438  }
1439
1440  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
1441    int sleepMs = 10;
1442    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
1443    while (retries-- > 0) {
1444      if (c.call().booleanValue()) {
1445        return;
1446      }
1447      Thread.sleep(sleepMs);
1448    }
1449    fail();
1450  }
1451
1452  public static void main(String args[]) throws Exception {
1453    new TestHFileOutputFormat2().manualTest(args);
1454  }
1455
1456  public void manualTest(String args[]) throws Exception {
1457    Configuration conf = HBaseConfiguration.create();
1458    util = new HBaseTestingUtil(conf);
1459    if ("newtable".equals(args[0])) {
1460      TableName tname = TableName.valueOf(args[1]);
1461      byte[][] splitKeys = generateRandomSplitKeys(4);
1462      Table table = util.createTable(tname, FAMILIES, splitKeys);
1463    } else if ("incremental".equals(args[0])) {
1464      TableName tname = TableName.valueOf(args[1]);
1465      try(Connection c = ConnectionFactory.createConnection(conf);
1466          Admin admin = c.getAdmin();
1467          RegionLocator regionLocator = c.getRegionLocator(tname)) {
1468        Path outDir = new Path("incremental-out");
1469        runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(admin
1470                .getDescriptor(tname), regionLocator)), outDir, false);
1471      }
1472    } else {
1473      throw new RuntimeException(
1474          "usage: TestHFileOutputFormat2 newtable | incremental");
1475    }
1476  }
1477
1478  @Test
1479  public void testBlockStoragePolicy() throws Exception {
1480    util = new HBaseTestingUtil();
1481    Configuration conf = util.getConfiguration();
1482    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
1483
1484    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX +
1485            Bytes.toString(HFileOutputFormat2.combineTableNameSuffix(
1486                    TABLE_NAMES[0].getName(), FAMILIES[0])), "ONE_SSD");
1487    Path cf1Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[0]));
1488    Path cf2Dir = new Path(util.getDataTestDir(), Bytes.toString(FAMILIES[1]));
1489    util.startMiniDFSCluster(3);
1490    FileSystem fs = util.getDFSCluster().getFileSystem();
1491    try {
1492      fs.mkdirs(cf1Dir);
1493      fs.mkdirs(cf2Dir);
1494
1495      // the original block storage policy would be HOT
1496      String spA = getStoragePolicyName(fs, cf1Dir);
1497      String spB = getStoragePolicyName(fs, cf2Dir);
1498      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1499      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1500      assertEquals("HOT", spA);
1501      assertEquals("HOT", spB);
1502
1503      // alter table cf schema to change storage policies
1504      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1505              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
1506      HFileOutputFormat2.configureStoragePolicy(conf, fs,
1507              HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
1508      spA = getStoragePolicyName(fs, cf1Dir);
1509      spB = getStoragePolicyName(fs, cf2Dir);
1510      LOG.debug("Storage policy of cf 0: [" + spA + "].");
1511      LOG.debug("Storage policy of cf 1: [" + spB + "].");
1512      assertNotNull(spA);
1513      assertEquals("ONE_SSD", spA);
1514      assertNotNull(spB);
1515      assertEquals("ALL_SSD", spB);
1516    } finally {
1517      fs.delete(cf1Dir, true);
1518      fs.delete(cf2Dir, true);
1519      util.shutdownMiniDFSCluster();
1520    }
1521  }
1522
1523  private String getStoragePolicyName(FileSystem fs, Path path) {
1524    try {
1525      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1526      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1527    } catch (Exception e) {
1528      // Maybe fail because of using old HDFS version, try the old way
1529      if (LOG.isTraceEnabled()) {
1530        LOG.trace("Failed to get policy directly", e);
1531      }
1532      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1533      return policy == null ? "HOT" : policy;// HOT by default
1534    }
1535  }
1536
1537  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1538    try {
1539      if (fs instanceof DistributedFileSystem) {
1540        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1541        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1542        if (null != status) {
1543          byte storagePolicyId = status.getStoragePolicy();
1544          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1545          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1546            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1547            for (BlockStoragePolicy policy : policies) {
1548              if (policy.getId() == storagePolicyId) {
1549                return policy.getName();
1550              }
1551            }
1552          }
1553        }
1554      }
1555    } catch (Throwable e) {
1556      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1557    }
1558
1559    return null;
1560  }
1561
1562  @Test
1563  public void TestConfigurePartitioner() throws IOException {
1564    Configuration conf = util.getConfiguration();
1565    // Create a user who is not the current user
1566    String fooUserName = "foo1234";
1567    String fooGroupName = "group1";
1568    UserGroupInformation
1569        ugi = UserGroupInformation.createUserForTesting(fooUserName, new String[]{fooGroupName});
1570    // Get user's home directory
1571    Path fooHomeDirectory = ugi.doAs(new PrivilegedAction<Path>() {
1572      @Override public Path run() {
1573        try (FileSystem fs = FileSystem.get(conf)) {
1574          return fs.makeQualified(fs.getHomeDirectory());
1575        } catch (IOException ioe) {
1576          LOG.error("Failed to get foo's home directory", ioe);
1577        }
1578        return null;
1579      }
1580    });
1581
1582    Job job = Mockito.mock(Job.class);
1583    Mockito.doReturn(conf).when(job).getConfiguration();
1584    ImmutableBytesWritable writable = new ImmutableBytesWritable();
1585    List<ImmutableBytesWritable> splitPoints = new LinkedList<ImmutableBytesWritable>();
1586    splitPoints.add(writable);
1587
1588    ugi.doAs(new PrivilegedAction<Void>() {
1589      @Override public Void run() {
1590        try {
1591          HFileOutputFormat2.configurePartitioner(job, splitPoints, false);
1592        } catch (IOException ioe) {
1593          LOG.error("Failed to configure partitioner", ioe);
1594        }
1595        return null;
1596      }
1597    });
1598    FileSystem fs = FileSystem.get(conf);
1599    // verify that the job uses TotalOrderPartitioner
1600    verify(job).setPartitionerClass(TotalOrderPartitioner.class);
1601    // verify that TotalOrderPartitioner.setPartitionFile() is called.
1602    String partitionPathString = conf.get("mapreduce.totalorderpartitioner.path");
1603    Assert.assertNotNull(partitionPathString);
1604    // Make sure the partion file is in foo1234's home directory, and that
1605    // the file exists.
1606    Assert.assertTrue(partitionPathString.startsWith(fooHomeDirectory.toString()));
1607    Assert.assertTrue(fs.exists(new Path(partitionPathString)));
1608  }
1609
1610  @Test
1611  public void TestConfigureCompression() throws Exception {
1612    Configuration conf = new Configuration(this.util.getConfiguration());
1613    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1614    TaskAttemptContext context = null;
1615    Path dir = util.getDataTestDir("TestConfigureCompression");
1616    String hfileoutputformatCompression = "gz";
1617
1618    try {
1619      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1620      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1621
1622      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1623
1624      Job job = Job.getInstance(conf);
1625      FileOutputFormat.setOutputPath(job, dir);
1626      context = createTestTaskAttemptContext(job);
1627      HFileOutputFormat2 hof = new HFileOutputFormat2();
1628      writer = hof.getRecordWriter(context);
1629      final byte[] b = Bytes.toBytes("b");
1630
1631      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1632      writer.write(new ImmutableBytesWritable(), kv);
1633      writer.close(context);
1634      writer = null;
1635      FileSystem fs = dir.getFileSystem(conf);
1636      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1637      while (iterator.hasNext()) {
1638        LocatedFileStatus keyFileStatus = iterator.next();
1639        HFile.Reader reader =
1640            HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1641        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
1642            hfileoutputformatCompression);
1643      }
1644    } finally {
1645      if (writer != null && context != null) {
1646        writer.close(context);
1647      }
1648      dir.getFileSystem(conf).delete(dir, true);
1649    }
1650
1651  }
1652
1653  @Test
1654  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
1655    // Start cluster A
1656    util = new HBaseTestingUtil();
1657    Configuration confA = util.getConfiguration();
1658    int hostCount = 3;
1659    int regionNum = 20;
1660    String[] hostnames = new String[hostCount];
1661    for (int i = 0; i < hostCount; ++i) {
1662      hostnames[i] = "datanode_" + i;
1663    }
1664    StartTestingClusterOption option = StartTestingClusterOption.builder()
1665      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
1666    util.startMiniCluster(option);
1667
1668    // Start cluster B
1669    HBaseTestingUtil utilB = new HBaseTestingUtil();
1670    Configuration confB = utilB.getConfiguration();
1671    utilB.startMiniCluster(option);
1672
1673    Path testDir = util.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
1674
1675    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
1676    TableName tableName = TableName.valueOf("table");
1677    // Create table in cluster B
1678    try (Table table = utilB.createTable(tableName, FAMILIES, splitKeys);
1679      RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
1680      // Generate the bulk load files
1681      // Job has zookeeper configuration for cluster A
1682      // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
1683      Job job = new Job(confA, "testLocalMRIncrementalLoad");
1684      Configuration jobConf = job.getConfiguration();
1685      final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
1686      job.setWorkingDirectory(util.getDataTestDirOnTestFS("runIncrementalPELoad"));
1687      setupRandomGeneratorMapper(job, false);
1688      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
1689
1690      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1691        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
1692      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1693        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
1694      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1695        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
1696
1697      String bSpecificConfigKey = "my.override.config.for.b";
1698      String bSpecificConfigValue = "b-specific-value";
1699      jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey,
1700        bSpecificConfigValue);
1701
1702      FileOutputFormat.setOutputPath(job, testDir);
1703
1704      assertFalse(util.getTestFileSystem().exists(testDir));
1705
1706      assertTrue(job.waitForCompletion(true));
1707
1708      final List<Configuration> configs =
1709        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
1710
1711      assertFalse(configs.isEmpty());
1712      for (Configuration config : configs) {
1713        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1714          config.get(HConstants.ZOOKEEPER_QUORUM));
1715        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1716          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
1717        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1718          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
1719
1720        assertEquals(bSpecificConfigValue,
1721          config.get(bSpecificConfigKey));
1722      }
1723    } finally {
1724      utilB.deleteTable(tableName);
1725      testDir.getFileSystem(confA).delete(testDir, true);
1726      util.shutdownMiniCluster();
1727      utilB.shutdownMiniCluster();
1728    }
1729  }
1730
1731  private static class ConfigurationCaptorConnection implements Connection {
1732    private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
1733
1734    private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
1735
1736    private final Connection delegate;
1737
1738    public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user)
1739      throws IOException {
1740      delegate = FutureUtils.get(createAsyncConnection(conf, user)).toConnection();
1741
1742      final String uuid = conf.get(UUID_KEY);
1743      if (uuid != null) {
1744        confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
1745      }
1746    }
1747
1748    static UUID configureConnectionImpl(Configuration conf) {
1749      conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
1750        ConfigurationCaptorConnection.class, Connection.class);
1751
1752      final UUID uuid = UUID.randomUUID();
1753      conf.set(UUID_KEY, uuid.toString());
1754      return uuid;
1755    }
1756
1757    static List<Configuration> getCapturedConfigarutions(UUID key) {
1758      return confs.get(key);
1759    }
1760
1761    @Override
1762    public Configuration getConfiguration() {
1763      return delegate.getConfiguration();
1764    }
1765
1766    @Override
1767    public Table getTable(TableName tableName) throws IOException {
1768      return delegate.getTable(tableName);
1769    }
1770
1771    @Override
1772    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
1773      return delegate.getTable(tableName, pool);
1774    }
1775
1776    @Override
1777    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
1778      return delegate.getBufferedMutator(tableName);
1779    }
1780
1781    @Override
1782    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
1783      return delegate.getBufferedMutator(params);
1784    }
1785
1786    @Override
1787    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
1788      return delegate.getRegionLocator(tableName);
1789    }
1790
1791    @Override
1792    public void clearRegionLocationCache() {
1793      delegate.clearRegionLocationCache();
1794    }
1795
1796    @Override
1797    public Admin getAdmin() throws IOException {
1798      return delegate.getAdmin();
1799    }
1800
1801    @Override
1802    public void close() throws IOException {
1803      delegate.close();
1804    }
1805
1806    @Override
1807    public boolean isClosed() {
1808      return delegate.isClosed();
1809    }
1810
1811    @Override
1812    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
1813      return delegate.getTableBuilder(tableName, pool);
1814    }
1815
1816    @Override
1817    public AsyncConnection toAsyncConnection() {
1818      return delegate.toAsyncConnection();
1819    }
1820
1821    @Override
1822    public String getClusterId() {
1823      return delegate.getClusterId();
1824    }
1825
1826    @Override
1827    public Hbck getHbck()
1828      throws IOException {
1829      return delegate.getHbck();
1830    }
1831
1832    @Override
1833    public Hbck getHbck(ServerName masterServer) throws IOException {
1834      return delegate.getHbck(masterServer);
1835    }
1836
1837    @Override
1838    public void abort(String why, Throwable e) {
1839      delegate.abort(why, e);
1840    }
1841
1842    @Override
1843    public boolean isAborted() {
1844      return delegate.isAborted();
1845    }
1846  }
1847
1848}
1849