001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.master.procedure; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.TreeSet; 028import java.util.concurrent.Callable; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.MetaTableAccessor; 036import org.apache.hadoop.hbase.MiniHBaseCluster; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.BufferedMutator; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.Durability; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.client.TableState; 052import org.apache.hadoop.hbase.master.HMaster; 053import org.apache.hadoop.hbase.master.RegionState; 054import org.apache.hadoop.hbase.master.TableStateManager; 055import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 056import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 057import org.apache.hadoop.hbase.procedure2.Procedure; 058import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 059import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 060import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 061import org.apache.hadoop.hbase.util.Bytes; 062import org.apache.hadoop.hbase.util.CommonFSUtils; 063import org.apache.hadoop.hbase.util.FSUtils; 064import org.apache.hadoop.hbase.util.MD5Hash; 065import org.apache.hadoop.hbase.util.ModifyRegionUtils; 066import org.apache.yetus.audience.InterfaceAudience; 067import org.slf4j.Logger; 068import org.slf4j.LoggerFactory; 069 070@InterfaceAudience.Private 071public class MasterProcedureTestingUtility { 072 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class); 073 074 private MasterProcedureTestingUtility() { } 075 076 public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec) 077 throws Exception { 078 final MasterProcedureEnv env = procExec.getEnvironment(); 079 final HMaster master = (HMaster) env.getMasterServices(); 080 ProcedureTestingUtility.restart(procExec, true, true, 081 // stop services 082 new Callable<Void>() { 083 @Override 084 public Void call() throws Exception { 085 AssignmentManager am = env.getAssignmentManager(); 086 // try to simulate a master restart by removing the ServerManager states about seqIDs 087 for (RegionState regionState: am.getRegionStates().getRegionStates()) { 088 env.getMasterServices().getServerManager().removeRegion(regionState.getRegion()); 089 } 090 am.stop(); 091 master.setInitialized(false); 092 return null; 093 } 094 }, 095 // setup RIT before starting workers 096 new Callable<Void>() { 097 098 @Override 099 public Void call() throws Exception { 100 AssignmentManager am = env.getAssignmentManager(); 101 am.start(); 102 // just follow the same way with HMaster.finishActiveMasterInitialization. See the 103 // comments there 104 am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess()) 105 .filter(p -> p instanceof TransitRegionStateProcedure) 106 .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList())); 107 return null; 108 } 109 }, 110 // restart services 111 new Callable<Void>() { 112 @Override 113 public Void call() throws Exception { 114 AssignmentManager am = env.getAssignmentManager(); 115 try { 116 am.joinCluster(); 117 am.wakeMetaLoadedEvent(); 118 master.setInitialized(true); 119 } catch (Exception e) { 120 LOG.warn("Failed to load meta", e); 121 } 122 return null; 123 } 124 }); 125 } 126 127 // ========================================================================== 128 // Master failover utils 129 // ========================================================================== 130 public static void masterFailover(final HBaseTestingUtility testUtil) 131 throws Exception { 132 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 133 134 // Kill the master 135 HMaster oldMaster = cluster.getMaster(); 136 cluster.killMaster(cluster.getMaster().getServerName()); 137 138 // Wait the secondary 139 waitBackupMaster(testUtil, oldMaster); 140 } 141 142 public static void waitBackupMaster(final HBaseTestingUtility testUtil, 143 final HMaster oldMaster) throws Exception { 144 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 145 146 HMaster newMaster = cluster.getMaster(); 147 while (newMaster == null || newMaster == oldMaster) { 148 Thread.sleep(250); 149 newMaster = cluster.getMaster(); 150 } 151 152 while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { 153 Thread.sleep(250); 154 } 155 } 156 157 // ========================================================================== 158 // Table Helpers 159 // ========================================================================== 160 public static TableDescriptor createHTD(final TableName tableName, final String... family) { 161 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 162 for (int i = 0; i < family.length; ++i) { 163 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i])); 164 } 165 return builder.build(); 166 } 167 168 public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec, 169 final TableName tableName, final byte[][] splitKeys, String... family) throws IOException { 170 TableDescriptor htd = createHTD(tableName, family); 171 RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys); 172 long procId = ProcedureTestingUtility.submitAndWait(procExec, 173 new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); 174 ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); 175 return regions; 176 } 177 178 public static void validateTableCreation(final HMaster master, final TableName tableName, 179 final RegionInfo[] regions, String... family) throws IOException { 180 validateTableCreation(master, tableName, regions, true, family); 181 } 182 183 public static void validateTableCreation(final HMaster master, final TableName tableName, 184 final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException { 185 // check filesystem 186 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 187 final Path tableDir = 188 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 189 assertTrue(fs.exists(tableDir)); 190 CommonFSUtils.logFileSystemState(fs, tableDir, LOG); 191 List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir); 192 for (int i = 0; i < regions.length; ++i) { 193 Path regionDir = new Path(tableDir, regions[i].getEncodedName()); 194 assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir)); 195 assertTrue(unwantedRegionDirs.remove(regionDir)); 196 List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir); 197 for (int j = 0; j < family.length; ++j) { 198 final Path familyDir = new Path(regionDir, family[j]); 199 if (hasFamilyDirs) { 200 assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir)); 201 assertTrue(allFamilyDirs.remove(familyDir)); 202 } else { 203 // TODO: WARN: Modify Table/Families does not create a family dir 204 if (!fs.exists(familyDir)) { 205 LOG.warn(family[j] + " family dir does not exist"); 206 } 207 allFamilyDirs.remove(familyDir); 208 } 209 } 210 assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty()); 211 } 212 assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty()); 213 LOG.debug("Table directory layout is as expected."); 214 215 // check meta 216 assertTrue(tableExists(master.getConnection(), tableName)); 217 assertEquals(regions.length, countMetaRegions(master, tableName)); 218 219 // check htd 220 TableDescriptor htd = master.getTableDescriptors().get(tableName); 221 assertTrue("table descriptor not found", htd != null); 222 for (int i = 0; i < family.length; ++i) { 223 assertTrue("family not found " + family[i], htd.getColumnFamily(Bytes.toBytes(family[i])) != null); 224 } 225 assertEquals(family.length, htd.getColumnFamilyCount()); 226 } 227 228 public static void validateTableDeletion( 229 final HMaster master, final TableName tableName) throws IOException { 230 // check filesystem 231 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 232 final Path tableDir = 233 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 234 assertFalse(fs.exists(tableDir)); 235 236 // check meta 237 assertFalse(tableExists(master.getConnection(), tableName)); 238 assertEquals(0, countMetaRegions(master, tableName)); 239 240 // check htd 241 assertTrue("found htd of deleted table", 242 master.getTableDescriptors().get(tableName) == null); 243 } 244 245 private static int countMetaRegions(final HMaster master, final TableName tableName) 246 throws IOException { 247 final AtomicInteger actualRegCount = new AtomicInteger(0); 248 final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { 249 @Override 250 public boolean visit(Result rowResult) throws IOException { 251 RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult); 252 if (list == null) { 253 LOG.warn("No serialized RegionInfo in " + rowResult); 254 return true; 255 } 256 HRegionLocation l = list.getRegionLocation(); 257 if (l == null) { 258 return true; 259 } 260 if (!l.getRegionInfo().getTable().equals(tableName)) { 261 return false; 262 } 263 if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; 264 HRegionLocation[] locations = list.getRegionLocations(); 265 for (HRegionLocation location : locations) { 266 if (location == null) continue; 267 ServerName serverName = location.getServerName(); 268 // Make sure that regions are assigned to server 269 if (serverName != null && serverName.getHostAndPort() != null) { 270 actualRegCount.incrementAndGet(); 271 } 272 } 273 return true; 274 } 275 }; 276 MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName); 277 return actualRegCount.get(); 278 } 279 280 public static void validateTableIsEnabled(final HMaster master, final TableName tableName) 281 throws IOException { 282 TableStateManager tsm = master.getTableStateManager(); 283 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED)); 284 } 285 286 public static void validateTableIsDisabled(final HMaster master, final TableName tableName) 287 throws IOException { 288 TableStateManager tsm = master.getTableStateManager(); 289 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED)); 290 } 291 292 public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName, 293 final String family) throws IOException { 294 TableDescriptor htd = master.getTableDescriptors().get(tableName); 295 assertTrue(htd != null); 296 297 assertTrue(htd.hasColumnFamily(family.getBytes())); 298 } 299 300 public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName, 301 final String family) throws IOException { 302 // verify htd 303 TableDescriptor htd = master.getTableDescriptors().get(tableName); 304 assertTrue(htd != null); 305 assertFalse(htd.hasColumnFamily(family.getBytes())); 306 307 // verify fs 308 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 309 final Path tableDir = 310 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 311 for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) { 312 final Path familyDir = new Path(regionDir, family); 313 assertFalse(family + " family dir should not exist", fs.exists(familyDir)); 314 } 315 } 316 317 public static void validateColumnFamilyModification(final HMaster master, 318 final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor) 319 throws IOException { 320 TableDescriptor htd = master.getTableDescriptors().get(tableName); 321 assertTrue(htd != null); 322 323 ColumnFamilyDescriptor hcfd = htd.getColumnFamily(family.getBytes()); 324 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor)); 325 } 326 327 public static void loadData(final Connection connection, final TableName tableName, 328 int rows, final byte[][] splitKeys, final String... sfamilies) throws IOException { 329 byte[][] families = new byte[sfamilies.length][]; 330 for (int i = 0; i < families.length; ++i) { 331 families[i] = Bytes.toBytes(sfamilies[i]); 332 } 333 334 BufferedMutator mutator = connection.getBufferedMutator(tableName); 335 336 // Ensure one row per region 337 assertTrue(rows >= splitKeys.length); 338 for (byte[] k: splitKeys) { 339 byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k); 340 byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); 341 mutator.mutate(createPut(families, key, value)); 342 rows--; 343 } 344 345 // Add other extra rows. more rows, more files 346 while (rows-- > 0) { 347 byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows)); 348 byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); 349 mutator.mutate(createPut(families, key, value)); 350 } 351 mutator.flush(); 352 } 353 354 private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { 355 byte[] q = Bytes.toBytes("q"); 356 Put put = new Put(key); 357 put.setDurability(Durability.SKIP_WAL); 358 for (byte[] family: families) { 359 put.addColumn(family, q, value); 360 } 361 return put; 362 } 363 364 // ========================================================================== 365 // Procedure Helpers 366 // ========================================================================== 367 public static long generateNonceGroup(final HMaster master) { 368 return master.getClusterConnection().getNonceGenerator().getNonceGroup(); 369 } 370 371 public static long generateNonce(final HMaster master) { 372 return master.getClusterConnection().getNonceGenerator().newNonce(); 373 } 374 375 /** 376 * Run through all procedure flow states TWICE while also restarting procedure executor at each 377 * step; i.e force a reread of procedure store. 378 * 379 *<p>It does 380 * <ol><li>Execute step N - kill the executor before store update 381 * <li>Restart executor/store 382 * <li>Execute step N - and then save to store 383 * </ol> 384 * 385 *<p>This is a good test for finding state that needs persisting and steps that are not 386 * idempotent. Use this version of the test when a procedure executes all flow steps from start to 387 * finish. 388 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long) 389 */ 390 public static void testRecoveryAndDoubleExecution( 391 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 392 final int lastStep, final boolean expectExecRunning) throws Exception { 393 ProcedureTestingUtility.waitProcedure(procExec, procId); 394 assertEquals(false, procExec.isRunning()); 395 396 // Restart the executor and execute the step twice 397 // execute step N - kill before store update 398 // restart executor/store 399 // execute step N - save on store 400 // NOTE: currently we make assumption that states/ steps are sequential. There are already 401 // instances of a procedures which skip (don't use) intermediate states/ steps. In future, 402 // intermediate states/ steps can be added with ordinal greater than lastStep. If and when 403 // that happens the states can not be treated as sequential steps and the condition in 404 // following while loop needs to be changed. We can use euqals/ not equals operator to check 405 // if the procedure has reached the user specified state. But there is a possibility that 406 // while loop may not get the control back exaclty when the procedure is in lastStep. Proper 407 // fix would be get all visited states by the procedure and then check if user speccified 408 // state is in that list. Current assumption of sequential proregression of steps/ states is 409 // made at multiple places so we can keep while condition below for simplicity. 410 Procedure<?> proc = procExec.getProcedure(procId); 411 int stepNum = proc instanceof StateMachineProcedure ? 412 ((StateMachineProcedure) proc).getCurrentStateId() : 0; 413 for (;;) { 414 if (stepNum == lastStep) { 415 break; 416 } 417 LOG.info("Restart " + stepNum + " exec state=" + proc); 418 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 419 restartMasterProcedureExecutor(procExec); 420 ProcedureTestingUtility.waitProcedure(procExec, procId); 421 // Old proc object is stale, need to get the new one after ProcedureExecutor restart 422 proc = procExec.getProcedure(procId); 423 stepNum = proc instanceof StateMachineProcedure ? 424 ((StateMachineProcedure) proc).getCurrentStateId() : stepNum + 1; 425 } 426 427 assertEquals(expectExecRunning, procExec.isRunning()); 428 } 429 430 /** 431 * Run through all procedure flow states TWICE while also restarting 432 * procedure executor at each step; i.e force a reread of procedure store. 433 * 434 *<p>It does 435 * <ol><li>Execute step N - kill the executor before store update 436 * <li>Restart executor/store 437 * <li>Executes hook for each step twice 438 * <li>Execute step N - and then save to store 439 * </ol> 440 * 441 *<p>This is a good test for finding state that needs persisting and steps that are not 442 * idempotent. Use this version of the test when the order in which flow steps are executed is 443 * not start to finish; where the procedure may vary the flow steps dependent on circumstance 444 * found. 445 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean) 446 */ 447 public static void testRecoveryAndDoubleExecution( 448 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook) 449 throws Exception { 450 ProcedureTestingUtility.waitProcedure(procExec, procId); 451 assertEquals(false, procExec.isRunning()); 452 for (int i = 0; !procExec.isFinished(procId); ++i) { 453 LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId)); 454 if (hook != null) { 455 assertTrue(hook.execute(i)); 456 } 457 restartMasterProcedureExecutor(procExec); 458 ProcedureTestingUtility.waitProcedure(procExec, procId); 459 } 460 assertEquals(true, procExec.isRunning()); 461 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 462 } 463 464 public static void testRecoveryAndDoubleExecution( 465 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception { 466 testRecoveryAndDoubleExecution(procExec, procId, null); 467 } 468 469 /** 470 * Hook which will be executed on each step 471 */ 472 public interface StepHook{ 473 /** 474 * @param step Step no. at which this will be executed 475 * @return false if test should fail otherwise true 476 * @throws IOException 477 */ 478 boolean execute(int step) throws IOException; 479 } 480 481 /** 482 * Execute the procedure up to "lastStep" and then the ProcedureExecutor 483 * is restarted and an abort() is injected. 484 * If the procedure implement abort() this should result in rollback being triggered. 485 * Each rollback step is called twice, by restarting the executor after every step. 486 * At the end of this call the procedure should be finished and rolledback. 487 * This method assert on the procedure being terminated with an AbortException. 488 */ 489 public static void testRollbackAndDoubleExecution( 490 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 491 final int lastStep) throws Exception { 492 testRollbackAndDoubleExecution(procExec, procId, lastStep, false); 493 } 494 495 public static void testRollbackAndDoubleExecution( 496 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 497 final int lastStep, boolean waitForAsyncProcs) throws Exception { 498 // Execute up to last step 499 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 500 501 // Restart the executor and rollback the step twice 502 // rollback step N - kill before store update 503 // restart executor/store 504 // rollback step N - save on store 505 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 506 abortListener.addProcId(procId); 507 procExec.registerListener(abortListener); 508 try { 509 for (int i = 0; !procExec.isFinished(procId); ++i) { 510 LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId)); 511 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 512 restartMasterProcedureExecutor(procExec); 513 ProcedureTestingUtility.waitProcedure(procExec, procId); 514 } 515 } finally { 516 assertTrue(procExec.unregisterListener(abortListener)); 517 } 518 519 if (waitForAsyncProcs) { 520 // Sometimes there are other procedures still executing (including asynchronously spawned by 521 // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before 522 // store update. Let all pending procedures finish normally. 523 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 524 // check 3 times to confirm that the procedure executor has not been killed 525 for (int i = 0; i < 3; i++) { 526 if (!procExec.isRunning()) { 527 LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" + 528 " to KillAndToggleBeforeStoreUpdate flag."); 529 restartMasterProcedureExecutor(procExec); 530 break; 531 } 532 Thread.sleep(1000); 533 } 534 ProcedureTestingUtility.waitNoProcedureRunning(procExec); 535 } 536 537 assertEquals(true, procExec.isRunning()); 538 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 539 } 540 541 /** 542 * Execute the procedure up to "lastStep" and then the ProcedureExecutor 543 * is restarted and an abort() is injected. 544 * If the procedure implement abort() this should result in rollback being triggered. 545 * At the end of this call the procedure should be finished and rolledback. 546 * This method assert on the procedure being terminated with an AbortException. 547 */ 548 public static void testRollbackRetriableFailure( 549 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 550 final int lastStep) throws Exception { 551 // Execute up to last step 552 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 553 554 // execute the rollback 555 testRestartWithAbort(procExec, procId); 556 557 assertEquals(true, procExec.isRunning()); 558 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 559 } 560 561 /** 562 * Restart the ProcedureExecutor and inject an abort to the specified procedure. 563 * If the procedure implement abort() this should result in rollback being triggered. 564 * At the end of this call the procedure should be finished and rolledback, if abort is implemnted 565 */ 566 public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec, 567 long procId) throws Exception { 568 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 569 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 570 abortListener.addProcId(procId); 571 procExec.registerListener(abortListener); 572 try { 573 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 574 LOG.info("Restart and rollback procId=" + procId); 575 restartMasterProcedureExecutor(procExec); 576 ProcedureTestingUtility.waitProcedure(procExec, procId); 577 } finally { 578 assertTrue(procExec.unregisterListener(abortListener)); 579 } 580 } 581 582 public static boolean tableExists(Connection conn, TableName tableName) throws IOException { 583 try (Admin admin = conn.getAdmin()) { 584 return admin.tableExists(tableName); 585 } 586 } 587 588 public static class InjectAbortOnLoadListener 589 implements ProcedureExecutor.ProcedureExecutorListener { 590 private final ProcedureExecutor<MasterProcedureEnv> procExec; 591 private TreeSet<Long> procsToAbort = null; 592 593 public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) { 594 this.procExec = procExec; 595 } 596 597 public void addProcId(long procId) { 598 if (procsToAbort == null) { 599 procsToAbort = new TreeSet<>(); 600 } 601 procsToAbort.add(procId); 602 } 603 604 @Override 605 public void procedureLoaded(long procId) { 606 if (procsToAbort != null && !procsToAbort.contains(procId)) { 607 return; 608 } 609 procExec.abort(procId); 610 } 611 612 @Override 613 public void procedureAdded(long procId) { /* no-op */ } 614 615 @Override 616 public void procedureFinished(long procId) { /* no-op */ } 617 } 618}