001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertNull; 023import static org.junit.jupiter.api.Assertions.assertTrue; 024import static org.junit.jupiter.api.Assertions.fail; 025 026import java.io.IOException; 027import java.util.ArrayList; 028import java.util.List; 029import java.util.Set; 030import java.util.concurrent.ConcurrentHashMap; 031import java.util.concurrent.ThreadLocalRandom; 032import java.util.concurrent.atomic.AtomicBoolean; 033import java.util.regex.Pattern; 034import org.apache.hadoop.conf.Configuration; 035import org.apache.hadoop.hbase.client.Admin; 036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.ConnectionFactory; 040import org.apache.hadoop.hbase.client.Put; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.client.Table; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 045import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 046import org.apache.hadoop.hbase.log.HBaseMarkers; 047import org.apache.hadoop.hbase.testclassification.IntegrationTests; 048import org.apache.hadoop.hbase.util.Bytes; 049import org.apache.hadoop.hbase.util.HBaseFsck; 050import org.apache.hadoop.hbase.util.Threads; 051import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; 052import org.apache.hadoop.util.ToolRunner; 053import org.junit.jupiter.api.Tag; 054import org.junit.jupiter.api.Test; 055import org.slf4j.Logger; 056import org.slf4j.LoggerFactory; 057 058/** 059 * Integration test that verifies Procedure V2. DDL operations should go through (rollforward or 060 * rollback) when primary master is killed by ChaosMonkey (default MASTER_KILLING). 061 * <p> 062 * </p> 063 * Multiple Worker threads are started to randomly do the following Actions in loops: Actions 064 * generating and populating tables: 065 * <ul> 066 * <li>CreateTableAction</li> 067 * <li>DisableTableAction</li> 068 * <li>EnableTableAction</li> 069 * <li>DeleteTableAction</li> 070 * <li>AddRowAction</li> 071 * </ul> 072 * Actions performing column family DDL operations: 073 * <ul> 074 * <li>AddColumnFamilyAction</li> 075 * <li>AlterColumnFamilyVersionsAction</li> 076 * <li>AlterColumnFamilyEncodingAction</li> 077 * <li>DeleteColumnFamilyAction</li> 078 * </ul> 079 * Actions performing namespace DDL operations: 080 * <ul> 081 * <li>AddNamespaceAction</li> 082 * <li>AlterNamespaceAction</li> 083 * <li>DeleteNamespaceAction</li> 084 * </ul> 085 * <br/> 086 * The threads run for a period of time (default 20 minutes) then are stopped at the end of runtime. 087 * Verification is performed towards those checkpoints: 088 * <ol> 089 * <li>No Actions throw Exceptions.</li> 090 * <li>No inconsistencies are detected in hbck.</li> 091 * </ol> 092 * <p> 093 * This test should be run by the hbase user since it invokes hbck at the end 094 * </p> 095 * <p> 096 * Usage: hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover 097 * -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000 098 * -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20 099 * -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling 100 */ 101 102@Tag(IntegrationTests.TAG) 103public class IntegrationTestDDLMasterFailover extends IntegrationTestBase { 104 105 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestDDLMasterFailover.class); 106 107 private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster 108 109 protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; 110 111 protected static final int DEFAULT_NUM_THREADS = 20; 112 113 protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables 114 115 private boolean keepObjectsAtTheEnd = false; 116 protected HBaseClusterInterface cluster; 117 118 protected Connection connection; 119 120 /** 121 * A soft limit on how long we should run 122 */ 123 protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; 124 protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads"; 125 protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions"; 126 127 protected AtomicBoolean running = new AtomicBoolean(true); 128 129 protected AtomicBoolean create_table = new AtomicBoolean(true); 130 131 protected int numThreads, numRegions; 132 133 ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap = new ConcurrentHashMap<>(); 134 135 ConcurrentHashMap<TableName, TableDescriptor> enabledTables = new ConcurrentHashMap<>(); 136 137 ConcurrentHashMap<TableName, TableDescriptor> disabledTables = new ConcurrentHashMap<>(); 138 139 ConcurrentHashMap<TableName, TableDescriptor> deletedTables = new ConcurrentHashMap<>(); 140 141 @Override 142 public void setUpCluster() throws Exception { 143 util = getTestingUtil(getConf()); 144 LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); 145 util.initializeCluster(getMinServerCount()); 146 LOG.debug("Done initializing/checking cluster"); 147 cluster = util.getHBaseClusterInterface(); 148 } 149 150 @Override 151 public void cleanUpCluster() throws Exception { 152 if (!keepObjectsAtTheEnd) { 153 Admin admin = util.getAdmin(); 154 for (TableName tableName : admin.listTableNames(Pattern.compile("ittable-\\d+"))) { 155 admin.disableTable(tableName); 156 admin.deleteTable(tableName); 157 } 158 NamespaceDescriptor[] nsds = admin.listNamespaceDescriptors(); 159 for (NamespaceDescriptor nsd : nsds) { 160 if (nsd.getName().matches("itnamespace\\d+")) { 161 LOG.info("Removing namespace=" + nsd.getName()); 162 admin.deleteNamespace(nsd.getName()); 163 } 164 } 165 } 166 167 enabledTables.clear(); 168 disabledTables.clear(); 169 deletedTables.clear(); 170 namespaceMap.clear(); 171 172 Connection connection = getConnection(); 173 connection.close(); 174 super.cleanUpCluster(); 175 } 176 177 protected int getMinServerCount() { 178 return SERVER_COUNT; 179 } 180 181 protected synchronized void setConnection(Connection connection) { 182 this.connection = connection; 183 } 184 185 protected synchronized Connection getConnection() { 186 if (this.connection == null) { 187 try { 188 Connection connection = ConnectionFactory.createConnection(getConf()); 189 setConnection(connection); 190 } catch (IOException e) { 191 LOG.error(HBaseMarkers.FATAL, "Failed to establish connection.", e); 192 } 193 } 194 return connection; 195 } 196 197 protected void verifyNamespaces() throws IOException { 198 Connection connection = getConnection(); 199 Admin admin = connection.getAdmin(); 200 // iterating concurrent map 201 for (String nsName : namespaceMap.keySet()) { 202 try { 203 assertTrue(admin.getNamespaceDescriptor(nsName) != null, 204 "Namespace: " + nsName + " in namespaceMap does not exist"); 205 } catch (NamespaceNotFoundException nsnfe) { 206 fail("Namespace: " + nsName + " in namespaceMap does not exist: " + nsnfe.getMessage()); 207 } 208 } 209 admin.close(); 210 } 211 212 protected void verifyTables() throws IOException { 213 Connection connection = getConnection(); 214 Admin admin = connection.getAdmin(); 215 // iterating concurrent map 216 for (TableName tableName : enabledTables.keySet()) { 217 assertTrue(admin.isTableEnabled(tableName), 218 "Table: " + tableName + " in enabledTables is not enabled"); 219 } 220 for (TableName tableName : disabledTables.keySet()) { 221 assertTrue(admin.isTableDisabled(tableName), 222 "Table: " + tableName + " in disabledTables is not disabled"); 223 } 224 for (TableName tableName : deletedTables.keySet()) { 225 assertFalse(admin.tableExists(tableName), 226 "Table: " + tableName + " in deletedTables is not deleted"); 227 } 228 admin.close(); 229 } 230 231 @Test 232 public void testAsUnitTest() throws Exception { 233 runTest(); 234 } 235 236 @Override 237 public int runTestFromCommandLine() throws Exception { 238 int ret = runTest(); 239 return ret; 240 } 241 242 private abstract class MasterAction { 243 Connection connection = getConnection(); 244 245 abstract void perform() throws IOException; 246 } 247 248 private abstract class NamespaceAction extends MasterAction { 249 final String nsTestConfigKey = "hbase.namespace.testKey"; 250 251 // NamespaceAction has implemented selectNamespace() shared by multiple namespace Actions 252 protected NamespaceDescriptor 253 selectNamespace(ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap) { 254 // synchronization to prevent removal from multiple threads 255 synchronized (namespaceMap) { 256 // randomly select namespace from namespaceMap 257 if (namespaceMap.isEmpty()) { 258 return null; 259 } 260 ArrayList<String> namespaceList = new ArrayList<>(namespaceMap.keySet()); 261 String randomKey = 262 namespaceList.get(ThreadLocalRandom.current().nextInt(namespaceList.size())); 263 NamespaceDescriptor randomNsd = namespaceMap.get(randomKey); 264 // remove from namespaceMap 265 namespaceMap.remove(randomKey); 266 return randomNsd; 267 } 268 } 269 } 270 271 private class CreateNamespaceAction extends NamespaceAction { 272 @Override 273 void perform() throws IOException { 274 Admin admin = connection.getAdmin(); 275 try { 276 NamespaceDescriptor nsd; 277 while (true) { 278 nsd = createNamespaceDesc(); 279 try { 280 if (admin.getNamespaceDescriptor(nsd.getName()) != null) { 281 // the namespace has already existed. 282 continue; 283 } else { 284 // currently, the code never return null - always throws exception if 285 // namespace is not found - this just a defensive programming to make 286 // sure null situation is handled in case the method changes in the 287 // future. 288 break; 289 } 290 } catch (NamespaceNotFoundException nsnfe) { 291 // This is expected for a random generated NamespaceDescriptor 292 break; 293 } 294 } 295 LOG.info("Creating namespace:" + nsd); 296 admin.createNamespace(nsd); 297 NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(nsd.getName()); 298 assertTrue(freshNamespaceDesc != null, "Namespace: " + nsd + " was not created"); 299 namespaceMap.put(nsd.getName(), freshNamespaceDesc); 300 LOG.info("Created namespace:" + freshNamespaceDesc); 301 } catch (Exception e) { 302 LOG.warn("Caught exception in action: " + this.getClass()); 303 throw e; 304 } finally { 305 admin.close(); 306 } 307 } 308 309 private NamespaceDescriptor createNamespaceDesc() { 310 String namespaceName = 311 "itnamespace" + String.format("%010d", ThreadLocalRandom.current().nextInt()); 312 NamespaceDescriptor nsd = NamespaceDescriptor.create(namespaceName).build(); 313 314 nsd.setConfiguration(nsTestConfigKey, 315 String.format("%010d", ThreadLocalRandom.current().nextInt())); 316 return nsd; 317 } 318 } 319 320 private class ModifyNamespaceAction extends NamespaceAction { 321 @Override 322 void perform() throws IOException { 323 NamespaceDescriptor selected = selectNamespace(namespaceMap); 324 if (selected == null) { 325 return; 326 } 327 328 Admin admin = connection.getAdmin(); 329 try { 330 String namespaceName = selected.getName(); 331 LOG.info("Modifying namespace :" + selected); 332 NamespaceDescriptor modifiedNsd = NamespaceDescriptor.create(namespaceName).build(); 333 String nsValueNew; 334 do { 335 nsValueNew = String.format("%010d", ThreadLocalRandom.current().nextInt()); 336 } while (selected.getConfigurationValue(nsTestConfigKey).equals(nsValueNew)); 337 modifiedNsd.setConfiguration(nsTestConfigKey, nsValueNew); 338 admin.modifyNamespace(modifiedNsd); 339 NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(namespaceName); 340 assertTrue(freshNamespaceDesc.getConfigurationValue(nsTestConfigKey).equals(nsValueNew), 341 "Namespace: " + selected + " was not modified"); 342 assertTrue(admin.getNamespaceDescriptor(namespaceName) != null, 343 "Namespace: " + namespaceName + " does not exist"); 344 namespaceMap.put(namespaceName, freshNamespaceDesc); 345 LOG.info("Modified namespace :" + freshNamespaceDesc); 346 } catch (Exception e) { 347 LOG.warn("Caught exception in action: " + this.getClass()); 348 throw e; 349 } finally { 350 admin.close(); 351 } 352 } 353 } 354 355 private class DeleteNamespaceAction extends NamespaceAction { 356 @Override 357 void perform() throws IOException { 358 NamespaceDescriptor selected = selectNamespace(namespaceMap); 359 if (selected == null) { 360 return; 361 } 362 363 Admin admin = connection.getAdmin(); 364 try { 365 String namespaceName = selected.getName(); 366 LOG.info("Deleting namespace :" + selected); 367 admin.deleteNamespace(namespaceName); 368 try { 369 if (admin.getNamespaceDescriptor(namespaceName) != null) { 370 // the namespace still exists. 371 assertTrue(false, "Namespace: " + selected + " was not deleted"); 372 } else { 373 LOG.info("Deleted namespace :" + selected); 374 } 375 } catch (NamespaceNotFoundException nsnfe) { 376 // This is expected result 377 LOG.info("Deleted namespace :" + selected); 378 } 379 } catch (Exception e) { 380 LOG.warn("Caught exception in action: " + this.getClass()); 381 throw e; 382 } finally { 383 admin.close(); 384 } 385 } 386 } 387 388 private abstract class TableAction extends MasterAction { 389 // TableAction has implemented selectTable() shared by multiple table Actions 390 protected TableDescriptor selectTable(ConcurrentHashMap<TableName, TableDescriptor> tableMap) { 391 // synchronization to prevent removal from multiple threads 392 synchronized (tableMap) { 393 // randomly select table from tableMap 394 if (tableMap.isEmpty()) { 395 return null; 396 } 397 ArrayList<TableName> tableList = new ArrayList<>(tableMap.keySet()); 398 TableName key = tableList.get(ThreadLocalRandom.current().nextInt(tableList.size())); 399 TableDescriptor randomTd = tableMap.remove(key); 400 return randomTd; 401 } 402 } 403 } 404 405 private class CreateTableAction extends TableAction { 406 407 @Override 408 void perform() throws IOException { 409 Admin admin = connection.getAdmin(); 410 try { 411 TableDescriptor td = createTableDesc(); 412 TableName tableName = td.getTableName(); 413 if (admin.tableExists(tableName)) { 414 return; 415 } 416 String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName()); 417 numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS); 418 byte[] startKey = Bytes.toBytes("row-0000000000"); 419 byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE); 420 LOG.info("Creating table:" + td); 421 admin.createTable(td, startKey, endKey, numRegions); 422 assertTrue(admin.tableExists(tableName), "Table: " + td + " was not created"); 423 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 424 assertTrue(admin.isTableEnabled(tableName), 425 "After create, Table: " + tableName + " in not enabled"); 426 enabledTables.put(tableName, freshTableDesc); 427 LOG.info("Created table:" + freshTableDesc); 428 } catch (Exception e) { 429 LOG.warn("Caught exception in action: " + this.getClass()); 430 throw e; 431 } finally { 432 admin.close(); 433 } 434 } 435 436 private TableDescriptor createTableDesc() { 437 String tableName = 438 String.format("ittable-%010d", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); 439 String familyName = "cf-" + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); 440 return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) 441 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(familyName)).build(); 442 } 443 } 444 445 private class DisableTableAction extends TableAction { 446 447 @Override 448 void perform() throws IOException { 449 450 TableDescriptor selected = selectTable(enabledTables); 451 if (selected == null) { 452 return; 453 } 454 455 Admin admin = connection.getAdmin(); 456 try { 457 TableName tableName = selected.getTableName(); 458 LOG.info("Disabling table :" + selected); 459 admin.disableTable(tableName); 460 assertTrue(admin.isTableDisabled(tableName), "Table: " + selected + " was not disabled"); 461 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 462 assertTrue(admin.isTableDisabled(tableName), 463 "After disable, Table: " + tableName + " is not disabled"); 464 disabledTables.put(tableName, freshTableDesc); 465 LOG.info("Disabled table :" + freshTableDesc); 466 } catch (Exception e) { 467 LOG.warn("Caught exception in action: " + this.getClass()); 468 // TODO workaround 469 // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync 470 // operations 471 // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node 472 // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes 473 // 2) if master failover happens in the middle of the enable/disable operation, the new 474 // master will try to recover the tables in ENABLING/DISABLING state, as programmed in 475 // AssignmentManager#recoverTableInEnablingState() and 476 // AssignmentManager#recoverTableInDisablingState() 477 // 3) after the new master initialization completes, the procedure tries to re-do the 478 // enable/disable operation, which was already done. Ignore those exceptions before change 479 // of behaviors of AssignmentManager in presence of PV2 480 if (e instanceof TableNotEnabledException) { 481 LOG.warn("Caught TableNotEnabledException in action: " + this.getClass()); 482 e.printStackTrace(); 483 } else { 484 throw e; 485 } 486 } finally { 487 admin.close(); 488 } 489 } 490 } 491 492 private class EnableTableAction extends TableAction { 493 494 @Override 495 void perform() throws IOException { 496 497 TableDescriptor selected = selectTable(disabledTables); 498 if (selected == null) { 499 return; 500 } 501 502 Admin admin = connection.getAdmin(); 503 try { 504 TableName tableName = selected.getTableName(); 505 LOG.info("Enabling table :" + selected); 506 admin.enableTable(tableName); 507 assertTrue(admin.isTableEnabled(tableName), "Table: " + selected + " was not enabled"); 508 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 509 assertTrue(admin.isTableEnabled(tableName), 510 "After enable, Table: " + tableName + " in not enabled"); 511 enabledTables.put(tableName, freshTableDesc); 512 LOG.info("Enabled table :" + freshTableDesc); 513 } catch (Exception e) { 514 LOG.warn("Caught exception in action: " + this.getClass()); 515 // TODO workaround 516 // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync 517 // operations 1) when enable/disable starts, the table state is changed to 518 // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED 519 // once the operation completes 2) if master failover happens in the middle of the 520 // enable/disable operation, the new master will try to recover the tables in 521 // ENABLING/DISABLING state, as programmed in 522 // AssignmentManager#recoverTableInEnablingState() and 523 // AssignmentManager#recoverTableInDisablingState() 524 // 3) after the new master initialization completes, the procedure tries to re-do the 525 // enable/disable operation, which was already done. Ignore those exceptions before 526 // change of behaviors of AssignmentManager in presence of PV2 527 if (e instanceof TableNotDisabledException) { 528 LOG.warn("Caught TableNotDisabledException in action: " + this.getClass()); 529 e.printStackTrace(); 530 } else { 531 throw e; 532 } 533 } finally { 534 admin.close(); 535 } 536 } 537 } 538 539 private class DeleteTableAction extends TableAction { 540 541 @Override 542 void perform() throws IOException { 543 544 TableDescriptor selected = selectTable(disabledTables); 545 if (selected == null) { 546 return; 547 } 548 549 Admin admin = connection.getAdmin(); 550 try { 551 TableName tableName = selected.getTableName(); 552 LOG.info("Deleting table :" + selected); 553 admin.deleteTable(tableName); 554 assertFalse(admin.tableExists(tableName), "Table: " + selected + " was not deleted"); 555 deletedTables.put(tableName, selected); 556 LOG.info("Deleted table :" + selected); 557 } catch (Exception e) { 558 LOG.warn("Caught exception in action: " + this.getClass()); 559 throw e; 560 } finally { 561 admin.close(); 562 } 563 } 564 } 565 566 private abstract class ColumnAction extends TableAction { 567 // ColumnAction has implemented selectFamily() shared by multiple family Actions 568 protected ColumnFamilyDescriptor selectFamily(TableDescriptor td) { 569 if (td == null) { 570 return null; 571 } 572 ColumnFamilyDescriptor[] families = td.getColumnFamilies(); 573 if (families.length == 0) { 574 LOG.info("No column families in table: " + td); 575 return null; 576 } 577 return families[ThreadLocalRandom.current().nextInt(families.length)]; 578 } 579 } 580 581 private class AddColumnFamilyAction extends ColumnAction { 582 583 @Override 584 void perform() throws IOException { 585 TableDescriptor selected = selectTable(disabledTables); 586 if (selected == null) { 587 return; 588 } 589 590 Admin admin = connection.getAdmin(); 591 try { 592 ColumnFamilyDescriptor cfd = createFamilyDesc(); 593 if (selected.hasColumnFamily(cfd.getName())) { 594 LOG.info( 595 Bytes.toString(cfd.getName()) + " already exists in table " + selected.getTableName()); 596 return; 597 } 598 TableName tableName = selected.getTableName(); 599 LOG.info("Adding column family: " + cfd + " to table: " + tableName); 600 admin.addColumnFamily(tableName, cfd); 601 // assertion 602 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 603 assertTrue(freshTableDesc.hasColumnFamily(cfd.getName()), 604 "Column family: " + cfd + " was not added"); 605 assertTrue(admin.isTableDisabled(tableName), 606 "After add column family, Table: " + tableName + " is not disabled"); 607 disabledTables.put(tableName, freshTableDesc); 608 LOG.info("Added column family: " + cfd + " to table: " + tableName); 609 } catch (Exception e) { 610 LOG.warn("Caught exception in action: " + this.getClass()); 611 throw e; 612 } finally { 613 admin.close(); 614 } 615 } 616 617 private ColumnFamilyDescriptor createFamilyDesc() { 618 String familyName = String.format("cf-%010d", ThreadLocalRandom.current().nextInt()); 619 return ColumnFamilyDescriptorBuilder.of(familyName); 620 } 621 } 622 623 private class AlterFamilyVersionsAction extends ColumnAction { 624 625 @Override 626 void perform() throws IOException { 627 TableDescriptor selected = selectTable(disabledTables); 628 if (selected == null) { 629 return; 630 } 631 ColumnFamilyDescriptor columnDesc = selectFamily(selected); 632 if (columnDesc == null) { 633 return; 634 } 635 636 Admin admin = connection.getAdmin(); 637 int versions = ThreadLocalRandom.current().nextInt(10) + 3; 638 try { 639 TableName tableName = selected.getTableName(); 640 LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions 641 + " in table: " + tableName); 642 643 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc) 644 .setMinVersions(versions).setMaxVersions(versions).build(); 645 TableDescriptor td = 646 TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build(); 647 admin.modifyTable(td); 648 649 // assertion 650 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 651 ColumnFamilyDescriptor freshColumnDesc = 652 freshTableDesc.getColumnFamily(columnDesc.getName()); 653 assertEquals(freshColumnDesc.getMaxVersions(), versions, 654 "Column family: " + columnDesc + " was not altered"); 655 assertEquals(freshColumnDesc.getMinVersions(), versions, 656 "Column family: " + freshColumnDesc + " was not altered"); 657 assertTrue(admin.isTableDisabled(tableName), 658 "After alter versions of column family, Table: " + tableName + " is not disabled"); 659 disabledTables.put(tableName, freshTableDesc); 660 LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions 661 + " in table: " + tableName); 662 } catch (Exception e) { 663 LOG.warn("Caught exception in action: " + this.getClass()); 664 throw e; 665 } finally { 666 admin.close(); 667 } 668 } 669 } 670 671 private class AlterFamilyEncodingAction extends ColumnAction { 672 673 @Override 674 void perform() throws IOException { 675 TableDescriptor selected = selectTable(disabledTables); 676 if (selected == null) { 677 return; 678 } 679 ColumnFamilyDescriptor columnDesc = selectFamily(selected); 680 if (columnDesc == null) { 681 return; 682 } 683 684 Admin admin = connection.getAdmin(); 685 try { 686 TableName tableName = selected.getTableName(); 687 // possible DataBlockEncoding ids 688 DataBlockEncoding[] possibleIds = { DataBlockEncoding.NONE, DataBlockEncoding.PREFIX, 689 DataBlockEncoding.DIFF, DataBlockEncoding.FAST_DIFF, DataBlockEncoding.ROW_INDEX_V1 }; 690 short id = possibleIds[ThreadLocalRandom.current().nextInt(possibleIds.length)].getId(); 691 LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id + " in table: " 692 + tableName); 693 694 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc) 695 .setDataBlockEncoding(DataBlockEncoding.getEncodingById(id)).build(); 696 TableDescriptor td = 697 TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build(); 698 admin.modifyTable(td); 699 700 // assertion 701 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 702 ColumnFamilyDescriptor freshColumnDesc = 703 freshTableDesc.getColumnFamily(columnDesc.getName()); 704 assertEquals(freshColumnDesc.getDataBlockEncoding().getId(), id, 705 "Encoding of column family: " + columnDesc + " was not altered"); 706 assertTrue(admin.isTableDisabled(tableName), 707 "After alter encoding of column family, Table: " + tableName + " is not disabled"); 708 disabledTables.put(tableName, freshTableDesc); 709 LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id 710 + " in table: " + tableName); 711 } catch (Exception e) { 712 LOG.warn("Caught exception in action: " + this.getClass()); 713 throw e; 714 } finally { 715 admin.close(); 716 } 717 } 718 } 719 720 private class DeleteColumnFamilyAction extends ColumnAction { 721 722 @Override 723 void perform() throws IOException { 724 TableDescriptor selected = selectTable(disabledTables); 725 ColumnFamilyDescriptor cfd = selectFamily(selected); 726 if (selected == null || cfd == null) { 727 return; 728 } 729 730 Admin admin = connection.getAdmin(); 731 try { 732 if (selected.getColumnFamilyCount() < 2) { 733 LOG.info("No enough column families to delete in table " + selected.getTableName()); 734 return; 735 } 736 TableName tableName = selected.getTableName(); 737 LOG.info("Deleting column family: " + cfd + " from table: " + tableName); 738 admin.deleteColumnFamily(tableName, cfd.getName()); 739 // assertion 740 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 741 assertFalse(freshTableDesc.hasColumnFamily(cfd.getName()), 742 "Column family: " + cfd + " was not added"); 743 assertTrue(admin.isTableDisabled(tableName), 744 "After delete column family, Table: " + tableName + " is not disabled"); 745 disabledTables.put(tableName, freshTableDesc); 746 LOG.info("Deleted column family: " + cfd + " from table: " + tableName); 747 } catch (Exception e) { 748 LOG.warn("Caught exception in action: " + this.getClass()); 749 throw e; 750 } finally { 751 admin.close(); 752 } 753 } 754 } 755 756 private class AddRowAction extends ColumnAction { 757 // populate tables 758 @Override 759 void perform() throws IOException { 760 TableDescriptor selected = selectTable(enabledTables); 761 if (selected == null) { 762 return; 763 } 764 765 Admin admin = connection.getAdmin(); 766 TableName tableName = selected.getTableName(); 767 try (Table table = connection.getTable(tableName)) { 768 ArrayList<RegionInfo> regionInfos = 769 new ArrayList<>(admin.getRegions(selected.getTableName())); 770 int numRegions = regionInfos.size(); 771 // average number of rows to be added per action to each region 772 int average_rows = 1; 773 int numRows = average_rows * numRegions; 774 LOG.info("Adding " + numRows + " rows to table: " + selected); 775 byte[] value = new byte[10]; 776 for (int i = 0; i < numRows; i++) { 777 // nextInt(Integer.MAX_VALUE)) to return positive numbers only 778 byte[] rowKey = 779 Bytes.toBytes("row-" + String.format("%010d", ThreadLocalRandom.current().nextInt())); 780 ColumnFamilyDescriptor cfd = selectFamily(selected); 781 if (cfd == null) { 782 return; 783 } 784 byte[] family = cfd.getName(); 785 byte[] qualifier = Bytes.toBytes("col-" + ThreadLocalRandom.current().nextInt(10)); 786 Bytes.random(value); 787 Put put = new Put(rowKey); 788 put.addColumn(family, qualifier, value); 789 table.put(put); 790 } 791 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 792 assertTrue(admin.isTableEnabled(tableName), 793 "After insert, Table: " + tableName + " in not enabled"); 794 enabledTables.put(tableName, freshTableDesc); 795 LOG.info("Added " + numRows + " rows to table: " + selected); 796 } catch (Exception e) { 797 LOG.warn("Caught exception in action: " + this.getClass()); 798 throw e; 799 } finally { 800 admin.close(); 801 } 802 } 803 } 804 805 private enum ACTION { 806 CREATE_NAMESPACE, 807 MODIFY_NAMESPACE, 808 DELETE_NAMESPACE, 809 CREATE_TABLE, 810 DISABLE_TABLE, 811 ENABLE_TABLE, 812 DELETE_TABLE, 813 ADD_COLUMNFAMILY, 814 DELETE_COLUMNFAMILY, 815 ALTER_FAMILYVERSIONS, 816 ALTER_FAMILYENCODING, 817 ADD_ROW 818 } 819 820 private class Worker extends Thread { 821 822 private Exception savedException; 823 824 private ACTION action; 825 826 @Override 827 public void run() { 828 while (running.get()) { 829 // select random action 830 ACTION selectedAction = 831 ACTION.values()[ThreadLocalRandom.current().nextInt(ACTION.values().length)]; 832 this.action = selectedAction; 833 LOG.info("Performing Action: " + selectedAction); 834 835 try { 836 switch (selectedAction) { 837 case CREATE_NAMESPACE: 838 new CreateNamespaceAction().perform(); 839 break; 840 case MODIFY_NAMESPACE: 841 new ModifyNamespaceAction().perform(); 842 break; 843 case DELETE_NAMESPACE: 844 new DeleteNamespaceAction().perform(); 845 break; 846 case CREATE_TABLE: 847 // stop creating new tables in the later stage of the test to avoid too many empty 848 // tables 849 if (create_table.get()) { 850 new CreateTableAction().perform(); 851 } 852 break; 853 case ADD_ROW: 854 new AddRowAction().perform(); 855 break; 856 case DISABLE_TABLE: 857 new DisableTableAction().perform(); 858 break; 859 case ENABLE_TABLE: 860 new EnableTableAction().perform(); 861 break; 862 case DELETE_TABLE: 863 // reduce probability of deleting table to 20% 864 if (ThreadLocalRandom.current().nextInt(100) < 20) { 865 new DeleteTableAction().perform(); 866 } 867 break; 868 case ADD_COLUMNFAMILY: 869 new AddColumnFamilyAction().perform(); 870 break; 871 case DELETE_COLUMNFAMILY: 872 // reduce probability of deleting column family to 20% 873 if (ThreadLocalRandom.current().nextInt(100) < 20) { 874 new DeleteColumnFamilyAction().perform(); 875 } 876 break; 877 case ALTER_FAMILYVERSIONS: 878 new AlterFamilyVersionsAction().perform(); 879 break; 880 case ALTER_FAMILYENCODING: 881 new AlterFamilyEncodingAction().perform(); 882 break; 883 } 884 } catch (Exception ex) { 885 this.savedException = ex; 886 return; 887 } 888 } 889 LOG.info(this.getName() + " stopped"); 890 } 891 892 public Exception getSavedException() { 893 return this.savedException; 894 } 895 896 public ACTION getAction() { 897 return this.action; 898 } 899 } 900 901 private void checkException(List<Worker> workers) { 902 if (workers == null || workers.isEmpty()) return; 903 for (Worker worker : workers) { 904 Exception e = worker.getSavedException(); 905 if (e != null) { 906 LOG.error("Found exception in thread: " + worker.getName(), e); 907 } 908 assertNull(e, "Action failed: " + worker.getAction() + " in thread: " + worker.getName()); 909 } 910 } 911 912 private int runTest() throws Exception { 913 LOG.info("Starting the test"); 914 915 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 916 long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME); 917 918 String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName()); 919 numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS); 920 921 ArrayList<Worker> workers = new ArrayList<>(numThreads); 922 for (int i = 0; i < numThreads; i++) { 923 checkException(workers); 924 Worker worker = new Worker(); 925 LOG.info("Launching worker thread " + worker.getName()); 926 workers.add(worker); 927 worker.start(); 928 } 929 930 Threads.sleep(runtime / 2); 931 LOG.info("Stopping creating new tables"); 932 create_table.set(false); 933 Threads.sleep(runtime / 2); 934 LOG.info("Runtime is up"); 935 running.set(false); 936 937 checkException(workers); 938 939 for (Worker worker : workers) { 940 worker.join(); 941 } 942 LOG.info("All Worker threads stopped"); 943 944 // verify 945 LOG.info("Verify actions of all threads succeeded"); 946 checkException(workers); 947 LOG.info("Verify namespaces"); 948 verifyNamespaces(); 949 LOG.info("Verify states of all tables"); 950 verifyTables(); 951 952 // RUN HBCK 953 954 HBaseFsck hbck = null; 955 try { 956 LOG.info("Running hbck"); 957 hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false); 958 if (HbckTestingUtil.inconsistencyFound(hbck)) { 959 // Find the inconsistency during HBCK. Leave table and namespace undropped so that 960 // we can check outside the test. 961 keepObjectsAtTheEnd = true; 962 } 963 HbckTestingUtil.assertNoErrors(hbck); 964 LOG.info("Finished hbck"); 965 } finally { 966 if (hbck != null) { 967 hbck.close(); 968 } 969 } 970 return 0; 971 } 972 973 @Override 974 public TableName getTablename() { 975 return null; // This test is not inteded to run with stock Chaos Monkey 976 } 977 978 @Override 979 protected Set<String> getColumnFamilies() { 980 return null; // This test is not inteded to run with stock Chaos Monkey 981 } 982 983 public static void main(String[] args) throws Exception { 984 Configuration conf = HBaseConfiguration.create(); 985 IntegrationTestingUtility.setUseDistributedCluster(conf); 986 IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover(); 987 Connection connection = null; 988 int ret = 1; 989 try { 990 // Initialize connection once, then pass to Actions 991 LOG.debug("Setting up connection ..."); 992 connection = ConnectionFactory.createConnection(conf); 993 masterFailover.setConnection(connection); 994 ret = ToolRunner.run(conf, masterFailover, args); 995 } catch (IOException e) { 996 LOG.error(HBaseMarkers.FATAL, "Failed to establish connection. Aborting test ...", e); 997 } finally { 998 connection = masterFailover.getConnection(); 999 if (connection != null) { 1000 connection.close(); 1001 } 1002 System.exit(ret); 1003 } 1004 } 1005}