001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.TreeSet; 028import java.util.concurrent.Callable; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.MetaTableAccessor; 036import org.apache.hadoop.hbase.MiniHBaseCluster; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.Admin; 041import org.apache.hadoop.hbase.client.BufferedMutator; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Connection; 045import org.apache.hadoop.hbase.client.Durability; 046import org.apache.hadoop.hbase.client.Put; 047import org.apache.hadoop.hbase.client.RegionInfo; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.TableDescriptor; 050import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 051import org.apache.hadoop.hbase.client.TableState; 052import org.apache.hadoop.hbase.master.HMaster; 053import org.apache.hadoop.hbase.master.RegionState; 054import org.apache.hadoop.hbase.master.TableStateManager; 055import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 056import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 057import org.apache.hadoop.hbase.procedure2.Procedure; 058import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 059import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 060import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 061import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 062import org.apache.hadoop.hbase.util.Bytes; 063import org.apache.hadoop.hbase.util.CommonFSUtils; 064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 065import org.apache.hadoop.hbase.util.FSUtils; 066import org.apache.hadoop.hbase.util.MD5Hash; 067import org.apache.hadoop.hbase.util.ModifyRegionUtils; 068import org.apache.yetus.audience.InterfaceAudience; 069import org.slf4j.Logger; 070import org.slf4j.LoggerFactory; 071 072@InterfaceAudience.Private 073public class MasterProcedureTestingUtility { 074 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class); 075 076 private MasterProcedureTestingUtility() { 077 } 078 079 public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec) 080 throws Exception { 081 final MasterProcedureEnv env = procExec.getEnvironment(); 082 final HMaster master = (HMaster) env.getMasterServices(); 083 ProcedureTestingUtility.restart(procExec, true, true, 084 // stop services 085 new Callable<Void>() { 086 @Override 087 public Void call() throws Exception { 088 AssignmentManager am = env.getAssignmentManager(); 089 // try to simulate a master restart by removing the ServerManager states about seqIDs 090 for (RegionState regionState : am.getRegionStates().getRegionStates()) { 091 env.getMasterServices().getServerManager().removeRegion(regionState.getRegion()); 092 } 093 am.stop(); 094 master.setInitialized(false); 095 return null; 096 } 097 }, 098 // setup RIT before starting workers 099 new Callable<Void>() { 100 101 @Override 102 public Void call() throws Exception { 103 AssignmentManager am = env.getAssignmentManager(); 104 am.start(); 105 // just follow the same way with HMaster.finishActiveMasterInitialization. See the 106 // comments there 107 am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess()) 108 .filter(p -> p instanceof TransitRegionStateProcedure) 109 .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList())); 110 return null; 111 } 112 }, 113 // restart services 114 new Callable<Void>() { 115 @Override 116 public Void call() throws Exception { 117 AssignmentManager am = env.getAssignmentManager(); 118 try { 119 am.joinCluster(); 120 am.wakeMetaLoadedEvent(); 121 master.setInitialized(true); 122 } catch (Exception e) { 123 LOG.warn("Failed to load meta", e); 124 } 125 return null; 126 } 127 }); 128 } 129 130 // ========================================================================== 131 // Master failover utils 132 // ========================================================================== 133 public static void masterFailover(final HBaseTestingUtility testUtil) throws Exception { 134 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 135 136 // Kill the master 137 HMaster oldMaster = cluster.getMaster(); 138 cluster.killMaster(cluster.getMaster().getServerName()); 139 140 // Wait the secondary 141 waitBackupMaster(testUtil, oldMaster); 142 } 143 144 public static void waitBackupMaster(final HBaseTestingUtility testUtil, final HMaster oldMaster) 145 throws Exception { 146 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 147 148 HMaster newMaster = cluster.getMaster(); 149 while (newMaster == null || newMaster == oldMaster) { 150 Thread.sleep(250); 151 newMaster = cluster.getMaster(); 152 } 153 154 while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { 155 Thread.sleep(250); 156 } 157 } 158 159 // ========================================================================== 160 // Table Helpers 161 // ========================================================================== 162 public static TableDescriptor createHTD(final TableName tableName, final String... family) { 163 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 164 for (int i = 0; i < family.length; ++i) { 165 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i])); 166 } 167 return builder.build(); 168 } 169 170 public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec, 171 final TableName tableName, final byte[][] splitKeys, String... family) throws IOException { 172 TableDescriptor htd = createHTD(tableName, family); 173 RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys); 174 long procId = ProcedureTestingUtility.submitAndWait(procExec, 175 new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); 176 ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); 177 return regions; 178 } 179 180 public static void validateTableCreation(final HMaster master, final TableName tableName, 181 final RegionInfo[] regions, String... family) throws IOException { 182 validateTableCreation(master, tableName, regions, true, family); 183 } 184 185 public static void validateTableCreation(final HMaster master, final TableName tableName, 186 final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException { 187 // check filesystem 188 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 189 final Path tableDir = 190 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 191 assertTrue(fs.exists(tableDir)); 192 CommonFSUtils.logFileSystemState(fs, tableDir, LOG); 193 List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir); 194 for (int i = 0; i < regions.length; ++i) { 195 Path regionDir = new Path(tableDir, regions[i].getEncodedName()); 196 assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir)); 197 assertTrue(unwantedRegionDirs.remove(regionDir)); 198 List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir); 199 for (int j = 0; j < family.length; ++j) { 200 final Path familyDir = new Path(regionDir, family[j]); 201 if (hasFamilyDirs) { 202 assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir)); 203 assertTrue(allFamilyDirs.remove(familyDir)); 204 } else { 205 // TODO: WARN: Modify Table/Families does not create a family dir 206 if (!fs.exists(familyDir)) { 207 LOG.warn(family[j] + " family dir does not exist"); 208 } 209 allFamilyDirs.remove(familyDir); 210 } 211 } 212 assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty()); 213 } 214 assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty()); 215 LOG.debug("Table directory layout is as expected."); 216 217 // check meta 218 assertTrue(tableExists(master.getConnection(), tableName)); 219 assertEquals(regions.length, countMetaRegions(master, tableName)); 220 221 // check htd 222 TableDescriptor htd = master.getTableDescriptors().get(tableName); 223 assertTrue("table descriptor not found", htd != null); 224 for (int i = 0; i < family.length; ++i) { 225 assertTrue("family not found " + family[i], 226 htd.getColumnFamily(Bytes.toBytes(family[i])) != null); 227 } 228 assertEquals(family.length, htd.getColumnFamilyCount()); 229 230 // checks store file tracker impl has been properly set in htd 231 String storeFileTrackerImpl = 232 StoreFileTrackerFactory.getStoreFileTrackerName(master.getConfiguration()); 233 assertEquals(storeFileTrackerImpl, htd.getValue(TRACKER_IMPL)); 234 } 235 236 public static void validateTableDeletion(final HMaster master, final TableName tableName) 237 throws IOException { 238 // check filesystem 239 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 240 final Path tableDir = 241 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 242 assertFalse(fs.exists(tableDir)); 243 244 // check meta 245 assertFalse(tableExists(master.getConnection(), tableName)); 246 assertEquals(0, countMetaRegions(master, tableName)); 247 248 // check htd 249 assertTrue("found htd of deleted table", master.getTableDescriptors().get(tableName) == null); 250 } 251 252 private static int countMetaRegions(final HMaster master, final TableName tableName) 253 throws IOException { 254 final AtomicInteger actualRegCount = new AtomicInteger(0); 255 final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { 256 @Override 257 public boolean visit(Result rowResult) throws IOException { 258 RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult); 259 if (list == null) { 260 LOG.warn("No serialized RegionInfo in " + rowResult); 261 return true; 262 } 263 HRegionLocation l = list.getRegionLocation(); 264 if (l == null) { 265 return true; 266 } 267 if (!l.getRegionInfo().getTable().equals(tableName)) { 268 return false; 269 } 270 if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; 271 HRegionLocation[] locations = list.getRegionLocations(); 272 for (HRegionLocation location : locations) { 273 if (location == null) continue; 274 ServerName serverName = location.getServerName(); 275 // Make sure that regions are assigned to server 276 if (serverName != null && serverName.getAddress() != null) { 277 actualRegCount.incrementAndGet(); 278 } 279 } 280 return true; 281 } 282 }; 283 MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName); 284 return actualRegCount.get(); 285 } 286 287 public static void validateTableIsEnabled(final HMaster master, final TableName tableName) 288 throws IOException { 289 TableStateManager tsm = master.getTableStateManager(); 290 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED)); 291 } 292 293 public static void validateTableIsDisabled(final HMaster master, final TableName tableName) 294 throws IOException { 295 TableStateManager tsm = master.getTableStateManager(); 296 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED)); 297 } 298 299 public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName, 300 final String family) throws IOException { 301 TableDescriptor htd = master.getTableDescriptors().get(tableName); 302 assertTrue(htd != null); 303 304 assertTrue(htd.hasColumnFamily(family.getBytes())); 305 } 306 307 public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName, 308 final String family) throws IOException { 309 // verify htd 310 TableDescriptor htd = master.getTableDescriptors().get(tableName); 311 assertTrue(htd != null); 312 assertFalse(htd.hasColumnFamily(family.getBytes())); 313 314 // verify fs 315 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 316 final Path tableDir = 317 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 318 for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) { 319 final Path familyDir = new Path(regionDir, family); 320 assertFalse(family + " family dir should not exist", fs.exists(familyDir)); 321 } 322 } 323 324 public static void validateColumnFamilyModification(final HMaster master, 325 final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor) 326 throws IOException { 327 TableDescriptor htd = master.getTableDescriptors().get(tableName); 328 assertTrue(htd != null); 329 330 ColumnFamilyDescriptor hcfd = htd.getColumnFamily(family.getBytes()); 331 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor)); 332 } 333 334 public static void loadData(final Connection connection, final TableName tableName, int rows, 335 final byte[][] splitKeys, final String... sfamilies) throws IOException { 336 byte[][] families = new byte[sfamilies.length][]; 337 for (int i = 0; i < families.length; ++i) { 338 families[i] = Bytes.toBytes(sfamilies[i]); 339 } 340 341 BufferedMutator mutator = connection.getBufferedMutator(tableName); 342 343 // Ensure one row per region 344 assertTrue(rows >= splitKeys.length); 345 for (byte[] k : splitKeys) { 346 byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), k); 347 byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); 348 mutator.mutate(createPut(families, key, value)); 349 rows--; 350 } 351 352 // Add other extra rows. more rows, more files 353 while (rows-- > 0) { 354 byte[] value = 355 Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), Bytes.toBytes(rows)); 356 byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); 357 mutator.mutate(createPut(families, key, value)); 358 } 359 mutator.flush(); 360 } 361 362 private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { 363 byte[] q = Bytes.toBytes("q"); 364 Put put = new Put(key); 365 put.setDurability(Durability.SKIP_WAL); 366 for (byte[] family : families) { 367 put.addColumn(family, q, value); 368 } 369 return put; 370 } 371 372 // ========================================================================== 373 // Procedure Helpers 374 // ========================================================================== 375 public static long generateNonceGroup(final HMaster master) { 376 return master.getClusterConnection().getNonceGenerator().getNonceGroup(); 377 } 378 379 public static long generateNonce(final HMaster master) { 380 return master.getClusterConnection().getNonceGenerator().newNonce(); 381 } 382 383 /** 384 * Run through all procedure flow states TWICE while also restarting procedure executor at each 385 * step; i.e force a reread of procedure store. 386 * <p> 387 * It does 388 * <ol> 389 * <li>Execute step N - kill the executor before store update 390 * <li>Restart executor/store 391 * <li>Execute step N - and then save to store 392 * </ol> 393 * <p> 394 * This is a good test for finding state that needs persisting and steps that are not idempotent. 395 * Use this version of the test when a procedure executes all flow steps from start to finish. 396 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long) 397 */ 398 public static void testRecoveryAndDoubleExecution( 399 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep, 400 final boolean expectExecRunning) throws Exception { 401 ProcedureTestingUtility.waitProcedure(procExec, procId); 402 assertEquals(false, procExec.isRunning()); 403 404 // Restart the executor and execute the step twice 405 // execute step N - kill before store update 406 // restart executor/store 407 // execute step N - save on store 408 // NOTE: currently we make assumption that states/ steps are sequential. There are already 409 // instances of a procedures which skip (don't use) intermediate states/ steps. In future, 410 // intermediate states/ steps can be added with ordinal greater than lastStep. If and when 411 // that happens the states can not be treated as sequential steps and the condition in 412 // following while loop needs to be changed. We can use euqals/ not equals operator to check 413 // if the procedure has reached the user specified state. But there is a possibility that 414 // while loop may not get the control back exaclty when the procedure is in lastStep. Proper 415 // fix would be get all visited states by the procedure and then check if user speccified 416 // state is in that list. Current assumption of sequential proregression of steps/ states is 417 // made at multiple places so we can keep while condition below for simplicity. 418 Procedure<?> proc = procExec.getProcedure(procId); 419 int stepNum = proc instanceof StateMachineProcedure 420 ? ((StateMachineProcedure) proc).getCurrentStateId() 421 : 0; 422 for (;;) { 423 if (stepNum == lastStep) { 424 break; 425 } 426 LOG.info("Restart " + stepNum + " exec state=" + proc); 427 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 428 restartMasterProcedureExecutor(procExec); 429 ProcedureTestingUtility.waitProcedure(procExec, procId); 430 // Old proc object is stale, need to get the new one after ProcedureExecutor restart 431 proc = procExec.getProcedure(procId); 432 stepNum = proc instanceof StateMachineProcedure 433 ? ((StateMachineProcedure) proc).getCurrentStateId() 434 : stepNum + 1; 435 } 436 437 assertEquals(expectExecRunning, procExec.isRunning()); 438 } 439 440 /** 441 * Run through all procedure flow states TWICE while also restarting procedure executor at each 442 * step; i.e force a reread of procedure store. 443 * <p> 444 * It does 445 * <ol> 446 * <li>Execute step N - kill the executor before store update 447 * <li>Restart executor/store 448 * <li>Executes hook for each step twice 449 * <li>Execute step N - and then save to store 450 * </ol> 451 * <p> 452 * This is a good test for finding state that needs persisting and steps that are not idempotent. 453 * Use this version of the test when the order in which flow steps are executed is not start to 454 * finish; where the procedure may vary the flow steps dependent on circumstance found. 455 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean) 456 */ 457 public static void testRecoveryAndDoubleExecution( 458 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook) 459 throws Exception { 460 ProcedureTestingUtility.waitProcedure(procExec, procId); 461 assertEquals(false, procExec.isRunning()); 462 for (int i = 0; !procExec.isFinished(procId); ++i) { 463 LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId)); 464 if (hook != null) { 465 assertTrue(hook.execute(i)); 466 } 467 restartMasterProcedureExecutor(procExec); 468 ProcedureTestingUtility.waitProcedure(procExec, procId); 469 } 470 assertEquals(true, procExec.isRunning()); 471 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 472 } 473 474 public static void testRecoveryAndDoubleExecution( 475 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception { 476 testRecoveryAndDoubleExecution(procExec, procId, null); 477 } 478 479 /** 480 * Hook which will be executed on each step 481 */ 482 public interface StepHook { 483 /** 484 * @param step Step no. at which this will be executed 485 * @return false if test should fail otherwise true n 486 */ 487 boolean execute(int step) throws IOException; 488 } 489 490 /** 491 * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an 492 * abort() is injected. If the procedure implement abort() this should result in rollback being 493 * triggered. Each rollback step is called twice, by restarting the executor after every step. At 494 * the end of this call the procedure should be finished and rolledback. This method assert on the 495 * procedure being terminated with an AbortException. 496 */ 497 public static void testRollbackAndDoubleExecution( 498 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep) 499 throws Exception { 500 testRollbackAndDoubleExecution(procExec, procId, lastStep, false); 501 } 502 503 public static void testRollbackAndDoubleExecution( 504 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep, 505 boolean waitForAsyncProcs) throws Exception { 506 // Execute up to last step 507 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 508 509 // Restart the executor and rollback the step twice 510 // rollback step N - kill before store update 511 // restart executor/store 512 // rollback step N - save on store 513 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 514 abortListener.addProcId(procId); 515 procExec.registerListener(abortListener); 516 try { 517 for (int i = 0; !procExec.isFinished(procId); ++i) { 518 LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId)); 519 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 520 restartMasterProcedureExecutor(procExec); 521 ProcedureTestingUtility.waitProcedure(procExec, procId); 522 } 523 } finally { 524 assertTrue(procExec.unregisterListener(abortListener)); 525 } 526 527 if (waitForAsyncProcs) { 528 // Sometimes there are other procedures still executing (including asynchronously spawned by 529 // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before 530 // store update. Let all pending procedures finish normally. 531 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 532 // check 3 times to confirm that the procedure executor has not been killed 533 for (int i = 0; i < 3; i++) { 534 if (!procExec.isRunning()) { 535 LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" 536 + " to KillAndToggleBeforeStoreUpdate flag."); 537 restartMasterProcedureExecutor(procExec); 538 break; 539 } 540 Thread.sleep(1000); 541 } 542 ProcedureTestingUtility.waitNoProcedureRunning(procExec); 543 } 544 545 assertEquals(true, procExec.isRunning()); 546 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 547 } 548 549 /** 550 * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an 551 * abort() is injected. If the procedure implement abort() this should result in rollback being 552 * triggered. At the end of this call the procedure should be finished and rolledback. This method 553 * assert on the procedure being terminated with an AbortException. 554 */ 555 public static void testRollbackRetriableFailure( 556 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep) 557 throws Exception { 558 // Execute up to last step 559 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 560 561 // execute the rollback 562 testRestartWithAbort(procExec, procId); 563 564 assertEquals(true, procExec.isRunning()); 565 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 566 } 567 568 /** 569 * Restart the ProcedureExecutor and inject an abort to the specified procedure. If the procedure 570 * implement abort() this should result in rollback being triggered. At the end of this call the 571 * procedure should be finished and rolledback, if abort is implemnted 572 */ 573 public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec, 574 long procId) throws Exception { 575 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 576 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 577 abortListener.addProcId(procId); 578 procExec.registerListener(abortListener); 579 try { 580 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 581 LOG.info("Restart and rollback procId=" + procId); 582 restartMasterProcedureExecutor(procExec); 583 ProcedureTestingUtility.waitProcedure(procExec, procId); 584 } finally { 585 assertTrue(procExec.unregisterListener(abortListener)); 586 } 587 } 588 589 public static boolean tableExists(Connection conn, TableName tableName) throws IOException { 590 try (Admin admin = conn.getAdmin()) { 591 return admin.tableExists(tableName); 592 } 593 } 594 595 public static class InjectAbortOnLoadListener 596 implements ProcedureExecutor.ProcedureExecutorListener { 597 private final ProcedureExecutor<MasterProcedureEnv> procExec; 598 private TreeSet<Long> procsToAbort = null; 599 600 public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) { 601 this.procExec = procExec; 602 } 603 604 public void addProcId(long procId) { 605 if (procsToAbort == null) { 606 procsToAbort = new TreeSet<>(); 607 } 608 procsToAbort.add(procId); 609 } 610 611 @Override 612 public void procedureLoaded(long procId) { 613 if (procsToAbort != null && !procsToAbort.contains(procId)) { 614 return; 615 } 616 procExec.abort(procId); 617 } 618 619 @Override 620 public void procedureAdded(long procId) { 621 /* no-op */ } 622 623 @Override 624 public void procedureFinished(long procId) { 625 /* no-op */ } 626 } 627}