001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase; 020 021import java.io.IOException; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.Set; 025import java.util.concurrent.ConcurrentHashMap; 026import java.util.concurrent.atomic.AtomicBoolean; 027 028import org.apache.commons.lang3.RandomStringUtils; 029import org.apache.commons.lang3.RandomUtils; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 033import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 034import org.apache.hadoop.hbase.client.Connection; 035import org.apache.hadoop.hbase.client.ConnectionFactory; 036import org.apache.hadoop.hbase.client.Put; 037import org.apache.hadoop.hbase.client.Table; 038import org.apache.hadoop.hbase.client.TableDescriptor; 039import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 040import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 041import org.apache.hadoop.hbase.log.HBaseMarkers; 042import org.apache.hadoop.hbase.testclassification.IntegrationTests; 043import org.apache.hadoop.hbase.util.Bytes; 044import org.apache.hadoop.hbase.util.HBaseFsck; 045import org.apache.hadoop.hbase.util.Threads; 046import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; 047import org.apache.hadoop.util.ToolRunner; 048import org.junit.Assert; 049import org.junit.Test; 050import org.junit.experimental.categories.Category; 051import org.slf4j.Logger; 052import org.slf4j.LoggerFactory; 053 054/** 055 * 056 * Integration test that verifies Procedure V2. 057 * 058 * DDL operations should go through (rollforward or rollback) when primary master is killed by 059 * ChaosMonkey (default MASTER_KILLING). 060 * 061 * <p></p>Multiple Worker threads are started to randomly do the following Actions in loops: 062 * Actions generating and populating tables: 063 * <ul> 064 * <li>CreateTableAction</li> 065 * <li>DisableTableAction</li> 066 * <li>EnableTableAction</li> 067 * <li>DeleteTableAction</li> 068 * <li>AddRowAction</li> 069 * </ul> 070 * Actions performing column family DDL operations: 071 * <ul> 072 * <li>AddColumnFamilyAction</li> 073 * <li>AlterColumnFamilyVersionsAction</li> 074 * <li>AlterColumnFamilyEncodingAction</li> 075 * <li>DeleteColumnFamilyAction</li> 076 * </ul> 077 * Actions performing namespace DDL operations: 078 * <ul> 079 * <li>AddNamespaceAction</li> 080 * <li>AlterNamespaceAction</li> 081 * <li>DeleteNamespaceAction</li> 082 * </ul> 083 * <br/> 084 * 085 * The threads run for a period of time (default 20 minutes) then are stopped at the end of 086 * runtime. Verification is performed towards those checkpoints: 087 * <ol> 088 * <li>No Actions throw Exceptions.</li> 089 * <li>No inconsistencies are detected in hbck.</li> 090 * </ol> 091 * 092 * <p> 093 * This test should be run by the hbase user since it invokes hbck at the end 094 * </p><p> 095 * Usage: 096 * hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover 097 * -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000 098 * -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20 099 * -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling 100 */ 101 102@Category(IntegrationTests.class) 103public class IntegrationTestDDLMasterFailover extends IntegrationTestBase { 104 105 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestDDLMasterFailover.class); 106 107 private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster 108 109 protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; 110 111 protected static final int DEFAULT_NUM_THREADS = 20; 112 113 protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables 114 115 private boolean keepObjectsAtTheEnd = false; 116 protected HBaseCluster cluster; 117 118 protected Connection connection; 119 120 /** 121 * A soft limit on how long we should run 122 */ 123 protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; 124 protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads"; 125 protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions"; 126 127 protected AtomicBoolean running = new AtomicBoolean(true); 128 129 protected AtomicBoolean create_table = new AtomicBoolean(true); 130 131 protected int numThreads, numRegions; 132 133 ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap = new ConcurrentHashMap<>(); 134 135 ConcurrentHashMap<TableName, TableDescriptor> enabledTables = new ConcurrentHashMap<>(); 136 137 ConcurrentHashMap<TableName, TableDescriptor> disabledTables = new ConcurrentHashMap<>(); 138 139 ConcurrentHashMap<TableName, TableDescriptor> deletedTables = new ConcurrentHashMap<>(); 140 141 @Override 142 public void setUpCluster() throws Exception { 143 util = getTestingUtil(getConf()); 144 LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); 145 util.initializeCluster(getMinServerCount()); 146 LOG.debug("Done initializing/checking cluster"); 147 cluster = util.getHBaseClusterInterface(); 148 } 149 150 @Override 151 public void cleanUpCluster() throws Exception { 152 if (!keepObjectsAtTheEnd) { 153 Admin admin = util.getAdmin(); 154 admin.disableTables("ittable-\\d+"); 155 admin.deleteTables("ittable-\\d+"); 156 NamespaceDescriptor [] nsds = admin.listNamespaceDescriptors(); 157 for(NamespaceDescriptor nsd: nsds) { 158 if(nsd.getName().matches("itnamespace\\d+")) { 159 LOG.info("Removing namespace="+nsd.getName()); 160 admin.deleteNamespace(nsd.getName()); 161 } 162 } 163 } 164 165 enabledTables.clear(); 166 disabledTables.clear(); 167 deletedTables.clear(); 168 namespaceMap.clear(); 169 170 Connection connection = getConnection(); 171 connection.close(); 172 super.cleanUpCluster(); 173 } 174 175 protected int getMinServerCount() { 176 return SERVER_COUNT; 177 } 178 179 protected synchronized void setConnection(Connection connection){ 180 this.connection = connection; 181 } 182 183 protected synchronized Connection getConnection(){ 184 if (this.connection == null) { 185 try { 186 Connection connection = ConnectionFactory.createConnection(getConf()); 187 setConnection(connection); 188 } catch (IOException e) { 189 LOG.error(HBaseMarkers.FATAL, "Failed to establish connection.", e); 190 } 191 } 192 return connection; 193 } 194 195 protected void verifyNamespaces() throws IOException{ 196 Connection connection = getConnection(); 197 Admin admin = connection.getAdmin(); 198 // iterating concurrent map 199 for (String nsName : namespaceMap.keySet()){ 200 try { 201 Assert.assertTrue( 202 "Namespace: " + nsName + " in namespaceMap does not exist", 203 admin.getNamespaceDescriptor(nsName) != null); 204 } catch (NamespaceNotFoundException nsnfe) { 205 Assert.fail( 206 "Namespace: " + nsName + " in namespaceMap does not exist: " + nsnfe.getMessage()); 207 } 208 } 209 admin.close(); 210 } 211 212 protected void verifyTables() throws IOException{ 213 Connection connection = getConnection(); 214 Admin admin = connection.getAdmin(); 215 // iterating concurrent map 216 for (TableName tableName : enabledTables.keySet()){ 217 Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled", 218 admin.isTableEnabled(tableName)); 219 } 220 for (TableName tableName : disabledTables.keySet()){ 221 Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled", 222 admin.isTableDisabled(tableName)); 223 } 224 for (TableName tableName : deletedTables.keySet()){ 225 Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted", 226 admin.tableExists(tableName)); 227 } 228 admin.close(); 229 } 230 231 @Test 232 public void testAsUnitTest() throws Exception { 233 runTest(); 234 } 235 236 @Override 237 public int runTestFromCommandLine() throws Exception { 238 int ret = runTest(); 239 return ret; 240 } 241 242 private abstract class MasterAction{ 243 Connection connection = getConnection(); 244 245 abstract void perform() throws IOException; 246 } 247 248 private abstract class NamespaceAction extends MasterAction { 249 final String nsTestConfigKey = "hbase.namespace.testKey"; 250 251 // NamespaceAction has implemented selectNamespace() shared by multiple namespace Actions 252 protected NamespaceDescriptor selectNamespace( 253 ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap) { 254 // synchronization to prevent removal from multiple threads 255 synchronized (namespaceMap) { 256 // randomly select namespace from namespaceMap 257 if (namespaceMap.isEmpty()) { 258 return null; 259 } 260 ArrayList<String> namespaceList = new ArrayList<>(namespaceMap.keySet()); 261 String randomKey = namespaceList.get(RandomUtils.nextInt(0, namespaceList.size())); 262 NamespaceDescriptor randomNsd = namespaceMap.get(randomKey); 263 // remove from namespaceMap 264 namespaceMap.remove(randomKey); 265 return randomNsd; 266 } 267 } 268 } 269 270 private class CreateNamespaceAction extends NamespaceAction { 271 @Override 272 void perform() throws IOException { 273 Admin admin = connection.getAdmin(); 274 try { 275 NamespaceDescriptor nsd; 276 while (true) { 277 nsd = createNamespaceDesc(); 278 try { 279 if (admin.getNamespaceDescriptor(nsd.getName()) != null) { 280 // the namespace has already existed. 281 continue; 282 } else { 283 // currently, the code never return null - always throws exception if 284 // namespace is not found - this just a defensive programming to make 285 // sure null situation is handled in case the method changes in the 286 // future. 287 break; 288 } 289 } catch (NamespaceNotFoundException nsnfe) { 290 // This is expected for a random generated NamespaceDescriptor 291 break; 292 } 293 } 294 LOG.info("Creating namespace:" + nsd); 295 admin.createNamespace(nsd); 296 NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(nsd.getName()); 297 Assert.assertTrue("Namespace: " + nsd + " was not created", freshNamespaceDesc != null); 298 namespaceMap.put(nsd.getName(), freshNamespaceDesc); 299 LOG.info("Created namespace:" + freshNamespaceDesc); 300 } catch (Exception e){ 301 LOG.warn("Caught exception in action: " + this.getClass()); 302 throw e; 303 } finally { 304 admin.close(); 305 } 306 } 307 308 private NamespaceDescriptor createNamespaceDesc() { 309 String namespaceName = "itnamespace" + String.format("%010d", 310 RandomUtils.nextInt()); 311 NamespaceDescriptor nsd = NamespaceDescriptor.create(namespaceName).build(); 312 313 nsd.setConfiguration( 314 nsTestConfigKey, 315 String.format("%010d", RandomUtils.nextInt())); 316 return nsd; 317 } 318 } 319 320 private class ModifyNamespaceAction extends NamespaceAction { 321 @Override 322 void perform() throws IOException { 323 NamespaceDescriptor selected = selectNamespace(namespaceMap); 324 if (selected == null) { 325 return; 326 } 327 328 Admin admin = connection.getAdmin(); 329 try { 330 String namespaceName = selected.getName(); 331 LOG.info("Modifying namespace :" + selected); 332 NamespaceDescriptor modifiedNsd = NamespaceDescriptor.create(namespaceName).build(); 333 String nsValueNew; 334 do { 335 nsValueNew = String.format("%010d", RandomUtils.nextInt()); 336 } while (selected.getConfigurationValue(nsTestConfigKey).equals(nsValueNew)); 337 modifiedNsd.setConfiguration(nsTestConfigKey, nsValueNew); 338 admin.modifyNamespace(modifiedNsd); 339 NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(namespaceName); 340 Assert.assertTrue( 341 "Namespace: " + selected + " was not modified", 342 freshNamespaceDesc.getConfigurationValue(nsTestConfigKey).equals(nsValueNew)); 343 Assert.assertTrue( 344 "Namespace: " + namespaceName + " does not exist", 345 admin.getNamespaceDescriptor(namespaceName) != null); 346 namespaceMap.put(namespaceName, freshNamespaceDesc); 347 LOG.info("Modified namespace :" + freshNamespaceDesc); 348 } catch (Exception e){ 349 LOG.warn("Caught exception in action: " + this.getClass()); 350 throw e; 351 } finally { 352 admin.close(); 353 } 354 } 355 } 356 357 private class DeleteNamespaceAction extends NamespaceAction { 358 @Override 359 void perform() throws IOException { 360 NamespaceDescriptor selected = selectNamespace(namespaceMap); 361 if (selected == null) { 362 return; 363 } 364 365 Admin admin = connection.getAdmin(); 366 try { 367 String namespaceName = selected.getName(); 368 LOG.info("Deleting namespace :" + selected); 369 admin.deleteNamespace(namespaceName); 370 try { 371 if (admin.getNamespaceDescriptor(namespaceName) != null) { 372 // the namespace still exists. 373 Assert.assertTrue("Namespace: " + selected + " was not deleted", false); 374 } else { 375 LOG.info("Deleted namespace :" + selected); 376 } 377 } catch (NamespaceNotFoundException nsnfe) { 378 // This is expected result 379 LOG.info("Deleted namespace :" + selected); 380 } 381 } catch (Exception e){ 382 LOG.warn("Caught exception in action: " + this.getClass()); 383 throw e; 384 } finally { 385 admin.close(); 386 } 387 } 388 } 389 390 private abstract class TableAction extends MasterAction{ 391 // TableAction has implemented selectTable() shared by multiple table Actions 392 protected TableDescriptor selectTable(ConcurrentHashMap<TableName, TableDescriptor> tableMap) 393 { 394 // synchronization to prevent removal from multiple threads 395 synchronized (tableMap){ 396 // randomly select table from tableMap 397 if (tableMap.isEmpty()) { 398 return null; 399 } 400 ArrayList<TableName> tableList = new ArrayList<>(tableMap.keySet()); 401 TableName randomKey = tableList.get(RandomUtils.nextInt(0, tableList.size())); 402 TableDescriptor randomTd = tableMap.remove(randomKey); 403 return randomTd; 404 } 405 } 406 } 407 408 private class CreateTableAction extends TableAction { 409 410 @Override 411 void perform() throws IOException { 412 Admin admin = connection.getAdmin(); 413 try { 414 TableDescriptor td = createTableDesc(); 415 TableName tableName = td.getTableName(); 416 if ( admin.tableExists(tableName)){ 417 return; 418 } 419 String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName()); 420 numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS); 421 byte[] startKey = Bytes.toBytes("row-0000000000"); 422 byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE); 423 LOG.info("Creating table:" + td); 424 admin.createTable(td, startKey, endKey, numRegions); 425 Assert.assertTrue("Table: " + td + " was not created", admin.tableExists(tableName)); 426 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 427 Assert.assertTrue( 428 "After create, Table: " + tableName + " in not enabled", admin.isTableEnabled(tableName)); 429 enabledTables.put(tableName, freshTableDesc); 430 LOG.info("Created table:" + freshTableDesc); 431 } catch (Exception e) { 432 LOG.warn("Caught exception in action: " + this.getClass()); 433 throw e; 434 } finally { 435 admin.close(); 436 } 437 } 438 439 private TableDescriptor createTableDesc() { 440 String tableName = String.format("ittable-%010d", RandomUtils.nextInt()); 441 String familyName = "cf-" + Math.abs(RandomUtils.nextInt()); 442 return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) 443 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(familyName)) 444 .build(); 445 } 446 } 447 448 private class DisableTableAction extends TableAction { 449 450 @Override 451 void perform() throws IOException { 452 453 TableDescriptor selected = selectTable(enabledTables); 454 if (selected == null) { 455 return; 456 } 457 458 Admin admin = connection.getAdmin(); 459 try { 460 TableName tableName = selected.getTableName(); 461 LOG.info("Disabling table :" + selected); 462 admin.disableTable(tableName); 463 Assert.assertTrue("Table: " + selected + " was not disabled", 464 admin.isTableDisabled(tableName)); 465 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 466 Assert.assertTrue( 467 "After disable, Table: " + tableName + " is not disabled", 468 admin.isTableDisabled(tableName)); 469 disabledTables.put(tableName, freshTableDesc); 470 LOG.info("Disabled table :" + freshTableDesc); 471 } catch (Exception e){ 472 LOG.warn("Caught exception in action: " + this.getClass()); 473 // TODO workaround 474 // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync 475 // operations 476 // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node 477 // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes 478 // 2) if master failover happens in the middle of the enable/disable operation, the new 479 // master will try to recover the tables in ENABLING/DISABLING state, as programmed in 480 // AssignmentManager#recoverTableInEnablingState() and 481 // AssignmentManager#recoverTableInDisablingState() 482 // 3) after the new master initialization completes, the procedure tries to re-do the 483 // enable/disable operation, which was already done. Ignore those exceptions before change 484 // of behaviors of AssignmentManager in presence of PV2 485 if (e instanceof TableNotEnabledException) { 486 LOG.warn("Caught TableNotEnabledException in action: " + this.getClass()); 487 e.printStackTrace(); 488 } else { 489 throw e; 490 } 491 } finally { 492 admin.close(); 493 } 494 } 495 } 496 497 private class EnableTableAction extends TableAction { 498 499 @Override 500 void perform() throws IOException { 501 502 TableDescriptor selected = selectTable(disabledTables); 503 if (selected == null ) { 504 return; 505 } 506 507 Admin admin = connection.getAdmin(); 508 try { 509 TableName tableName = selected.getTableName(); 510 LOG.info("Enabling table :" + selected); 511 admin.enableTable(tableName); 512 Assert.assertTrue("Table: " + selected + " was not enabled", 513 admin.isTableEnabled(tableName)); 514 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 515 Assert.assertTrue( 516 "After enable, Table: " + tableName + " in not enabled", admin.isTableEnabled(tableName)); 517 enabledTables.put(tableName, freshTableDesc); 518 LOG.info("Enabled table :" + freshTableDesc); 519 } catch (Exception e){ 520 LOG.warn("Caught exception in action: " + this.getClass()); 521 // TODO workaround 522 // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync 523 // operations 1) when enable/disable starts, the table state is changed to 524 // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED 525 // once the operation completes 2) if master failover happens in the middle of the 526 // enable/disable operation, the new master will try to recover the tables in 527 // ENABLING/DISABLING state, as programmed in 528 // AssignmentManager#recoverTableInEnablingState() and 529 // AssignmentManager#recoverTableInDisablingState() 530 // 3) after the new master initialization completes, the procedure tries to re-do the 531 // enable/disable operation, which was already done. Ignore those exceptions before 532 // change of behaviors of AssignmentManager in presence of PV2 533 if (e instanceof TableNotDisabledException) { 534 LOG.warn("Caught TableNotDisabledException in action: " + this.getClass()); 535 e.printStackTrace(); 536 } else { 537 throw e; 538 } 539 } finally { 540 admin.close(); 541 } 542 } 543 } 544 545 private class DeleteTableAction extends TableAction { 546 547 @Override 548 void perform() throws IOException { 549 550 TableDescriptor selected = selectTable(disabledTables); 551 if (selected == null) { 552 return; 553 } 554 555 Admin admin = connection.getAdmin(); 556 try { 557 TableName tableName = selected.getTableName(); 558 LOG.info("Deleting table :" + selected); 559 admin.deleteTable(tableName); 560 Assert.assertFalse("Table: " + selected + " was not deleted", 561 admin.tableExists(tableName)); 562 deletedTables.put(tableName, selected); 563 LOG.info("Deleted table :" + selected); 564 } catch (Exception e) { 565 LOG.warn("Caught exception in action: " + this.getClass()); 566 throw e; 567 } finally { 568 admin.close(); 569 } 570 } 571 } 572 573 574 private abstract class ColumnAction extends TableAction{ 575 // ColumnAction has implemented selectFamily() shared by multiple family Actions 576 protected ColumnFamilyDescriptor selectFamily(TableDescriptor td) { 577 if (td == null) { 578 return null; 579 } 580 ColumnFamilyDescriptor[] families = td.getColumnFamilies(); 581 if (families.length == 0){ 582 LOG.info("No column families in table: " + td); 583 return null; 584 } 585 ColumnFamilyDescriptor randomCfd = families[RandomUtils.nextInt(0, families.length)]; 586 return randomCfd; 587 } 588 } 589 590 private class AddColumnFamilyAction extends ColumnAction { 591 592 @Override 593 void perform() throws IOException { 594 TableDescriptor selected = selectTable(disabledTables); 595 if (selected == null) { 596 return; 597 } 598 599 Admin admin = connection.getAdmin(); 600 try { 601 ColumnFamilyDescriptor cfd = createFamilyDesc(); 602 if (selected.hasColumnFamily(cfd.getName())){ 603 LOG.info(new String(cfd.getName()) + " already exists in table " 604 + selected.getTableName()); 605 return; 606 } 607 TableName tableName = selected.getTableName(); 608 LOG.info("Adding column family: " + cfd + " to table: " + tableName); 609 admin.addColumnFamily(tableName, cfd); 610 // assertion 611 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 612 Assert.assertTrue("Column family: " + cfd + " was not added", 613 freshTableDesc.hasColumnFamily(cfd.getName())); 614 Assert.assertTrue( 615 "After add column family, Table: " + tableName + " is not disabled", 616 admin.isTableDisabled(tableName)); 617 disabledTables.put(tableName, freshTableDesc); 618 LOG.info("Added column family: " + cfd + " to table: " + tableName); 619 } catch (Exception e) { 620 LOG.warn("Caught exception in action: " + this.getClass()); 621 throw e; 622 } finally { 623 admin.close(); 624 } 625 } 626 627 private ColumnFamilyDescriptor createFamilyDesc() { 628 String familyName = String.format("cf-%010d", RandomUtils.nextInt()); 629 return ColumnFamilyDescriptorBuilder.of(familyName); 630 } 631 } 632 633 private class AlterFamilyVersionsAction extends ColumnAction { 634 635 @Override 636 void perform() throws IOException { 637 TableDescriptor selected = selectTable(disabledTables); 638 if (selected == null) { 639 return; 640 } 641 ColumnFamilyDescriptor columnDesc = selectFamily(selected); 642 if (columnDesc == null){ 643 return; 644 } 645 646 Admin admin = connection.getAdmin(); 647 int versions = RandomUtils.nextInt(0, 10) + 3; 648 try { 649 TableName tableName = selected.getTableName(); 650 LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions + 651 " in table: " + tableName); 652 653 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc) 654 .setMinVersions(versions) 655 .setMaxVersions(versions) 656 .build(); 657 TableDescriptor td = TableDescriptorBuilder.newBuilder(selected) 658 .modifyColumnFamily(cfd) 659 .build(); 660 admin.modifyTable(td); 661 662 // assertion 663 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 664 ColumnFamilyDescriptor freshColumnDesc = freshTableDesc.getColumnFamily(columnDesc.getName()); 665 Assert.assertEquals("Column family: " + columnDesc + " was not altered", 666 freshColumnDesc.getMaxVersions(), versions); 667 Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered", 668 freshColumnDesc.getMinVersions(), versions); 669 Assert.assertTrue( 670 "After alter versions of column family, Table: " + tableName + " is not disabled", 671 admin.isTableDisabled(tableName)); 672 disabledTables.put(tableName, freshTableDesc); 673 LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions + 674 " in table: " + tableName); 675 } catch (Exception e) { 676 LOG.warn("Caught exception in action: " + this.getClass()); 677 throw e; 678 } finally { 679 admin.close(); 680 } 681 } 682 } 683 684 private class AlterFamilyEncodingAction extends ColumnAction { 685 686 @Override 687 void perform() throws IOException { 688 TableDescriptor selected = selectTable(disabledTables); 689 if (selected == null) { 690 return; 691 } 692 ColumnFamilyDescriptor columnDesc = selectFamily(selected); 693 if (columnDesc == null){ 694 return; 695 } 696 697 Admin admin = connection.getAdmin(); 698 try { 699 TableName tableName = selected.getTableName(); 700 // possible DataBlockEncoding ids 701 DataBlockEncoding[] possibleIds = {DataBlockEncoding.NONE, DataBlockEncoding.PREFIX, 702 DataBlockEncoding.DIFF, DataBlockEncoding.FAST_DIFF, DataBlockEncoding.ROW_INDEX_V1}; 703 short id = possibleIds[RandomUtils.nextInt(0, possibleIds.length)].getId(); 704 LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id + 705 " in table: " + tableName); 706 707 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc) 708 .setDataBlockEncoding(DataBlockEncoding.getEncodingById(id)) 709 .build(); 710 TableDescriptor td = TableDescriptorBuilder.newBuilder(selected) 711 .modifyColumnFamily(cfd) 712 .build(); 713 admin.modifyTable(td); 714 715 // assertion 716 TableDescriptor freshTableDesc = admin.getTableDescriptor(tableName); 717 ColumnFamilyDescriptor freshColumnDesc = freshTableDesc.getColumnFamily(columnDesc.getName()); 718 Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered", 719 freshColumnDesc.getDataBlockEncoding().getId(), id); 720 Assert.assertTrue( 721 "After alter encoding of column family, Table: " + tableName + " is not disabled", 722 admin.isTableDisabled(tableName)); 723 disabledTables.put(tableName, freshTableDesc); 724 LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id + 725 " in table: " + tableName); 726 } catch (Exception e) { 727 LOG.warn("Caught exception in action: " + this.getClass()); 728 throw e; 729 } finally { 730 admin.close(); 731 } 732 } 733 } 734 735 private class DeleteColumnFamilyAction extends ColumnAction { 736 737 @Override 738 void perform() throws IOException { 739 TableDescriptor selected = selectTable(disabledTables); 740 ColumnFamilyDescriptor cfd = selectFamily(selected); 741 if (selected == null || cfd == null) { 742 return; 743 } 744 745 Admin admin = connection.getAdmin(); 746 try { 747 if (selected.getColumnFamilyCount() < 2) { 748 LOG.info("No enough column families to delete in table " + selected.getTableName()); 749 return; 750 } 751 TableName tableName = selected.getTableName(); 752 LOG.info("Deleting column family: " + cfd + " from table: " + tableName); 753 admin.deleteColumnFamily(tableName, cfd.getName()); 754 // assertion 755 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 756 Assert.assertFalse("Column family: " + cfd + " was not added", 757 freshTableDesc.hasColumnFamily(cfd.getName())); 758 Assert.assertTrue( 759 "After delete column family, Table: " + tableName + " is not disabled", 760 admin.isTableDisabled(tableName)); 761 disabledTables.put(tableName, freshTableDesc); 762 LOG.info("Deleted column family: " + cfd + " from table: " + tableName); 763 } catch (Exception e) { 764 LOG.warn("Caught exception in action: " + this.getClass()); 765 throw e; 766 } finally { 767 admin.close(); 768 } 769 } 770 } 771 772 private class AddRowAction extends ColumnAction { 773 // populate tables 774 @Override 775 void perform() throws IOException { 776 TableDescriptor selected = selectTable(enabledTables); 777 if (selected == null ) { 778 return; 779 } 780 781 Admin admin = connection.getAdmin(); 782 TableName tableName = selected.getTableName(); 783 try (Table table = connection.getTable(tableName)){ 784 ArrayList<HRegionInfo> regionInfos = new ArrayList<>(admin.getTableRegions( 785 selected.getTableName())); 786 int numRegions = regionInfos.size(); 787 // average number of rows to be added per action to each region 788 int average_rows = 1; 789 int numRows = average_rows * numRegions; 790 LOG.info("Adding " + numRows + " rows to table: " + selected); 791 for (int i = 0; i < numRows; i++){ 792 // nextInt(Integer.MAX_VALUE)) to return positive numbers only 793 byte[] rowKey = Bytes.toBytes( 794 "row-" + String.format("%010d", RandomUtils.nextInt())); 795 ColumnFamilyDescriptor cfd = selectFamily(selected); 796 if (cfd == null){ 797 return; 798 } 799 byte[] family = cfd.getName(); 800 byte[] qualifier = Bytes.toBytes("col-" + RandomUtils.nextInt() % 10); 801 byte[] value = Bytes.toBytes("val-" + RandomStringUtils.randomAlphanumeric(10)); 802 Put put = new Put(rowKey); 803 put.addColumn(family, qualifier, value); 804 table.put(put); 805 } 806 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 807 Assert.assertTrue( 808 "After insert, Table: " + tableName + " in not enabled", admin.isTableEnabled(tableName)); 809 enabledTables.put(tableName, freshTableDesc); 810 LOG.info("Added " + numRows + " rows to table: " + selected); 811 } catch (Exception e) { 812 LOG.warn("Caught exception in action: " + this.getClass()); 813 throw e; 814 } finally { 815 admin.close(); 816 } 817 } 818 } 819 820 private enum ACTION { 821 CREATE_NAMESPACE, 822 MODIFY_NAMESPACE, 823 DELETE_NAMESPACE, 824 CREATE_TABLE, 825 DISABLE_TABLE, 826 ENABLE_TABLE, 827 DELETE_TABLE, 828 ADD_COLUMNFAMILY, 829 DELETE_COLUMNFAMILY, 830 ALTER_FAMILYVERSIONS, 831 ALTER_FAMILYENCODING, 832 ADD_ROW 833 } 834 835 private class Worker extends Thread { 836 837 private Exception savedException; 838 839 private ACTION action; 840 841 @Override 842 public void run() { 843 while (running.get()) { 844 // select random action 845 ACTION selectedAction = ACTION.values()[RandomUtils.nextInt() % ACTION.values().length]; 846 this.action = selectedAction; 847 LOG.info("Performing Action: " + selectedAction); 848 849 try { 850 switch (selectedAction) { 851 case CREATE_NAMESPACE: 852 new CreateNamespaceAction().perform(); 853 break; 854 case MODIFY_NAMESPACE: 855 new ModifyNamespaceAction().perform(); 856 break; 857 case DELETE_NAMESPACE: 858 new DeleteNamespaceAction().perform(); 859 break; 860 case CREATE_TABLE: 861 // stop creating new tables in the later stage of the test to avoid too many empty 862 // tables 863 if (create_table.get()) { 864 new CreateTableAction().perform(); 865 } 866 break; 867 case ADD_ROW: 868 new AddRowAction().perform(); 869 break; 870 case DISABLE_TABLE: 871 new DisableTableAction().perform(); 872 break; 873 case ENABLE_TABLE: 874 new EnableTableAction().perform(); 875 break; 876 case DELETE_TABLE: 877 // reduce probability of deleting table to 20% 878 if (RandomUtils.nextInt(0, 100) < 20) { 879 new DeleteTableAction().perform(); 880 } 881 break; 882 case ADD_COLUMNFAMILY: 883 new AddColumnFamilyAction().perform(); 884 break; 885 case DELETE_COLUMNFAMILY: 886 // reduce probability of deleting column family to 20% 887 if (RandomUtils.nextInt(0, 100) < 20) { 888 new DeleteColumnFamilyAction().perform(); 889 } 890 break; 891 case ALTER_FAMILYVERSIONS: 892 new AlterFamilyVersionsAction().perform(); 893 break; 894 case ALTER_FAMILYENCODING: 895 new AlterFamilyEncodingAction().perform(); 896 break; 897 } 898 } catch (Exception ex) { 899 this.savedException = ex; 900 return; 901 } 902 } 903 LOG.info(this.getName() + " stopped"); 904 } 905 906 public Exception getSavedException(){ 907 return this.savedException; 908 } 909 910 public ACTION getAction(){ 911 return this.action; 912 } 913 } 914 915 private void checkException(List<Worker> workers){ 916 if(workers == null || workers.isEmpty()) 917 return; 918 for (Worker worker : workers){ 919 Exception e = worker.getSavedException(); 920 if (e != null) { 921 LOG.error("Found exception in thread: " + worker.getName()); 922 e.printStackTrace(); 923 } 924 Assert.assertNull("Action failed: " + worker.getAction() + " in thread: " 925 + worker.getName(), e); 926 } 927 } 928 929 private int runTest() throws Exception { 930 LOG.info("Starting the test"); 931 932 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 933 long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME); 934 935 String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName()); 936 numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS); 937 938 ArrayList<Worker> workers = new ArrayList<>(numThreads); 939 for (int i = 0; i < numThreads; i++) { 940 checkException(workers); 941 Worker worker = new Worker(); 942 LOG.info("Launching worker thread " + worker.getName()); 943 workers.add(worker); 944 worker.start(); 945 } 946 947 Threads.sleep(runtime / 2); 948 LOG.info("Stopping creating new tables"); 949 create_table.set(false); 950 Threads.sleep(runtime / 2); 951 LOG.info("Runtime is up"); 952 running.set(false); 953 954 checkException(workers); 955 956 for (Worker worker : workers) { 957 worker.join(); 958 } 959 LOG.info("All Worker threads stopped"); 960 961 // verify 962 LOG.info("Verify actions of all threads succeeded"); 963 checkException(workers); 964 LOG.info("Verify namespaces"); 965 verifyNamespaces(); 966 LOG.info("Verify states of all tables"); 967 verifyTables(); 968 969 // RUN HBCK 970 971 HBaseFsck hbck = null; 972 try { 973 LOG.info("Running hbck"); 974 hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false); 975 if (HbckTestingUtil.inconsistencyFound(hbck)) { 976 // Find the inconsistency during HBCK. Leave table and namespace undropped so that 977 // we can check outside the test. 978 keepObjectsAtTheEnd = true; 979 } 980 HbckTestingUtil.assertNoErrors(hbck); 981 LOG.info("Finished hbck"); 982 } finally { 983 if (hbck != null) { 984 hbck.close(); 985 } 986 } 987 return 0; 988 } 989 990 @Override 991 public TableName getTablename() { 992 return null; // This test is not inteded to run with stock Chaos Monkey 993 } 994 995 @Override 996 protected Set<String> getColumnFamilies() { 997 return null; // This test is not inteded to run with stock Chaos Monkey 998 } 999 1000 public static void main(String[] args) throws Exception { 1001 Configuration conf = HBaseConfiguration.create(); 1002 IntegrationTestingUtility.setUseDistributedCluster(conf); 1003 IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover(); 1004 Connection connection = null; 1005 int ret = 1; 1006 try { 1007 // Initialize connection once, then pass to Actions 1008 LOG.debug("Setting up connection ..."); 1009 connection = ConnectionFactory.createConnection(conf); 1010 masterFailover.setConnection(connection); 1011 ret = ToolRunner.run(conf, masterFailover, args); 1012 } catch (IOException e){ 1013 LOG.error(HBaseMarkers.FATAL, "Failed to establish connection. Aborting test ...", e); 1014 } finally { 1015 connection = masterFailover.getConnection(); 1016 if (connection != null){ 1017 connection.close(); 1018 } 1019 System.exit(ret); 1020 } 1021 } 1022}