001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.mob.compactions; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertTrue; 022 023import java.io.IOException; 024import java.security.Key; 025import java.security.SecureRandom; 026import java.util.ArrayList; 027import java.util.Arrays; 028import java.util.Calendar; 029import java.util.Collections; 030import java.util.List; 031import java.util.Objects; 032import java.util.Optional; 033import java.util.Random; 034import java.util.concurrent.ExecutorService; 035import java.util.concurrent.RejectedExecutionException; 036import java.util.concurrent.SynchronousQueue; 037import java.util.concurrent.ThreadPoolExecutor; 038import java.util.concurrent.TimeUnit; 039import javax.crypto.spec.SecretKeySpec; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileStatus; 042import org.apache.hadoop.fs.FileSystem; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.Cell; 045import org.apache.hadoop.hbase.CellUtil; 046import org.apache.hadoop.hbase.HBaseClassTestRule; 047import org.apache.hadoop.hbase.HBaseTestingUtility; 048import org.apache.hadoop.hbase.HColumnDescriptor; 049import org.apache.hadoop.hbase.HConstants; 050import org.apache.hadoop.hbase.HTableDescriptor; 051import org.apache.hadoop.hbase.NamespaceDescriptor; 052import org.apache.hadoop.hbase.TableName; 053import org.apache.hadoop.hbase.client.Admin; 054import org.apache.hadoop.hbase.client.BufferedMutator; 055import org.apache.hadoop.hbase.client.CompactType; 056import org.apache.hadoop.hbase.client.CompactionState; 057import org.apache.hadoop.hbase.client.Connection; 058import org.apache.hadoop.hbase.client.ConnectionFactory; 059import org.apache.hadoop.hbase.client.Delete; 060import org.apache.hadoop.hbase.client.Durability; 061import org.apache.hadoop.hbase.client.Get; 062import org.apache.hadoop.hbase.client.MobCompactPartitionPolicy; 063import org.apache.hadoop.hbase.client.Put; 064import org.apache.hadoop.hbase.client.Result; 065import org.apache.hadoop.hbase.client.ResultScanner; 066import org.apache.hadoop.hbase.client.Scan; 067import org.apache.hadoop.hbase.client.Table; 068import org.apache.hadoop.hbase.coprocessor.ObserverContext; 069import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 070import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 071import org.apache.hadoop.hbase.coprocessor.RegionObserver; 072import org.apache.hadoop.hbase.io.HFileLink; 073import org.apache.hadoop.hbase.io.crypto.KeyProviderForTesting; 074import org.apache.hadoop.hbase.io.crypto.aes.AES; 075import org.apache.hadoop.hbase.io.hfile.CacheConfig; 076import org.apache.hadoop.hbase.io.hfile.HFile; 077import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; 078import org.apache.hadoop.hbase.mob.MobConstants; 079import org.apache.hadoop.hbase.mob.MobFileName; 080import org.apache.hadoop.hbase.mob.MobUtils; 081import org.apache.hadoop.hbase.regionserver.BloomType; 082import org.apache.hadoop.hbase.regionserver.HRegion; 083import org.apache.hadoop.hbase.regionserver.HStoreFile; 084import org.apache.hadoop.hbase.regionserver.Store; 085import org.apache.hadoop.hbase.regionserver.StoreFile; 086import org.apache.hadoop.hbase.regionserver.StoreFileInfo; 087import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; 088import org.apache.hadoop.hbase.security.EncryptionUtil; 089import org.apache.hadoop.hbase.security.User; 090import org.apache.hadoop.hbase.testclassification.LargeTests; 091import org.apache.hadoop.hbase.util.Bytes; 092import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 093import org.apache.hadoop.hbase.util.Pair; 094import org.apache.hadoop.hbase.util.Threads; 095import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 096import org.junit.AfterClass; 097import org.junit.Assert; 098import org.junit.BeforeClass; 099import org.junit.ClassRule; 100import org.junit.Rule; 101import org.junit.Test; 102import org.junit.experimental.categories.Category; 103import org.junit.rules.TestName; 104import org.slf4j.Logger; 105import org.slf4j.LoggerFactory; 106 107@Category(LargeTests.class) 108public class TestMobCompactor { 109 110 @ClassRule 111 public static final HBaseClassTestRule CLASS_RULE = 112 HBaseClassTestRule.forClass(TestMobCompactor.class); 113 114 private static final Logger LOG = LoggerFactory.getLogger(TestMobCompactor.class); 115 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 116 private static Configuration conf = null; 117 private TableName tableName; 118 private static Connection conn; 119 private BufferedMutator bufMut; 120 private Table table; 121 private static Admin admin; 122 private HTableDescriptor desc; 123 private HColumnDescriptor hcd1; 124 private HColumnDescriptor hcd2; 125 private static FileSystem fs; 126 private static final String family1 = "family1"; 127 private static final String family2 = "family2"; 128 private static final String qf1 = "qualifier1"; 129 private static final String qf2 = "qualifier2"; 130 131 private static long tsFor20150907Monday; 132 private static long tsFor20151120Sunday; 133 private static long tsFor20151128Saturday; 134 private static long tsFor20151130Monday; 135 private static long tsFor20151201Tuesday; 136 private static long tsFor20151205Saturday; 137 private static long tsFor20151228Monday; 138 private static long tsFor20151231Thursday; 139 private static long tsFor20160101Friday; 140 private static long tsFor20160103Sunday; 141 142 private static final byte[] mobKey01 = Bytes.toBytes("r01"); 143 private static final byte[] mobKey02 = Bytes.toBytes("r02"); 144 private static final byte[] mobKey03 = Bytes.toBytes("r03"); 145 private static final byte[] mobKey04 = Bytes.toBytes("r04"); 146 private static final byte[] mobKey05 = Bytes.toBytes("r05"); 147 private static final byte[] mobKey06 = Bytes.toBytes("r05"); 148 private static final byte[] mobKey1 = Bytes.toBytes("r1"); 149 private static final byte[] mobKey2 = Bytes.toBytes("r2"); 150 private static final byte[] mobKey3 = Bytes.toBytes("r3"); 151 private static final byte[] mobKey4 = Bytes.toBytes("r4"); 152 private static final byte[] mobKey5 = Bytes.toBytes("r5"); 153 private static final byte[] mobKey6 = Bytes.toBytes("r6"); 154 private static final byte[] mobKey7 = Bytes.toBytes("r7"); 155 private static final byte[] mobKey8 = Bytes.toBytes("r8"); 156 private static final String mobValue0 = "mobValue00000000000000000000000000"; 157 private static final String mobValue1 = "mobValue00000111111111111111111111"; 158 private static final String mobValue2 = "mobValue00000222222222222222222222"; 159 private static final String mobValue3 = "mobValue00000333333333333333333333"; 160 private static final String mobValue4 = "mobValue00000444444444444444444444"; 161 private static final String mobValue5 = "mobValue00000666666666666666666666"; 162 private static final String mobValue6 = "mobValue00000777777777777777777777"; 163 private static final String mobValue7 = "mobValue00000888888888888888888888"; 164 private static final String mobValue8 = "mobValue00000888888888888888888899"; 165 166 private static byte[] KEYS = Bytes.toBytes("012"); 167 private static int regionNum = KEYS.length; 168 private static int delRowNum = 1; 169 private static int delCellNum = 6; 170 private static int cellNumPerRow = 3; 171 private static int rowNumPerFile = 2; 172 private static ExecutorService pool; 173 174 @Rule 175 public TestName name = new TestName(); 176 177 @BeforeClass 178 public static void setUpBeforeClass() throws Exception { 179 TEST_UTIL.getConfiguration().setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 5000); 180 TEST_UTIL.getConfiguration() 181 .set(HConstants.CRYPTO_KEYPROVIDER_CONF_KEY, KeyProviderForTesting.class.getName()); 182 TEST_UTIL.getConfiguration().set(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, "hbase"); 183 TEST_UTIL.getConfiguration().setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 184 TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 1); 185 TEST_UTIL.getConfiguration().setInt("hbase.hfile.compaction.discharger.interval", 100); 186 TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); 187 TEST_UTIL.startMiniCluster(1); 188 pool = createThreadPool(TEST_UTIL.getConfiguration()); 189 conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), pool); 190 fs = TEST_UTIL.getTestFileSystem(); 191 conf = TEST_UTIL.getConfiguration(); 192 admin = TEST_UTIL.getAdmin(); 193 194 // Initialize timestamps for these days 195 Calendar calendar = Calendar.getInstance(); 196 calendar.set(2015, 8, 7, 10, 20); 197 tsFor20150907Monday = calendar.getTimeInMillis(); 198 199 calendar.set(2015, 10, 20, 10, 20); 200 tsFor20151120Sunday = calendar.getTimeInMillis(); 201 202 calendar.set(2015, 10, 28, 10, 20); 203 tsFor20151128Saturday = calendar.getTimeInMillis(); 204 205 calendar.set(2015, 10, 30, 10, 20); 206 tsFor20151130Monday = calendar.getTimeInMillis(); 207 208 calendar.set(2015, 11, 1, 10, 20); 209 tsFor20151201Tuesday = calendar.getTimeInMillis(); 210 211 calendar.set(2015, 11, 5, 10, 20); 212 tsFor20151205Saturday = calendar.getTimeInMillis(); 213 214 calendar.set(2015, 11, 28, 10, 20); 215 tsFor20151228Monday = calendar.getTimeInMillis(); 216 217 calendar.set(2015, 11, 31, 10, 20); 218 tsFor20151231Thursday = calendar.getTimeInMillis(); 219 220 calendar.set(2016, 0, 1, 10, 20); 221 tsFor20160101Friday = calendar.getTimeInMillis(); 222 223 calendar.set(2016, 0, 3, 10, 20); 224 tsFor20160103Sunday = calendar.getTimeInMillis(); 225 } 226 227 @AfterClass 228 public static void tearDownAfterClass() throws Exception { 229 pool.shutdown(); 230 conn.close(); 231 TEST_UTIL.shutdownMiniCluster(); 232 } 233 234 public void setUp(String tableNameAsString) throws IOException { 235 tableName = TableName.valueOf(tableNameAsString); 236 hcd1 = new HColumnDescriptor(family1); 237 hcd1.setMobEnabled(true); 238 hcd1.setMobThreshold(5); 239 hcd2 = new HColumnDescriptor(family2); 240 hcd2.setMobEnabled(true); 241 hcd2.setMobThreshold(5); 242 desc = new HTableDescriptor(tableName); 243 desc.addFamily(hcd1); 244 desc.addFamily(hcd2); 245 admin.createTable(desc, getSplitKeys()); 246 table = conn.getTable(tableName); 247 bufMut = conn.getBufferedMutator(tableName); 248 } 249 250 // Set up for mob compaction policy testing 251 private void setUpForPolicyTest(String tableNameAsString, MobCompactPartitionPolicy type) 252 throws IOException { 253 tableName = TableName.valueOf(tableNameAsString); 254 hcd1 = new HColumnDescriptor(family1); 255 hcd1.setMobEnabled(true); 256 hcd1.setMobThreshold(10); 257 hcd1.setMobCompactPartitionPolicy(type); 258 desc = new HTableDescriptor(tableName); 259 desc.addFamily(hcd1); 260 admin.createTable(desc); 261 table = conn.getTable(tableName); 262 bufMut = conn.getBufferedMutator(tableName); 263 } 264 265 // alter mob compaction policy 266 private void alterForPolicyTest(final MobCompactPartitionPolicy type) 267 throws Exception { 268 269 hcd1.setMobCompactPartitionPolicy(type); 270 desc.modifyFamily(hcd1); 271 admin.modifyTable(tableName, desc); 272 Pair<Integer, Integer> st; 273 274 while (null != (st = admin.getAlterStatus(tableName)) && st.getFirst() > 0) { 275 LOG.debug(st.getFirst() + " regions left to update"); 276 Thread.sleep(40); 277 } 278 LOG.info("alter status finished"); 279 } 280 281 @Test 282 public void testMinorCompaction() throws Exception { 283 resetConf(); 284 int mergeSize = 5000; 285 // change the mob compaction merge size 286 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); 287 288 // create a table with namespace 289 NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("ns").build(); 290 String tableNameAsString = "ns:testMinorCompaction"; 291 admin.createNamespace(namespaceDescriptor); 292 setUp(tableNameAsString); 293 int count = 4; 294 // generate mob files 295 loadData(admin, bufMut, tableName, count, rowNumPerFile); 296 int rowNumPerRegion = count * rowNumPerFile; 297 298 assertEquals("Before deleting: mob rows count", regionNum * rowNumPerRegion, 299 countMobRows(table)); 300 assertEquals("Before deleting: mob cells count", regionNum * cellNumPerRow * rowNumPerRegion, 301 countMobCells(table)); 302 assertEquals("Before deleting: mob file count", regionNum * count, 303 countFiles(tableName, true, family1)); 304 305 int largeFilesCount = countLargeFiles(mergeSize, tableName, family1); 306 createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1)); 307 308 assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), 309 countMobRows(table)); 310 assertEquals("Before compaction: mob cells count", regionNum 311 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table)); 312 assertEquals("Before compaction: family1 mob file count", regionNum * count, 313 countFiles(tableName, true, family1)); 314 assertEquals("Before compaction: family2 mob file count", regionNum * count, 315 countFiles(tableName, true, family2)); 316 assertEquals("Before compaction: family1 del file count", regionNum, 317 countFiles(tableName, false, family1)); 318 assertEquals("Before compaction: family2 del file count", regionNum, 319 countFiles(tableName, false, family2)); 320 321 // do the mob file compaction 322 MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool); 323 compactor.compact(); 324 325 assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), 326 countMobRows(table)); 327 assertEquals("After compaction: mob cells count", regionNum 328 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table)); 329 // After the compaction, the files smaller than the mob compaction merge size 330 // is merge to one file 331 assertEquals("After compaction: family1 mob file count", largeFilesCount + regionNum, 332 countFiles(tableName, true, family1)); 333 assertEquals("After compaction: family2 mob file count", regionNum * count, 334 countFiles(tableName, true, family2)); 335 assertEquals("After compaction: family1 del file count", regionNum, 336 countFiles(tableName, false, family1)); 337 assertEquals("After compaction: family2 del file count", regionNum, 338 countFiles(tableName, false, family2)); 339 } 340 341 @Test 342 public void testMinorCompactionWithWeeklyPolicy() throws Exception { 343 resetConf(); 344 int mergeSize = 5000; 345 // change the mob compaction merge size 346 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); 347 348 commonPolicyTestLogic("testMinorCompactionWithWeeklyPolicy", 349 MobCompactPartitionPolicy.WEEKLY, false, 6, 350 new String[] { "20150907", "20151120", "20151128", "20151130", "20151205", "20160103" }, 351 true); 352 } 353 354 @Test 355 public void testMajorCompactionWithWeeklyPolicy() throws Exception { 356 resetConf(); 357 358 commonPolicyTestLogic("testMajorCompactionWithWeeklyPolicy", 359 MobCompactPartitionPolicy.WEEKLY, true, 5, 360 new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true); 361 } 362 363 @Test 364 public void testMinorCompactionWithMonthlyPolicy() throws Exception { 365 resetConf(); 366 int mergeSize = 5000; 367 // change the mob compaction merge size 368 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); 369 370 commonPolicyTestLogic("testMinorCompactionWithMonthlyPolicy", 371 MobCompactPartitionPolicy.MONTHLY, false, 4, 372 new String[] { "20150907", "20151130", "20151231", "20160103" }, true); 373 } 374 375 @Test 376 public void testMajorCompactionWithMonthlyPolicy() throws Exception { 377 resetConf(); 378 379 commonPolicyTestLogic("testMajorCompactionWithMonthlyPolicy", 380 MobCompactPartitionPolicy.MONTHLY, true, 4, 381 new String[] {"20150907", "20151130", "20151231", "20160103"}, true); 382 } 383 384 @Test 385 public void testMajorCompactionWithWeeklyFollowedByMonthly() throws Exception { 386 resetConf(); 387 388 commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly", 389 MobCompactPartitionPolicy.WEEKLY, true, 5, 390 new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true); 391 392 commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthly", 393 MobCompactPartitionPolicy.MONTHLY, true, 4, 394 new String[] {"20150907", "20151128", "20151205", "20160103" }, false); 395 } 396 397 @Test 398 public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly() throws Exception { 399 resetConf(); 400 401 commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly", 402 MobCompactPartitionPolicy.WEEKLY, true, 5, 403 new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true); 404 405 commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly", 406 MobCompactPartitionPolicy.MONTHLY, true, 4, 407 new String[] { "20150907", "20151128", "20151205", "20160103" }, false); 408 409 commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByWeekly", 410 MobCompactPartitionPolicy.WEEKLY, true, 4, 411 new String[] { "20150907", "20151128", "20151205", "20160103" }, false); 412 } 413 414 @Test 415 public void testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily() throws Exception { 416 resetConf(); 417 418 commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily", 419 MobCompactPartitionPolicy.WEEKLY, true, 5, 420 new String[] { "20150907", "20151120", "20151128", "20151205", "20160103" }, true); 421 422 commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily", 423 MobCompactPartitionPolicy.MONTHLY, true, 4, 424 new String[] { "20150907", "20151128", "20151205", "20160103" }, false); 425 426 commonPolicyTestLogic("testMajorCompactionWithWeeklyFollowedByMonthlyFollowedByDaily", 427 MobCompactPartitionPolicy.DAILY, true, 4, 428 new String[] { "20150907", "20151128", "20151205", "20160103" }, false); 429 } 430 431 @Test 432 public void testCompactionWithHFileLink() throws IOException, InterruptedException { 433 resetConf(); 434 String tableNameAsString = "testCompactionWithHFileLink"; 435 setUp(tableNameAsString); 436 int count = 4; 437 // generate mob files 438 loadData(admin, bufMut, tableName, count, rowNumPerFile); 439 int rowNumPerRegion = count * rowNumPerFile; 440 441 long tid = System.currentTimeMillis(); 442 byte[] snapshotName1 = Bytes.toBytes("snaptb-" + tid); 443 // take a snapshot 444 admin.snapshot(snapshotName1, tableName); 445 446 createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1)); 447 448 assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), 449 countMobRows(table)); 450 assertEquals("Before compaction: mob cells count", regionNum 451 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table)); 452 assertEquals("Before compaction: family1 mob file count", regionNum * count, 453 countFiles(tableName, true, family1)); 454 assertEquals("Before compaction: family2 mob file count", regionNum * count, 455 countFiles(tableName, true, family2)); 456 assertEquals("Before compaction: family1 del file count", regionNum, 457 countFiles(tableName, false, family1)); 458 assertEquals("Before compaction: family2 del file count", regionNum, 459 countFiles(tableName, false, family2)); 460 461 // do the mob compaction 462 MobCompactor compactor = new PartitionedMobCompactor(conf, fs, tableName, hcd1, pool); 463 compactor.compact(); 464 465 assertEquals("After first compaction: mob rows count", regionNum 466 * (rowNumPerRegion - delRowNum), countMobRows(table)); 467 assertEquals("After first compaction: mob cells count", regionNum 468 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table)); 469 assertEquals("After first compaction: family1 mob file count", regionNum, 470 countFiles(tableName, true, family1)); 471 assertEquals("After first compaction: family2 mob file count", regionNum * count, 472 countFiles(tableName, true, family2)); 473 assertEquals("After first compaction: family1 del file count", 0, 474 countFiles(tableName, false, family1)); 475 assertEquals("After first compaction: family2 del file count", regionNum, 476 countFiles(tableName, false, family2)); 477 assertEquals("After first compaction: family1 hfilelink count", 0, countHFileLinks(family1)); 478 assertEquals("After first compaction: family2 hfilelink count", 0, countHFileLinks(family2)); 479 480 admin.disableTable(tableName); 481 // Restore from snapshot, the hfilelink will exist in mob dir 482 admin.restoreSnapshot(snapshotName1); 483 admin.enableTable(tableName); 484 485 assertEquals("After restoring snapshot: mob rows count", regionNum * rowNumPerRegion, 486 countMobRows(table)); 487 assertEquals("After restoring snapshot: mob cells count", regionNum * cellNumPerRow 488 * rowNumPerRegion, countMobCells(table)); 489 assertEquals("After restoring snapshot: family1 mob file count", regionNum * count, 490 countFiles(tableName, true, family1)); 491 assertEquals("After restoring snapshot: family2 mob file count", regionNum * count, 492 countFiles(tableName, true, family2)); 493 assertEquals("After restoring snapshot: family1 del file count", 0, 494 countFiles(tableName, false, family1)); 495 assertEquals("After restoring snapshot: family2 del file count", 0, 496 countFiles(tableName, false, family2)); 497 assertEquals("After restoring snapshot: family1 hfilelink count", regionNum * count, 498 countHFileLinks(family1)); 499 assertEquals("After restoring snapshot: family2 hfilelink count", 0, countHFileLinks(family2)); 500 501 compactor.compact(); 502 503 assertEquals("After second compaction: mob rows count", regionNum * rowNumPerRegion, 504 countMobRows(table)); 505 assertEquals("After second compaction: mob cells count", regionNum * cellNumPerRow 506 * rowNumPerRegion, countMobCells(table)); 507 assertEquals("After second compaction: family1 mob file count", regionNum, 508 countFiles(tableName, true, family1)); 509 assertEquals("After second compaction: family2 mob file count", regionNum * count, 510 countFiles(tableName, true, family2)); 511 assertEquals("After second compaction: family1 del file count", 0, 512 countFiles(tableName, false, family1)); 513 assertEquals("After second compaction: family2 del file count", 0, 514 countFiles(tableName, false, family2)); 515 assertEquals("After second compaction: family1 hfilelink count", 0, countHFileLinks(family1)); 516 assertEquals("After second compaction: family2 hfilelink count", 0, countHFileLinks(family2)); 517 assertRefFileNameEqual(family1); 518 } 519 520 @Test 521 public void testMajorCompactionFromAdmin() throws Exception { 522 resetConf(); 523 int mergeSize = 5000; 524 // change the mob compaction merge size 525 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, mergeSize); 526 SecureRandom rng = new SecureRandom(); 527 byte[] keyBytes = new byte[AES.KEY_LENGTH]; 528 rng.nextBytes(keyBytes); 529 String algorithm = conf.get(HConstants.CRYPTO_KEY_ALGORITHM_CONF_KEY, HConstants.CIPHER_AES); 530 Key cfKey = new SecretKeySpec(keyBytes, algorithm); 531 byte[] encryptionKey = EncryptionUtil.wrapKey(conf, 532 conf.get(HConstants.CRYPTO_MASTERKEY_NAME_CONF_KEY, User.getCurrent().getShortName()), cfKey); 533 final TableName tableName = TableName.valueOf(name.getMethodName()); 534 HTableDescriptor desc = new HTableDescriptor(tableName); 535 HColumnDescriptor hcd1 = new HColumnDescriptor(family1); 536 hcd1.setMobEnabled(true); 537 hcd1.setMobThreshold(0); 538 hcd1.setEncryptionType(algorithm); 539 hcd1.setEncryptionKey(encryptionKey); 540 HColumnDescriptor hcd2 = new HColumnDescriptor(family2); 541 hcd2.setMobEnabled(true); 542 hcd2.setMobThreshold(0); 543 desc.addFamily(hcd1); 544 desc.addFamily(hcd2); 545 admin.createTable(desc, getSplitKeys()); 546 Table table = conn.getTable(tableName); 547 BufferedMutator bufMut = conn.getBufferedMutator(tableName); 548 int count = 4; 549 // generate mob files 550 loadData(admin, bufMut, tableName, count, rowNumPerFile); 551 int rowNumPerRegion = count * rowNumPerFile; 552 553 assertEquals("Before deleting: mob rows count", regionNum * rowNumPerRegion, 554 countMobRows(table)); 555 assertEquals("Before deleting: mob cells count", regionNum * cellNumPerRow * rowNumPerRegion, 556 countMobCells(table)); 557 assertEquals("Before deleting: mob file count", regionNum * count, 558 countFiles(tableName, true, family1)); 559 560 createDelFile(table, tableName, Bytes.toBytes(family1), Bytes.toBytes(qf1)); 561 562 assertEquals("Before compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), 563 countMobRows(table)); 564 assertEquals("Before compaction: mob cells count", regionNum 565 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table)); 566 assertEquals("Before compaction: family1 mob file count", regionNum * count, 567 countFiles(tableName, true, family1)); 568 assertEquals("Before compaction: family2 mob file count", regionNum * count, 569 countFiles(tableName, true, family2)); 570 assertEquals("Before compaction: family1 del file count", regionNum, 571 countFiles(tableName, false, family1)); 572 assertEquals("Before compaction: family2 del file count", regionNum, 573 countFiles(tableName, false, family2)); 574 575 // do the major mob compaction, it will force all files to compaction 576 admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB); 577 578 waitUntilMobCompactionFinished(tableName); 579 assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum), 580 countMobRows(table)); 581 assertEquals("After compaction: mob cells count", regionNum 582 * (cellNumPerRow * rowNumPerRegion - delCellNum), countMobCells(table)); 583 assertEquals("After compaction: family1 mob file count", regionNum, 584 countFiles(tableName, true, family1)); 585 assertEquals("After compaction: family2 mob file count", regionNum * count, 586 countFiles(tableName, true, family2)); 587 assertEquals("After compaction: family1 del file count", 0, 588 countFiles(tableName, false, family1)); 589 assertEquals("After compaction: family2 del file count", regionNum, 590 countFiles(tableName, false, family2)); 591 Assert.assertTrue(verifyEncryption(tableName, family1)); 592 table.close(); 593 } 594 595 @Test 596 public void testScannerOnBulkLoadRefHFiles() throws Exception { 597 resetConf(); 598 setUp("testScannerOnBulkLoadRefHFiles"); 599 long ts = EnvironmentEdgeManager.currentTime(); 600 byte[] key0 = Bytes.toBytes("k0"); 601 byte[] key1 = Bytes.toBytes("k1"); 602 String value0 = "mobValue0"; 603 String value1 = "mobValue1"; 604 String newValue0 = "new"; 605 Put put0 = new Put(key0); 606 put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value0)); 607 loadData(admin, bufMut, tableName, new Put[] { put0 }); 608 put0 = new Put(key0); 609 put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue0)); 610 Put put1 = new Put(key1); 611 put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value1)); 612 loadData(admin, bufMut, tableName, new Put[] { put0, put1 }); 613 // read the latest cell of key0. 614 Get get = new Get(key0); 615 Result result = table.get(get); 616 Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1)); 617 assertEquals("Before compaction: mob value of k0", newValue0, 618 Bytes.toString(CellUtil.cloneValue(cell))); 619 admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB); 620 waitUntilMobCompactionFinished(tableName); 621 // read the latest cell of key0, the cell seqId in bulk loaded file is not reset in the 622 // scanner. The cell that has "new" value is still visible. 623 result = table.get(get); 624 cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1)); 625 assertEquals("After compaction: mob value of k0", newValue0, 626 Bytes.toString(CellUtil.cloneValue(cell))); 627 // read the ref cell, not read further to the mob cell. 628 get = new Get(key1); 629 get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true)); 630 result = table.get(get); 631 cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1)); 632 // the ref name is the new file 633 Path mobFamilyPath = 634 MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), tableName, hcd1.getNameAsString()); 635 List<Path> paths = new ArrayList<>(); 636 if (fs.exists(mobFamilyPath)) { 637 FileStatus[] files = fs.listStatus(mobFamilyPath); 638 for (FileStatus file : files) { 639 if (!StoreFileInfo.isDelFile(file.getPath())) { 640 paths.add(file.getPath()); 641 } 642 } 643 } 644 assertEquals("After compaction: number of mob files:", 1, paths.size()); 645 assertEquals("After compaction: mob file name:", MobUtils.getMobFileName(cell), paths.get(0) 646 .getName()); 647 } 648 649 /** 650 * This case tests the following mob compaction and normal compaction scenario, 651 * after mob compaction, the mob reference in new bulkloaded hfile will win even after it 652 * is compacted with some other normal hfiles. This is to make sure the mvcc is included 653 * after compaction for mob enabled store files. 654 */ 655 @Test 656 public void testGetAfterCompaction() throws Exception { 657 resetConf(); 658 conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, 0); 659 String famStr = "f1"; 660 byte[] fam = Bytes.toBytes(famStr); 661 byte[] qualifier = Bytes.toBytes("q1"); 662 byte[] mobVal = Bytes.toBytes("01234567890"); 663 HTableDescriptor hdt = new HTableDescriptor(TableName.valueOf(name.getMethodName())); 664 hdt.addCoprocessor(CompactTwoLatestHfilesCopro.class.getName()); 665 HColumnDescriptor hcd = new HColumnDescriptor(fam); 666 hcd.setMobEnabled(true); 667 hcd.setMobThreshold(10); 668 hcd.setMaxVersions(1); 669 hdt.addFamily(hcd); 670 try { 671 Table table = TEST_UTIL.createTable(hdt, null); 672 HRegion r = TEST_UTIL.getMiniHBaseCluster().getRegions(hdt.getTableName()).get(0); 673 Put p = new Put(Bytes.toBytes("r1")); 674 p.addColumn(fam, qualifier, mobVal); 675 table.put(p); 676 // Create mob file mob1 and reference file ref1 677 TEST_UTIL.flush(table.getName()); 678 // Make sure that it is flushed. 679 FileSystem fs = r.getRegionFileSystem().getFileSystem(); 680 Path path = r.getRegionFileSystem().getStoreDir(famStr); 681 waitUntilFilesShowup(fs, path, 1); 682 683 p = new Put(Bytes.toBytes("r2")); 684 p.addColumn(fam, qualifier, mobVal); 685 table.put(p); 686 // Create mob file mob2 and reference file ref2 687 TEST_UTIL.flush(table.getName()); 688 waitUntilFilesShowup(fs, path, 2); 689 // Do mob compaction to create mob3 and ref3 690 TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam, CompactType.MOB); 691 waitUntilFilesShowup(fs, path, 3); 692 693 // Compact ref3 and ref2 into ref4 694 TEST_UTIL.getAdmin().compact(hdt.getTableName(), fam); 695 waitUntilFilesShowup(fs, path, 2); 696 697 // Sleep for some time, since TimeToLiveHFileCleaner is 0, the next run of 698 // clean chore is guaranteed to clean up files in archive 699 Thread.sleep(100); 700 // Run cleaner to make sure that files in archive directory are cleaned up 701 TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 702 703 // Get "r2" 704 Get get = new Get(Bytes.toBytes("r2")); 705 try { 706 Result result = table.get(get); 707 assertTrue(Arrays.equals(result.getValue(fam, qualifier), mobVal)); 708 } catch (IOException e) { 709 assertTrue("The MOB file doesn't exist", false); 710 } 711 } finally { 712 TEST_UTIL.deleteTable(hdt.getTableName()); 713 } 714 } 715 716 private void waitUntilFilesShowup(final FileSystem fs, final Path path, final int num) 717 throws InterruptedException, IOException { 718 FileStatus[] fileList = fs.listStatus(path); 719 while (fileList.length != num) { 720 Thread.sleep(50); 721 fileList = fs.listStatus(path); 722 for (FileStatus fileStatus: fileList) { 723 LOG.info(Objects.toString(fileStatus)); 724 } 725 } 726 } 727 728 /** 729 * This copro overwrites the default compaction policy. It always chooses two latest hfiles and 730 * compacts them into a new one. 731 */ 732 public static class CompactTwoLatestHfilesCopro implements RegionCoprocessor, RegionObserver { 733 734 @Override 735 public Optional<RegionObserver> getRegionObserver() { 736 return Optional.of(this); 737 } 738 739 @Override 740 public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, Store store, 741 List<? extends StoreFile> candidates, CompactionLifeCycleTracker tracker) 742 throws IOException { 743 int count = candidates.size(); 744 if (count >= 2) { 745 for (int i = 0; i < count - 2; i++) { 746 candidates.remove(0); 747 } 748 c.bypass(); 749 } 750 } 751 } 752 753 private void waitUntilMobCompactionFinished(TableName tableName) throws IOException, 754 InterruptedException { 755 long finished = EnvironmentEdgeManager.currentTime() + 60000; 756 CompactionState state = admin.getCompactionState(tableName, CompactType.MOB); 757 while (EnvironmentEdgeManager.currentTime() < finished) { 758 if (state == CompactionState.NONE) { 759 break; 760 } 761 state = admin.getCompactionState(tableName, CompactType.MOB); 762 Thread.sleep(10); 763 } 764 assertEquals(CompactionState.NONE, state); 765 } 766 767 /** 768 * Gets the number of rows in the given table. 769 * @param table to get the scanner 770 * @return the number of rows 771 */ 772 private int countMobRows(final Table table) throws IOException { 773 Scan scan = new Scan(); 774 // Do not retrieve the mob data when scanning 775 scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); 776 return TEST_UTIL.countRows(table, scan); 777 } 778 779 /** 780 * Gets the number of cells in the given table. 781 * @param table to get the scanner 782 * @return the number of cells 783 */ 784 private int countMobCells(final Table table) throws IOException { 785 Scan scan = new Scan(); 786 // Do not retrieve the mob data when scanning 787 scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); 788 ResultScanner results = table.getScanner(scan); 789 int count = 0; 790 for (Result res : results) { 791 count += res.size(); 792 } 793 results.close(); 794 return count; 795 } 796 797 /** 798 * Gets the number of files in the mob path. 799 * @param isMobFile gets number of the mob files or del files 800 * @param familyName the family name 801 * @return the number of the files 802 */ 803 private int countFiles(TableName tableName, boolean isMobFile, String familyName) 804 throws IOException { 805 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName); 806 int count = 0; 807 if (fs.exists(mobDirPath)) { 808 FileStatus[] files = fs.listStatus(mobDirPath); 809 for (FileStatus file : files) { 810 if (isMobFile == true) { 811 if (!StoreFileInfo.isDelFile(file.getPath())) { 812 count++; 813 } 814 } else { 815 if (StoreFileInfo.isDelFile(file.getPath())) { 816 count++; 817 } 818 } 819 } 820 } 821 return count; 822 } 823 824 private boolean verifyEncryption(TableName tableName, String familyName) throws IOException { 825 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName); 826 boolean hasFiles = false; 827 if (fs.exists(mobDirPath)) { 828 FileStatus[] files = fs.listStatus(mobDirPath); 829 hasFiles = files != null && files.length > 0; 830 Assert.assertTrue(hasFiles); 831 Path path = files[0].getPath(); 832 CacheConfig cacheConf = new CacheConfig(conf); 833 HStoreFile sf = new HStoreFile(TEST_UTIL.getTestFileSystem(), path, conf, cacheConf, 834 BloomType.NONE, true); 835 sf.initReader(); 836 HFile.Reader reader = sf.getReader().getHFileReader(); 837 byte[] encryptionKey = reader.getTrailer().getEncryptionKey(); 838 Assert.assertTrue(null != encryptionKey); 839 Assert.assertTrue(reader.getFileContext().getEncryptionContext().getCipher().getName() 840 .equals(HConstants.CIPHER_AES)); 841 } 842 return hasFiles; 843 } 844 845 /** 846 * Gets the number of HFileLink in the mob path. 847 * @param familyName the family name 848 * @return the number of the HFileLink 849 */ 850 private int countHFileLinks(String familyName) throws IOException { 851 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName); 852 int count = 0; 853 if (fs.exists(mobDirPath)) { 854 FileStatus[] files = fs.listStatus(mobDirPath); 855 for (FileStatus file : files) { 856 if (HFileLink.isHFileLink(file.getPath())) { 857 count++; 858 } 859 } 860 } 861 return count; 862 } 863 864 /** 865 * Gets the number of files. 866 * @param size the size of the file 867 * @param tableName the current table name 868 * @param familyName the family name 869 * @return the number of files large than the size 870 */ 871 private int countLargeFiles(int size, TableName tableName, String familyName) throws IOException { 872 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, familyName); 873 int count = 0; 874 if (fs.exists(mobDirPath)) { 875 FileStatus[] files = fs.listStatus(mobDirPath); 876 for (FileStatus file : files) { 877 // ignore the del files in the mob path 878 if ((!StoreFileInfo.isDelFile(file.getPath())) && (file.getLen() > size)) { 879 count++; 880 } 881 } 882 } 883 return count; 884 } 885 886 /** 887 * loads some data to the table. 888 */ 889 private void loadData(Admin admin, BufferedMutator table, TableName tableName, int fileNum, 890 int rowNumPerFile) throws IOException, InterruptedException { 891 if (fileNum <= 0) { 892 throw new IllegalArgumentException(); 893 } 894 for (int i = 0; i < fileNum * rowNumPerFile; i++) { 895 for (byte k0 : KEYS) { 896 byte[] k = new byte[] { k0 }; 897 byte[] key = Bytes.add(k, Bytes.toBytes(i)); 898 byte[] mobVal = makeDummyData(10 * (i + 1)); 899 Put put = new Put(key); 900 put.setDurability(Durability.SKIP_WAL); 901 put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), mobVal); 902 put.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf2), mobVal); 903 put.addColumn(Bytes.toBytes(family2), Bytes.toBytes(qf1), mobVal); 904 table.mutate(put); 905 } 906 if ((i + 1) % rowNumPerFile == 0) { 907 table.flush(); 908 admin.flush(tableName); 909 } 910 } 911 } 912 913 private void loadData(Admin admin, BufferedMutator table, TableName tableName, Put[] puts) 914 throws IOException { 915 table.mutate(Arrays.asList(puts)); 916 table.flush(); 917 admin.flush(tableName); 918 } 919 920 private void loadDataForPartitionPolicy(Admin admin, BufferedMutator table, TableName tableName) 921 throws IOException { 922 923 Put[] pArray = new Put[1000]; 924 925 for (int i = 0; i < 1000; i ++) { 926 Put put0 = new Put(Bytes.toBytes("r0" + i)); 927 put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151130Monday, Bytes.toBytes(mobValue0)); 928 pArray[i] = put0; 929 } 930 loadData(admin, bufMut, tableName, pArray); 931 932 Put put06 = new Put(mobKey06); 933 put06.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151128Saturday, Bytes.toBytes(mobValue0)); 934 935 loadData(admin, bufMut, tableName, new Put[] { put06 }); 936 937 Put put1 = new Put(mobKey1); 938 put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151201Tuesday, 939 Bytes.toBytes(mobValue1)); 940 loadData(admin, bufMut, tableName, new Put[] { put1 }); 941 942 Put put2 = new Put(mobKey2); 943 put2.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151205Saturday, 944 Bytes.toBytes(mobValue2)); 945 loadData(admin, bufMut, tableName, new Put[] { put2 }); 946 947 Put put3 = new Put(mobKey3); 948 put3.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151228Monday, 949 Bytes.toBytes(mobValue3)); 950 loadData(admin, bufMut, tableName, new Put[] { put3 }); 951 952 Put put4 = new Put(mobKey4); 953 put4.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151231Thursday, 954 Bytes.toBytes(mobValue4)); 955 loadData(admin, bufMut, tableName, new Put[] { put4 }); 956 957 Put put5 = new Put(mobKey5); 958 put5.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160101Friday, 959 Bytes.toBytes(mobValue5)); 960 loadData(admin, bufMut, tableName, new Put[] { put5 }); 961 962 Put put6 = new Put(mobKey6); 963 put6.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20160103Sunday, 964 Bytes.toBytes(mobValue6)); 965 loadData(admin, bufMut, tableName, new Put[] { put6 }); 966 967 Put put7 = new Put(mobKey7); 968 put7.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20150907Monday, 969 Bytes.toBytes(mobValue7)); 970 loadData(admin, bufMut, tableName, new Put[] { put7 }); 971 972 Put put8 = new Put(mobKey8); 973 put8.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), tsFor20151120Sunday, 974 Bytes.toBytes(mobValue8)); 975 loadData(admin, bufMut, tableName, new Put[] { put8 }); 976 } 977 978 979 /** 980 * delete the row, family and cell to create the del file 981 */ 982 private void createDelFile(Table table, TableName tableName, byte[] family, byte[] qf) 983 throws IOException, InterruptedException { 984 for (byte k0 : KEYS) { 985 byte[] k = new byte[] { k0 }; 986 // delete a family 987 byte[] key1 = Bytes.add(k, Bytes.toBytes(0)); 988 Delete delete1 = new Delete(key1); 989 delete1.addFamily(family); 990 table.delete(delete1); 991 // delete one row 992 byte[] key2 = Bytes.add(k, Bytes.toBytes(2)); 993 Delete delete2 = new Delete(key2); 994 table.delete(delete2); 995 // delete one cell 996 byte[] key3 = Bytes.add(k, Bytes.toBytes(4)); 997 Delete delete3 = new Delete(key3); 998 delete3.addColumn(family, qf); 999 table.delete(delete3); 1000 } 1001 admin.flush(tableName); 1002 List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName); 1003 for (HRegion region : regions) { 1004 region.waitForFlushesAndCompactions(); 1005 region.compact(true); 1006 } 1007 } 1008 /** 1009 * Creates the dummy data with a specific size. 1010 * @param size the size of value 1011 * @return the dummy data 1012 */ 1013 private byte[] makeDummyData(int size) { 1014 byte[] dummyData = new byte[size]; 1015 new Random().nextBytes(dummyData); 1016 return dummyData; 1017 } 1018 1019 /** 1020 * Gets the split keys 1021 */ 1022 private byte[][] getSplitKeys() { 1023 byte[][] splitKeys = new byte[KEYS.length - 1][]; 1024 for (int i = 0; i < splitKeys.length; ++i) { 1025 splitKeys[i] = new byte[] { KEYS[i + 1] }; 1026 } 1027 return splitKeys; 1028 } 1029 1030 private static ExecutorService createThreadPool(Configuration conf) { 1031 int maxThreads = 10; 1032 long keepAliveTime = 60; 1033 final SynchronousQueue<Runnable> queue = new SynchronousQueue<>(); 1034 ThreadPoolExecutor pool = 1035 new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, queue, 1036 new ThreadFactoryBuilder().setNameFormat("MobFileCompactionChore-pool-%d") 1037 .setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 1038 (r, executor) -> { 1039 try { 1040 // waiting for a thread to pick up instead of throwing exceptions. 1041 queue.put(r); 1042 } catch (InterruptedException e) { 1043 throw new RejectedExecutionException(e); 1044 } 1045 }); 1046 pool.allowCoreThreadTimeOut(true); 1047 return pool; 1048 } 1049 1050 private void assertRefFileNameEqual(String familyName) throws IOException { 1051 Scan scan = new Scan(); 1052 scan.addFamily(Bytes.toBytes(familyName)); 1053 // Do not retrieve the mob data when scanning 1054 scan.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(Boolean.TRUE)); 1055 ResultScanner results = table.getScanner(scan); 1056 Path mobFamilyPath = MobUtils.getMobFamilyPath(TEST_UTIL.getConfiguration(), 1057 tableName, familyName); 1058 List<Path> actualFilePaths = new ArrayList<>(); 1059 List<Path> expectFilePaths = new ArrayList<>(); 1060 for (Result res : results) { 1061 for (Cell cell : res.listCells()) { 1062 byte[] referenceValue = CellUtil.cloneValue(cell); 1063 String fileName = Bytes.toString(referenceValue, Bytes.SIZEOF_INT, 1064 referenceValue.length - Bytes.SIZEOF_INT); 1065 Path targetPath = new Path(mobFamilyPath, fileName); 1066 if(!actualFilePaths.contains(targetPath)) { 1067 actualFilePaths.add(targetPath); 1068 } 1069 } 1070 } 1071 results.close(); 1072 if (fs.exists(mobFamilyPath)) { 1073 FileStatus[] files = fs.listStatus(mobFamilyPath); 1074 for (FileStatus file : files) { 1075 if (!StoreFileInfo.isDelFile(file.getPath())) { 1076 expectFilePaths.add(file.getPath()); 1077 } 1078 } 1079 } 1080 Collections.sort(actualFilePaths); 1081 Collections.sort(expectFilePaths); 1082 assertEquals(expectFilePaths, actualFilePaths); 1083 } 1084 1085 /** 1086 * Resets the configuration. 1087 */ 1088 private void resetConf() { 1089 conf.setLong(MobConstants.MOB_COMPACTION_MERGEABLE_THRESHOLD, 1090 MobConstants.DEFAULT_MOB_COMPACTION_MERGEABLE_THRESHOLD); 1091 conf.setInt(MobConstants.MOB_COMPACTION_BATCH_SIZE, 1092 MobConstants.DEFAULT_MOB_COMPACTION_BATCH_SIZE); 1093 } 1094 1095 /** 1096 * Verify mob partition policy compaction values. 1097 */ 1098 private void verifyPolicyValues() throws Exception { 1099 Get get = new Get(mobKey01); 1100 Result result = table.get(get); 1101 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1102 Bytes.toBytes(mobValue0))); 1103 1104 get = new Get(mobKey02); 1105 result = table.get(get); 1106 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1107 Bytes.toBytes(mobValue0))); 1108 1109 get = new Get(mobKey03); 1110 result = table.get(get); 1111 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1112 Bytes.toBytes(mobValue0))); 1113 1114 get = new Get(mobKey04); 1115 result = table.get(get); 1116 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1117 Bytes.toBytes(mobValue0))); 1118 1119 get = new Get(mobKey05); 1120 result = table.get(get); 1121 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1122 Bytes.toBytes(mobValue0))); 1123 1124 get = new Get(mobKey06); 1125 result = table.get(get); 1126 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1127 Bytes.toBytes(mobValue0))); 1128 1129 get = new Get(mobKey1); 1130 result = table.get(get); 1131 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1132 Bytes.toBytes(mobValue1))); 1133 1134 get = new Get(mobKey2); 1135 result = table.get(get); 1136 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1137 Bytes.toBytes(mobValue2))); 1138 1139 get = new Get(mobKey3); 1140 result = table.get(get); 1141 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1142 Bytes.toBytes(mobValue3))); 1143 1144 get = new Get(mobKey4); 1145 result = table.get(get); 1146 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1147 Bytes.toBytes(mobValue4))); 1148 1149 get = new Get(mobKey5); 1150 result = table.get(get); 1151 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1152 Bytes.toBytes(mobValue5))); 1153 1154 get = new Get(mobKey6); 1155 result = table.get(get); 1156 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1157 Bytes.toBytes(mobValue6))); 1158 1159 get = new Get(mobKey7); 1160 result = table.get(get); 1161 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1162 Bytes.toBytes(mobValue7))); 1163 1164 get = new Get(mobKey8); 1165 result = table.get(get); 1166 assertTrue(Arrays.equals(result.getValue(Bytes.toBytes(family1), Bytes.toBytes(qf1)), 1167 Bytes.toBytes(mobValue8))); 1168 } 1169 1170 private void commonPolicyTestLogic (final String tableNameAsString, 1171 final MobCompactPartitionPolicy pType, final boolean majorCompact, 1172 final int expectedFileNumbers, final String[] expectedFileNames, 1173 final boolean setupAndLoadData 1174 ) throws Exception { 1175 if (setupAndLoadData) { 1176 setUpForPolicyTest(tableNameAsString, pType); 1177 1178 loadDataForPartitionPolicy(admin, bufMut, tableName); 1179 } else { 1180 alterForPolicyTest(pType); 1181 } 1182 1183 if (majorCompact) { 1184 admin.majorCompact(tableName, hcd1.getName(), CompactType.MOB); 1185 } else { 1186 admin.compact(tableName, hcd1.getName(), CompactType.MOB); 1187 } 1188 1189 waitUntilMobCompactionFinished(tableName); 1190 1191 // Run cleaner to make sure that files in archive directory are cleaned up 1192 TEST_UTIL.getMiniHBaseCluster().getMaster().getHFileCleaner().choreForTesting(); 1193 1194 //check the number of files 1195 Path mobDirPath = MobUtils.getMobFamilyPath(conf, tableName, family1); 1196 FileStatus[] fileList = fs.listStatus(mobDirPath); 1197 1198 assertTrue(fileList.length == expectedFileNumbers); 1199 1200 // the file names are expected 1201 ArrayList<String> fileNames = new ArrayList<>(expectedFileNumbers); 1202 for (FileStatus file : fileList) { 1203 fileNames.add(MobFileName.getDateFromName(file.getPath().getName())); 1204 } 1205 int index = 0; 1206 for (String fileName : expectedFileNames) { 1207 index = fileNames.indexOf(fileName); 1208 assertTrue(index >= 0); 1209 fileNames.remove(index); 1210 } 1211 1212 // Check daily mob files are removed from the mobdir, and only weekly mob files are there. 1213 // Also check that there is no data loss. 1214 1215 verifyPolicyValues(); 1216 } 1217 }