001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.List; 023import java.util.Set; 024import java.util.concurrent.ConcurrentHashMap; 025import java.util.concurrent.ThreadLocalRandom; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.conf.Configuration; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 030import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 031import org.apache.hadoop.hbase.client.Connection; 032import org.apache.hadoop.hbase.client.ConnectionFactory; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 037import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; 038import org.apache.hadoop.hbase.log.HBaseMarkers; 039import org.apache.hadoop.hbase.testclassification.IntegrationTests; 040import org.apache.hadoop.hbase.util.Bytes; 041import org.apache.hadoop.hbase.util.HBaseFsck; 042import org.apache.hadoop.hbase.util.Threads; 043import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; 044import org.apache.hadoop.util.ToolRunner; 045import org.junit.Assert; 046import org.junit.Test; 047import org.junit.experimental.categories.Category; 048import org.slf4j.Logger; 049import org.slf4j.LoggerFactory; 050 051/** 052 * Integration test that verifies Procedure V2. DDL operations should go through (rollforward or 053 * rollback) when primary master is killed by ChaosMonkey (default MASTER_KILLING). 054 * <p> 055 * </p> 056 * Multiple Worker threads are started to randomly do the following Actions in loops: Actions 057 * generating and populating tables: 058 * <ul> 059 * <li>CreateTableAction</li> 060 * <li>DisableTableAction</li> 061 * <li>EnableTableAction</li> 062 * <li>DeleteTableAction</li> 063 * <li>AddRowAction</li> 064 * </ul> 065 * Actions performing column family DDL operations: 066 * <ul> 067 * <li>AddColumnFamilyAction</li> 068 * <li>AlterColumnFamilyVersionsAction</li> 069 * <li>AlterColumnFamilyEncodingAction</li> 070 * <li>DeleteColumnFamilyAction</li> 071 * </ul> 072 * Actions performing namespace DDL operations: 073 * <ul> 074 * <li>AddNamespaceAction</li> 075 * <li>AlterNamespaceAction</li> 076 * <li>DeleteNamespaceAction</li> 077 * </ul> 078 * <br/> 079 * The threads run for a period of time (default 20 minutes) then are stopped at the end of runtime. 080 * Verification is performed towards those checkpoints: 081 * <ol> 082 * <li>No Actions throw Exceptions.</li> 083 * <li>No inconsistencies are detected in hbck.</li> 084 * </ol> 085 * <p> 086 * This test should be run by the hbase user since it invokes hbck at the end 087 * </p> 088 * <p> 089 * Usage: hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover 090 * -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000 091 * -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20 092 * -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling 093 */ 094 095@Category(IntegrationTests.class) 096public class IntegrationTestDDLMasterFailover extends IntegrationTestBase { 097 098 private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestDDLMasterFailover.class); 099 100 private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster 101 102 protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000; 103 104 protected static final int DEFAULT_NUM_THREADS = 20; 105 106 protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables 107 108 private boolean keepObjectsAtTheEnd = false; 109 protected HBaseCluster cluster; 110 111 protected Connection connection; 112 113 /** 114 * A soft limit on how long we should run 115 */ 116 protected static final String RUN_TIME_KEY = "hbase.%s.runtime"; 117 protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads"; 118 protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions"; 119 120 protected AtomicBoolean running = new AtomicBoolean(true); 121 122 protected AtomicBoolean create_table = new AtomicBoolean(true); 123 124 protected int numThreads, numRegions; 125 126 ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap = new ConcurrentHashMap<>(); 127 128 ConcurrentHashMap<TableName, TableDescriptor> enabledTables = new ConcurrentHashMap<>(); 129 130 ConcurrentHashMap<TableName, TableDescriptor> disabledTables = new ConcurrentHashMap<>(); 131 132 ConcurrentHashMap<TableName, TableDescriptor> deletedTables = new ConcurrentHashMap<>(); 133 134 @Override 135 public void setUpCluster() throws Exception { 136 util = getTestingUtil(getConf()); 137 LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers"); 138 util.initializeCluster(getMinServerCount()); 139 LOG.debug("Done initializing/checking cluster"); 140 cluster = util.getHBaseClusterInterface(); 141 } 142 143 @Override 144 public void cleanUpCluster() throws Exception { 145 if (!keepObjectsAtTheEnd) { 146 Admin admin = util.getAdmin(); 147 admin.disableTables("ittable-\\d+"); 148 admin.deleteTables("ittable-\\d+"); 149 NamespaceDescriptor[] nsds = admin.listNamespaceDescriptors(); 150 for (NamespaceDescriptor nsd : nsds) { 151 if (nsd.getName().matches("itnamespace\\d+")) { 152 LOG.info("Removing namespace=" + nsd.getName()); 153 admin.deleteNamespace(nsd.getName()); 154 } 155 } 156 } 157 158 enabledTables.clear(); 159 disabledTables.clear(); 160 deletedTables.clear(); 161 namespaceMap.clear(); 162 163 Connection connection = getConnection(); 164 connection.close(); 165 super.cleanUpCluster(); 166 } 167 168 protected int getMinServerCount() { 169 return SERVER_COUNT; 170 } 171 172 protected synchronized void setConnection(Connection connection) { 173 this.connection = connection; 174 } 175 176 protected synchronized Connection getConnection() { 177 if (this.connection == null) { 178 try { 179 Connection connection = ConnectionFactory.createConnection(getConf()); 180 setConnection(connection); 181 } catch (IOException e) { 182 LOG.error(HBaseMarkers.FATAL, "Failed to establish connection.", e); 183 } 184 } 185 return connection; 186 } 187 188 protected void verifyNamespaces() throws IOException { 189 Connection connection = getConnection(); 190 Admin admin = connection.getAdmin(); 191 // iterating concurrent map 192 for (String nsName : namespaceMap.keySet()) { 193 try { 194 Assert.assertTrue("Namespace: " + nsName + " in namespaceMap does not exist", 195 admin.getNamespaceDescriptor(nsName) != null); 196 } catch (NamespaceNotFoundException nsnfe) { 197 Assert 198 .fail("Namespace: " + nsName + " in namespaceMap does not exist: " + nsnfe.getMessage()); 199 } 200 } 201 admin.close(); 202 } 203 204 protected void verifyTables() throws IOException { 205 Connection connection = getConnection(); 206 Admin admin = connection.getAdmin(); 207 // iterating concurrent map 208 for (TableName tableName : enabledTables.keySet()) { 209 Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled", 210 admin.isTableEnabled(tableName)); 211 } 212 for (TableName tableName : disabledTables.keySet()) { 213 Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled", 214 admin.isTableDisabled(tableName)); 215 } 216 for (TableName tableName : deletedTables.keySet()) { 217 Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted", 218 admin.tableExists(tableName)); 219 } 220 admin.close(); 221 } 222 223 @Test 224 public void testAsUnitTest() throws Exception { 225 runTest(); 226 } 227 228 @Override 229 public int runTestFromCommandLine() throws Exception { 230 int ret = runTest(); 231 return ret; 232 } 233 234 private abstract class MasterAction { 235 Connection connection = getConnection(); 236 237 abstract void perform() throws IOException; 238 } 239 240 private abstract class NamespaceAction extends MasterAction { 241 final String nsTestConfigKey = "hbase.namespace.testKey"; 242 243 // NamespaceAction has implemented selectNamespace() shared by multiple namespace Actions 244 protected NamespaceDescriptor 245 selectNamespace(ConcurrentHashMap<String, NamespaceDescriptor> namespaceMap) { 246 // synchronization to prevent removal from multiple threads 247 synchronized (namespaceMap) { 248 // randomly select namespace from namespaceMap 249 if (namespaceMap.isEmpty()) { 250 return null; 251 } 252 ArrayList<String> namespaceList = new ArrayList<>(namespaceMap.keySet()); 253 String randomKey = 254 namespaceList.get(ThreadLocalRandom.current().nextInt(namespaceList.size())); 255 NamespaceDescriptor randomNsd = namespaceMap.get(randomKey); 256 // remove from namespaceMap 257 namespaceMap.remove(randomKey); 258 return randomNsd; 259 } 260 } 261 } 262 263 private class CreateNamespaceAction extends NamespaceAction { 264 @Override 265 void perform() throws IOException { 266 Admin admin = connection.getAdmin(); 267 try { 268 NamespaceDescriptor nsd; 269 while (true) { 270 nsd = createNamespaceDesc(); 271 try { 272 if (admin.getNamespaceDescriptor(nsd.getName()) != null) { 273 // the namespace has already existed. 274 continue; 275 } else { 276 // currently, the code never return null - always throws exception if 277 // namespace is not found - this just a defensive programming to make 278 // sure null situation is handled in case the method changes in the 279 // future. 280 break; 281 } 282 } catch (NamespaceNotFoundException nsnfe) { 283 // This is expected for a random generated NamespaceDescriptor 284 break; 285 } 286 } 287 LOG.info("Creating namespace:" + nsd); 288 admin.createNamespace(nsd); 289 NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(nsd.getName()); 290 Assert.assertTrue("Namespace: " + nsd + " was not created", freshNamespaceDesc != null); 291 namespaceMap.put(nsd.getName(), freshNamespaceDesc); 292 LOG.info("Created namespace:" + freshNamespaceDesc); 293 } catch (Exception e) { 294 LOG.warn("Caught exception in action: " + this.getClass()); 295 throw e; 296 } finally { 297 admin.close(); 298 } 299 } 300 301 private NamespaceDescriptor createNamespaceDesc() { 302 String namespaceName = 303 "itnamespace" + String.format("%010d", ThreadLocalRandom.current().nextInt()); 304 NamespaceDescriptor nsd = NamespaceDescriptor.create(namespaceName).build(); 305 306 nsd.setConfiguration(nsTestConfigKey, 307 String.format("%010d", ThreadLocalRandom.current().nextInt())); 308 return nsd; 309 } 310 } 311 312 private class ModifyNamespaceAction extends NamespaceAction { 313 @Override 314 void perform() throws IOException { 315 NamespaceDescriptor selected = selectNamespace(namespaceMap); 316 if (selected == null) { 317 return; 318 } 319 320 Admin admin = connection.getAdmin(); 321 try { 322 String namespaceName = selected.getName(); 323 LOG.info("Modifying namespace :" + selected); 324 NamespaceDescriptor modifiedNsd = NamespaceDescriptor.create(namespaceName).build(); 325 String nsValueNew; 326 do { 327 nsValueNew = String.format("%010d", ThreadLocalRandom.current().nextInt()); 328 } while (selected.getConfigurationValue(nsTestConfigKey).equals(nsValueNew)); 329 modifiedNsd.setConfiguration(nsTestConfigKey, nsValueNew); 330 admin.modifyNamespace(modifiedNsd); 331 NamespaceDescriptor freshNamespaceDesc = admin.getNamespaceDescriptor(namespaceName); 332 Assert.assertTrue("Namespace: " + selected + " was not modified", 333 freshNamespaceDesc.getConfigurationValue(nsTestConfigKey).equals(nsValueNew)); 334 Assert.assertTrue("Namespace: " + namespaceName + " does not exist", 335 admin.getNamespaceDescriptor(namespaceName) != null); 336 namespaceMap.put(namespaceName, freshNamespaceDesc); 337 LOG.info("Modified namespace :" + freshNamespaceDesc); 338 } catch (Exception e) { 339 LOG.warn("Caught exception in action: " + this.getClass()); 340 throw e; 341 } finally { 342 admin.close(); 343 } 344 } 345 } 346 347 private class DeleteNamespaceAction extends NamespaceAction { 348 @Override 349 void perform() throws IOException { 350 NamespaceDescriptor selected = selectNamespace(namespaceMap); 351 if (selected == null) { 352 return; 353 } 354 355 Admin admin = connection.getAdmin(); 356 try { 357 String namespaceName = selected.getName(); 358 LOG.info("Deleting namespace :" + selected); 359 admin.deleteNamespace(namespaceName); 360 try { 361 if (admin.getNamespaceDescriptor(namespaceName) != null) { 362 // the namespace still exists. 363 Assert.assertTrue("Namespace: " + selected + " was not deleted", false); 364 } else { 365 LOG.info("Deleted namespace :" + selected); 366 } 367 } catch (NamespaceNotFoundException nsnfe) { 368 // This is expected result 369 LOG.info("Deleted namespace :" + selected); 370 } 371 } catch (Exception e) { 372 LOG.warn("Caught exception in action: " + this.getClass()); 373 throw e; 374 } finally { 375 admin.close(); 376 } 377 } 378 } 379 380 private abstract class TableAction extends MasterAction { 381 // TableAction has implemented selectTable() shared by multiple table Actions 382 protected TableDescriptor selectTable(ConcurrentHashMap<TableName, TableDescriptor> tableMap) { 383 // synchronization to prevent removal from multiple threads 384 synchronized (tableMap) { 385 // randomly select table from tableMap 386 if (tableMap.isEmpty()) { 387 return null; 388 } 389 ArrayList<TableName> tableList = new ArrayList<>(tableMap.keySet()); 390 TableName key = tableList.get(ThreadLocalRandom.current().nextInt(tableList.size())); 391 TableDescriptor randomTd = tableMap.remove(key); 392 return randomTd; 393 } 394 } 395 } 396 397 private class CreateTableAction extends TableAction { 398 399 @Override 400 void perform() throws IOException { 401 Admin admin = connection.getAdmin(); 402 try { 403 TableDescriptor td = createTableDesc(); 404 TableName tableName = td.getTableName(); 405 if (admin.tableExists(tableName)) { 406 return; 407 } 408 String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName()); 409 numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS); 410 byte[] startKey = Bytes.toBytes("row-0000000000"); 411 byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE); 412 LOG.info("Creating table:" + td); 413 admin.createTable(td, startKey, endKey, numRegions); 414 Assert.assertTrue("Table: " + td + " was not created", admin.tableExists(tableName)); 415 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 416 Assert.assertTrue("After create, Table: " + tableName + " in not enabled", 417 admin.isTableEnabled(tableName)); 418 enabledTables.put(tableName, freshTableDesc); 419 LOG.info("Created table:" + freshTableDesc); 420 } catch (Exception e) { 421 LOG.warn("Caught exception in action: " + this.getClass()); 422 throw e; 423 } finally { 424 admin.close(); 425 } 426 } 427 428 private TableDescriptor createTableDesc() { 429 String tableName = 430 String.format("ittable-%010d", ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)); 431 String familyName = "cf-" + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); 432 return TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)) 433 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(familyName)).build(); 434 } 435 } 436 437 private class DisableTableAction extends TableAction { 438 439 @Override 440 void perform() throws IOException { 441 442 TableDescriptor selected = selectTable(enabledTables); 443 if (selected == null) { 444 return; 445 } 446 447 Admin admin = connection.getAdmin(); 448 try { 449 TableName tableName = selected.getTableName(); 450 LOG.info("Disabling table :" + selected); 451 admin.disableTable(tableName); 452 Assert.assertTrue("Table: " + selected + " was not disabled", 453 admin.isTableDisabled(tableName)); 454 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 455 Assert.assertTrue("After disable, Table: " + tableName + " is not disabled", 456 admin.isTableDisabled(tableName)); 457 disabledTables.put(tableName, freshTableDesc); 458 LOG.info("Disabled table :" + freshTableDesc); 459 } catch (Exception e) { 460 LOG.warn("Caught exception in action: " + this.getClass()); 461 // TODO workaround 462 // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync 463 // operations 464 // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node 465 // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes 466 // 2) if master failover happens in the middle of the enable/disable operation, the new 467 // master will try to recover the tables in ENABLING/DISABLING state, as programmed in 468 // AssignmentManager#recoverTableInEnablingState() and 469 // AssignmentManager#recoverTableInDisablingState() 470 // 3) after the new master initialization completes, the procedure tries to re-do the 471 // enable/disable operation, which was already done. Ignore those exceptions before change 472 // of behaviors of AssignmentManager in presence of PV2 473 if (e instanceof TableNotEnabledException) { 474 LOG.warn("Caught TableNotEnabledException in action: " + this.getClass()); 475 e.printStackTrace(); 476 } else { 477 throw e; 478 } 479 } finally { 480 admin.close(); 481 } 482 } 483 } 484 485 private class EnableTableAction extends TableAction { 486 487 @Override 488 void perform() throws IOException { 489 490 TableDescriptor selected = selectTable(disabledTables); 491 if (selected == null) { 492 return; 493 } 494 495 Admin admin = connection.getAdmin(); 496 try { 497 TableName tableName = selected.getTableName(); 498 LOG.info("Enabling table :" + selected); 499 admin.enableTable(tableName); 500 Assert.assertTrue("Table: " + selected + " was not enabled", 501 admin.isTableEnabled(tableName)); 502 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 503 Assert.assertTrue("After enable, Table: " + tableName + " in not enabled", 504 admin.isTableEnabled(tableName)); 505 enabledTables.put(tableName, freshTableDesc); 506 LOG.info("Enabled table :" + freshTableDesc); 507 } catch (Exception e) { 508 LOG.warn("Caught exception in action: " + this.getClass()); 509 // TODO workaround 510 // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync 511 // operations 1) when enable/disable starts, the table state is changed to 512 // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED 513 // once the operation completes 2) if master failover happens in the middle of the 514 // enable/disable operation, the new master will try to recover the tables in 515 // ENABLING/DISABLING state, as programmed in 516 // AssignmentManager#recoverTableInEnablingState() and 517 // AssignmentManager#recoverTableInDisablingState() 518 // 3) after the new master initialization completes, the procedure tries to re-do the 519 // enable/disable operation, which was already done. Ignore those exceptions before 520 // change of behaviors of AssignmentManager in presence of PV2 521 if (e instanceof TableNotDisabledException) { 522 LOG.warn("Caught TableNotDisabledException in action: " + this.getClass()); 523 e.printStackTrace(); 524 } else { 525 throw e; 526 } 527 } finally { 528 admin.close(); 529 } 530 } 531 } 532 533 private class DeleteTableAction extends TableAction { 534 535 @Override 536 void perform() throws IOException { 537 538 TableDescriptor selected = selectTable(disabledTables); 539 if (selected == null) { 540 return; 541 } 542 543 Admin admin = connection.getAdmin(); 544 try { 545 TableName tableName = selected.getTableName(); 546 LOG.info("Deleting table :" + selected); 547 admin.deleteTable(tableName); 548 Assert.assertFalse("Table: " + selected + " was not deleted", admin.tableExists(tableName)); 549 deletedTables.put(tableName, selected); 550 LOG.info("Deleted table :" + selected); 551 } catch (Exception e) { 552 LOG.warn("Caught exception in action: " + this.getClass()); 553 throw e; 554 } finally { 555 admin.close(); 556 } 557 } 558 } 559 560 private abstract class ColumnAction extends TableAction { 561 // ColumnAction has implemented selectFamily() shared by multiple family Actions 562 protected ColumnFamilyDescriptor selectFamily(TableDescriptor td) { 563 if (td == null) { 564 return null; 565 } 566 ColumnFamilyDescriptor[] families = td.getColumnFamilies(); 567 if (families.length == 0) { 568 LOG.info("No column families in table: " + td); 569 return null; 570 } 571 return families[ThreadLocalRandom.current().nextInt(families.length)]; 572 } 573 } 574 575 private class AddColumnFamilyAction extends ColumnAction { 576 577 @Override 578 void perform() throws IOException { 579 TableDescriptor selected = selectTable(disabledTables); 580 if (selected == null) { 581 return; 582 } 583 584 Admin admin = connection.getAdmin(); 585 try { 586 ColumnFamilyDescriptor cfd = createFamilyDesc(); 587 if (selected.hasColumnFamily(cfd.getName())) { 588 LOG.info( 589 Bytes.toString(cfd.getName()) + " already exists in table " + selected.getTableName()); 590 return; 591 } 592 TableName tableName = selected.getTableName(); 593 LOG.info("Adding column family: " + cfd + " to table: " + tableName); 594 admin.addColumnFamily(tableName, cfd); 595 // assertion 596 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 597 Assert.assertTrue("Column family: " + cfd + " was not added", 598 freshTableDesc.hasColumnFamily(cfd.getName())); 599 Assert.assertTrue("After add column family, Table: " + tableName + " is not disabled", 600 admin.isTableDisabled(tableName)); 601 disabledTables.put(tableName, freshTableDesc); 602 LOG.info("Added column family: " + cfd + " to table: " + tableName); 603 } catch (Exception e) { 604 LOG.warn("Caught exception in action: " + this.getClass()); 605 throw e; 606 } finally { 607 admin.close(); 608 } 609 } 610 611 private ColumnFamilyDescriptor createFamilyDesc() { 612 String familyName = String.format("cf-%010d", ThreadLocalRandom.current().nextInt()); 613 return ColumnFamilyDescriptorBuilder.of(familyName); 614 } 615 } 616 617 private class AlterFamilyVersionsAction extends ColumnAction { 618 619 @Override 620 void perform() throws IOException { 621 TableDescriptor selected = selectTable(disabledTables); 622 if (selected == null) { 623 return; 624 } 625 ColumnFamilyDescriptor columnDesc = selectFamily(selected); 626 if (columnDesc == null) { 627 return; 628 } 629 630 Admin admin = connection.getAdmin(); 631 int versions = ThreadLocalRandom.current().nextInt(10) + 3; 632 try { 633 TableName tableName = selected.getTableName(); 634 LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions 635 + " in table: " + tableName); 636 637 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc) 638 .setMinVersions(versions).setMaxVersions(versions).build(); 639 TableDescriptor td = 640 TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build(); 641 admin.modifyTable(td); 642 643 // assertion 644 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 645 ColumnFamilyDescriptor freshColumnDesc = 646 freshTableDesc.getColumnFamily(columnDesc.getName()); 647 Assert.assertEquals("Column family: " + columnDesc + " was not altered", 648 freshColumnDesc.getMaxVersions(), versions); 649 Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered", 650 freshColumnDesc.getMinVersions(), versions); 651 Assert.assertTrue( 652 "After alter versions of column family, Table: " + tableName + " is not disabled", 653 admin.isTableDisabled(tableName)); 654 disabledTables.put(tableName, freshTableDesc); 655 LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions 656 + " in table: " + tableName); 657 } catch (Exception e) { 658 LOG.warn("Caught exception in action: " + this.getClass()); 659 throw e; 660 } finally { 661 admin.close(); 662 } 663 } 664 } 665 666 private class AlterFamilyEncodingAction extends ColumnAction { 667 668 @Override 669 void perform() throws IOException { 670 TableDescriptor selected = selectTable(disabledTables); 671 if (selected == null) { 672 return; 673 } 674 ColumnFamilyDescriptor columnDesc = selectFamily(selected); 675 if (columnDesc == null) { 676 return; 677 } 678 679 Admin admin = connection.getAdmin(); 680 try { 681 TableName tableName = selected.getTableName(); 682 // possible DataBlockEncoding ids 683 DataBlockEncoding[] possibleIds = { DataBlockEncoding.NONE, DataBlockEncoding.PREFIX, 684 DataBlockEncoding.DIFF, DataBlockEncoding.FAST_DIFF, DataBlockEncoding.ROW_INDEX_V1 }; 685 short id = possibleIds[ThreadLocalRandom.current().nextInt(possibleIds.length)].getId(); 686 LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id + " in table: " 687 + tableName); 688 689 ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.newBuilder(columnDesc) 690 .setDataBlockEncoding(DataBlockEncoding.getEncodingById(id)).build(); 691 TableDescriptor td = 692 TableDescriptorBuilder.newBuilder(selected).modifyColumnFamily(cfd).build(); 693 admin.modifyTable(td); 694 695 // assertion 696 TableDescriptor freshTableDesc = admin.getTableDescriptor(tableName); 697 ColumnFamilyDescriptor freshColumnDesc = 698 freshTableDesc.getColumnFamily(columnDesc.getName()); 699 Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered", 700 freshColumnDesc.getDataBlockEncoding().getId(), id); 701 Assert.assertTrue( 702 "After alter encoding of column family, Table: " + tableName + " is not disabled", 703 admin.isTableDisabled(tableName)); 704 disabledTables.put(tableName, freshTableDesc); 705 LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id 706 + " in table: " + tableName); 707 } catch (Exception e) { 708 LOG.warn("Caught exception in action: " + this.getClass()); 709 throw e; 710 } finally { 711 admin.close(); 712 } 713 } 714 } 715 716 private class DeleteColumnFamilyAction extends ColumnAction { 717 718 @Override 719 void perform() throws IOException { 720 TableDescriptor selected = selectTable(disabledTables); 721 ColumnFamilyDescriptor cfd = selectFamily(selected); 722 if (selected == null || cfd == null) { 723 return; 724 } 725 726 Admin admin = connection.getAdmin(); 727 try { 728 if (selected.getColumnFamilyCount() < 2) { 729 LOG.info("No enough column families to delete in table " + selected.getTableName()); 730 return; 731 } 732 TableName tableName = selected.getTableName(); 733 LOG.info("Deleting column family: " + cfd + " from table: " + tableName); 734 admin.deleteColumnFamily(tableName, cfd.getName()); 735 // assertion 736 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 737 Assert.assertFalse("Column family: " + cfd + " was not added", 738 freshTableDesc.hasColumnFamily(cfd.getName())); 739 Assert.assertTrue("After delete column family, Table: " + tableName + " is not disabled", 740 admin.isTableDisabled(tableName)); 741 disabledTables.put(tableName, freshTableDesc); 742 LOG.info("Deleted column family: " + cfd + " from table: " + tableName); 743 } catch (Exception e) { 744 LOG.warn("Caught exception in action: " + this.getClass()); 745 throw e; 746 } finally { 747 admin.close(); 748 } 749 } 750 } 751 752 private class AddRowAction extends ColumnAction { 753 // populate tables 754 @Override 755 void perform() throws IOException { 756 TableDescriptor selected = selectTable(enabledTables); 757 if (selected == null) { 758 return; 759 } 760 761 Admin admin = connection.getAdmin(); 762 TableName tableName = selected.getTableName(); 763 try (Table table = connection.getTable(tableName)) { 764 ArrayList<HRegionInfo> regionInfos = 765 new ArrayList<>(admin.getTableRegions(selected.getTableName())); 766 int numRegions = regionInfos.size(); 767 // average number of rows to be added per action to each region 768 int average_rows = 1; 769 int numRows = average_rows * numRegions; 770 LOG.info("Adding " + numRows + " rows to table: " + selected); 771 byte[] value = new byte[10]; 772 for (int i = 0; i < numRows; i++) { 773 // nextInt(Integer.MAX_VALUE)) to return positive numbers only 774 byte[] rowKey = 775 Bytes.toBytes("row-" + String.format("%010d", ThreadLocalRandom.current().nextInt())); 776 ColumnFamilyDescriptor cfd = selectFamily(selected); 777 if (cfd == null) { 778 return; 779 } 780 byte[] family = cfd.getName(); 781 byte[] qualifier = Bytes.toBytes("col-" + ThreadLocalRandom.current().nextInt(10)); 782 Bytes.random(value); 783 Put put = new Put(rowKey); 784 put.addColumn(family, qualifier, value); 785 table.put(put); 786 } 787 TableDescriptor freshTableDesc = admin.getDescriptor(tableName); 788 Assert.assertTrue("After insert, Table: " + tableName + " in not enabled", 789 admin.isTableEnabled(tableName)); 790 enabledTables.put(tableName, freshTableDesc); 791 LOG.info("Added " + numRows + " rows to table: " + selected); 792 } catch (Exception e) { 793 LOG.warn("Caught exception in action: " + this.getClass()); 794 throw e; 795 } finally { 796 admin.close(); 797 } 798 } 799 } 800 801 private enum ACTION { 802 CREATE_NAMESPACE, 803 MODIFY_NAMESPACE, 804 DELETE_NAMESPACE, 805 CREATE_TABLE, 806 DISABLE_TABLE, 807 ENABLE_TABLE, 808 DELETE_TABLE, 809 ADD_COLUMNFAMILY, 810 DELETE_COLUMNFAMILY, 811 ALTER_FAMILYVERSIONS, 812 ALTER_FAMILYENCODING, 813 ADD_ROW 814 } 815 816 private class Worker extends Thread { 817 818 private Exception savedException; 819 820 private ACTION action; 821 822 @Override 823 public void run() { 824 while (running.get()) { 825 // select random action 826 ACTION selectedAction = 827 ACTION.values()[ThreadLocalRandom.current().nextInt(ACTION.values().length)]; 828 this.action = selectedAction; 829 LOG.info("Performing Action: " + selectedAction); 830 831 try { 832 switch (selectedAction) { 833 case CREATE_NAMESPACE: 834 new CreateNamespaceAction().perform(); 835 break; 836 case MODIFY_NAMESPACE: 837 new ModifyNamespaceAction().perform(); 838 break; 839 case DELETE_NAMESPACE: 840 new DeleteNamespaceAction().perform(); 841 break; 842 case CREATE_TABLE: 843 // stop creating new tables in the later stage of the test to avoid too many empty 844 // tables 845 if (create_table.get()) { 846 new CreateTableAction().perform(); 847 } 848 break; 849 case ADD_ROW: 850 new AddRowAction().perform(); 851 break; 852 case DISABLE_TABLE: 853 new DisableTableAction().perform(); 854 break; 855 case ENABLE_TABLE: 856 new EnableTableAction().perform(); 857 break; 858 case DELETE_TABLE: 859 // reduce probability of deleting table to 20% 860 if (ThreadLocalRandom.current().nextInt(100) < 20) { 861 new DeleteTableAction().perform(); 862 } 863 break; 864 case ADD_COLUMNFAMILY: 865 new AddColumnFamilyAction().perform(); 866 break; 867 case DELETE_COLUMNFAMILY: 868 // reduce probability of deleting column family to 20% 869 if (ThreadLocalRandom.current().nextInt(100) < 20) { 870 new DeleteColumnFamilyAction().perform(); 871 } 872 break; 873 case ALTER_FAMILYVERSIONS: 874 new AlterFamilyVersionsAction().perform(); 875 break; 876 case ALTER_FAMILYENCODING: 877 new AlterFamilyEncodingAction().perform(); 878 break; 879 } 880 } catch (Exception ex) { 881 this.savedException = ex; 882 return; 883 } 884 } 885 LOG.info(this.getName() + " stopped"); 886 } 887 888 public Exception getSavedException() { 889 return this.savedException; 890 } 891 892 public ACTION getAction() { 893 return this.action; 894 } 895 } 896 897 private void checkException(List<Worker> workers) { 898 if (workers == null || workers.isEmpty()) return; 899 for (Worker worker : workers) { 900 Exception e = worker.getSavedException(); 901 if (e != null) { 902 LOG.error("Found exception in thread: " + worker.getName()); 903 e.printStackTrace(); 904 } 905 Assert.assertNull("Action failed: " + worker.getAction() + " in thread: " + worker.getName(), 906 e); 907 } 908 } 909 910 private int runTest() throws Exception { 911 LOG.info("Starting the test"); 912 913 String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName()); 914 long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME); 915 916 String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName()); 917 numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS); 918 919 ArrayList<Worker> workers = new ArrayList<>(numThreads); 920 for (int i = 0; i < numThreads; i++) { 921 checkException(workers); 922 Worker worker = new Worker(); 923 LOG.info("Launching worker thread " + worker.getName()); 924 workers.add(worker); 925 worker.start(); 926 } 927 928 Threads.sleep(runtime / 2); 929 LOG.info("Stopping creating new tables"); 930 create_table.set(false); 931 Threads.sleep(runtime / 2); 932 LOG.info("Runtime is up"); 933 running.set(false); 934 935 checkException(workers); 936 937 for (Worker worker : workers) { 938 worker.join(); 939 } 940 LOG.info("All Worker threads stopped"); 941 942 // verify 943 LOG.info("Verify actions of all threads succeeded"); 944 checkException(workers); 945 LOG.info("Verify namespaces"); 946 verifyNamespaces(); 947 LOG.info("Verify states of all tables"); 948 verifyTables(); 949 950 // RUN HBCK 951 952 HBaseFsck hbck = null; 953 try { 954 LOG.info("Running hbck"); 955 hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false); 956 if (HbckTestingUtil.inconsistencyFound(hbck)) { 957 // Find the inconsistency during HBCK. Leave table and namespace undropped so that 958 // we can check outside the test. 959 keepObjectsAtTheEnd = true; 960 } 961 HbckTestingUtil.assertNoErrors(hbck); 962 LOG.info("Finished hbck"); 963 } finally { 964 if (hbck != null) { 965 hbck.close(); 966 } 967 } 968 return 0; 969 } 970 971 @Override 972 public TableName getTablename() { 973 return null; // This test is not inteded to run with stock Chaos Monkey 974 } 975 976 @Override 977 protected Set<String> getColumnFamilies() { 978 return null; // This test is not inteded to run with stock Chaos Monkey 979 } 980 981 public static void main(String[] args) throws Exception { 982 Configuration conf = HBaseConfiguration.create(); 983 IntegrationTestingUtility.setUseDistributedCluster(conf); 984 IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover(); 985 Connection connection = null; 986 int ret = 1; 987 try { 988 // Initialize connection once, then pass to Actions 989 LOG.debug("Setting up connection ..."); 990 connection = ConnectionFactory.createConnection(conf); 991 masterFailover.setConnection(connection); 992 ret = ToolRunner.run(conf, masterFailover, args); 993 } catch (IOException e) { 994 LOG.error(HBaseMarkers.FATAL, "Failed to establish connection. Aborting test ...", e); 995 } finally { 996 connection = masterFailover.getConnection(); 997 if (connection != null) { 998 connection.close(); 999 } 1000 System.exit(ret); 1001 } 1002 } 1003}