001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.master.procedure; 020 021import static org.junit.Assert.assertEquals; 022import static org.junit.Assert.assertFalse; 023import static org.junit.Assert.assertTrue; 024 025import java.io.IOException; 026import java.util.List; 027import java.util.TreeSet; 028import java.util.concurrent.Callable; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.fs.FileSystem; 031import org.apache.hadoop.fs.Path; 032import org.apache.hadoop.hbase.HBaseTestingUtility; 033import org.apache.hadoop.hbase.HRegionLocation; 034import org.apache.hadoop.hbase.MetaTableAccessor; 035import org.apache.hadoop.hbase.MiniHBaseCluster; 036import org.apache.hadoop.hbase.RegionLocations; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.BufferedMutator; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.Durability; 044import org.apache.hadoop.hbase.client.Put; 045import org.apache.hadoop.hbase.client.RegionInfo; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.TableDescriptor; 048import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 049import org.apache.hadoop.hbase.client.TableState; 050import org.apache.hadoop.hbase.master.HMaster; 051import org.apache.hadoop.hbase.master.RegionState; 052import org.apache.hadoop.hbase.master.TableStateManager; 053import org.apache.hadoop.hbase.master.assignment.AssignmentManager; 054import org.apache.hadoop.hbase.procedure2.Procedure; 055import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; 056import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 057import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.FSUtils; 060import org.apache.hadoop.hbase.util.MD5Hash; 061import org.apache.hadoop.hbase.util.ModifyRegionUtils; 062import org.apache.yetus.audience.InterfaceAudience; 063import org.slf4j.Logger; 064import org.slf4j.LoggerFactory; 065 066@InterfaceAudience.Private 067public class MasterProcedureTestingUtility { 068 private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class); 069 070 private MasterProcedureTestingUtility() { } 071 072 public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec) 073 throws Exception { 074 final MasterProcedureEnv env = procExec.getEnvironment(); 075 final HMaster master = (HMaster)env.getMasterServices(); 076 ProcedureTestingUtility.restart(procExec, true, true, 077 // stop services 078 new Callable<Void>() { 079 @Override 080 public Void call() throws Exception { 081 final AssignmentManager am = env.getAssignmentManager(); 082 // try to simulate a master restart by removing the ServerManager states about seqIDs 083 for (RegionState regionState: am.getRegionStates().getRegionStates()) { 084 env.getMasterServices().getServerManager().removeRegion(regionState.getRegion()); 085 } 086 am.stop(); 087 master.setInitialized(false); 088 return null; 089 } 090 }, 091 // restart services 092 new Callable<Void>() { 093 @Override 094 public Void call() throws Exception { 095 final AssignmentManager am = env.getAssignmentManager(); 096 am.start(); 097 am.joinCluster(); 098 master.setInitialized(true); 099 return null; 100 } 101 }); 102 } 103 104 // ========================================================================== 105 // Master failover utils 106 // ========================================================================== 107 public static void masterFailover(final HBaseTestingUtility testUtil) 108 throws Exception { 109 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 110 111 // Kill the master 112 HMaster oldMaster = cluster.getMaster(); 113 cluster.killMaster(cluster.getMaster().getServerName()); 114 115 // Wait the secondary 116 waitBackupMaster(testUtil, oldMaster); 117 } 118 119 public static void waitBackupMaster(final HBaseTestingUtility testUtil, 120 final HMaster oldMaster) throws Exception { 121 MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster(); 122 123 HMaster newMaster = cluster.getMaster(); 124 while (newMaster == null || newMaster == oldMaster) { 125 Thread.sleep(250); 126 newMaster = cluster.getMaster(); 127 } 128 129 while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) { 130 Thread.sleep(250); 131 } 132 } 133 134 // ========================================================================== 135 // Table Helpers 136 // ========================================================================== 137 public static TableDescriptor createHTD(final TableName tableName, final String... family) { 138 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); 139 for (int i = 0; i < family.length; ++i) { 140 builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i])); 141 } 142 return builder.build(); 143 } 144 145 public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec, 146 final TableName tableName, final byte[][] splitKeys, String... family) throws IOException { 147 TableDescriptor htd = createHTD(tableName, family); 148 RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys); 149 long procId = ProcedureTestingUtility.submitAndWait(procExec, 150 new CreateTableProcedure(procExec.getEnvironment(), htd, regions)); 151 ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId)); 152 return regions; 153 } 154 155 public static void validateTableCreation(final HMaster master, final TableName tableName, 156 final RegionInfo[] regions, String... family) throws IOException { 157 validateTableCreation(master, tableName, regions, true, family); 158 } 159 160 public static void validateTableCreation(final HMaster master, final TableName tableName, 161 final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException { 162 // check filesystem 163 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 164 final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 165 assertTrue(fs.exists(tableDir)); 166 FSUtils.logFileSystemState(fs, tableDir, LOG); 167 List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir); 168 for (int i = 0; i < regions.length; ++i) { 169 Path regionDir = new Path(tableDir, regions[i].getEncodedName()); 170 assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir)); 171 assertTrue(unwantedRegionDirs.remove(regionDir)); 172 List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir); 173 for (int j = 0; j < family.length; ++j) { 174 final Path familyDir = new Path(regionDir, family[j]); 175 if (hasFamilyDirs) { 176 assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir)); 177 assertTrue(allFamilyDirs.remove(familyDir)); 178 } else { 179 // TODO: WARN: Modify Table/Families does not create a family dir 180 if (!fs.exists(familyDir)) { 181 LOG.warn(family[j] + " family dir does not exist"); 182 } 183 allFamilyDirs.remove(familyDir); 184 } 185 } 186 assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty()); 187 } 188 assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty()); 189 LOG.debug("Table directory layout is as expected."); 190 191 // check meta 192 assertTrue(MetaTableAccessor.tableExists(master.getConnection(), tableName)); 193 assertEquals(regions.length, countMetaRegions(master, tableName)); 194 195 // check htd 196 TableDescriptor htd = master.getTableDescriptors().get(tableName); 197 assertTrue("table descriptor not found", htd != null); 198 for (int i = 0; i < family.length; ++i) { 199 assertTrue("family not found " + family[i], htd.getColumnFamily(Bytes.toBytes(family[i])) != null); 200 } 201 assertEquals(family.length, htd.getColumnFamilyCount()); 202 } 203 204 public static void validateTableDeletion( 205 final HMaster master, final TableName tableName) throws IOException { 206 // check filesystem 207 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 208 final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 209 assertFalse(fs.exists(tableDir)); 210 211 // check meta 212 assertFalse(MetaTableAccessor.tableExists(master.getConnection(), tableName)); 213 assertEquals(0, countMetaRegions(master, tableName)); 214 215 // check htd 216 assertTrue("found htd of deleted table", 217 master.getTableDescriptors().get(tableName) == null); 218 } 219 220 private static int countMetaRegions(final HMaster master, final TableName tableName) 221 throws IOException { 222 final AtomicInteger actualRegCount = new AtomicInteger(0); 223 final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { 224 @Override 225 public boolean visit(Result rowResult) throws IOException { 226 RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult); 227 if (list == null) { 228 LOG.warn("No serialized RegionInfo in " + rowResult); 229 return true; 230 } 231 HRegionLocation l = list.getRegionLocation(); 232 if (l == null) { 233 return true; 234 } 235 if (!l.getRegionInfo().getTable().equals(tableName)) { 236 return false; 237 } 238 if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; 239 HRegionLocation[] locations = list.getRegionLocations(); 240 for (HRegionLocation location : locations) { 241 if (location == null) continue; 242 ServerName serverName = location.getServerName(); 243 // Make sure that regions are assigned to server 244 if (serverName != null && serverName.getHostAndPort() != null) { 245 actualRegCount.incrementAndGet(); 246 } 247 } 248 return true; 249 } 250 }; 251 MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName); 252 return actualRegCount.get(); 253 } 254 255 public static void validateTableIsEnabled(final HMaster master, final TableName tableName) 256 throws IOException { 257 TableStateManager tsm = master.getTableStateManager(); 258 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED)); 259 } 260 261 public static void validateTableIsDisabled(final HMaster master, final TableName tableName) 262 throws IOException { 263 TableStateManager tsm = master.getTableStateManager(); 264 assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED)); 265 } 266 267 public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName, 268 final String family) throws IOException { 269 TableDescriptor htd = master.getTableDescriptors().get(tableName); 270 assertTrue(htd != null); 271 272 assertTrue(htd.hasColumnFamily(family.getBytes())); 273 } 274 275 public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName, 276 final String family) throws IOException { 277 // verify htd 278 TableDescriptor htd = master.getTableDescriptors().get(tableName); 279 assertTrue(htd != null); 280 assertFalse(htd.hasColumnFamily(family.getBytes())); 281 282 // verify fs 283 final FileSystem fs = master.getMasterFileSystem().getFileSystem(); 284 final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName); 285 for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) { 286 final Path familyDir = new Path(regionDir, family); 287 assertFalse(family + " family dir should not exist", fs.exists(familyDir)); 288 } 289 } 290 291 public static void validateColumnFamilyModification(final HMaster master, 292 final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor) 293 throws IOException { 294 TableDescriptor htd = master.getTableDescriptors().get(tableName); 295 assertTrue(htd != null); 296 297 ColumnFamilyDescriptor hcfd = htd.getColumnFamily(family.getBytes()); 298 assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor)); 299 } 300 301 public static void loadData(final Connection connection, final TableName tableName, 302 int rows, final byte[][] splitKeys, final String... sfamilies) throws IOException { 303 byte[][] families = new byte[sfamilies.length][]; 304 for (int i = 0; i < families.length; ++i) { 305 families[i] = Bytes.toBytes(sfamilies[i]); 306 } 307 308 BufferedMutator mutator = connection.getBufferedMutator(tableName); 309 310 // Ensure one row per region 311 assertTrue(rows >= splitKeys.length); 312 for (byte[] k: splitKeys) { 313 byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k); 314 byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value))); 315 mutator.mutate(createPut(families, key, value)); 316 rows--; 317 } 318 319 // Add other extra rows. more rows, more files 320 while (rows-- > 0) { 321 byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows)); 322 byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value)); 323 mutator.mutate(createPut(families, key, value)); 324 } 325 mutator.flush(); 326 } 327 328 private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) { 329 byte[] q = Bytes.toBytes("q"); 330 Put put = new Put(key); 331 put.setDurability(Durability.SKIP_WAL); 332 for (byte[] family: families) { 333 put.addColumn(family, q, value); 334 } 335 return put; 336 } 337 338 // ========================================================================== 339 // Procedure Helpers 340 // ========================================================================== 341 public static long generateNonceGroup(final HMaster master) { 342 return master.getClusterConnection().getNonceGenerator().getNonceGroup(); 343 } 344 345 public static long generateNonce(final HMaster master) { 346 return master.getClusterConnection().getNonceGenerator().newNonce(); 347 } 348 349 /** 350 * Run through all procedure flow states TWICE while also restarting procedure executor at each 351 * step; i.e force a reread of procedure store. 352 * 353 *<p>It does 354 * <ol><li>Execute step N - kill the executor before store update 355 * <li>Restart executor/store 356 * <li>Execute step N - and then save to store 357 * </ol> 358 * 359 *<p>This is a good test for finding state that needs persisting and steps that are not 360 * idempotent. Use this version of the test when a procedure executes all flow steps from start to 361 * finish. 362 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long) 363 */ 364 public static void testRecoveryAndDoubleExecution( 365 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 366 final int lastStep, final boolean expectExecRunning) throws Exception { 367 ProcedureTestingUtility.waitProcedure(procExec, procId); 368 assertEquals(false, procExec.isRunning()); 369 370 // Restart the executor and execute the step twice 371 // execute step N - kill before store update 372 // restart executor/store 373 // execute step N - save on store 374 // NOTE: currently we make assumption that states/ steps are sequential. There are already 375 // instances of a procedures which skip (don't use) intermediate states/ steps. In future, 376 // intermediate states/ steps can be added with ordinal greater than lastStep. If and when 377 // that happens the states can not be treated as sequential steps and the condition in 378 // following while loop needs to be changed. We can use euqals/ not equals operator to check 379 // if the procedure has reached the user specified state. But there is a possibility that 380 // while loop may not get the control back exaclty when the procedure is in lastStep. Proper 381 // fix would be get all visited states by the procedure and then check if user speccified 382 // state is in that list. Current assumption of sequential proregression of steps/ states is 383 // made at multiple places so we can keep while condition below for simplicity. 384 Procedure<?> proc = procExec.getProcedure(procId); 385 int stepNum = proc instanceof StateMachineProcedure ? 386 ((StateMachineProcedure) proc).getCurrentStateId() : 0; 387 for (;;) { 388 if (stepNum == lastStep) { 389 break; 390 } 391 LOG.info("Restart " + stepNum + " exec state=" + proc); 392 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 393 restartMasterProcedureExecutor(procExec); 394 ProcedureTestingUtility.waitProcedure(procExec, procId); 395 // Old proc object is stale, need to get the new one after ProcedureExecutor restart 396 proc = procExec.getProcedure(procId); 397 stepNum = proc instanceof StateMachineProcedure ? 398 ((StateMachineProcedure) proc).getCurrentStateId() : stepNum + 1; 399 } 400 401 assertEquals(expectExecRunning, procExec.isRunning()); 402 } 403 404 /** 405 * Run through all procedure flow states TWICE while also restarting 406 * procedure executor at each step; i.e force a reread of procedure store. 407 * 408 *<p>It does 409 * <ol><li>Execute step N - kill the executor before store update 410 * <li>Restart executor/store 411 * <li>Executes hook for each step twice 412 * <li>Execute step N - and then save to store 413 * </ol> 414 * 415 *<p>This is a good test for finding state that needs persisting and steps that are not 416 * idempotent. Use this version of the test when the order in which flow steps are executed is 417 * not start to finish; where the procedure may vary the flow steps dependent on circumstance 418 * found. 419 * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean) 420 */ 421 public static void testRecoveryAndDoubleExecution( 422 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook) 423 throws Exception { 424 ProcedureTestingUtility.waitProcedure(procExec, procId); 425 assertEquals(false, procExec.isRunning()); 426 for (int i = 0; !procExec.isFinished(procId); ++i) { 427 LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId)); 428 if (hook != null) { 429 assertTrue(hook.execute(i)); 430 } 431 restartMasterProcedureExecutor(procExec); 432 ProcedureTestingUtility.waitProcedure(procExec, procId); 433 } 434 assertEquals(true, procExec.isRunning()); 435 ProcedureTestingUtility.assertProcNotFailed(procExec, procId); 436 } 437 438 public static void testRecoveryAndDoubleExecution( 439 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception { 440 testRecoveryAndDoubleExecution(procExec, procId, null); 441 } 442 443 /** 444 * Hook which will be executed on each step 445 */ 446 public interface StepHook{ 447 /** 448 * @param step Step no. at which this will be executed 449 * @return false if test should fail otherwise true 450 * @throws IOException 451 */ 452 boolean execute(int step) throws IOException; 453 } 454 455 /** 456 * Execute the procedure up to "lastStep" and then the ProcedureExecutor 457 * is restarted and an abort() is injected. 458 * If the procedure implement abort() this should result in rollback being triggered. 459 * Each rollback step is called twice, by restarting the executor after every step. 460 * At the end of this call the procedure should be finished and rolledback. 461 * This method assert on the procedure being terminated with an AbortException. 462 */ 463 public static void testRollbackAndDoubleExecution( 464 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 465 final int lastStep) throws Exception { 466 testRollbackAndDoubleExecution(procExec, procId, lastStep, false); 467 } 468 469 public static void testRollbackAndDoubleExecution( 470 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 471 final int lastStep, boolean waitForAsyncProcs) throws Exception { 472 // Execute up to last step 473 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 474 475 // Restart the executor and rollback the step twice 476 // rollback step N - kill before store update 477 // restart executor/store 478 // rollback step N - save on store 479 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 480 abortListener.addProcId(procId); 481 procExec.registerListener(abortListener); 482 try { 483 for (int i = 0; !procExec.isFinished(procId); ++i) { 484 LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId)); 485 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 486 restartMasterProcedureExecutor(procExec); 487 ProcedureTestingUtility.waitProcedure(procExec, procId); 488 } 489 } finally { 490 assertTrue(procExec.unregisterListener(abortListener)); 491 } 492 493 if (waitForAsyncProcs) { 494 // Sometimes there are other procedures still executing (including asynchronously spawned by 495 // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before 496 // store update. Let all pending procedures finish normally. 497 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 498 // check 3 times to confirm that the procedure executor has not been killed 499 for (int i = 0; i < 3; i++) { 500 if (!procExec.isRunning()) { 501 LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" + 502 " to KillAndToggleBeforeStoreUpdate flag."); 503 restartMasterProcedureExecutor(procExec); 504 break; 505 } 506 Thread.sleep(1000); 507 } 508 ProcedureTestingUtility.waitNoProcedureRunning(procExec); 509 } 510 511 assertEquals(true, procExec.isRunning()); 512 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 513 } 514 515 /** 516 * Execute the procedure up to "lastStep" and then the ProcedureExecutor 517 * is restarted and an abort() is injected. 518 * If the procedure implement abort() this should result in rollback being triggered. 519 * At the end of this call the procedure should be finished and rolledback. 520 * This method assert on the procedure being terminated with an AbortException. 521 */ 522 public static void testRollbackRetriableFailure( 523 final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, 524 final int lastStep) throws Exception { 525 // Execute up to last step 526 testRecoveryAndDoubleExecution(procExec, procId, lastStep, false); 527 528 // execute the rollback 529 testRestartWithAbort(procExec, procId); 530 531 assertEquals(true, procExec.isRunning()); 532 ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId)); 533 } 534 535 /** 536 * Restart the ProcedureExecutor and inject an abort to the specified procedure. 537 * If the procedure implement abort() this should result in rollback being triggered. 538 * At the end of this call the procedure should be finished and rolledback, if abort is implemnted 539 */ 540 public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec, 541 long procId) throws Exception { 542 ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false); 543 InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); 544 abortListener.addProcId(procId); 545 procExec.registerListener(abortListener); 546 try { 547 ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId); 548 LOG.info("Restart and rollback procId=" + procId); 549 restartMasterProcedureExecutor(procExec); 550 ProcedureTestingUtility.waitProcedure(procExec, procId); 551 } finally { 552 assertTrue(procExec.unregisterListener(abortListener)); 553 } 554 } 555 556 public static class InjectAbortOnLoadListener 557 implements ProcedureExecutor.ProcedureExecutorListener { 558 private final ProcedureExecutor<MasterProcedureEnv> procExec; 559 private TreeSet<Long> procsToAbort = null; 560 561 public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) { 562 this.procExec = procExec; 563 } 564 565 public void addProcId(long procId) { 566 if (procsToAbort == null) { 567 procsToAbort = new TreeSet<>(); 568 } 569 procsToAbort.add(procId); 570 } 571 572 @Override 573 public void procedureLoaded(long procId) { 574 if (procsToAbort != null && !procsToAbort.contains(procId)) { 575 return; 576 } 577 procExec.abort(procId); 578 } 579 580 @Override 581 public void procedureAdded(long procId) { /* no-op */ } 582 583 @Override 584 public void procedureFinished(long procId) { /* no-op */ } 585 } 586}