001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022import static org.junit.jupiter.api.Assertions.assertFalse;
023import static org.junit.jupiter.api.Assertions.assertNotNull;
024import static org.junit.jupiter.api.Assertions.assertNotSame;
025import static org.junit.jupiter.api.Assertions.assertTrue;
026import static org.junit.jupiter.api.Assertions.fail;
027
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Set;
037import java.util.UUID;
038import java.util.concurrent.Callable;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.CopyOnWriteArrayList;
041import java.util.concurrent.ExecutorService;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.LocatedFileStatus;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.fs.RemoteIterator;
048import org.apache.hadoop.hbase.ArrayBackedTag;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellUtil;
051import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
052import org.apache.hadoop.hbase.ExtendedCell;
053import org.apache.hadoop.hbase.HBaseConfiguration;
054import org.apache.hadoop.hbase.HBaseTestingUtil;
055import org.apache.hadoop.hbase.HConstants;
056import org.apache.hadoop.hbase.HadoopShims;
057import org.apache.hadoop.hbase.KeyValue;
058import org.apache.hadoop.hbase.PrivateCellUtil;
059import org.apache.hadoop.hbase.ServerName;
060import org.apache.hadoop.hbase.StartTestingClusterOption;
061import org.apache.hadoop.hbase.TableName;
062import org.apache.hadoop.hbase.Tag;
063import org.apache.hadoop.hbase.TagType;
064import org.apache.hadoop.hbase.client.Admin;
065import org.apache.hadoop.hbase.client.AsyncConnection;
066import org.apache.hadoop.hbase.client.BufferedMutator;
067import org.apache.hadoop.hbase.client.BufferedMutatorParams;
068import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
070import org.apache.hadoop.hbase.client.Connection;
071import org.apache.hadoop.hbase.client.ConnectionFactory;
072import org.apache.hadoop.hbase.client.ConnectionRegistry;
073import org.apache.hadoop.hbase.client.ConnectionUtils;
074import org.apache.hadoop.hbase.client.Hbck;
075import org.apache.hadoop.hbase.client.Put;
076import org.apache.hadoop.hbase.client.RegionLocator;
077import org.apache.hadoop.hbase.client.Table;
078import org.apache.hadoop.hbase.client.TableBuilder;
079import org.apache.hadoop.hbase.client.TableDescriptor;
080import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
081import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
082import org.apache.hadoop.hbase.io.compress.Compression;
083import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
084import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
085import org.apache.hadoop.hbase.io.hfile.CacheConfig;
086import org.apache.hadoop.hbase.io.hfile.HFile;
087import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
088import org.apache.hadoop.hbase.io.hfile.HFileScanner;
089import org.apache.hadoop.hbase.regionserver.BloomType;
090import org.apache.hadoop.hbase.regionserver.HRegion;
091import org.apache.hadoop.hbase.regionserver.HStore;
092import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
093import org.apache.hadoop.hbase.security.User;
094import org.apache.hadoop.hbase.testclassification.LargeTests;
095import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
096import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
097import org.apache.hadoop.hbase.util.Bytes;
098import org.apache.hadoop.hbase.util.CommonFSUtils;
099import org.apache.hadoop.hbase.util.FSUtils;
100import org.apache.hadoop.hbase.util.FutureUtils;
101import org.apache.hadoop.hbase.util.ReflectionUtils;
102import org.apache.hadoop.hdfs.DistributedFileSystem;
103import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
104import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
105import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
106import org.apache.hadoop.mapreduce.Job;
107import org.apache.hadoop.mapreduce.RecordWriter;
108import org.apache.hadoop.mapreduce.TaskAttemptContext;
109import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
110import org.junit.jupiter.api.Disabled;
111import org.junit.jupiter.api.Test;
112import org.mockito.Mockito;
113import org.slf4j.Logger;
114import org.slf4j.LoggerFactory;
115
116/**
117 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
118 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
119 * values.
120 */
121@org.junit.jupiter.api.Tag(VerySlowMapReduceTests.TAG)
122@org.junit.jupiter.api.Tag(LargeTests.TAG)
123public class TestHFileOutputFormat2 extends HFileOutputFormat2TestBase {
124
125  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
126
127  /**
128   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
129   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
130   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
131   */
132  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
133  @Test
134  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
135    Configuration conf = new Configuration(this.UTIL.getConfiguration());
136    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
137    TaskAttemptContext context = null;
138    Path dir = UTIL.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
139    try {
140      Job job = new Job(conf);
141      FileOutputFormat.setOutputPath(job, dir);
142      context = createTestTaskAttemptContext(job);
143      HFileOutputFormat2 hof = new HFileOutputFormat2();
144      writer = hof.getRecordWriter(context);
145      final byte[] b = Bytes.toBytes("b");
146
147      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
148      // changed by call to write. Check all in kv is same but ts.
149      KeyValue kv = new KeyValue(b, b, b);
150      KeyValue original = kv.clone();
151      writer.write(new ImmutableBytesWritable(), kv);
152      assertFalse(original.equals(kv));
153      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
154      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
155      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
156      assertNotSame(original.getTimestamp(), kv.getTimestamp());
157      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
158
159      // Test 2. Now test passing a kv that has explicit ts. It should not be
160      // changed by call to record write.
161      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
162      original = kv.clone();
163      writer.write(new ImmutableBytesWritable(), kv);
164      assertTrue(original.equals(kv));
165    } finally {
166      if (writer != null && context != null) writer.close(context);
167      dir.getFileSystem(conf).delete(dir, true);
168    }
169  }
170
171  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
172    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
173    TaskAttemptContext context =
174      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
175    return context;
176  }
177
178  /*
179   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
180   * time-restricted scans.
181   */
182  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
183  @Test
184  public void test_TIMERANGE() throws Exception {
185    Configuration conf = new Configuration(this.UTIL.getConfiguration());
186    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
187    TaskAttemptContext context = null;
188    Path dir = UTIL.getDataTestDir("test_TIMERANGE_present");
189    LOG.info("Timerange dir writing to dir: " + dir);
190    try {
191      // build a record writer using HFileOutputFormat2
192      Job job = new Job(conf);
193      FileOutputFormat.setOutputPath(job, dir);
194      context = createTestTaskAttemptContext(job);
195      HFileOutputFormat2 hof = new HFileOutputFormat2();
196      writer = hof.getRecordWriter(context);
197
198      // Pass two key values with explicit times stamps
199      final byte[] b = Bytes.toBytes("b");
200
201      // value 1 with timestamp 2000
202      KeyValue kv = new KeyValue(b, b, b, 2000, b);
203      KeyValue original = kv.clone();
204      writer.write(new ImmutableBytesWritable(), kv);
205      assertEquals(original, kv);
206
207      // value 2 with timestamp 1000
208      kv = new KeyValue(b, b, b, 1000, b);
209      original = kv.clone();
210      writer.write(new ImmutableBytesWritable(), kv);
211      assertEquals(original, kv);
212
213      // verify that the file has the proper FileInfo.
214      writer.close(context);
215
216      // the generated file lives 1 directory down from the attempt directory
217      // and is the only file, e.g.
218      // _attempt__0000_r_000000_0/b/1979617994050536795
219      FileSystem fs = FileSystem.get(conf);
220      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
221      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
222      FileStatus[] file = fs.listStatus(sub1[0].getPath());
223
224      // open as HFile Reader and pull out TIMERANGE FileInfo.
225      HFile.Reader rd =
226        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
227      Map<byte[], byte[]> finfo = rd.getHFileInfo();
228      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
229      assertNotNull(range);
230
231      // unmarshall and check values.
232      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
233      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
234      assertEquals(1000, timeRangeTracker.getMin());
235      assertEquals(2000, timeRangeTracker.getMax());
236      rd.close();
237    } finally {
238      if (writer != null && context != null) writer.close(context);
239      dir.getFileSystem(conf).delete(dir, true);
240    }
241  }
242
243  /**
244   * Run small MR job.
245   */
246  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
247  @Test
248  public void testWritingPEData() throws Exception {
249    Configuration conf = UTIL.getConfiguration();
250    Path testDir = UTIL.getDataTestDirOnTestFS("testWritingPEData");
251    FileSystem fs = testDir.getFileSystem(conf);
252
253    // Set down this value or we OOME in eclipse.
254    conf.setInt("mapreduce.task.io.sort.mb", 20);
255    // Write a few files.
256    long hregionMaxFilesize = 10 * 1024;
257    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
258
259    Job job = new Job(conf, "testWritingPEData");
260    setupRandomGeneratorMapper(job, false);
261    // This partitioner doesn't work well for number keys but using it anyways
262    // just to demonstrate how to configure it.
263    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
264    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
265
266    Arrays.fill(startKey, (byte) 0);
267    Arrays.fill(endKey, (byte) 0xff);
268
269    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
270    // Set start and end rows for partitioner.
271    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
272    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
273    job.setReducerClass(CellSortReducer.class);
274    job.setOutputFormatClass(HFileOutputFormat2.class);
275    job.setNumReduceTasks(4);
276    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
277      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
278      CellSerialization.class.getName());
279
280    FileOutputFormat.setOutputPath(job, testDir);
281    assertTrue(job.waitForCompletion(false));
282    FileStatus[] files = fs.listStatus(testDir);
283    assertTrue(files.length > 0);
284
285    // check output file num and size.
286    for (byte[] family : FAMILIES) {
287      long kvCount = 0;
288      RemoteIterator<LocatedFileStatus> iterator =
289        fs.listFiles(testDir.suffix("/" + new String(family)), true);
290      while (iterator.hasNext()) {
291        LocatedFileStatus keyFileStatus = iterator.next();
292        HFile.Reader reader =
293          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
294        HFileScanner scanner = reader.getScanner(conf, false, false, false);
295
296        kvCount += reader.getEntries();
297        scanner.seekTo();
298        long perKVSize = scanner.getCell().getSerializedSize();
299        assertTrue(perKVSize * reader.getEntries() <= hregionMaxFilesize,
300          "Data size of each file should not be too large.");
301      }
302      assertEquals(ROWSPERSPLIT, kvCount, "Should write expected data in output file.");
303    }
304  }
305
306  /**
307   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
308   */
309  @Test
310  public void test_WritingTagData() throws Exception {
311    Configuration conf = new Configuration(this.UTIL.getConfiguration());
312    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
313    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
314    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
315    TaskAttemptContext context = null;
316    Path dir = UTIL.getDataTestDir("WritingTagData");
317    try {
318      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
319      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
320      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
321      Job job = new Job(conf);
322      FileOutputFormat.setOutputPath(job, dir);
323      context = createTestTaskAttemptContext(job);
324      HFileOutputFormat2 hof = new HFileOutputFormat2();
325      writer = hof.getRecordWriter(context);
326      final byte[] b = Bytes.toBytes("b");
327
328      List<Tag> tags = new ArrayList<>();
329      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
330      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
331      writer.write(new ImmutableBytesWritable(), kv);
332      writer.close(context);
333      writer = null;
334      FileSystem fs = dir.getFileSystem(conf);
335      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
336      while (iterator.hasNext()) {
337        LocatedFileStatus keyFileStatus = iterator.next();
338        HFile.Reader reader =
339          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
340        HFileScanner scanner = reader.getScanner(conf, false, false, false);
341        scanner.seekTo();
342        ExtendedCell cell = scanner.getCell();
343        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
344        assertTrue(tagsFromCell.size() > 0);
345        for (Tag tag : tagsFromCell) {
346          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
347        }
348      }
349    } finally {
350      if (writer != null && context != null) writer.close(context);
351      dir.getFileSystem(conf).delete(dir, true);
352    }
353  }
354
355  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
356  @Test
357  public void testJobConfiguration() throws Exception {
358    Configuration conf = new Configuration(this.UTIL.getConfiguration());
359    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
360      UTIL.getDataTestDir("testJobConfiguration").toString());
361    Job job = new Job(conf);
362    job.setWorkingDirectory(UTIL.getDataTestDir("testJobConfiguration"));
363    Table table = Mockito.mock(Table.class);
364    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
365    setupMockStartKeys(regionLocator);
366    setupMockTableName(regionLocator);
367    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
368    assertEquals(4, job.getNumReduceTasks());
369  }
370
371  /**
372   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
373   * family compression map is correctly serialized into and deserialized from configuration
374   */
375  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
376  @Test
377  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
378    for (int numCfs = 0; numCfs <= 3; numCfs++) {
379      Configuration conf = new Configuration(this.UTIL.getConfiguration());
380      Map<String, Compression.Algorithm> familyToCompression =
381        getMockColumnFamiliesForCompression(numCfs);
382      Table table = Mockito.mock(Table.class);
383      setupMockColumnFamiliesForCompression(table, familyToCompression);
384      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
385        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
386          Arrays.asList(table.getDescriptor())));
387
388      // read back family specific compression setting from the configuration
389      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
390        HFileOutputFormat2.createFamilyCompressionMap(conf);
391
392      // test that we have a value for all column families that matches with the
393      // used mock values
394      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
395        assertEquals(entry.getValue(),
396          retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())),
397          "Compression configuration incorrect for column family:" + entry.getKey());
398      }
399    }
400  }
401
402  private void setupMockColumnFamiliesForCompression(Table table,
403    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
404
405    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
406    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
407      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
408        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
409        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
410
411      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
412    }
413    Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor();
414  }
415
416  /**
417   * @return a map from column family names to compression algorithms for testing column family
418   *         compression. Column family names have special characters
419   */
420  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
421    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
422    // use column family names having special characters
423    if (numCfs-- > 0) {
424      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
425    }
426    if (numCfs-- > 0) {
427      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
428    }
429    if (numCfs-- > 0) {
430      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
431    }
432    if (numCfs-- > 0) {
433      familyToCompression.put("Family3", Compression.Algorithm.NONE);
434    }
435    return familyToCompression;
436  }
437
438  /**
439   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the
440   * family bloom type map is correctly serialized into and deserialized from configuration
441   */
442  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
443  @Test
444  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
445    for (int numCfs = 0; numCfs <= 2; numCfs++) {
446      Configuration conf = new Configuration(this.UTIL.getConfiguration());
447      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
448      Table table = Mockito.mock(Table.class);
449      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
450      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
451        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
452          Arrays.asList(table.getDescriptor())));
453
454      // read back family specific data block encoding settings from the
455      // configuration
456      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
457        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
458
459      // test that we have a value for all column families that matches with the
460      // used mock values
461      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
462        assertEquals(entry.getValue(),
463          retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())),
464          "BloomType configuration incorrect for column family:" + entry.getKey());
465      }
466    }
467  }
468
469  private void setupMockColumnFamiliesForBloomType(Table table,
470    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
471    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
472    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
473      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
474        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
475        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
476      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
477    }
478    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
479  }
480
481  /**
482   * @return a map from column family names to compression algorithms for testing column family
483   *         compression. Column family names have special characters
484   */
485  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
486    Map<String, BloomType> familyToBloomType = new HashMap<>();
487    // use column family names having special characters
488    if (numCfs-- > 0) {
489      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
490    }
491    if (numCfs-- > 0) {
492      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
493    }
494    if (numCfs-- > 0) {
495      familyToBloomType.put("Family3", BloomType.NONE);
496    }
497    return familyToBloomType;
498  }
499
500  /**
501   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the
502   * family block size map is correctly serialized into and deserialized from configuration
503   */
504  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
505  @Test
506  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
507    for (int numCfs = 0; numCfs <= 3; numCfs++) {
508      Configuration conf = new Configuration(this.UTIL.getConfiguration());
509      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
510      Table table = Mockito.mock(Table.class);
511      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
512      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
513        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
514          Arrays.asList(table.getDescriptor())));
515
516      // read back family specific data block encoding settings from the
517      // configuration
518      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
519        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
520
521      // test that we have a value for all column families that matches with the
522      // used mock values
523      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
524        assertEquals(entry.getValue(),
525          retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())),
526          "BlockSize configuration incorrect for column family:" + entry.getKey());
527      }
528    }
529  }
530
531  private void setupMockColumnFamiliesForBlockSize(Table table,
532    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
533    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
534    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
535      ColumnFamilyDescriptor columnFamilyDescriptor =
536        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
537          .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
538      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
539    }
540    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
541  }
542
543  /**
544   * @return a map from column family names to compression algorithms for testing column family
545   *         compression. Column family names have special characters
546   */
547  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
548    Map<String, Integer> familyToBlockSize = new HashMap<>();
549    // use column family names having special characters
550    if (numCfs-- > 0) {
551      familyToBlockSize.put("Family1!@#!@#&", 1234);
552    }
553    if (numCfs-- > 0) {
554      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
555    }
556    if (numCfs-- > 0) {
557      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
558    }
559    if (numCfs-- > 0) {
560      familyToBlockSize.put("Family3", 0);
561    }
562    return familyToBlockSize;
563  }
564
565  /**
566   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that
567   * the family data block encoding map is correctly serialized into and deserialized from
568   * configuration
569   */
570  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
571  @Test
572  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
573    for (int numCfs = 0; numCfs <= 3; numCfs++) {
574      Configuration conf = new Configuration(this.UTIL.getConfiguration());
575      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
576        getMockColumnFamiliesForDataBlockEncoding(numCfs);
577      Table table = Mockito.mock(Table.class);
578      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
579      TableDescriptor tableDescriptor = table.getDescriptor();
580      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
581        HFileOutputFormat2.serializeColumnFamilyAttribute(
582          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
583
584      // read back family specific data block encoding settings from the
585      // configuration
586      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
587        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
588
589      // test that we have a value for all column families that matches with the
590      // used mock values
591      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
592        assertEquals(entry.getValue(),
593          retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())),
594          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey());
595      }
596    }
597  }
598
599  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
600    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
601    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
602    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
603      ColumnFamilyDescriptor columnFamilyDescriptor =
604        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
605          .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)
606          .build();
607      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
608    }
609    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
610  }
611
612  /**
613   * @return a map from column family names to compression algorithms for testing column family
614   *         compression. Column family names have special characters
615   */
616  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
617    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
618    // use column family names having special characters
619    if (numCfs-- > 0) {
620      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
621    }
622    if (numCfs-- > 0) {
623      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
624    }
625    if (numCfs-- > 0) {
626      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
627    }
628    if (numCfs-- > 0) {
629      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
630    }
631    return familyToDataBlockEncoding;
632  }
633
634  private void setupMockStartKeys(RegionLocator table) throws IOException {
635    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
636      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
637    Mockito.doReturn(mockKeys).when(table).getStartKeys();
638  }
639
640  private void setupMockTableName(RegionLocator table) throws IOException {
641    TableName mockTableName = TableName.valueOf("mock_table");
642    Mockito.doReturn(mockTableName).when(table).getName();
643  }
644
645  /**
646   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
647   * from the column family descriptor
648   */
649  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
650  @Test
651  public void testColumnFamilySettings() throws Exception {
652    Configuration conf = new Configuration(this.UTIL.getConfiguration());
653    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
654    TaskAttemptContext context = null;
655    Path dir = UTIL.getDataTestDir("testColumnFamilySettings");
656
657    // Setup table descriptor
658    Table table = Mockito.mock(Table.class);
659    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
660    TableDescriptorBuilder tableDescriptorBuilder =
661      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
662
663    Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor();
664    for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) {
665      tableDescriptorBuilder.setColumnFamily(hcd);
666    }
667
668    // set up the table to return some mock keys
669    setupMockStartKeys(regionLocator);
670
671    try {
672      // partial map red setup to get an operational writer for testing
673      // We turn off the sequence file compression, because DefaultCodec
674      // pollutes the GZip codec pool with an incompatible compressor.
675      conf.set("io.seqfile.compression.type", "NONE");
676      conf.set("hbase.fs.tmp.dir", dir.toString());
677      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
678      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
679
680      Job job = new Job(conf, "testLocalMRIncrementalLoad");
681      job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("testColumnFamilySettings"));
682      setupRandomGeneratorMapper(job, false);
683      HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
684      FileOutputFormat.setOutputPath(job, dir);
685      context = createTestTaskAttemptContext(job);
686      HFileOutputFormat2 hof = new HFileOutputFormat2();
687      writer = hof.getRecordWriter(context);
688
689      // write out random rows
690      writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(),
691        ROWSPERSPLIT);
692      writer.close(context);
693
694      // Make sure that a directory was created for every CF
695      FileSystem fs = dir.getFileSystem(conf);
696
697      // commit so that the filesystem has one directory per column family
698      hof.getOutputCommitter(context).commitTask(context);
699      hof.getOutputCommitter(context).commitJob(context);
700      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
701      assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length);
702      for (FileStatus f : families) {
703        String familyStr = f.getPath().getName();
704        ColumnFamilyDescriptor hcd =
705          tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr));
706        // verify that the compression on this file matches the configured
707        // compression
708        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
709        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
710        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
711
712        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
713        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
714        assertEquals(hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)),
715          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader
716            + ")");
717        assertEquals(hcd.getCompressionType(), reader.getFileContext().getCompression(),
718          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")");
719      }
720    } finally {
721      dir.getFileSystem(conf).delete(dir, true);
722    }
723  }
724
725  /**
726   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
727   * family descriptors
728   */
729  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
730    TaskAttemptContext context, Set<byte[]> families, int numRows)
731    throws IOException, InterruptedException {
732    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
733    int valLength = 10;
734    byte valBytes[] = new byte[valLength];
735
736    int taskId = context.getTaskAttemptID().getTaskID().getId();
737    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
738    final byte[] qualifier = Bytes.toBytes("data");
739    for (int i = 0; i < numRows; i++) {
740      Bytes.putInt(keyBytes, 0, i);
741      Bytes.random(valBytes);
742      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
743      for (byte[] family : families) {
744        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
745        writer.write(key, kv);
746      }
747    }
748  }
749
750  /**
751   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
752   * excluded from minor compaction. Without the fix of HBASE-6901, an
753   * ArrayIndexOutOfBoundsException will be thrown.
754   */
755  @Disabled("Flakey: See HBASE-9051")
756  @Test
757  public void testExcludeAllFromMinorCompaction() throws Exception {
758    Configuration conf = UTIL.getConfiguration();
759    conf.setInt("hbase.hstore.compaction.min", 2);
760    generateRandomStartKeys(5);
761
762    UTIL.startMiniCluster();
763    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
764      Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES);
765      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
766      final FileSystem fs = UTIL.getDFSCluster().getFileSystem();
767      assertEquals(0, UTIL.countRows(table), "Should start with empty table");
768
769      // deep inspection: get the StoreFile dir
770      final Path storePath =
771        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
772          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
773            Bytes.toString(FAMILIES[0])));
774      assertEquals(0, fs.listStatus(storePath).length);
775
776      // Generate two bulk load files
777      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
778
779      for (int i = 0; i < 2; i++) {
780        Path testDir = UTIL.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
781        runIncrementalPELoad(conf,
782          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(),
783            conn.getRegionLocator(TABLE_NAMES[0]))),
784          testDir, false);
785        // Perform the actual load
786        BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
787      }
788
789      // Ensure data shows up
790      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
791      assertEquals(expectedRows, UTIL.countRows(table),
792        "BulkLoadHFiles should put expected data in table");
793
794      // should have a second StoreFile now
795      assertEquals(2, fs.listStatus(storePath).length);
796
797      // minor compactions shouldn't get rid of the file
798      admin.compact(TABLE_NAMES[0]);
799      try {
800        quickPoll(new Callable<Boolean>() {
801          @Override
802          public Boolean call() throws Exception {
803            List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
804            for (HRegion region : regions) {
805              for (HStore store : region.getStores()) {
806                store.closeAndArchiveCompactedFiles();
807              }
808            }
809            return fs.listStatus(storePath).length == 1;
810          }
811        }, 5000);
812        throw new IOException("SF# = " + fs.listStatus(storePath).length);
813      } catch (AssertionError ae) {
814        // this is expected behavior
815      }
816
817      // a major compaction should work though
818      admin.majorCompact(TABLE_NAMES[0]);
819      quickPoll(new Callable<Boolean>() {
820        @Override
821        public Boolean call() throws Exception {
822          List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
823          for (HRegion region : regions) {
824            for (HStore store : region.getStores()) {
825              store.closeAndArchiveCompactedFiles();
826            }
827          }
828          return fs.listStatus(storePath).length == 1;
829        }
830      }, 5000);
831
832    } finally {
833      UTIL.shutdownMiniCluster();
834    }
835  }
836
837  @Disabled("Goes zombie too frequently; needs work. See HBASE-14563")
838  @Test
839  public void testExcludeMinorCompaction() throws Exception {
840    Configuration conf = UTIL.getConfiguration();
841    conf.setInt("hbase.hstore.compaction.min", 2);
842    generateRandomStartKeys(5);
843
844    UTIL.startMiniCluster();
845    try (Connection conn = ConnectionFactory.createConnection(conf);
846      Admin admin = conn.getAdmin()) {
847      Path testDir = UTIL.getDataTestDirOnTestFS("testExcludeMinorCompaction");
848      final FileSystem fs = UTIL.getDFSCluster().getFileSystem();
849      Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES);
850      assertEquals(0, UTIL.countRows(table), "Should start with empty table");
851
852      // deep inspection: get the StoreFile dir
853      final Path storePath =
854        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
855          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
856            Bytes.toString(FAMILIES[0])));
857      assertEquals(0, fs.listStatus(storePath).length);
858
859      // put some data in it and flush to create a storefile
860      Put p = new Put(Bytes.toBytes("test"));
861      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
862      table.put(p);
863      admin.flush(TABLE_NAMES[0]);
864      assertEquals(1, UTIL.countRows(table));
865      quickPoll(new Callable<Boolean>() {
866        @Override
867        public Boolean call() throws Exception {
868          return fs.listStatus(storePath).length == 1;
869        }
870      }, 5000);
871
872      // Generate a bulk load file with more rows
873      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
874
875      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
876      runIncrementalPELoad(conf,
877        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)),
878        testDir, false);
879
880      // Perform the actual load
881      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
882
883      // Ensure data shows up
884      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
885      assertEquals(expectedRows + 1, UTIL.countRows(table),
886        "BulkLoadHFiles should put expected data in table");
887
888      // should have a second StoreFile now
889      assertEquals(2, fs.listStatus(storePath).length);
890
891      // minor compactions shouldn't get rid of the file
892      admin.compact(TABLE_NAMES[0]);
893      try {
894        quickPoll(new Callable<Boolean>() {
895          @Override
896          public Boolean call() throws Exception {
897            return fs.listStatus(storePath).length == 1;
898          }
899        }, 5000);
900        throw new IOException("SF# = " + fs.listStatus(storePath).length);
901      } catch (AssertionError ae) {
902        // this is expected behavior
903      }
904
905      // a major compaction should work though
906      admin.majorCompact(TABLE_NAMES[0]);
907      quickPoll(new Callable<Boolean>() {
908        @Override
909        public Boolean call() throws Exception {
910          return fs.listStatus(storePath).length == 1;
911        }
912      }, 5000);
913
914    } finally {
915      UTIL.shutdownMiniCluster();
916    }
917  }
918
919  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
920    int sleepMs = 10;
921    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
922    while (retries-- > 0) {
923      if (c.call().booleanValue()) {
924        return;
925      }
926      Thread.sleep(sleepMs);
927    }
928    fail();
929  }
930
931  public static void main(String args[]) throws Exception {
932    new TestHFileOutputFormat2().manualTest(args);
933  }
934
935  public void manualTest(String args[]) throws Exception {
936    Configuration conf = HBaseConfiguration.create();
937    UTIL = new HBaseTestingUtil(conf);
938    if ("newtable".equals(args[0])) {
939      TableName tname = TableName.valueOf(args[1]);
940      byte[][] splitKeys = generateRandomSplitKeys(4);
941      Table table = UTIL.createTable(tname, FAMILIES, splitKeys);
942    } else if ("incremental".equals(args[0])) {
943      TableName tname = TableName.valueOf(args[1]);
944      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
945        RegionLocator regionLocator = c.getRegionLocator(tname)) {
946        Path outDir = new Path("incremental-out");
947        runIncrementalPELoad(conf,
948          Arrays
949            .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)),
950          outDir, false);
951      }
952    } else {
953      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
954    }
955  }
956
957  @Test
958  public void testBlockStoragePolicy() throws Exception {
959    UTIL = new HBaseTestingUtil();
960    Configuration conf = UTIL.getConfiguration();
961    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
962
963    conf.set(
964      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
965        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
966      "ONE_SSD");
967    Path cf1Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[0]));
968    Path cf2Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[1]));
969    UTIL.startMiniDFSCluster(3);
970    FileSystem fs = UTIL.getDFSCluster().getFileSystem();
971    try {
972      fs.mkdirs(cf1Dir);
973      fs.mkdirs(cf2Dir);
974
975      // the original block storage policy would be HOT
976      String spA = getStoragePolicyName(fs, cf1Dir);
977      String spB = getStoragePolicyName(fs, cf2Dir);
978      LOG.debug("Storage policy of cf 0: [" + spA + "].");
979      LOG.debug("Storage policy of cf 1: [" + spB + "].");
980      assertEquals("HOT", spA);
981      assertEquals("HOT", spB);
982
983      // alter table cf schema to change storage policies
984      HFileOutputFormat2.configureStoragePolicy(conf, fs,
985        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
986      HFileOutputFormat2.configureStoragePolicy(conf, fs,
987        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
988      spA = getStoragePolicyName(fs, cf1Dir);
989      spB = getStoragePolicyName(fs, cf2Dir);
990      LOG.debug("Storage policy of cf 0: [" + spA + "].");
991      LOG.debug("Storage policy of cf 1: [" + spB + "].");
992      assertNotNull(spA);
993      assertEquals("ONE_SSD", spA);
994      assertNotNull(spB);
995      assertEquals("ALL_SSD", spB);
996    } finally {
997      fs.delete(cf1Dir, true);
998      fs.delete(cf2Dir, true);
999      UTIL.shutdownMiniDFSCluster();
1000    }
1001  }
1002
1003  private String getStoragePolicyName(FileSystem fs, Path path) {
1004    try {
1005      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1006      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1007    } catch (Exception e) {
1008      // Maybe fail because of using old HDFS version, try the old way
1009      if (LOG.isTraceEnabled()) {
1010        LOG.trace("Failed to get policy directly", e);
1011      }
1012      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1013      return policy == null ? "HOT" : policy;// HOT by default
1014    }
1015  }
1016
1017  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1018    try {
1019      if (fs instanceof DistributedFileSystem) {
1020        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1021        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1022        if (null != status) {
1023          byte storagePolicyId = status.getStoragePolicy();
1024          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1025          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1026            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1027            for (BlockStoragePolicy policy : policies) {
1028              if (policy.getId() == storagePolicyId) {
1029                return policy.getName();
1030              }
1031            }
1032          }
1033        }
1034      }
1035    } catch (Throwable e) {
1036      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1037    }
1038
1039    return null;
1040  }
1041
1042  @Test
1043  public void TestConfigureCompression() throws Exception {
1044    Configuration conf = new Configuration(this.UTIL.getConfiguration());
1045    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1046    TaskAttemptContext context = null;
1047    Path dir = UTIL.getDataTestDir("TestConfigureCompression");
1048    String hfileoutputformatCompression = "gz";
1049
1050    try {
1051      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1052      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1053
1054      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1055
1056      Job job = Job.getInstance(conf);
1057      FileOutputFormat.setOutputPath(job, dir);
1058      context = createTestTaskAttemptContext(job);
1059      HFileOutputFormat2 hof = new HFileOutputFormat2();
1060      writer = hof.getRecordWriter(context);
1061      final byte[] b = Bytes.toBytes("b");
1062
1063      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1064      writer.write(new ImmutableBytesWritable(), kv);
1065      writer.close(context);
1066      writer = null;
1067      FileSystem fs = dir.getFileSystem(conf);
1068      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1069      while (iterator.hasNext()) {
1070        LocatedFileStatus keyFileStatus = iterator.next();
1071        HFile.Reader reader =
1072          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1073        assertEquals(hfileoutputformatCompression,
1074          reader.getTrailer().getCompressionCodec().getName());
1075      }
1076    } finally {
1077      if (writer != null && context != null) {
1078        writer.close(context);
1079      }
1080      dir.getFileSystem(conf).delete(dir, true);
1081    }
1082
1083  }
1084
1085  @Test
1086  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
1087    // Start cluster A
1088    UTIL = new HBaseTestingUtil();
1089    Configuration confA = UTIL.getConfiguration();
1090    int hostCount = 3;
1091    int regionNum = 20;
1092    String[] hostnames = new String[hostCount];
1093    for (int i = 0; i < hostCount; ++i) {
1094      hostnames[i] = "datanode_" + i;
1095    }
1096    StartTestingClusterOption option = StartTestingClusterOption.builder()
1097      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
1098    UTIL.startMiniCluster(option);
1099
1100    // Start cluster B
1101    HBaseTestingUtil UTILB = new HBaseTestingUtil();
1102    Configuration confB = UTILB.getConfiguration();
1103    UTILB.startMiniCluster(option);
1104
1105    Path testDir = UTIL.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
1106
1107    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
1108    TableName tableName = TableName.valueOf("table");
1109    // Create table in cluster B
1110    try (Table table = UTILB.createTable(tableName, FAMILIES, splitKeys);
1111      RegionLocator r = UTILB.getConnection().getRegionLocator(tableName)) {
1112      // Generate the bulk load files
1113      // Job has zookeeper configuration for cluster A
1114      // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
1115      Job job = new Job(confA, "testLocalMRIncrementalLoad");
1116      Configuration jobConf = job.getConfiguration();
1117      final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
1118      job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("runIncrementalPELoad"));
1119      setupRandomGeneratorMapper(job, false);
1120      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
1121
1122      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1123        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
1124      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1125        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
1126      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1127        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
1128
1129      String bSpecificConfigKey = "my.override.config.for.b";
1130      String bSpecificConfigValue = "b-specific-value";
1131      jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey,
1132        bSpecificConfigValue);
1133
1134      FileOutputFormat.setOutputPath(job, testDir);
1135
1136      assertFalse(UTIL.getTestFileSystem().exists(testDir));
1137
1138      assertTrue(job.waitForCompletion(true));
1139
1140      final List<Configuration> configs =
1141        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
1142
1143      assertFalse(configs.isEmpty());
1144      for (Configuration config : configs) {
1145        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1146          config.get(HConstants.ZOOKEEPER_QUORUM));
1147        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1148          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
1149        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1150          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
1151
1152        assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey));
1153      }
1154    } finally {
1155      UTILB.deleteTable(tableName);
1156      testDir.getFileSystem(confA).delete(testDir, true);
1157      UTIL.shutdownMiniCluster();
1158      UTILB.shutdownMiniCluster();
1159    }
1160  }
1161
1162  private static class ConfigurationCaptorConnection implements Connection {
1163    private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
1164
1165    private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
1166
1167    private final Connection delegate;
1168
1169    public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user,
1170      ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
1171      // here we do not use this registry, so close it...
1172      registry.close();
1173      // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection
1174      // implementation in below method
1175      delegate =
1176        FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes))
1177          .toConnection();
1178
1179      final String uuid = conf.get(UUID_KEY);
1180      if (uuid != null) {
1181        confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
1182      }
1183    }
1184
1185    static UUID configureConnectionImpl(Configuration conf) {
1186      conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
1187        ConfigurationCaptorConnection.class, Connection.class);
1188
1189      final UUID uuid = UUID.randomUUID();
1190      conf.set(UUID_KEY, uuid.toString());
1191      return uuid;
1192    }
1193
1194    static List<Configuration> getCapturedConfigarutions(UUID key) {
1195      return confs.get(key);
1196    }
1197
1198    @Override
1199    public Configuration getConfiguration() {
1200      return delegate.getConfiguration();
1201    }
1202
1203    @Override
1204    public Table getTable(TableName tableName) throws IOException {
1205      return delegate.getTable(tableName);
1206    }
1207
1208    @Override
1209    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
1210      return delegate.getTable(tableName, pool);
1211    }
1212
1213    @Override
1214    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
1215      return delegate.getBufferedMutator(tableName);
1216    }
1217
1218    @Override
1219    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
1220      return delegate.getBufferedMutator(params);
1221    }
1222
1223    @Override
1224    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
1225      return delegate.getRegionLocator(tableName);
1226    }
1227
1228    @Override
1229    public void clearRegionLocationCache() {
1230      delegate.clearRegionLocationCache();
1231    }
1232
1233    @Override
1234    public Admin getAdmin() throws IOException {
1235      return delegate.getAdmin();
1236    }
1237
1238    @Override
1239    public void close() throws IOException {
1240      delegate.close();
1241    }
1242
1243    @Override
1244    public boolean isClosed() {
1245      return delegate.isClosed();
1246    }
1247
1248    @Override
1249    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
1250      return delegate.getTableBuilder(tableName, pool);
1251    }
1252
1253    @Override
1254    public AsyncConnection toAsyncConnection() {
1255      return delegate.toAsyncConnection();
1256    }
1257
1258    @Override
1259    public String getClusterId() {
1260      return delegate.getClusterId();
1261    }
1262
1263    @Override
1264    public Hbck getHbck() throws IOException {
1265      return delegate.getHbck();
1266    }
1267
1268    @Override
1269    public Hbck getHbck(ServerName masterServer) throws IOException {
1270      return delegate.getHbck(masterServer);
1271    }
1272
1273    @Override
1274    public void abort(String why, Throwable e) {
1275      delegate.abort(why, e);
1276    }
1277
1278    @Override
1279    public boolean isAborted() {
1280      return delegate.isAborted();
1281    }
1282  }
1283
1284}