001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.mapreduce;
019
020import static org.apache.hadoop.hbase.regionserver.HStoreFile.BLOOM_FILTER_TYPE_KEY;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertNotNull;
024import static org.junit.Assert.assertNotSame;
025import static org.junit.Assert.assertTrue;
026import static org.junit.Assert.fail;
027
028import java.io.IOException;
029import java.lang.reflect.Field;
030import java.util.ArrayList;
031import java.util.Arrays;
032import java.util.HashMap;
033import java.util.List;
034import java.util.Map;
035import java.util.Map.Entry;
036import java.util.Set;
037import java.util.UUID;
038import java.util.concurrent.Callable;
039import java.util.concurrent.ConcurrentHashMap;
040import java.util.concurrent.CopyOnWriteArrayList;
041import java.util.concurrent.ExecutorService;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.FileStatus;
044import org.apache.hadoop.fs.FileSystem;
045import org.apache.hadoop.fs.LocatedFileStatus;
046import org.apache.hadoop.fs.Path;
047import org.apache.hadoop.fs.RemoteIterator;
048import org.apache.hadoop.hbase.ArrayBackedTag;
049import org.apache.hadoop.hbase.Cell;
050import org.apache.hadoop.hbase.CellUtil;
051import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
052import org.apache.hadoop.hbase.ExtendedCell;
053import org.apache.hadoop.hbase.HBaseClassTestRule;
054import org.apache.hadoop.hbase.HBaseConfiguration;
055import org.apache.hadoop.hbase.HBaseTestingUtil;
056import org.apache.hadoop.hbase.HConstants;
057import org.apache.hadoop.hbase.HadoopShims;
058import org.apache.hadoop.hbase.KeyValue;
059import org.apache.hadoop.hbase.PrivateCellUtil;
060import org.apache.hadoop.hbase.ServerName;
061import org.apache.hadoop.hbase.StartTestingClusterOption;
062import org.apache.hadoop.hbase.TableName;
063import org.apache.hadoop.hbase.Tag;
064import org.apache.hadoop.hbase.TagType;
065import org.apache.hadoop.hbase.client.Admin;
066import org.apache.hadoop.hbase.client.AsyncConnection;
067import org.apache.hadoop.hbase.client.BufferedMutator;
068import org.apache.hadoop.hbase.client.BufferedMutatorParams;
069import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
070import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
071import org.apache.hadoop.hbase.client.Connection;
072import org.apache.hadoop.hbase.client.ConnectionFactory;
073import org.apache.hadoop.hbase.client.ConnectionRegistry;
074import org.apache.hadoop.hbase.client.ConnectionUtils;
075import org.apache.hadoop.hbase.client.Hbck;
076import org.apache.hadoop.hbase.client.Put;
077import org.apache.hadoop.hbase.client.RegionLocator;
078import org.apache.hadoop.hbase.client.Table;
079import org.apache.hadoop.hbase.client.TableBuilder;
080import org.apache.hadoop.hbase.client.TableDescriptor;
081import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
082import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
083import org.apache.hadoop.hbase.io.compress.Compression;
084import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
085import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
086import org.apache.hadoop.hbase.io.hfile.CacheConfig;
087import org.apache.hadoop.hbase.io.hfile.HFile;
088import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
089import org.apache.hadoop.hbase.io.hfile.HFileScanner;
090import org.apache.hadoop.hbase.regionserver.BloomType;
091import org.apache.hadoop.hbase.regionserver.HRegion;
092import org.apache.hadoop.hbase.regionserver.HStore;
093import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
094import org.apache.hadoop.hbase.security.User;
095import org.apache.hadoop.hbase.testclassification.LargeTests;
096import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
097import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
098import org.apache.hadoop.hbase.util.Bytes;
099import org.apache.hadoop.hbase.util.CommonFSUtils;
100import org.apache.hadoop.hbase.util.FSUtils;
101import org.apache.hadoop.hbase.util.FutureUtils;
102import org.apache.hadoop.hbase.util.ReflectionUtils;
103import org.apache.hadoop.hdfs.DistributedFileSystem;
104import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
105import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
106import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
107import org.apache.hadoop.mapreduce.Job;
108import org.apache.hadoop.mapreduce.RecordWriter;
109import org.apache.hadoop.mapreduce.TaskAttemptContext;
110import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
111import org.junit.ClassRule;
112import org.junit.Ignore;
113import org.junit.Test;
114import org.junit.experimental.categories.Category;
115import org.mockito.Mockito;
116import org.slf4j.Logger;
117import org.slf4j.LoggerFactory;
118
119/**
120 * Simple test for {@link HFileOutputFormat2}. Sets up and runs a mapreduce job that writes hfile
121 * output. Creates a few inner classes to implement splits and an inputformat that emits keys and
122 * values.
123 */
124@Category({ VerySlowMapReduceTests.class, LargeTests.class })
125public class TestHFileOutputFormat2 extends HFileOutputFormat2TestBase {
126
127  @ClassRule
128  public static final HBaseClassTestRule CLASS_RULE =
129    HBaseClassTestRule.forClass(TestHFileOutputFormat2.class);
130
131  private static final Logger LOG = LoggerFactory.getLogger(TestHFileOutputFormat2.class);
132
133  /**
134   * Test that {@link HFileOutputFormat2} RecordWriter amends timestamps if passed a keyvalue whose
135   * timestamp is {@link HConstants#LATEST_TIMESTAMP}.
136   * @see <a href="https://issues.apache.org/jira/browse/HBASE-2615">HBASE-2615</a>
137   */
138  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
139  @Test
140  public void test_LATEST_TIMESTAMP_isReplaced() throws Exception {
141    Configuration conf = new Configuration(this.UTIL.getConfiguration());
142    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
143    TaskAttemptContext context = null;
144    Path dir = UTIL.getDataTestDir("test_LATEST_TIMESTAMP_isReplaced");
145    try {
146      Job job = new Job(conf);
147      FileOutputFormat.setOutputPath(job, dir);
148      context = createTestTaskAttemptContext(job);
149      HFileOutputFormat2 hof = new HFileOutputFormat2();
150      writer = hof.getRecordWriter(context);
151      final byte[] b = Bytes.toBytes("b");
152
153      // Test 1. Pass a KV that has a ts of LATEST_TIMESTAMP. It should be
154      // changed by call to write. Check all in kv is same but ts.
155      KeyValue kv = new KeyValue(b, b, b);
156      KeyValue original = kv.clone();
157      writer.write(new ImmutableBytesWritable(), kv);
158      assertFalse(original.equals(kv));
159      assertTrue(Bytes.equals(CellUtil.cloneRow(original), CellUtil.cloneRow(kv)));
160      assertTrue(Bytes.equals(CellUtil.cloneFamily(original), CellUtil.cloneFamily(kv)));
161      assertTrue(Bytes.equals(CellUtil.cloneQualifier(original), CellUtil.cloneQualifier(kv)));
162      assertNotSame(original.getTimestamp(), kv.getTimestamp());
163      assertNotSame(HConstants.LATEST_TIMESTAMP, kv.getTimestamp());
164
165      // Test 2. Now test passing a kv that has explicit ts. It should not be
166      // changed by call to record write.
167      kv = new KeyValue(b, b, b, kv.getTimestamp() - 1, b);
168      original = kv.clone();
169      writer.write(new ImmutableBytesWritable(), kv);
170      assertTrue(original.equals(kv));
171    } finally {
172      if (writer != null && context != null) writer.close(context);
173      dir.getFileSystem(conf).delete(dir, true);
174    }
175  }
176
177  private TaskAttemptContext createTestTaskAttemptContext(final Job job) throws Exception {
178    HadoopShims hadoop = CompatibilitySingletonFactory.getInstance(HadoopShims.class);
179    TaskAttemptContext context =
180      hadoop.createTestTaskAttemptContext(job, "attempt_201402131733_0001_m_000000_0");
181    return context;
182  }
183
184  /*
185   * Test that {@link HFileOutputFormat2} creates an HFile with TIMERANGE metadata used by
186   * time-restricted scans.
187   */
188  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
189  @Test
190  public void test_TIMERANGE() throws Exception {
191    Configuration conf = new Configuration(this.UTIL.getConfiguration());
192    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
193    TaskAttemptContext context = null;
194    Path dir = UTIL.getDataTestDir("test_TIMERANGE_present");
195    LOG.info("Timerange dir writing to dir: " + dir);
196    try {
197      // build a record writer using HFileOutputFormat2
198      Job job = new Job(conf);
199      FileOutputFormat.setOutputPath(job, dir);
200      context = createTestTaskAttemptContext(job);
201      HFileOutputFormat2 hof = new HFileOutputFormat2();
202      writer = hof.getRecordWriter(context);
203
204      // Pass two key values with explicit times stamps
205      final byte[] b = Bytes.toBytes("b");
206
207      // value 1 with timestamp 2000
208      KeyValue kv = new KeyValue(b, b, b, 2000, b);
209      KeyValue original = kv.clone();
210      writer.write(new ImmutableBytesWritable(), kv);
211      assertEquals(original, kv);
212
213      // value 2 with timestamp 1000
214      kv = new KeyValue(b, b, b, 1000, b);
215      original = kv.clone();
216      writer.write(new ImmutableBytesWritable(), kv);
217      assertEquals(original, kv);
218
219      // verify that the file has the proper FileInfo.
220      writer.close(context);
221
222      // the generated file lives 1 directory down from the attempt directory
223      // and is the only file, e.g.
224      // _attempt__0000_r_000000_0/b/1979617994050536795
225      FileSystem fs = FileSystem.get(conf);
226      Path attemptDirectory = hof.getDefaultWorkFile(context, "").getParent();
227      FileStatus[] sub1 = fs.listStatus(attemptDirectory);
228      FileStatus[] file = fs.listStatus(sub1[0].getPath());
229
230      // open as HFile Reader and pull out TIMERANGE FileInfo.
231      HFile.Reader rd =
232        HFile.createReader(fs, file[0].getPath(), new CacheConfig(conf), true, conf);
233      Map<byte[], byte[]> finfo = rd.getHFileInfo();
234      byte[] range = finfo.get(Bytes.toBytes("TIMERANGE"));
235      assertNotNull(range);
236
237      // unmarshall and check values.
238      TimeRangeTracker timeRangeTracker = TimeRangeTracker.parseFrom(range);
239      LOG.info(timeRangeTracker.getMin() + "...." + timeRangeTracker.getMax());
240      assertEquals(1000, timeRangeTracker.getMin());
241      assertEquals(2000, timeRangeTracker.getMax());
242      rd.close();
243    } finally {
244      if (writer != null && context != null) writer.close(context);
245      dir.getFileSystem(conf).delete(dir, true);
246    }
247  }
248
249  /**
250   * Run small MR job.
251   */
252  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
253  @Test
254  public void testWritingPEData() throws Exception {
255    Configuration conf = UTIL.getConfiguration();
256    Path testDir = UTIL.getDataTestDirOnTestFS("testWritingPEData");
257    FileSystem fs = testDir.getFileSystem(conf);
258
259    // Set down this value or we OOME in eclipse.
260    conf.setInt("mapreduce.task.io.sort.mb", 20);
261    // Write a few files.
262    long hregionMaxFilesize = 10 * 1024;
263    conf.setLong(HConstants.HREGION_MAX_FILESIZE, hregionMaxFilesize);
264
265    Job job = new Job(conf, "testWritingPEData");
266    setupRandomGeneratorMapper(job, false);
267    // This partitioner doesn't work well for number keys but using it anyways
268    // just to demonstrate how to configure it.
269    byte[] startKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
270    byte[] endKey = new byte[RandomKVGeneratingMapper.KEYLEN_DEFAULT];
271
272    Arrays.fill(startKey, (byte) 0);
273    Arrays.fill(endKey, (byte) 0xff);
274
275    job.setPartitionerClass(SimpleTotalOrderPartitioner.class);
276    // Set start and end rows for partitioner.
277    SimpleTotalOrderPartitioner.setStartKey(job.getConfiguration(), startKey);
278    SimpleTotalOrderPartitioner.setEndKey(job.getConfiguration(), endKey);
279    job.setReducerClass(CellSortReducer.class);
280    job.setOutputFormatClass(HFileOutputFormat2.class);
281    job.setNumReduceTasks(4);
282    job.getConfiguration().setStrings("io.serializations", conf.get("io.serializations"),
283      MutationSerialization.class.getName(), ResultSerialization.class.getName(),
284      CellSerialization.class.getName());
285
286    FileOutputFormat.setOutputPath(job, testDir);
287    assertTrue(job.waitForCompletion(false));
288    FileStatus[] files = fs.listStatus(testDir);
289    assertTrue(files.length > 0);
290
291    // check output file num and size.
292    for (byte[] family : FAMILIES) {
293      long kvCount = 0;
294      RemoteIterator<LocatedFileStatus> iterator =
295        fs.listFiles(testDir.suffix("/" + new String(family)), true);
296      while (iterator.hasNext()) {
297        LocatedFileStatus keyFileStatus = iterator.next();
298        HFile.Reader reader =
299          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
300        HFileScanner scanner = reader.getScanner(conf, false, false, false);
301
302        kvCount += reader.getEntries();
303        scanner.seekTo();
304        long perKVSize = scanner.getCell().getSerializedSize();
305        assertTrue("Data size of each file should not be too large.",
306          perKVSize * reader.getEntries() <= hregionMaxFilesize);
307      }
308      assertEquals("Should write expected data in output file.", ROWSPERSPLIT, kvCount);
309    }
310  }
311
312  /**
313   * Test that {@link HFileOutputFormat2} RecordWriter writes tags such as ttl into hfile.
314   */
315  @Test
316  public void test_WritingTagData() throws Exception {
317    Configuration conf = new Configuration(this.UTIL.getConfiguration());
318    final String HFILE_FORMAT_VERSION_CONF_KEY = "hfile.format.version";
319    conf.setInt(HFILE_FORMAT_VERSION_CONF_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
320    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
321    TaskAttemptContext context = null;
322    Path dir = UTIL.getDataTestDir("WritingTagData");
323    try {
324      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
325      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
326      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
327      Job job = new Job(conf);
328      FileOutputFormat.setOutputPath(job, dir);
329      context = createTestTaskAttemptContext(job);
330      HFileOutputFormat2 hof = new HFileOutputFormat2();
331      writer = hof.getRecordWriter(context);
332      final byte[] b = Bytes.toBytes("b");
333
334      List<Tag> tags = new ArrayList<>();
335      tags.add(new ArrayBackedTag(TagType.TTL_TAG_TYPE, Bytes.toBytes(978670)));
336      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b, tags);
337      writer.write(new ImmutableBytesWritable(), kv);
338      writer.close(context);
339      writer = null;
340      FileSystem fs = dir.getFileSystem(conf);
341      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
342      while (iterator.hasNext()) {
343        LocatedFileStatus keyFileStatus = iterator.next();
344        HFile.Reader reader =
345          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
346        HFileScanner scanner = reader.getScanner(conf, false, false, false);
347        scanner.seekTo();
348        ExtendedCell cell = scanner.getCell();
349        List<Tag> tagsFromCell = PrivateCellUtil.getTags(cell);
350        assertTrue(tagsFromCell.size() > 0);
351        for (Tag tag : tagsFromCell) {
352          assertTrue(tag.getType() == TagType.TTL_TAG_TYPE);
353        }
354      }
355    } finally {
356      if (writer != null && context != null) writer.close(context);
357      dir.getFileSystem(conf).delete(dir, true);
358    }
359  }
360
361  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
362  @Test
363  public void testJobConfiguration() throws Exception {
364    Configuration conf = new Configuration(this.UTIL.getConfiguration());
365    conf.set(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
366      UTIL.getDataTestDir("testJobConfiguration").toString());
367    Job job = new Job(conf);
368    job.setWorkingDirectory(UTIL.getDataTestDir("testJobConfiguration"));
369    Table table = Mockito.mock(Table.class);
370    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
371    setupMockStartKeys(regionLocator);
372    setupMockTableName(regionLocator);
373    HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
374    assertEquals(job.getNumReduceTasks(), 4);
375  }
376
377  /**
378   * Test for {@link HFileOutputFormat2#createFamilyCompressionMap(Configuration)}. Tests that the
379   * family compression map is correctly serialized into and deserialized from configuration
380   */
381  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
382  @Test
383  public void testSerializeDeserializeFamilyCompressionMap() throws IOException {
384    for (int numCfs = 0; numCfs <= 3; numCfs++) {
385      Configuration conf = new Configuration(this.UTIL.getConfiguration());
386      Map<String, Compression.Algorithm> familyToCompression =
387        getMockColumnFamiliesForCompression(numCfs);
388      Table table = Mockito.mock(Table.class);
389      setupMockColumnFamiliesForCompression(table, familyToCompression);
390      conf.set(HFileOutputFormat2.COMPRESSION_FAMILIES_CONF_KEY,
391        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.compressionDetails,
392          Arrays.asList(table.getDescriptor())));
393
394      // read back family specific compression setting from the configuration
395      Map<byte[], Algorithm> retrievedFamilyToCompressionMap =
396        HFileOutputFormat2.createFamilyCompressionMap(conf);
397
398      // test that we have a value for all column families that matches with the
399      // used mock values
400      for (Entry<String, Algorithm> entry : familyToCompression.entrySet()) {
401        assertEquals("Compression configuration incorrect for column family:" + entry.getKey(),
402          entry.getValue(), retrievedFamilyToCompressionMap.get(Bytes.toBytes(entry.getKey())));
403      }
404    }
405  }
406
407  private void setupMockColumnFamiliesForCompression(Table table,
408    Map<String, Compression.Algorithm> familyToCompression) throws IOException {
409
410    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
411    for (Entry<String, Compression.Algorithm> entry : familyToCompression.entrySet()) {
412      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
413        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
414        .setCompressionType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
415
416      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
417    }
418    Mockito.doReturn(mockTableDescriptor.build()).when(table).getDescriptor();
419  }
420
421  /**
422   * @return a map from column family names to compression algorithms for testing column family
423   *         compression. Column family names have special characters
424   */
425  private Map<String, Compression.Algorithm> getMockColumnFamiliesForCompression(int numCfs) {
426    Map<String, Compression.Algorithm> familyToCompression = new HashMap<>();
427    // use column family names having special characters
428    if (numCfs-- > 0) {
429      familyToCompression.put("Family1!@#!@#&", Compression.Algorithm.LZO);
430    }
431    if (numCfs-- > 0) {
432      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.SNAPPY);
433    }
434    if (numCfs-- > 0) {
435      familyToCompression.put("Family2=asdads&!AASD", Compression.Algorithm.GZ);
436    }
437    if (numCfs-- > 0) {
438      familyToCompression.put("Family3", Compression.Algorithm.NONE);
439    }
440    return familyToCompression;
441  }
442
443  /**
444   * Test for {@link HFileOutputFormat2#createFamilyBloomTypeMap(Configuration)}. Tests that the
445   * family bloom type map is correctly serialized into and deserialized from configuration
446   */
447  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
448  @Test
449  public void testSerializeDeserializeFamilyBloomTypeMap() throws IOException {
450    for (int numCfs = 0; numCfs <= 2; numCfs++) {
451      Configuration conf = new Configuration(this.UTIL.getConfiguration());
452      Map<String, BloomType> familyToBloomType = getMockColumnFamiliesForBloomType(numCfs);
453      Table table = Mockito.mock(Table.class);
454      setupMockColumnFamiliesForBloomType(table, familyToBloomType);
455      conf.set(HFileOutputFormat2.BLOOM_TYPE_FAMILIES_CONF_KEY,
456        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.bloomTypeDetails,
457          Arrays.asList(table.getDescriptor())));
458
459      // read back family specific data block encoding settings from the
460      // configuration
461      Map<byte[], BloomType> retrievedFamilyToBloomTypeMap =
462        HFileOutputFormat2.createFamilyBloomTypeMap(conf);
463
464      // test that we have a value for all column families that matches with the
465      // used mock values
466      for (Entry<String, BloomType> entry : familyToBloomType.entrySet()) {
467        assertEquals("BloomType configuration incorrect for column family:" + entry.getKey(),
468          entry.getValue(), retrievedFamilyToBloomTypeMap.get(Bytes.toBytes(entry.getKey())));
469      }
470    }
471  }
472
473  private void setupMockColumnFamiliesForBloomType(Table table,
474    Map<String, BloomType> familyToDataBlockEncoding) throws IOException {
475    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
476    for (Entry<String, BloomType> entry : familyToDataBlockEncoding.entrySet()) {
477      ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder
478        .newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
479        .setBloomFilterType(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
480      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
481    }
482    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
483  }
484
485  /**
486   * @return a map from column family names to compression algorithms for testing column family
487   *         compression. Column family names have special characters
488   */
489  private Map<String, BloomType> getMockColumnFamiliesForBloomType(int numCfs) {
490    Map<String, BloomType> familyToBloomType = new HashMap<>();
491    // use column family names having special characters
492    if (numCfs-- > 0) {
493      familyToBloomType.put("Family1!@#!@#&", BloomType.ROW);
494    }
495    if (numCfs-- > 0) {
496      familyToBloomType.put("Family2=asdads&!AASD", BloomType.ROWCOL);
497    }
498    if (numCfs-- > 0) {
499      familyToBloomType.put("Family3", BloomType.NONE);
500    }
501    return familyToBloomType;
502  }
503
504  /**
505   * Test for {@link HFileOutputFormat2#createFamilyBlockSizeMap(Configuration)}. Tests that the
506   * family block size map is correctly serialized into and deserialized from configuration
507   */
508  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
509  @Test
510  public void testSerializeDeserializeFamilyBlockSizeMap() throws IOException {
511    for (int numCfs = 0; numCfs <= 3; numCfs++) {
512      Configuration conf = new Configuration(this.UTIL.getConfiguration());
513      Map<String, Integer> familyToBlockSize = getMockColumnFamiliesForBlockSize(numCfs);
514      Table table = Mockito.mock(Table.class);
515      setupMockColumnFamiliesForBlockSize(table, familyToBlockSize);
516      conf.set(HFileOutputFormat2.BLOCK_SIZE_FAMILIES_CONF_KEY,
517        HFileOutputFormat2.serializeColumnFamilyAttribute(HFileOutputFormat2.blockSizeDetails,
518          Arrays.asList(table.getDescriptor())));
519
520      // read back family specific data block encoding settings from the
521      // configuration
522      Map<byte[], Integer> retrievedFamilyToBlockSizeMap =
523        HFileOutputFormat2.createFamilyBlockSizeMap(conf);
524
525      // test that we have a value for all column families that matches with the
526      // used mock values
527      for (Entry<String, Integer> entry : familyToBlockSize.entrySet()) {
528        assertEquals("BlockSize configuration incorrect for column family:" + entry.getKey(),
529          entry.getValue(), retrievedFamilyToBlockSizeMap.get(Bytes.toBytes(entry.getKey())));
530      }
531    }
532  }
533
534  private void setupMockColumnFamiliesForBlockSize(Table table,
535    Map<String, Integer> familyToDataBlockEncoding) throws IOException {
536    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
537    for (Entry<String, Integer> entry : familyToDataBlockEncoding.entrySet()) {
538      ColumnFamilyDescriptor columnFamilyDescriptor =
539        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
540          .setBlocksize(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0).build();
541      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
542    }
543    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
544  }
545
546  /**
547   * @return a map from column family names to compression algorithms for testing column family
548   *         compression. Column family names have special characters
549   */
550  private Map<String, Integer> getMockColumnFamiliesForBlockSize(int numCfs) {
551    Map<String, Integer> familyToBlockSize = new HashMap<>();
552    // use column family names having special characters
553    if (numCfs-- > 0) {
554      familyToBlockSize.put("Family1!@#!@#&", 1234);
555    }
556    if (numCfs-- > 0) {
557      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
558    }
559    if (numCfs-- > 0) {
560      familyToBlockSize.put("Family2=asdads&!AASD", Integer.MAX_VALUE);
561    }
562    if (numCfs-- > 0) {
563      familyToBlockSize.put("Family3", 0);
564    }
565    return familyToBlockSize;
566  }
567
568  /**
569   * Test for {@link HFileOutputFormat2#createFamilyDataBlockEncodingMap(Configuration)}. Tests that
570   * the family data block encoding map is correctly serialized into and deserialized from
571   * configuration
572   */
573  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
574  @Test
575  public void testSerializeDeserializeFamilyDataBlockEncodingMap() throws IOException {
576    for (int numCfs = 0; numCfs <= 3; numCfs++) {
577      Configuration conf = new Configuration(this.UTIL.getConfiguration());
578      Map<String, DataBlockEncoding> familyToDataBlockEncoding =
579        getMockColumnFamiliesForDataBlockEncoding(numCfs);
580      Table table = Mockito.mock(Table.class);
581      setupMockColumnFamiliesForDataBlockEncoding(table, familyToDataBlockEncoding);
582      TableDescriptor tableDescriptor = table.getDescriptor();
583      conf.set(HFileOutputFormat2.DATABLOCK_ENCODING_FAMILIES_CONF_KEY,
584        HFileOutputFormat2.serializeColumnFamilyAttribute(
585          HFileOutputFormat2.dataBlockEncodingDetails, Arrays.asList(tableDescriptor)));
586
587      // read back family specific data block encoding settings from the
588      // configuration
589      Map<byte[], DataBlockEncoding> retrievedFamilyToDataBlockEncodingMap =
590        HFileOutputFormat2.createFamilyDataBlockEncodingMap(conf);
591
592      // test that we have a value for all column families that matches with the
593      // used mock values
594      for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
595        assertEquals(
596          "DataBlockEncoding configuration incorrect for column family:" + entry.getKey(),
597          entry.getValue(),
598          retrievedFamilyToDataBlockEncodingMap.get(Bytes.toBytes(entry.getKey())));
599      }
600    }
601  }
602
603  private void setupMockColumnFamiliesForDataBlockEncoding(Table table,
604    Map<String, DataBlockEncoding> familyToDataBlockEncoding) throws IOException {
605    TableDescriptorBuilder mockTableDescriptor = TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
606    for (Entry<String, DataBlockEncoding> entry : familyToDataBlockEncoding.entrySet()) {
607      ColumnFamilyDescriptor columnFamilyDescriptor =
608        ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(entry.getKey())).setMaxVersions(1)
609          .setDataBlockEncoding(entry.getValue()).setBlockCacheEnabled(false).setTimeToLive(0)
610          .build();
611      mockTableDescriptor.setColumnFamily(columnFamilyDescriptor);
612    }
613    Mockito.doReturn(mockTableDescriptor).when(table).getDescriptor();
614  }
615
616  /**
617   * @return a map from column family names to compression algorithms for testing column family
618   *         compression. Column family names have special characters
619   */
620  private Map<String, DataBlockEncoding> getMockColumnFamiliesForDataBlockEncoding(int numCfs) {
621    Map<String, DataBlockEncoding> familyToDataBlockEncoding = new HashMap<>();
622    // use column family names having special characters
623    if (numCfs-- > 0) {
624      familyToDataBlockEncoding.put("Family1!@#!@#&", DataBlockEncoding.DIFF);
625    }
626    if (numCfs-- > 0) {
627      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.FAST_DIFF);
628    }
629    if (numCfs-- > 0) {
630      familyToDataBlockEncoding.put("Family2=asdads&!AASD", DataBlockEncoding.PREFIX);
631    }
632    if (numCfs-- > 0) {
633      familyToDataBlockEncoding.put("Family3", DataBlockEncoding.NONE);
634    }
635    return familyToDataBlockEncoding;
636  }
637
638  private void setupMockStartKeys(RegionLocator table) throws IOException {
639    byte[][] mockKeys = new byte[][] { HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("aaa"),
640      Bytes.toBytes("ggg"), Bytes.toBytes("zzz") };
641    Mockito.doReturn(mockKeys).when(table).getStartKeys();
642  }
643
644  private void setupMockTableName(RegionLocator table) throws IOException {
645    TableName mockTableName = TableName.valueOf("mock_table");
646    Mockito.doReturn(mockTableName).when(table).getName();
647  }
648
649  /**
650   * Test that {@link HFileOutputFormat2} RecordWriter uses compression and bloom filter settings
651   * from the column family descriptor
652   */
653  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
654  @Test
655  public void testColumnFamilySettings() throws Exception {
656    Configuration conf = new Configuration(this.UTIL.getConfiguration());
657    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
658    TaskAttemptContext context = null;
659    Path dir = UTIL.getDataTestDir("testColumnFamilySettings");
660
661    // Setup table descriptor
662    Table table = Mockito.mock(Table.class);
663    RegionLocator regionLocator = Mockito.mock(RegionLocator.class);
664    TableDescriptorBuilder tableDescriptorBuilder =
665      TableDescriptorBuilder.newBuilder(TABLE_NAMES[0]);
666
667    Mockito.doReturn(tableDescriptorBuilder.build()).when(table).getDescriptor();
668    for (ColumnFamilyDescriptor hcd : HBaseTestingUtil.generateColumnDescriptors()) {
669      tableDescriptorBuilder.setColumnFamily(hcd);
670    }
671
672    // set up the table to return some mock keys
673    setupMockStartKeys(regionLocator);
674
675    try {
676      // partial map red setup to get an operational writer for testing
677      // We turn off the sequence file compression, because DefaultCodec
678      // pollutes the GZip codec pool with an incompatible compressor.
679      conf.set("io.seqfile.compression.type", "NONE");
680      conf.set("hbase.fs.tmp.dir", dir.toString());
681      // turn locality off to eliminate getRegionLocation fail-and-retry time when writing kvs
682      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
683
684      Job job = new Job(conf, "testLocalMRIncrementalLoad");
685      job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("testColumnFamilySettings"));
686      setupRandomGeneratorMapper(job, false);
687      HFileOutputFormat2.configureIncrementalLoad(job, table.getDescriptor(), regionLocator);
688      FileOutputFormat.setOutputPath(job, dir);
689      context = createTestTaskAttemptContext(job);
690      HFileOutputFormat2 hof = new HFileOutputFormat2();
691      writer = hof.getRecordWriter(context);
692
693      // write out random rows
694      writeRandomKeyValues(writer, context, tableDescriptorBuilder.build().getColumnFamilyNames(),
695        ROWSPERSPLIT);
696      writer.close(context);
697
698      // Make sure that a directory was created for every CF
699      FileSystem fs = dir.getFileSystem(conf);
700
701      // commit so that the filesystem has one directory per column family
702      hof.getOutputCommitter(context).commitTask(context);
703      hof.getOutputCommitter(context).commitJob(context);
704      FileStatus[] families = CommonFSUtils.listStatus(fs, dir, new FSUtils.FamilyDirFilter(fs));
705      assertEquals(tableDescriptorBuilder.build().getColumnFamilies().length, families.length);
706      for (FileStatus f : families) {
707        String familyStr = f.getPath().getName();
708        ColumnFamilyDescriptor hcd =
709          tableDescriptorBuilder.build().getColumnFamily(Bytes.toBytes(familyStr));
710        // verify that the compression on this file matches the configured
711        // compression
712        Path dataFilePath = fs.listStatus(f.getPath())[0].getPath();
713        Reader reader = HFile.createReader(fs, dataFilePath, new CacheConfig(conf), true, conf);
714        Map<byte[], byte[]> fileInfo = reader.getHFileInfo();
715
716        byte[] bloomFilter = fileInfo.get(BLOOM_FILTER_TYPE_KEY);
717        if (bloomFilter == null) bloomFilter = Bytes.toBytes("NONE");
718        assertEquals(
719          "Incorrect bloom filter used for column family " + familyStr + "(reader: " + reader + ")",
720          hcd.getBloomFilterType(), BloomType.valueOf(Bytes.toString(bloomFilter)));
721        assertEquals(
722          "Incorrect compression used for column family " + familyStr + "(reader: " + reader + ")",
723          hcd.getCompressionType(), reader.getFileContext().getCompression());
724      }
725    } finally {
726      dir.getFileSystem(conf).delete(dir, true);
727    }
728  }
729
730  /**
731   * Write random values to the writer assuming a table created using {@link #FAMILIES} as column
732   * family descriptors
733   */
734  private void writeRandomKeyValues(RecordWriter<ImmutableBytesWritable, Cell> writer,
735    TaskAttemptContext context, Set<byte[]> families, int numRows)
736    throws IOException, InterruptedException {
737    byte keyBytes[] = new byte[Bytes.SIZEOF_INT];
738    int valLength = 10;
739    byte valBytes[] = new byte[valLength];
740
741    int taskId = context.getTaskAttemptID().getTaskID().getId();
742    assert taskId < Byte.MAX_VALUE : "Unit tests dont support > 127 tasks!";
743    final byte[] qualifier = Bytes.toBytes("data");
744    for (int i = 0; i < numRows; i++) {
745      Bytes.putInt(keyBytes, 0, i);
746      Bytes.random(valBytes);
747      ImmutableBytesWritable key = new ImmutableBytesWritable(keyBytes);
748      for (byte[] family : families) {
749        Cell kv = new KeyValue(keyBytes, family, qualifier, valBytes);
750        writer.write(key, kv);
751      }
752    }
753  }
754
755  /**
756   * This test is to test the scenario happened in HBASE-6901. All files are bulk loaded and
757   * excluded from minor compaction. Without the fix of HBASE-6901, an
758   * ArrayIndexOutOfBoundsException will be thrown.
759   */
760  @Ignore("Flakey: See HBASE-9051")
761  @Test
762  public void testExcludeAllFromMinorCompaction() throws Exception {
763    Configuration conf = UTIL.getConfiguration();
764    conf.setInt("hbase.hstore.compaction.min", 2);
765    generateRandomStartKeys(5);
766
767    UTIL.startMiniCluster();
768    try (Connection conn = ConnectionFactory.createConnection(); Admin admin = conn.getAdmin();
769      Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES);
770      RegionLocator locator = conn.getRegionLocator(TABLE_NAMES[0])) {
771      final FileSystem fs = UTIL.getDFSCluster().getFileSystem();
772      assertEquals("Should start with empty table", 0, UTIL.countRows(table));
773
774      // deep inspection: get the StoreFile dir
775      final Path storePath =
776        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
777          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
778            Bytes.toString(FAMILIES[0])));
779      assertEquals(0, fs.listStatus(storePath).length);
780
781      // Generate two bulk load files
782      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
783
784      for (int i = 0; i < 2; i++) {
785        Path testDir = UTIL.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
786        runIncrementalPELoad(conf,
787          Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(),
788            conn.getRegionLocator(TABLE_NAMES[0]))),
789          testDir, false);
790        // Perform the actual load
791        BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
792      }
793
794      // Ensure data shows up
795      int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
796      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
797        UTIL.countRows(table));
798
799      // should have a second StoreFile now
800      assertEquals(2, fs.listStatus(storePath).length);
801
802      // minor compactions shouldn't get rid of the file
803      admin.compact(TABLE_NAMES[0]);
804      try {
805        quickPoll(new Callable<Boolean>() {
806          @Override
807          public Boolean call() throws Exception {
808            List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
809            for (HRegion region : regions) {
810              for (HStore store : region.getStores()) {
811                store.closeAndArchiveCompactedFiles();
812              }
813            }
814            return fs.listStatus(storePath).length == 1;
815          }
816        }, 5000);
817        throw new IOException("SF# = " + fs.listStatus(storePath).length);
818      } catch (AssertionError ae) {
819        // this is expected behavior
820      }
821
822      // a major compaction should work though
823      admin.majorCompact(TABLE_NAMES[0]);
824      quickPoll(new Callable<Boolean>() {
825        @Override
826        public Boolean call() throws Exception {
827          List<HRegion> regions = UTIL.getMiniHBaseCluster().getRegions(TABLE_NAMES[0]);
828          for (HRegion region : regions) {
829            for (HStore store : region.getStores()) {
830              store.closeAndArchiveCompactedFiles();
831            }
832          }
833          return fs.listStatus(storePath).length == 1;
834        }
835      }, 5000);
836
837    } finally {
838      UTIL.shutdownMiniCluster();
839    }
840  }
841
842  @Ignore("Goes zombie too frequently; needs work. See HBASE-14563")
843  @Test
844  public void testExcludeMinorCompaction() throws Exception {
845    Configuration conf = UTIL.getConfiguration();
846    conf.setInt("hbase.hstore.compaction.min", 2);
847    generateRandomStartKeys(5);
848
849    UTIL.startMiniCluster();
850    try (Connection conn = ConnectionFactory.createConnection(conf);
851      Admin admin = conn.getAdmin()) {
852      Path testDir = UTIL.getDataTestDirOnTestFS("testExcludeMinorCompaction");
853      final FileSystem fs = UTIL.getDFSCluster().getFileSystem();
854      Table table = UTIL.createTable(TABLE_NAMES[0], FAMILIES);
855      assertEquals("Should start with empty table", 0, UTIL.countRows(table));
856
857      // deep inspection: get the StoreFile dir
858      final Path storePath =
859        new Path(CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), TABLE_NAMES[0]),
860          new Path(admin.getRegions(TABLE_NAMES[0]).get(0).getEncodedName(),
861            Bytes.toString(FAMILIES[0])));
862      assertEquals(0, fs.listStatus(storePath).length);
863
864      // put some data in it and flush to create a storefile
865      Put p = new Put(Bytes.toBytes("test"));
866      p.addColumn(FAMILIES[0], Bytes.toBytes("1"), Bytes.toBytes("1"));
867      table.put(p);
868      admin.flush(TABLE_NAMES[0]);
869      assertEquals(1, UTIL.countRows(table));
870      quickPoll(new Callable<Boolean>() {
871        @Override
872        public Boolean call() throws Exception {
873          return fs.listStatus(storePath).length == 1;
874        }
875      }, 5000);
876
877      // Generate a bulk load file with more rows
878      conf.setBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", true);
879
880      RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
881      runIncrementalPELoad(conf,
882        Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)),
883        testDir, false);
884
885      // Perform the actual load
886      BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
887
888      // Ensure data shows up
889      int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
890      assertEquals("BulkLoadHFiles should put expected data in table", expectedRows + 1,
891        UTIL.countRows(table));
892
893      // should have a second StoreFile now
894      assertEquals(2, fs.listStatus(storePath).length);
895
896      // minor compactions shouldn't get rid of the file
897      admin.compact(TABLE_NAMES[0]);
898      try {
899        quickPoll(new Callable<Boolean>() {
900          @Override
901          public Boolean call() throws Exception {
902            return fs.listStatus(storePath).length == 1;
903          }
904        }, 5000);
905        throw new IOException("SF# = " + fs.listStatus(storePath).length);
906      } catch (AssertionError ae) {
907        // this is expected behavior
908      }
909
910      // a major compaction should work though
911      admin.majorCompact(TABLE_NAMES[0]);
912      quickPoll(new Callable<Boolean>() {
913        @Override
914        public Boolean call() throws Exception {
915          return fs.listStatus(storePath).length == 1;
916        }
917      }, 5000);
918
919    } finally {
920      UTIL.shutdownMiniCluster();
921    }
922  }
923
924  private void quickPoll(Callable<Boolean> c, int waitMs) throws Exception {
925    int sleepMs = 10;
926    int retries = (int) Math.ceil(((double) waitMs) / sleepMs);
927    while (retries-- > 0) {
928      if (c.call().booleanValue()) {
929        return;
930      }
931      Thread.sleep(sleepMs);
932    }
933    fail();
934  }
935
936  public static void main(String args[]) throws Exception {
937    new TestHFileOutputFormat2().manualTest(args);
938  }
939
940  public void manualTest(String args[]) throws Exception {
941    Configuration conf = HBaseConfiguration.create();
942    UTIL = new HBaseTestingUtil(conf);
943    if ("newtable".equals(args[0])) {
944      TableName tname = TableName.valueOf(args[1]);
945      byte[][] splitKeys = generateRandomSplitKeys(4);
946      Table table = UTIL.createTable(tname, FAMILIES, splitKeys);
947    } else if ("incremental".equals(args[0])) {
948      TableName tname = TableName.valueOf(args[1]);
949      try (Connection c = ConnectionFactory.createConnection(conf); Admin admin = c.getAdmin();
950        RegionLocator regionLocator = c.getRegionLocator(tname)) {
951        Path outDir = new Path("incremental-out");
952        runIncrementalPELoad(conf,
953          Arrays
954            .asList(new HFileOutputFormat2.TableInfo(admin.getDescriptor(tname), regionLocator)),
955          outDir, false);
956      }
957    } else {
958      throw new RuntimeException("usage: TestHFileOutputFormat2 newtable | incremental");
959    }
960  }
961
962  @Test
963  public void testBlockStoragePolicy() throws Exception {
964    UTIL = new HBaseTestingUtil();
965    Configuration conf = UTIL.getConfiguration();
966    conf.set(HFileOutputFormat2.STORAGE_POLICY_PROPERTY, "ALL_SSD");
967
968    conf.set(
969      HFileOutputFormat2.STORAGE_POLICY_PROPERTY_CF_PREFIX + Bytes
970        .toString(HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0])),
971      "ONE_SSD");
972    Path cf1Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[0]));
973    Path cf2Dir = new Path(UTIL.getDataTestDir(), Bytes.toString(FAMILIES[1]));
974    UTIL.startMiniDFSCluster(3);
975    FileSystem fs = UTIL.getDFSCluster().getFileSystem();
976    try {
977      fs.mkdirs(cf1Dir);
978      fs.mkdirs(cf2Dir);
979
980      // the original block storage policy would be HOT
981      String spA = getStoragePolicyName(fs, cf1Dir);
982      String spB = getStoragePolicyName(fs, cf2Dir);
983      LOG.debug("Storage policy of cf 0: [" + spA + "].");
984      LOG.debug("Storage policy of cf 1: [" + spB + "].");
985      assertEquals("HOT", spA);
986      assertEquals("HOT", spB);
987
988      // alter table cf schema to change storage policies
989      HFileOutputFormat2.configureStoragePolicy(conf, fs,
990        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[0]), cf1Dir);
991      HFileOutputFormat2.configureStoragePolicy(conf, fs,
992        HFileOutputFormat2.combineTableNameSuffix(TABLE_NAMES[0].getName(), FAMILIES[1]), cf2Dir);
993      spA = getStoragePolicyName(fs, cf1Dir);
994      spB = getStoragePolicyName(fs, cf2Dir);
995      LOG.debug("Storage policy of cf 0: [" + spA + "].");
996      LOG.debug("Storage policy of cf 1: [" + spB + "].");
997      assertNotNull(spA);
998      assertEquals("ONE_SSD", spA);
999      assertNotNull(spB);
1000      assertEquals("ALL_SSD", spB);
1001    } finally {
1002      fs.delete(cf1Dir, true);
1003      fs.delete(cf2Dir, true);
1004      UTIL.shutdownMiniDFSCluster();
1005    }
1006  }
1007
1008  private String getStoragePolicyName(FileSystem fs, Path path) {
1009    try {
1010      Object blockStoragePolicySpi = ReflectionUtils.invokeMethod(fs, "getStoragePolicy", path);
1011      return (String) ReflectionUtils.invokeMethod(blockStoragePolicySpi, "getName");
1012    } catch (Exception e) {
1013      // Maybe fail because of using old HDFS version, try the old way
1014      if (LOG.isTraceEnabled()) {
1015        LOG.trace("Failed to get policy directly", e);
1016      }
1017      String policy = getStoragePolicyNameForOldHDFSVersion(fs, path);
1018      return policy == null ? "HOT" : policy;// HOT by default
1019    }
1020  }
1021
1022  private String getStoragePolicyNameForOldHDFSVersion(FileSystem fs, Path path) {
1023    try {
1024      if (fs instanceof DistributedFileSystem) {
1025        DistributedFileSystem dfs = (DistributedFileSystem) fs;
1026        HdfsFileStatus status = dfs.getClient().getFileInfo(path.toUri().getPath());
1027        if (null != status) {
1028          byte storagePolicyId = status.getStoragePolicy();
1029          Field idUnspecified = BlockStoragePolicySuite.class.getField("ID_UNSPECIFIED");
1030          if (storagePolicyId != idUnspecified.getByte(BlockStoragePolicySuite.class)) {
1031            BlockStoragePolicy[] policies = dfs.getStoragePolicies();
1032            for (BlockStoragePolicy policy : policies) {
1033              if (policy.getId() == storagePolicyId) {
1034                return policy.getName();
1035              }
1036            }
1037          }
1038        }
1039      }
1040    } catch (Throwable e) {
1041      LOG.warn("failed to get block storage policy of [" + path + "]", e);
1042    }
1043
1044    return null;
1045  }
1046
1047  @Test
1048  public void TestConfigureCompression() throws Exception {
1049    Configuration conf = new Configuration(this.UTIL.getConfiguration());
1050    RecordWriter<ImmutableBytesWritable, Cell> writer = null;
1051    TaskAttemptContext context = null;
1052    Path dir = UTIL.getDataTestDir("TestConfigureCompression");
1053    String hfileoutputformatCompression = "gz";
1054
1055    try {
1056      conf.set(HFileOutputFormat2.OUTPUT_TABLE_NAME_CONF_KEY, TABLE_NAMES[0].getNameAsString());
1057      conf.setBoolean(HFileOutputFormat2.LOCALITY_SENSITIVE_CONF_KEY, false);
1058
1059      conf.set(HFileOutputFormat2.COMPRESSION_OVERRIDE_CONF_KEY, hfileoutputformatCompression);
1060
1061      Job job = Job.getInstance(conf);
1062      FileOutputFormat.setOutputPath(job, dir);
1063      context = createTestTaskAttemptContext(job);
1064      HFileOutputFormat2 hof = new HFileOutputFormat2();
1065      writer = hof.getRecordWriter(context);
1066      final byte[] b = Bytes.toBytes("b");
1067
1068      KeyValue kv = new KeyValue(b, b, b, HConstants.LATEST_TIMESTAMP, b);
1069      writer.write(new ImmutableBytesWritable(), kv);
1070      writer.close(context);
1071      writer = null;
1072      FileSystem fs = dir.getFileSystem(conf);
1073      RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(dir, true);
1074      while (iterator.hasNext()) {
1075        LocatedFileStatus keyFileStatus = iterator.next();
1076        HFile.Reader reader =
1077          HFile.createReader(fs, keyFileStatus.getPath(), new CacheConfig(conf), true, conf);
1078        assertEquals(reader.getTrailer().getCompressionCodec().getName(),
1079          hfileoutputformatCompression);
1080      }
1081    } finally {
1082      if (writer != null && context != null) {
1083        writer.close(context);
1084      }
1085      dir.getFileSystem(conf).delete(dir, true);
1086    }
1087
1088  }
1089
1090  @Test
1091  public void testMRIncrementalLoadWithLocalityMultiCluster() throws Exception {
1092    // Start cluster A
1093    UTIL = new HBaseTestingUtil();
1094    Configuration confA = UTIL.getConfiguration();
1095    int hostCount = 3;
1096    int regionNum = 20;
1097    String[] hostnames = new String[hostCount];
1098    for (int i = 0; i < hostCount; ++i) {
1099      hostnames[i] = "datanode_" + i;
1100    }
1101    StartTestingClusterOption option = StartTestingClusterOption.builder()
1102      .numRegionServers(hostCount).dataNodeHosts(hostnames).build();
1103    UTIL.startMiniCluster(option);
1104
1105    // Start cluster B
1106    HBaseTestingUtil UTILB = new HBaseTestingUtil();
1107    Configuration confB = UTILB.getConfiguration();
1108    UTILB.startMiniCluster(option);
1109
1110    Path testDir = UTIL.getDataTestDirOnTestFS("testLocalMRIncrementalLoad");
1111
1112    byte[][] splitKeys = generateRandomSplitKeys(regionNum - 1);
1113    TableName tableName = TableName.valueOf("table");
1114    // Create table in cluster B
1115    try (Table table = UTILB.createTable(tableName, FAMILIES, splitKeys);
1116      RegionLocator r = UTILB.getConnection().getRegionLocator(tableName)) {
1117      // Generate the bulk load files
1118      // Job has zookeeper configuration for cluster A
1119      // Assume reading from cluster A by TableInputFormat and creating hfiles to cluster B
1120      Job job = new Job(confA, "testLocalMRIncrementalLoad");
1121      Configuration jobConf = job.getConfiguration();
1122      final UUID key = ConfigurationCaptorConnection.configureConnectionImpl(jobConf);
1123      job.setWorkingDirectory(UTIL.getDataTestDirOnTestFS("runIncrementalPELoad"));
1124      setupRandomGeneratorMapper(job, false);
1125      HFileOutputFormat2.configureIncrementalLoad(job, table, r);
1126
1127      assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1128        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_QUORUM_CONF_KEY));
1129      assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1130        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY));
1131      assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1132        jobConf.get(HFileOutputFormat2.REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY));
1133
1134      String bSpecificConfigKey = "my.override.config.for.b";
1135      String bSpecificConfigValue = "b-specific-value";
1136      jobConf.set(HFileOutputFormat2.REMOTE_CLUSTER_CONF_PREFIX + bSpecificConfigKey,
1137        bSpecificConfigValue);
1138
1139      FileOutputFormat.setOutputPath(job, testDir);
1140
1141      assertFalse(UTIL.getTestFileSystem().exists(testDir));
1142
1143      assertTrue(job.waitForCompletion(true));
1144
1145      final List<Configuration> configs =
1146        ConfigurationCaptorConnection.getCapturedConfigarutions(key);
1147
1148      assertFalse(configs.isEmpty());
1149      for (Configuration config : configs) {
1150        assertEquals(confB.get(HConstants.ZOOKEEPER_QUORUM),
1151          config.get(HConstants.ZOOKEEPER_QUORUM));
1152        assertEquals(confB.get(HConstants.ZOOKEEPER_CLIENT_PORT),
1153          config.get(HConstants.ZOOKEEPER_CLIENT_PORT));
1154        assertEquals(confB.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
1155          config.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
1156
1157        assertEquals(bSpecificConfigValue, config.get(bSpecificConfigKey));
1158      }
1159    } finally {
1160      UTILB.deleteTable(tableName);
1161      testDir.getFileSystem(confA).delete(testDir, true);
1162      UTIL.shutdownMiniCluster();
1163      UTILB.shutdownMiniCluster();
1164    }
1165  }
1166
1167  private static class ConfigurationCaptorConnection implements Connection {
1168    private static final String UUID_KEY = "ConfigurationCaptorConnection.uuid";
1169
1170    private static final Map<UUID, List<Configuration>> confs = new ConcurrentHashMap<>();
1171
1172    private final Connection delegate;
1173
1174    public ConfigurationCaptorConnection(Configuration conf, ExecutorService es, User user,
1175      ConnectionRegistry registry, Map<String, byte[]> connectionAttributes) throws IOException {
1176      // here we do not use this registry, so close it...
1177      registry.close();
1178      // here we use createAsyncConnection, to avoid infinite recursive as we reset the Connection
1179      // implementation in below method
1180      delegate =
1181        FutureUtils.get(ConnectionFactory.createAsyncConnection(conf, user, connectionAttributes))
1182          .toConnection();
1183
1184      final String uuid = conf.get(UUID_KEY);
1185      if (uuid != null) {
1186        confs.computeIfAbsent(UUID.fromString(uuid), u -> new CopyOnWriteArrayList<>()).add(conf);
1187      }
1188    }
1189
1190    static UUID configureConnectionImpl(Configuration conf) {
1191      conf.setClass(ConnectionUtils.HBASE_CLIENT_CONNECTION_IMPL,
1192        ConfigurationCaptorConnection.class, Connection.class);
1193
1194      final UUID uuid = UUID.randomUUID();
1195      conf.set(UUID_KEY, uuid.toString());
1196      return uuid;
1197    }
1198
1199    static List<Configuration> getCapturedConfigarutions(UUID key) {
1200      return confs.get(key);
1201    }
1202
1203    @Override
1204    public Configuration getConfiguration() {
1205      return delegate.getConfiguration();
1206    }
1207
1208    @Override
1209    public Table getTable(TableName tableName) throws IOException {
1210      return delegate.getTable(tableName);
1211    }
1212
1213    @Override
1214    public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
1215      return delegate.getTable(tableName, pool);
1216    }
1217
1218    @Override
1219    public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
1220      return delegate.getBufferedMutator(tableName);
1221    }
1222
1223    @Override
1224    public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
1225      return delegate.getBufferedMutator(params);
1226    }
1227
1228    @Override
1229    public RegionLocator getRegionLocator(TableName tableName) throws IOException {
1230      return delegate.getRegionLocator(tableName);
1231    }
1232
1233    @Override
1234    public void clearRegionLocationCache() {
1235      delegate.clearRegionLocationCache();
1236    }
1237
1238    @Override
1239    public Admin getAdmin() throws IOException {
1240      return delegate.getAdmin();
1241    }
1242
1243    @Override
1244    public void close() throws IOException {
1245      delegate.close();
1246    }
1247
1248    @Override
1249    public boolean isClosed() {
1250      return delegate.isClosed();
1251    }
1252
1253    @Override
1254    public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
1255      return delegate.getTableBuilder(tableName, pool);
1256    }
1257
1258    @Override
1259    public AsyncConnection toAsyncConnection() {
1260      return delegate.toAsyncConnection();
1261    }
1262
1263    @Override
1264    public String getClusterId() {
1265      return delegate.getClusterId();
1266    }
1267
1268    @Override
1269    public Hbck getHbck() throws IOException {
1270      return delegate.getHbck();
1271    }
1272
1273    @Override
1274    public Hbck getHbck(ServerName masterServer) throws IOException {
1275      return delegate.getHbck(masterServer);
1276    }
1277
1278    @Override
1279    public void abort(String why, Throwable e) {
1280      delegate.abort(why, e);
1281    }
1282
1283    @Override
1284    public boolean isAborted() {
1285      return delegate.isAborted();
1286    }
1287  }
1288
1289}