001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.procedure; 019 020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL; 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.TreeSet; 028import java.util.concurrent.Callable; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.CatalogFamilyFormat; 034import org.apache.hadoop.hbase.ClientMetaTableAccessor; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HRegionLocation; 037import org.apache.hadoop.hbase.MetaTableAccessor; 038import org.apache.hadoop.hbase.RegionLocations; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 041import org.apache.hadoop.hbase.TableName; 042import org.apache.hadoop.hbase.client.Admin; 043import org.apache.hadoop.hbase.client.BufferedMutator; 044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 046import org.apache.hadoop.hbase.client.Connection; 047import org.apache.hadoop.hbase.client.Durability; 048import org.apache.hadoop.hbase.client.Put; 049import org.apache.hadoop.hbase.client.RegionInfo; 050import org.apache.hadoop.hbase.client.Result; 051import org.apache.hadoop.hbase.client.TableDescriptor; 052import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 053import org.apache.hadoop.hbase.client.TableState; 054import org.apache.hadoop.hbase.master.HMaster; 055import org.apache.hadoop.hbase.master.RegionState; 056import org.apache.hadoop.hbase.master.TableStateManager; 057import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 058import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 059import org.apache.hadoop.hbase.procedure2.Procedure; 060import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 061import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 062import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 063import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory; 064import org.apache.hadoop.hbase.util.Bytes; 065import org.apache.hadoop.hbase.util.CommonFSUtils; 066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 067import org.apache.hadoop.hbase.util.FSUtils; 068import org.apache.hadoop.hbase.util.MD5Hash; 069import org.apache.hadoop.hbase.util.ModifyRegionUtils; 070import org.apache.yetus.audience.InterfaceAudience; 071import org.slf4j.Logger; 072import org.slf4j.LoggerFactory; 073 074@InterfaceAudience.Private 075public class MasterProcedureTestingUtility { 076 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class); 077 078 private MasterProcedureTestingUtility() { 079 } 080 081 public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec) 082 throws Exception { 083 final MasterProcedureEnv env = procExec.getEnvironment(); 084 final HMaster master = (HMaster) env.getMasterServices(); 085 ProcedureTestingUtility.restart(procExec, true, true, 086 // stop services 087 new Callable<Void>() { 088 @Override 089 public Void call() throws Exception { 090 master.setServiceStarted(false); 091 AssignmentManager am = env.getAssignmentManager(); 092 // try to simulate a master restart by removing the ServerManager states about seqIDs 093 for (RegionState regionState : am.getRegionStates().getRegionStates()) { 094 env.getMasterServices().getServerManager().removeRegion(regionState.getRegion()); 095 } 096 am.stop(); 097 master.setInitialized(false); 098 return null; 099 } 100 }, 101 // setup RIT before starting workers 102 new Callable<Void>() { 103 104 @Override 105 public Void call() throws Exception { 106 AssignmentManager am = env.getAssignmentManager(); 107 am.start(); 108 // just follow the same way with HMaster.finishActiveMasterInitialization. See the 109 // comments there 110 am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess()) 111 .filter(p -> p instanceof TransitRegionStateProcedure) 112 .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList())); 113 // create server state node, to simulate master start up 114 env.getMasterServices().getServerManager().getOnlineServersList() 115 .forEach(am.getRegionStates()::createServer); 116 am.initializationPostMetaOnline(); 117 master.setServiceStarted(true); 118 return null; 119 } 120 }, 121 // restart services 122 new Callable<Void>() { 123 @Override 124 public Void call() throws Exception { 125 AssignmentManager am = env.getAssignmentManager(); 126 try { 127 am.joinCluster(); 128 am.wakeMetaLoadedEvent(); 129 master.setInitialized(true); 130 } catch (Exception e) { 131 LOG.warn("Failed to load meta", e); 132 } 133 return null; 134 } 135 }); 136 } 137 138 // ========================================================================== 139 // Master failover utils 140 // ========================================================================== 141 public static void masterFailover(final HBaseTestingUtil testUtil) throws Exception { 142 SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 143 144 // Kill the master 145 HMaster oldMaster = cluster.getMaster(); 146 cluster.killMaster(cluster.getMaster().getServerName()); 147 148 // Wait the secondary 149 waitBackupMaster(testUtil, oldMaster); 150 } 151 152 public static void waitBackupMaster(final HBaseTestingUtil testUtil, final HMaster oldMaster) 153 throws Exception { 154 SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 155 156 HMaster newMaster = cluster.getMaster(); 157 while (newMaster == null || newMaster == oldMaster) { 158 Thread.sleep(250); 159 newMaster = cluster.getMaster(); 160 } 161 162 while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { 163 Thread.sleep(250); 164 } 165 } 166 167 // ========================================================================== 168 // Table Helpers 169 // ========================================================================== 170 public static TableDescriptor createHTD(final TableName tableName, final String... family) { 171 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 172 for (int i = 0; i < family.length; ++i) { 173 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i])); 174 } 175 return builder.build(); 176 } 177 178 public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec, 179 final TableName tableName, final byte[][] splitKeys, String... family) throws IOException { 180 TableDescriptor htd = createHTD(tableName, family); 181 RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys); 182 long procId = ProcedureTestingUtility.submitAndWait(procExec, 183 new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); 184 ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); 185 return regions; 186 } 187 188 public static void validateTableCreation(final HMaster master, final TableName tableName, 189 final RegionInfo[] regions, String... family) throws IOException { 190 validateTableCreation(master, tableName, regions, true, family); 191 } 192 193 public static void validateTableCreation(final HMaster master, final TableName tableName, 194 final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException { 195 // check filesystem 196 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 197 final Path tableDir = 198 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 199 assertTrue(fs.exists(tableDir)); 200 CommonFSUtils.logFileSystemState(fs, tableDir, LOG); 201 List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir); 202 for (int i = 0; i < regions.length; ++i) { 203 Path regionDir = new Path(tableDir, regions[i].getEncodedName()); 204 assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir)); 205 assertTrue(unwantedRegionDirs.remove(regionDir)); 206 List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir); 207 for (int j = 0; j < family.length; ++j) { 208 final Path familyDir = new Path(regionDir, family[j]); 209 if (hasFamilyDirs) { 210 assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir)); 211 assertTrue(allFamilyDirs.remove(familyDir)); 212 } else { 213 // TODO: WARN: Modify Table/Families does not create a family dir 214 if (!fs.exists(familyDir)) { 215 LOG.warn(family[j] + " family dir does not exist"); 216 } 217 allFamilyDirs.remove(familyDir); 218 } 219 } 220 assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty()); 221 } 222 assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty()); 223 LOG.debug("Table directory layout is as expected."); 224 225 // check meta 226 assertTrue(tableExists(master.getConnection(), tableName)); 227 assertEquals(regions.length, countMetaRegions(master, tableName)); 228 229 // check htd 230 TableDescriptor htd = master.getTableDescriptors().get(tableName); 231 assertTrue("table descriptor not found", htd != null); 232 for (int i = 0; i < family.length; ++i) { 233 assertTrue("family not found " + family[i], 234 htd.getColumnFamily(Bytes.toBytes(family[i])) != null); 235 } 236 assertEquals(family.length, htd.getColumnFamilyCount()); 237 238 // checks store file tracker impl has been properly set in htd 239 String storeFileTrackerImpl = 240 StoreFileTrackerFactory.getStoreFileTrackerName(master.getConfiguration()); 241 assertEquals(storeFileTrackerImpl, htd.getValue(TRACKER_IMPL)); 242 } 243 244 public static void validateTableDeletion(final HMaster master, final TableName tableName) 245 throws IOException { 246 // check filesystem 247 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 248 final Path tableDir = 249 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 250 assertFalse(fs.exists(tableDir)); 251 252 // check meta 253 assertFalse(tableExists(master.getConnection(), tableName)); 254 assertEquals(0, countMetaRegions(master, tableName)); 255 256 // check htd 257 assertTrue("found htd of deleted table", master.getTableDescriptors().get(tableName) == null); 258 } 259 260 private static int countMetaRegions(final HMaster master, final TableName tableName) 261 throws IOException { 262 final AtomicInteger actualRegCount = new AtomicInteger(0); 263 final ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() { 264 @Override 265 public boolean visit(Result rowResult) throws IOException { 266 RegionLocations list = CatalogFamilyFormat.getRegionLocations(rowResult); 267 if (list == null) { 268 LOG.warn("No serialized RegionInfo in " + rowResult); 269 return true; 270 } 271 HRegionLocation l = list.getRegionLocation(); 272 if (l == null) { 273 return true; 274 } 275 if (!l.getRegion().getTable().equals(tableName)) { 276 return false; 277 } 278 if (l.getRegion().isOffline() || l.getRegion().isSplit()) { 279 return true; 280 } 281 282 HRegionLocation[] locations = list.getRegionLocations(); 283 for (HRegionLocation location : locations) { 284 if (location == null) continue; 285 ServerName serverName = location.getServerName(); 286 // Make sure that regions are assigned to server 287 if (serverName != null && serverName.getAddress() != null) { 288 actualRegCount.incrementAndGet(); 289 } 290 } 291 return true; 292 } 293 }; 294 MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName); 295 return actualRegCount.get(); 296 } 297 298 public static void validateTableIsEnabled(final HMaster master, final TableName tableName) 299 throws IOException { 300 TableStateManager tsm = master.getTableStateManager(); 301 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED)); 302 } 303 304 public static void validateTableIsDisabled(final HMaster master, final TableName tableName) 305 throws IOException { 306 TableStateManager tsm = master.getTableStateManager(); 307 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED)); 308 } 309 310 public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName, 311 final String family) throws IOException { 312 TableDescriptor htd = master.getTableDescriptors().get(tableName); 313 assertTrue(htd != null); 314 315 assertTrue(htd.hasColumnFamily(Bytes.toBytes(family))); 316 } 317 318 public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName, 319 final String family) throws IOException { 320 // verify htd 321 TableDescriptor htd = master.getTableDescriptors().get(tableName); 322 assertTrue(htd != null); 323 assertFalse(htd.hasColumnFamily(Bytes.toBytes(family))); 324 325 // verify fs 326 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 327 final Path tableDir = 328 CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 329 for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) { 330 final Path familyDir = new Path(regionDir, family); 331 assertFalse(family + " family dir should not exist", fs.exists(familyDir)); 332 } 333 } 334 335 public static void validateColumnFamilyModification(final HMaster master, 336 final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor) 337 throws IOException { 338 TableDescriptor htd = master.getTableDescriptors().get(tableName); 339 assertTrue(htd != null); 340 341 ColumnFamilyDescriptor hcfd = htd.getColumnFamily(Bytes.toBytes(family)); 342 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor)); 343 } 344 345 public static void loadData(final Connection connection, final TableName tableName, int rows, 346 final byte[][] splitKeys, final String... sfamilies) throws IOException { 347 byte[][] families = new byte[sfamilies.length][]; 348 for (int i = 0; i < families.length; ++i) { 349 families[i] = Bytes.toBytes(sfamilies[i]); 350 } 351 352 BufferedMutator mutator = connection.getBufferedMutator(tableName); 353 354 // Ensure one row per region 355 assertTrue(rows >= splitKeys.length); 356 for (byte[] k : splitKeys) { 357 byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), k); 358 byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); 359 mutator.mutate(createPut(families, key, value)); 360 rows--; 361 } 362 363 // Add other extra rows. more rows, more files 364 while (rows-- > 0) { 365 byte[] value = 366 Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), Bytes.toBytes(rows)); 367 byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); 368 mutator.mutate(createPut(families, key, value)); 369 } 370 mutator.flush(); 371 } 372 373 private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { 374 byte[] q = Bytes.toBytes("q"); 375 Put put = new Put(key); 376 put.setDurability(Durability.SKIP_WAL); 377 for (byte[] family : families) { 378 put.addColumn(family, q, value); 379 } 380 return put; 381 } 382 383 // ========================================================================== 384 // Procedure Helpers 385 // ========================================================================== 386 public static long generateNonceGroup(final HMaster master) { 387 return master.getAsyncClusterConnection().getNonceGenerator().getNonceGroup(); 388 } 389 390 public static long generateNonce(final HMaster master) { 391 return master.getAsyncClusterConnection().getNonceGenerator().newNonce(); 392 } 393 394 /** 395 * Run through all procedure flow states TWICE while also restarting procedure executor at each 396 * step; i.e force a reread of procedure store. 397 * <p> 398 * It does 399 * <ol> 400 * <li>Execute step N - kill the executor before store update 401 * <li>Restart executor/store 402 * <li>Execute step N - and then save to store 403 * </ol> 404 * <p> 405 * This is a good test for finding state that needs persisting and steps that are not idempotent. 406 * Use this version of the test when a procedure executes all flow steps from start to finish. 407 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long) 408 */ 409 public static void testRecoveryAndDoubleExecution( 410 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep, 411 final boolean expectExecRunning) throws Exception { 412 ProcedureTestingUtility.waitProcedure(procExec, procId); 413 assertEquals(false, procExec.isRunning()); 414 415 // Restart the executor and execute the step twice 416 // execute step N - kill before store update 417 // restart executor/store 418 // execute step N - save on store 419 // NOTE: currently we make assumption that states/ steps are sequential. There are already 420 // instances of a procedures which skip (don't use) intermediate states/ steps. In future, 421 // intermediate states/ steps can be added with ordinal greater than lastStep. If and when 422 // that happens the states can not be treated as sequential steps and the condition in 423 // following while loop needs to be changed. We can use euqals/ not equals operator to check 424 // if the procedure has reached the user specified state. But there is a possibility that 425 // while loop may not get the control back exaclty when the procedure is in lastStep. Proper 426 // fix would be get all visited states by the procedure and then check if user speccified 427 // state is in that list. Current assumption of sequential proregression of steps/ states is 428 // made at multiple places so we can keep while condition below for simplicity. 429 Procedure<?> proc = procExec.getProcedure(procId); 430 int stepNum = proc instanceof StateMachineProcedure 431 ? ((StateMachineProcedure) proc).getCurrentStateId() 432 : 0; 433 for (;;) { 434 if (stepNum == lastStep) { 435 break; 436 } 437 LOG.info("Restart " + stepNum + " exec state=" + proc); 438 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 439 restartMasterProcedureExecutor(procExec); 440 ProcedureTestingUtility.waitProcedure(procExec, procId); 441 // Old proc object is stale, need to get the new one after ProcedureExecutor restart 442 proc = procExec.getProcedure(procId); 443 stepNum = proc instanceof StateMachineProcedure 444 ? ((StateMachineProcedure) proc).getCurrentStateId() 445 : stepNum + 1; 446 } 447 448 assertEquals(expectExecRunning, procExec.isRunning()); 449 } 450 451 /** 452 * Run through all procedure flow states TWICE while also restarting procedure executor at each 453 * step; i.e force a reread of procedure store. 454 * <p> 455 * It does 456 * <ol> 457 * <li>Execute step N - kill the executor before store update 458 * <li>Restart executor/store 459 * <li>Executes hook for each step twice 460 * <li>Execute step N - and then save to store 461 * </ol> 462 * <p> 463 * This is a good test for finding state that needs persisting and steps that are not idempotent. 464 * Use this version of the test when the order in which flow steps are executed is not start to 465 * finish; where the procedure may vary the flow steps dependent on circumstance found. 466 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean) 467 */ 468 public static void testRecoveryAndDoubleExecution( 469 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook) 470 throws Exception { 471 ProcedureTestingUtility.waitProcedure(procExec, procId); 472 assertEquals(false, procExec.isRunning()); 473 for (int i = 0; !procExec.isFinished(procId); ++i) { 474 LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId)); 475 if (hook != null) { 476 assertTrue(hook.execute(i)); 477 } 478 restartMasterProcedureExecutor(procExec); 479 ProcedureTestingUtility.waitProcedure(procExec, procId); 480 } 481 assertEquals(true, procExec.isRunning()); 482 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 483 } 484 485 public static void testRecoveryAndDoubleExecution( 486 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception { 487 testRecoveryAndDoubleExecution(procExec, procId, null); 488 } 489 490 /** 491 * Hook which will be executed on each step 492 */ 493 public interface StepHook { 494 /** 495 * @param step Step no. at which this will be executed 496 * @return false if test should fail otherwise true 497 */ 498 boolean execute(int step) throws IOException; 499 } 500 501 /** 502 * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an 503 * abort() is injected. If the procedure implement abort() this should result in rollback being 504 * triggered. Each rollback step is called twice, by restarting the executor after every step. At 505 * the end of this call the procedure should be finished and rolledback. This method assert on the 506 * procedure being terminated with an AbortException. 507 */ 508 public static void testRollbackAndDoubleExecution( 509 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep) 510 throws Exception { 511 testRollbackAndDoubleExecution(procExec, procId, lastStep, false); 512 } 513 514 public static void testRollbackAndDoubleExecution( 515 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep, 516 boolean waitForAsyncProcs) throws Exception { 517 // Execute up to last step 518 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 519 520 // Restart the executor and rollback the step twice 521 // rollback step N - kill before store update 522 // restart executor/store 523 // rollback step N - save on store 524 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 525 abortListener.addProcId(procId); 526 procExec.registerListener(abortListener); 527 try { 528 for (int i = 0; !procExec.isFinished(procId); ++i) { 529 LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId)); 530 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 531 restartMasterProcedureExecutor(procExec); 532 ProcedureTestingUtility.waitProcedure(procExec, procId); 533 } 534 } finally { 535 assertTrue(procExec.unregisterListener(abortListener)); 536 } 537 538 if (waitForAsyncProcs) { 539 // Sometimes there are other procedures still executing (including asynchronously spawned by 540 // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before 541 // store update. Let all pending procedures finish normally. 542 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 543 // check 3 times to confirm that the procedure executor has not been killed 544 for (int i = 0; i < 3; i++) { 545 if (!procExec.isRunning()) { 546 LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" 547 + " to KillAndToggleBeforeStoreUpdate flag."); 548 restartMasterProcedureExecutor(procExec); 549 break; 550 } 551 Thread.sleep(1000); 552 } 553 ProcedureTestingUtility.waitNoProcedureRunning(procExec); 554 } 555 556 assertEquals(true, procExec.isRunning()); 557 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 558 } 559 560 /** 561 * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an 562 * abort() is injected. If the procedure implement abort() this should result in rollback being 563 * triggered. At the end of this call the procedure should be finished and rolledback. This method 564 * assert on the procedure being terminated with an AbortException. 565 */ 566 public static void testRollbackRetriableFailure( 567 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep) 568 throws Exception { 569 // Execute up to last step 570 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 571 572 // execute the rollback 573 testRestartWithAbort(procExec, procId); 574 575 assertEquals(true, procExec.isRunning()); 576 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 577 } 578 579 /** 580 * Restart the ProcedureExecutor and inject an abort to the specified procedure. If the procedure 581 * implement abort() this should result in rollback being triggered. At the end of this call the 582 * procedure should be finished and rolledback, if abort is implemnted 583 */ 584 public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec, 585 long procId) throws Exception { 586 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 587 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 588 abortListener.addProcId(procId); 589 procExec.registerListener(abortListener); 590 try { 591 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 592 LOG.info("Restart and rollback procId=" + procId); 593 restartMasterProcedureExecutor(procExec); 594 ProcedureTestingUtility.waitProcedure(procExec, procId); 595 } finally { 596 assertTrue(procExec.unregisterListener(abortListener)); 597 } 598 } 599 600 public static boolean tableExists(Connection conn, TableName tableName) throws IOException { 601 try (Admin admin = conn.getAdmin()) { 602 return admin.tableExists(tableName); 603 } 604 } 605 606 public static class InjectAbortOnLoadListener 607 implements ProcedureExecutor.ProcedureExecutorListener { 608 private final ProcedureExecutor<MasterProcedureEnv> procExec; 609 private TreeSet<Long> procsToAbort = null; 610 611 public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) { 612 this.procExec = procExec; 613 } 614 615 public void addProcId(long procId) { 616 if (procsToAbort == null) { 617 procsToAbort = new TreeSet<>(); 618 } 619 procsToAbort.add(procId); 620 } 621 622 @Override 623 public void procedureLoaded(long procId) { 624 if (procsToAbort != null && !procsToAbort.contains(procId)) { 625 return; 626 } 627 procExec.abort(procId); 628 } 629 630 @Override 631 public void procedureAdded(long procId) { 632 /* no-op */ } 633 634 @Override 635 public void procedureFinished(long procId) { 636 /* no-op */ } 637 } 638}