001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.master.procedure; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.TreeSet; 028import java.util.concurrent.Callable; 029import java.util.concurrent.atomic.AtomicInteger; 030import java.util.stream.Collectors; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.HBaseTestingUtility; 034import org.apache.hadoop.hbase.HRegionLocation; 035import org.apache.hadoop.hbase.MetaTableAccessor; 036import org.apache.hadoop.hbase.MiniHBaseCluster; 037import org.apache.hadoop.hbase.RegionLocations; 038import org.apache.hadoop.hbase.ServerName; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.BufferedMutator; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 043import org.apache.hadoop.hbase.client.Connection; 044import org.apache.hadoop.hbase.client.Durability; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.RegionInfo; 047import org.apache.hadoop.hbase.client.Result; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.client.TableState; 051import org.apache.hadoop.hbase.master.HMaster; 052import org.apache.hadoop.hbase.master.RegionState; 053import org.apache.hadoop.hbase.master.TableStateManager; 054import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 055import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; 056import org.apache.hadoop.hbase.procedure2.Procedure; 057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 058import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 059import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 060import org.apache.hadoop.hbase.util.Bytes; 061import org.apache.hadoop.hbase.util.FSUtils; 062import org.apache.hadoop.hbase.util.MD5Hash; 063import org.apache.hadoop.hbase.util.ModifyRegionUtils; 064import org.apache.yetus.audience.InterfaceAudience; 065import org.slf4j.Logger; 066import org.slf4j.LoggerFactory; 067 068@InterfaceAudience.Private 069public class MasterProcedureTestingUtility { 070 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class); 071 072 private MasterProcedureTestingUtility() { } 073 074 public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec) 075 throws Exception { 076 final MasterProcedureEnv env = procExec.getEnvironment(); 077 final HMaster master = (HMaster) env.getMasterServices(); 078 ProcedureTestingUtility.restart(procExec, true, true, 079 // stop services 080 new Callable<Void>() { 081 @Override 082 public Void call() throws Exception { 083 AssignmentManager am = env.getAssignmentManager(); 084 // try to simulate a master restart by removing the ServerManager states about seqIDs 085 for (RegionState regionState: am.getRegionStates().getRegionStates()) { 086 env.getMasterServices().getServerManager().removeRegion(regionState.getRegion()); 087 } 088 am.stop(); 089 master.setInitialized(false); 090 return null; 091 } 092 }, 093 // setup RIT before starting workers 094 new Callable<Void>() { 095 096 @Override 097 public Void call() throws Exception { 098 AssignmentManager am = env.getAssignmentManager(); 099 am.start(); 100 // just follow the same way with HMaster.finishActiveMasterInitialization. See the 101 // comments there 102 am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess()) 103 .filter(p -> p instanceof TransitRegionStateProcedure) 104 .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList())); 105 return null; 106 } 107 }, 108 // restart services 109 new Callable<Void>() { 110 @Override 111 public Void call() throws Exception { 112 AssignmentManager am = env.getAssignmentManager(); 113 try { 114 am.joinCluster(); 115 master.setInitialized(true); 116 } catch (Exception e) { 117 LOG.warn("Failed to load meta", e); 118 } 119 return null; 120 } 121 }); 122 } 123 124 // ========================================================================== 125 // Master failover utils 126 // ========================================================================== 127 public static void masterFailover(final HBaseTestingUtility testUtil) 128 throws Exception { 129 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 130 131 // Kill the master 132 HMaster oldMaster = cluster.getMaster(); 133 cluster.killMaster(cluster.getMaster().getServerName()); 134 135 // Wait the secondary 136 waitBackupMaster(testUtil, oldMaster); 137 } 138 139 public static void waitBackupMaster(final HBaseTestingUtility testUtil, 140 final HMaster oldMaster) throws Exception { 141 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 142 143 HMaster newMaster = cluster.getMaster(); 144 while (newMaster == null || newMaster == oldMaster) { 145 Thread.sleep(250); 146 newMaster = cluster.getMaster(); 147 } 148 149 while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { 150 Thread.sleep(250); 151 } 152 } 153 154 // ========================================================================== 155 // Table Helpers 156 // ========================================================================== 157 public static TableDescriptor createHTD(final TableName tableName, final String... family) { 158 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 159 for (int i = 0; i < family.length; ++i) { 160 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i])); 161 } 162 return builder.build(); 163 } 164 165 public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec, 166 final TableName tableName, final byte[][] splitKeys, String... family) throws IOException { 167 TableDescriptor htd = createHTD(tableName, family); 168 RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys); 169 long procId = ProcedureTestingUtility.submitAndWait(procExec, 170 new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); 171 ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); 172 return regions; 173 } 174 175 public static void validateTableCreation(final HMaster master, final TableName tableName, 176 final RegionInfo[] regions, String... family) throws IOException { 177 validateTableCreation(master, tableName, regions, true, family); 178 } 179 180 public static void validateTableCreation(final HMaster master, final TableName tableName, 181 final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException { 182 // check filesystem 183 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 184 final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 185 assertTrue(fs.exists(tableDir)); 186 FSUtils.logFileSystemState(fs, tableDir, LOG); 187 List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir); 188 for (int i = 0; i < regions.length; ++i) { 189 Path regionDir = new Path(tableDir, regions[i].getEncodedName()); 190 assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir)); 191 assertTrue(unwantedRegionDirs.remove(regionDir)); 192 List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir); 193 for (int j = 0; j < family.length; ++j) { 194 final Path familyDir = new Path(regionDir, family[j]); 195 if (hasFamilyDirs) { 196 assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir)); 197 assertTrue(allFamilyDirs.remove(familyDir)); 198 } else { 199 // TODO: WARN: Modify Table/Families does not create a family dir 200 if (!fs.exists(familyDir)) { 201 LOG.warn(family[j] + " family dir does not exist"); 202 } 203 allFamilyDirs.remove(familyDir); 204 } 205 } 206 assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty()); 207 } 208 assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty()); 209 LOG.debug("Table directory layout is as expected."); 210 211 // check meta 212 assertTrue(MetaTableAccessor.tableExists(master.getConnection(), tableName)); 213 assertEquals(regions.length, countMetaRegions(master, tableName)); 214 215 // check htd 216 TableDescriptor htd = master.getTableDescriptors().get(tableName); 217 assertTrue("table descriptor not found", htd != null); 218 for (int i = 0; i < family.length; ++i) { 219 assertTrue("family not found " + family[i], htd.getColumnFamily(Bytes.toBytes(family[i])) != null); 220 } 221 assertEquals(family.length, htd.getColumnFamilyCount()); 222 } 223 224 public static void validateTableDeletion( 225 final HMaster master, final TableName tableName) throws IOException { 226 // check filesystem 227 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 228 final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 229 assertFalse(fs.exists(tableDir)); 230 231 // check meta 232 assertFalse(MetaTableAccessor.tableExists(master.getConnection(), tableName)); 233 assertEquals(0, countMetaRegions(master, tableName)); 234 235 // check htd 236 assertTrue("found htd of deleted table", 237 master.getTableDescriptors().get(tableName) == null); 238 } 239 240 private static int countMetaRegions(final HMaster master, final TableName tableName) 241 throws IOException { 242 final AtomicInteger actualRegCount = new AtomicInteger(0); 243 final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { 244 @Override 245 public boolean visit(Result rowResult) throws IOException { 246 RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult); 247 if (list == null) { 248 LOG.warn("No serialized RegionInfo in " + rowResult); 249 return true; 250 } 251 HRegionLocation l = list.getRegionLocation(); 252 if (l == null) { 253 return true; 254 } 255 if (!l.getRegionInfo().getTable().equals(tableName)) { 256 return false; 257 } 258 if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; 259 HRegionLocation[] locations = list.getRegionLocations(); 260 for (HRegionLocation location : locations) { 261 if (location == null) continue; 262 ServerName serverName = location.getServerName(); 263 // Make sure that regions are assigned to server 264 if (serverName != null && serverName.getHostAndPort() != null) { 265 actualRegCount.incrementAndGet(); 266 } 267 } 268 return true; 269 } 270 }; 271 MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName); 272 return actualRegCount.get(); 273 } 274 275 public static void validateTableIsEnabled(final HMaster master, final TableName tableName) 276 throws IOException { 277 TableStateManager tsm = master.getTableStateManager(); 278 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED)); 279 } 280 281 public static void validateTableIsDisabled(final HMaster master, final TableName tableName) 282 throws IOException { 283 TableStateManager tsm = master.getTableStateManager(); 284 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED)); 285 } 286 287 public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName, 288 final String family) throws IOException { 289 TableDescriptor htd = master.getTableDescriptors().get(tableName); 290 assertTrue(htd != null); 291 292 assertTrue(htd.hasColumnFamily(family.getBytes())); 293 } 294 295 public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName, 296 final String family) throws IOException { 297 // verify htd 298 TableDescriptor htd = master.getTableDescriptors().get(tableName); 299 assertTrue(htd != null); 300 assertFalse(htd.hasColumnFamily(family.getBytes())); 301 302 // verify fs 303 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 304 final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 305 for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) { 306 final Path familyDir = new Path(regionDir, family); 307 assertFalse(family + " family dir should not exist", fs.exists(familyDir)); 308 } 309 } 310 311 public static void validateColumnFamilyModification(final HMaster master, 312 final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor) 313 throws IOException { 314 TableDescriptor htd = master.getTableDescriptors().get(tableName); 315 assertTrue(htd != null); 316 317 ColumnFamilyDescriptor hcfd = htd.getColumnFamily(family.getBytes()); 318 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor)); 319 } 320 321 public static void loadData(final Connection connection, final TableName tableName, 322 int rows, final byte[][] splitKeys, final String... sfamilies) throws IOException { 323 byte[][] families = new byte[sfamilies.length][]; 324 for (int i = 0; i < families.length; ++i) { 325 families[i] = Bytes.toBytes(sfamilies[i]); 326 } 327 328 BufferedMutator mutator = connection.getBufferedMutator(tableName); 329 330 // Ensure one row per region 331 assertTrue(rows >= splitKeys.length); 332 for (byte[] k: splitKeys) { 333 byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k); 334 byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); 335 mutator.mutate(createPut(families, key, value)); 336 rows--; 337 } 338 339 // Add other extra rows. more rows, more files 340 while (rows-- > 0) { 341 byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows)); 342 byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); 343 mutator.mutate(createPut(families, key, value)); 344 } 345 mutator.flush(); 346 } 347 348 private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { 349 byte[] q = Bytes.toBytes("q"); 350 Put put = new Put(key); 351 put.setDurability(Durability.SKIP_WAL); 352 for (byte[] family: families) { 353 put.addColumn(family, q, value); 354 } 355 return put; 356 } 357 358 // ========================================================================== 359 // Procedure Helpers 360 // ========================================================================== 361 public static long generateNonceGroup(final HMaster master) { 362 return master.getClusterConnection().getNonceGenerator().getNonceGroup(); 363 } 364 365 public static long generateNonce(final HMaster master) { 366 return master.getClusterConnection().getNonceGenerator().newNonce(); 367 } 368 369 /** 370 * Run through all procedure flow states TWICE while also restarting procedure executor at each 371 * step; i.e force a reread of procedure store. 372 * 373 *<p>It does 374 * <ol><li>Execute step N - kill the executor before store update 375 * <li>Restart executor/store 376 * <li>Execute step N - and then save to store 377 * </ol> 378 * 379 *<p>This is a good test for finding state that needs persisting and steps that are not 380 * idempotent. Use this version of the test when a procedure executes all flow steps from start to 381 * finish. 382 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long) 383 */ 384 public static void testRecoveryAndDoubleExecution( 385 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 386 final int lastStep, final boolean expectExecRunning) throws Exception { 387 ProcedureTestingUtility.waitProcedure(procExec, procId); 388 assertEquals(false, procExec.isRunning()); 389 390 // Restart the executor and execute the step twice 391 // execute step N - kill before store update 392 // restart executor/store 393 // execute step N - save on store 394 // NOTE: currently we make assumption that states/ steps are sequential. There are already 395 // instances of a procedures which skip (don't use) intermediate states/ steps. In future, 396 // intermediate states/ steps can be added with ordinal greater than lastStep. If and when 397 // that happens the states can not be treated as sequential steps and the condition in 398 // following while loop needs to be changed. We can use euqals/ not equals operator to check 399 // if the procedure has reached the user specified state. But there is a possibility that 400 // while loop may not get the control back exaclty when the procedure is in lastStep. Proper 401 // fix would be get all visited states by the procedure and then check if user speccified 402 // state is in that list. Current assumption of sequential proregression of steps/ states is 403 // made at multiple places so we can keep while condition below for simplicity. 404 Procedure<?> proc = procExec.getProcedure(procId); 405 int stepNum = proc instanceof StateMachineProcedure ? 406 ((StateMachineProcedure) proc).getCurrentStateId() : 0; 407 for (;;) { 408 if (stepNum == lastStep) { 409 break; 410 } 411 LOG.info("Restart " + stepNum + " exec state=" + proc); 412 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 413 restartMasterProcedureExecutor(procExec); 414 ProcedureTestingUtility.waitProcedure(procExec, procId); 415 // Old proc object is stale, need to get the new one after ProcedureExecutor restart 416 proc = procExec.getProcedure(procId); 417 stepNum = proc instanceof StateMachineProcedure ? 418 ((StateMachineProcedure) proc).getCurrentStateId() : stepNum + 1; 419 } 420 421 assertEquals(expectExecRunning, procExec.isRunning()); 422 } 423 424 /** 425 * Run through all procedure flow states TWICE while also restarting 426 * procedure executor at each step; i.e force a reread of procedure store. 427 * 428 *<p>It does 429 * <ol><li>Execute step N - kill the executor before store update 430 * <li>Restart executor/store 431 * <li>Executes hook for each step twice 432 * <li>Execute step N - and then save to store 433 * </ol> 434 * 435 *<p>This is a good test for finding state that needs persisting and steps that are not 436 * idempotent. Use this version of the test when the order in which flow steps are executed is 437 * not start to finish; where the procedure may vary the flow steps dependent on circumstance 438 * found. 439 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean) 440 */ 441 public static void testRecoveryAndDoubleExecution( 442 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook) 443 throws Exception { 444 ProcedureTestingUtility.waitProcedure(procExec, procId); 445 assertEquals(false, procExec.isRunning()); 446 for (int i = 0; !procExec.isFinished(procId); ++i) { 447 LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId)); 448 if (hook != null) { 449 assertTrue(hook.execute(i)); 450 } 451 restartMasterProcedureExecutor(procExec); 452 ProcedureTestingUtility.waitProcedure(procExec, procId); 453 } 454 assertEquals(true, procExec.isRunning()); 455 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 456 } 457 458 public static void testRecoveryAndDoubleExecution( 459 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception { 460 testRecoveryAndDoubleExecution(procExec, procId, null); 461 } 462 463 /** 464 * Hook which will be executed on each step 465 */ 466 public interface StepHook{ 467 /** 468 * @param step Step no. at which this will be executed 469 * @return false if test should fail otherwise true 470 * @throws IOException 471 */ 472 boolean execute(int step) throws IOException; 473 } 474 475 /** 476 * Execute the procedure up to "lastStep" and then the ProcedureExecutor 477 * is restarted and an abort() is injected. 478 * If the procedure implement abort() this should result in rollback being triggered. 479 * Each rollback step is called twice, by restarting the executor after every step. 480 * At the end of this call the procedure should be finished and rolledback. 481 * This method assert on the procedure being terminated with an AbortException. 482 */ 483 public static void testRollbackAndDoubleExecution( 484 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 485 final int lastStep) throws Exception { 486 testRollbackAndDoubleExecution(procExec, procId, lastStep, false); 487 } 488 489 public static void testRollbackAndDoubleExecution( 490 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 491 final int lastStep, boolean waitForAsyncProcs) throws Exception { 492 // Execute up to last step 493 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 494 495 // Restart the executor and rollback the step twice 496 // rollback step N - kill before store update 497 // restart executor/store 498 // rollback step N - save on store 499 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 500 abortListener.addProcId(procId); 501 procExec.registerListener(abortListener); 502 try { 503 for (int i = 0; !procExec.isFinished(procId); ++i) { 504 LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId)); 505 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 506 restartMasterProcedureExecutor(procExec); 507 ProcedureTestingUtility.waitProcedure(procExec, procId); 508 } 509 } finally { 510 assertTrue(procExec.unregisterListener(abortListener)); 511 } 512 513 if (waitForAsyncProcs) { 514 // Sometimes there are other procedures still executing (including asynchronously spawned by 515 // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before 516 // store update. Let all pending procedures finish normally. 517 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 518 // check 3 times to confirm that the procedure executor has not been killed 519 for (int i = 0; i < 3; i++) { 520 if (!procExec.isRunning()) { 521 LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" + 522 " to KillAndToggleBeforeStoreUpdate flag."); 523 restartMasterProcedureExecutor(procExec); 524 break; 525 } 526 Thread.sleep(1000); 527 } 528 ProcedureTestingUtility.waitNoProcedureRunning(procExec); 529 } 530 531 assertEquals(true, procExec.isRunning()); 532 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 533 } 534 535 /** 536 * Execute the procedure up to "lastStep" and then the ProcedureExecutor 537 * is restarted and an abort() is injected. 538 * If the procedure implement abort() this should result in rollback being triggered. 539 * At the end of this call the procedure should be finished and rolledback. 540 * This method assert on the procedure being terminated with an AbortException. 541 */ 542 public static void testRollbackRetriableFailure( 543 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 544 final int lastStep) throws Exception { 545 // Execute up to last step 546 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 547 548 // execute the rollback 549 testRestartWithAbort(procExec, procId); 550 551 assertEquals(true, procExec.isRunning()); 552 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 553 } 554 555 /** 556 * Restart the ProcedureExecutor and inject an abort to the specified procedure. 557 * If the procedure implement abort() this should result in rollback being triggered. 558 * At the end of this call the procedure should be finished and rolledback, if abort is implemnted 559 */ 560 public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec, 561 long procId) throws Exception { 562 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 563 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 564 abortListener.addProcId(procId); 565 procExec.registerListener(abortListener); 566 try { 567 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 568 LOG.info("Restart and rollback procId=" + procId); 569 restartMasterProcedureExecutor(procExec); 570 ProcedureTestingUtility.waitProcedure(procExec, procId); 571 } finally { 572 assertTrue(procExec.unregisterListener(abortListener)); 573 } 574 } 575 576 public static class InjectAbortOnLoadListener 577 implements ProcedureExecutor.ProcedureExecutorListener { 578 private final ProcedureExecutor<MasterProcedureEnv> procExec; 579 private TreeSet<Long> procsToAbort = null; 580 581 public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) { 582 this.procExec = procExec; 583 } 584 585 public void addProcId(long procId) { 586 if (procsToAbort == null) { 587 procsToAbort = new TreeSet<>(); 588 } 589 procsToAbort.add(procId); 590 } 591 592 @Override 593 public void procedureLoaded(long procId) { 594 if (procsToAbort != null && !procsToAbort.contains(procId)) { 595 return; 596 } 597 procExec.abort(procId); 598 } 599 600 @Override 601 public void procedureAdded(long procId) { /* no-op */ } 602 603 @Override 604 public void procedureFinished(long procId) { /* no-op */ } 605 } 606}