001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.TreeSet;
028import java.util.concurrent.Callable;
029import java.util.concurrent.atomic.AtomicInteger;
030import org.apache.hadoop.fs.FileSystem;
031import org.apache.hadoop.fs.Path;
032import org.apache.hadoop.hbase.HBaseTestingUtility;
033import org.apache.hadoop.hbase.HRegionLocation;
034import org.apache.hadoop.hbase.MetaTableAccessor;
035import org.apache.hadoop.hbase.MiniHBaseCluster;
036import org.apache.hadoop.hbase.RegionLocations;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.BufferedMutator;
040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.Durability;
044import org.apache.hadoop.hbase.client.Put;
045import org.apache.hadoop.hbase.client.RegionInfo;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.TableDescriptor;
048import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
049import org.apache.hadoop.hbase.client.TableState;
050import org.apache.hadoop.hbase.master.HMaster;
051import org.apache.hadoop.hbase.master.RegionState;
052import org.apache.hadoop.hbase.master.TableStateManager;
053import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
054import org.apache.hadoop.hbase.procedure2.Procedure;
055import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
056import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
057import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
058import org.apache.hadoop.hbase.util.Bytes;
059import org.apache.hadoop.hbase.util.FSUtils;
060import org.apache.hadoop.hbase.util.MD5Hash;
061import org.apache.hadoop.hbase.util.ModifyRegionUtils;
062import org.apache.yetus.audience.InterfaceAudience;
063import org.slf4j.Logger;
064import org.slf4j.LoggerFactory;
065
066@InterfaceAudience.Private
067public class MasterProcedureTestingUtility {
068  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class);
069
070  private MasterProcedureTestingUtility() { }
071
072  public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec)
073      throws Exception {
074    final MasterProcedureEnv env = procExec.getEnvironment();
075    final HMaster master = (HMaster)env.getMasterServices();
076    ProcedureTestingUtility.restart(procExec, true, true,
077      // stop services
078      new Callable<Void>() {
079        @Override
080        public Void call() throws Exception {
081          final AssignmentManager am = env.getAssignmentManager();
082          // try to simulate a master restart by removing the ServerManager states about seqIDs
083          for (RegionState regionState: am.getRegionStates().getRegionStates()) {
084            env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
085          }
086          am.stop();
087          master.setInitialized(false);
088          return null;
089        }
090      },
091      // restart services
092      new Callable<Void>() {
093        @Override
094        public Void call() throws Exception {
095          final AssignmentManager am = env.getAssignmentManager();
096          am.start();
097          am.joinCluster();
098          master.setInitialized(true);
099          return null;
100        }
101      });
102  }
103
104  // ==========================================================================
105  //  Master failover utils
106  // ==========================================================================
107  public static void masterFailover(final HBaseTestingUtility testUtil)
108      throws Exception {
109    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
110
111    // Kill the master
112    HMaster oldMaster = cluster.getMaster();
113    cluster.killMaster(cluster.getMaster().getServerName());
114
115    // Wait the secondary
116    waitBackupMaster(testUtil, oldMaster);
117  }
118
119  public static void waitBackupMaster(final HBaseTestingUtility testUtil,
120      final HMaster oldMaster) throws Exception {
121    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
122
123    HMaster newMaster = cluster.getMaster();
124    while (newMaster == null || newMaster == oldMaster) {
125      Thread.sleep(250);
126      newMaster = cluster.getMaster();
127    }
128
129    while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
130      Thread.sleep(250);
131    }
132  }
133
134  // ==========================================================================
135  //  Table Helpers
136  // ==========================================================================
137  public static TableDescriptor createHTD(final TableName tableName, final String... family) {
138    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
139    for (int i = 0; i < family.length; ++i) {
140      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i]));
141    }
142    return builder.build();
143  }
144
145  public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec,
146      final TableName tableName, final byte[][] splitKeys, String... family) throws IOException {
147    TableDescriptor htd = createHTD(tableName, family);
148    RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys);
149    long procId = ProcedureTestingUtility.submitAndWait(procExec,
150      new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
151    ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
152    return regions;
153  }
154
155  public static void validateTableCreation(final HMaster master, final TableName tableName,
156      final RegionInfo[] regions, String... family) throws IOException {
157    validateTableCreation(master, tableName, regions, true, family);
158  }
159
160  public static void validateTableCreation(final HMaster master, final TableName tableName,
161      final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException {
162    // check filesystem
163    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
164    final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
165    assertTrue(fs.exists(tableDir));
166    FSUtils.logFileSystemState(fs, tableDir, LOG);
167    List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir);
168    for (int i = 0; i < regions.length; ++i) {
169      Path regionDir = new Path(tableDir, regions[i].getEncodedName());
170      assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir));
171      assertTrue(unwantedRegionDirs.remove(regionDir));
172      List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir);
173      for (int j = 0; j < family.length; ++j) {
174        final Path familyDir = new Path(regionDir, family[j]);
175        if (hasFamilyDirs) {
176          assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir));
177          assertTrue(allFamilyDirs.remove(familyDir));
178        } else {
179          // TODO: WARN: Modify Table/Families does not create a family dir
180          if (!fs.exists(familyDir)) {
181            LOG.warn(family[j] + " family dir does not exist");
182          }
183          allFamilyDirs.remove(familyDir);
184        }
185      }
186      assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty());
187    }
188    assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty());
189    LOG.debug("Table directory layout is as expected.");
190
191    // check meta
192    assertTrue(MetaTableAccessor.tableExists(master.getConnection(), tableName));
193    assertEquals(regions.length, countMetaRegions(master, tableName));
194
195    // check htd
196    TableDescriptor htd = master.getTableDescriptors().get(tableName);
197    assertTrue("table descriptor not found", htd != null);
198    for (int i = 0; i < family.length; ++i) {
199      assertTrue("family not found " + family[i], htd.getColumnFamily(Bytes.toBytes(family[i])) != null);
200    }
201    assertEquals(family.length, htd.getColumnFamilyCount());
202  }
203
204  public static void validateTableDeletion(
205      final HMaster master, final TableName tableName) throws IOException {
206    // check filesystem
207    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
208    final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
209    assertFalse(fs.exists(tableDir));
210
211    // check meta
212    assertFalse(MetaTableAccessor.tableExists(master.getConnection(), tableName));
213    assertEquals(0, countMetaRegions(master, tableName));
214
215    // check htd
216    assertTrue("found htd of deleted table",
217      master.getTableDescriptors().get(tableName) == null);
218  }
219
220  private static int countMetaRegions(final HMaster master, final TableName tableName)
221      throws IOException {
222    final AtomicInteger actualRegCount = new AtomicInteger(0);
223    final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
224      @Override
225      public boolean visit(Result rowResult) throws IOException {
226        RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
227        if (list == null) {
228          LOG.warn("No serialized RegionInfo in " + rowResult);
229          return true;
230        }
231        HRegionLocation l = list.getRegionLocation();
232        if (l == null) {
233          return true;
234        }
235        if (!l.getRegionInfo().getTable().equals(tableName)) {
236          return false;
237        }
238        if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
239        HRegionLocation[] locations = list.getRegionLocations();
240        for (HRegionLocation location : locations) {
241          if (location == null) continue;
242          ServerName serverName = location.getServerName();
243          // Make sure that regions are assigned to server
244          if (serverName != null && serverName.getHostAndPort() != null) {
245            actualRegCount.incrementAndGet();
246          }
247        }
248        return true;
249      }
250    };
251    MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName);
252    return actualRegCount.get();
253  }
254
255  public static void validateTableIsEnabled(final HMaster master, final TableName tableName)
256      throws IOException {
257    TableStateManager tsm = master.getTableStateManager();
258    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED));
259  }
260
261  public static void validateTableIsDisabled(final HMaster master, final TableName tableName)
262      throws IOException {
263    TableStateManager tsm = master.getTableStateManager();
264    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED));
265  }
266
267  public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName,
268      final String family) throws IOException {
269    TableDescriptor htd = master.getTableDescriptors().get(tableName);
270    assertTrue(htd != null);
271
272    assertTrue(htd.hasColumnFamily(family.getBytes()));
273  }
274
275  public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName,
276      final String family) throws IOException {
277    // verify htd
278    TableDescriptor htd = master.getTableDescriptors().get(tableName);
279    assertTrue(htd != null);
280    assertFalse(htd.hasColumnFamily(family.getBytes()));
281
282    // verify fs
283    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
284    final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
285    for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
286      final Path familyDir = new Path(regionDir, family);
287      assertFalse(family + " family dir should not exist", fs.exists(familyDir));
288    }
289  }
290
291  public static void validateColumnFamilyModification(final HMaster master,
292      final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor)
293      throws IOException {
294    TableDescriptor htd = master.getTableDescriptors().get(tableName);
295    assertTrue(htd != null);
296
297    ColumnFamilyDescriptor hcfd = htd.getColumnFamily(family.getBytes());
298    assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor));
299  }
300
301  public static void loadData(final Connection connection, final TableName tableName,
302      int rows, final byte[][] splitKeys,  final String... sfamilies) throws IOException {
303    byte[][] families = new byte[sfamilies.length][];
304    for (int i = 0; i < families.length; ++i) {
305      families[i] = Bytes.toBytes(sfamilies[i]);
306    }
307
308    BufferedMutator mutator = connection.getBufferedMutator(tableName);
309
310    // Ensure one row per region
311    assertTrue(rows >= splitKeys.length);
312    for (byte[] k: splitKeys) {
313      byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k);
314      byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
315      mutator.mutate(createPut(families, key, value));
316      rows--;
317    }
318
319    // Add other extra rows. more rows, more files
320    while (rows-- > 0) {
321      byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows));
322      byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
323      mutator.mutate(createPut(families, key, value));
324    }
325    mutator.flush();
326  }
327
328  private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
329    byte[] q = Bytes.toBytes("q");
330    Put put = new Put(key);
331    put.setDurability(Durability.SKIP_WAL);
332    for (byte[] family: families) {
333      put.addColumn(family, q, value);
334    }
335    return put;
336  }
337
338  // ==========================================================================
339  //  Procedure Helpers
340  // ==========================================================================
341  public static long generateNonceGroup(final HMaster master) {
342    return master.getClusterConnection().getNonceGenerator().getNonceGroup();
343  }
344
345  public static long generateNonce(final HMaster master) {
346    return master.getClusterConnection().getNonceGenerator().newNonce();
347  }
348
349  /**
350   * Run through all procedure flow states TWICE while also restarting procedure executor at each
351   * step; i.e force a reread of procedure store.
352   *
353   *<p>It does
354   * <ol><li>Execute step N - kill the executor before store update
355   * <li>Restart executor/store
356   * <li>Execute step N - and then save to store
357   * </ol>
358   *
359   *<p>This is a good test for finding state that needs persisting and steps that are not
360   * idempotent. Use this version of the test when a procedure executes all flow steps from start to
361   * finish.
362   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long)
363   */
364  public static void testRecoveryAndDoubleExecution(
365      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
366      final int lastStep, final boolean expectExecRunning) throws Exception {
367    ProcedureTestingUtility.waitProcedure(procExec, procId);
368    assertEquals(false, procExec.isRunning());
369
370    // Restart the executor and execute the step twice
371    //   execute step N - kill before store update
372    //   restart executor/store
373    //   execute step N - save on store
374    // NOTE: currently we make assumption that states/ steps are sequential. There are already
375    // instances of a procedures which skip (don't use) intermediate states/ steps. In future,
376    // intermediate states/ steps can be added with ordinal greater than lastStep. If and when
377    // that happens the states can not be treated as sequential steps and the condition in
378    // following while loop needs to be changed. We can use euqals/ not equals operator to check
379    // if the procedure has reached the user specified state. But there is a possibility that
380    // while loop may not get the control back exaclty when the procedure is in lastStep. Proper
381    // fix would be get all visited states by the procedure and then check if user speccified
382    // state is in that list. Current assumption of sequential proregression of steps/ states is
383    // made at multiple places so we can keep while condition below for simplicity.
384    Procedure<?> proc = procExec.getProcedure(procId);
385    int stepNum = proc instanceof StateMachineProcedure ?
386        ((StateMachineProcedure) proc).getCurrentStateId() : 0;
387    for (;;) {
388      if (stepNum == lastStep) {
389        break;
390      }
391      LOG.info("Restart " + stepNum + " exec state=" + proc);
392      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
393      restartMasterProcedureExecutor(procExec);
394      ProcedureTestingUtility.waitProcedure(procExec, procId);
395      // Old proc object is stale, need to get the new one after ProcedureExecutor restart
396      proc = procExec.getProcedure(procId);
397      stepNum = proc instanceof StateMachineProcedure ?
398          ((StateMachineProcedure) proc).getCurrentStateId() : stepNum + 1;
399    }
400
401    assertEquals(expectExecRunning, procExec.isRunning());
402  }
403
404  /**
405   * Run through all procedure flow states TWICE while also restarting
406   * procedure executor at each step; i.e force a reread of procedure store.
407   *
408   *<p>It does
409   * <ol><li>Execute step N - kill the executor before store update
410   * <li>Restart executor/store
411   * <li>Executes hook for each step twice
412   * <li>Execute step N - and then save to store
413   * </ol>
414   *
415   *<p>This is a good test for finding state that needs persisting and steps that are not
416   * idempotent. Use this version of the test when the order in which flow steps are executed is
417   * not start to finish; where the procedure may vary the flow steps dependent on circumstance
418   * found.
419   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean)
420   */
421  public static void testRecoveryAndDoubleExecution(
422      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook)
423      throws Exception {
424    ProcedureTestingUtility.waitProcedure(procExec, procId);
425    assertEquals(false, procExec.isRunning());
426    for (int i = 0; !procExec.isFinished(procId); ++i) {
427      LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId));
428      if (hook != null) {
429        assertTrue(hook.execute(i));
430      }
431      restartMasterProcedureExecutor(procExec);
432      ProcedureTestingUtility.waitProcedure(procExec, procId);
433    }
434    assertEquals(true, procExec.isRunning());
435    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
436  }
437
438  public static void testRecoveryAndDoubleExecution(
439      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception {
440    testRecoveryAndDoubleExecution(procExec, procId, null);
441  }
442
443  /**
444   * Hook which will be executed on each step
445   */
446  public interface StepHook{
447    /**
448     * @param step Step no. at which this will be executed
449     * @return false if test should fail otherwise true
450     * @throws IOException
451     */
452    boolean execute(int step) throws IOException;
453  }
454
455  /**
456   * Execute the procedure up to "lastStep" and then the ProcedureExecutor
457   * is restarted and an abort() is injected.
458   * If the procedure implement abort() this should result in rollback being triggered.
459   * Each rollback step is called twice, by restarting the executor after every step.
460   * At the end of this call the procedure should be finished and rolledback.
461   * This method assert on the procedure being terminated with an AbortException.
462   */
463  public static void testRollbackAndDoubleExecution(
464      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
465      final int lastStep) throws Exception {
466    testRollbackAndDoubleExecution(procExec, procId, lastStep, false);
467  }
468
469  public static void testRollbackAndDoubleExecution(
470      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
471      final int lastStep, boolean waitForAsyncProcs) throws Exception {
472    // Execute up to last step
473    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
474
475    // Restart the executor and rollback the step twice
476    //   rollback step N - kill before store update
477    //   restart executor/store
478    //   rollback step N - save on store
479    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
480    abortListener.addProcId(procId);
481    procExec.registerListener(abortListener);
482    try {
483      for (int i = 0; !procExec.isFinished(procId); ++i) {
484        LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId));
485        ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
486        restartMasterProcedureExecutor(procExec);
487        ProcedureTestingUtility.waitProcedure(procExec, procId);
488      }
489    } finally {
490      assertTrue(procExec.unregisterListener(abortListener));
491    }
492
493    if (waitForAsyncProcs) {
494      // Sometimes there are other procedures still executing (including asynchronously spawned by
495      // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before
496      // store update. Let all pending procedures finish normally.
497      ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
498      // check 3 times to confirm that the procedure executor has not been killed
499      for (int i = 0; i < 3; i++) {
500        if (!procExec.isRunning()) {
501          LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" +
502            " to KillAndToggleBeforeStoreUpdate flag.");
503          restartMasterProcedureExecutor(procExec);
504          break;
505        }
506        Thread.sleep(1000);
507      }
508      ProcedureTestingUtility.waitNoProcedureRunning(procExec);
509    }
510
511    assertEquals(true, procExec.isRunning());
512    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
513  }
514
515  /**
516   * Execute the procedure up to "lastStep" and then the ProcedureExecutor
517   * is restarted and an abort() is injected.
518   * If the procedure implement abort() this should result in rollback being triggered.
519   * At the end of this call the procedure should be finished and rolledback.
520   * This method assert on the procedure being terminated with an AbortException.
521   */
522  public static void testRollbackRetriableFailure(
523      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
524      final int lastStep) throws Exception {
525    // Execute up to last step
526    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
527
528    // execute the rollback
529    testRestartWithAbort(procExec, procId);
530
531    assertEquals(true, procExec.isRunning());
532    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
533  }
534
535  /**
536   * Restart the ProcedureExecutor and inject an abort to the specified procedure.
537   * If the procedure implement abort() this should result in rollback being triggered.
538   * At the end of this call the procedure should be finished and rolledback, if abort is implemnted
539   */
540  public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec,
541      long procId) throws Exception {
542    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
543    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
544    abortListener.addProcId(procId);
545    procExec.registerListener(abortListener);
546    try {
547      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
548      LOG.info("Restart and rollback procId=" + procId);
549      restartMasterProcedureExecutor(procExec);
550      ProcedureTestingUtility.waitProcedure(procExec, procId);
551    } finally {
552      assertTrue(procExec.unregisterListener(abortListener));
553    }
554  }
555
556  public static class InjectAbortOnLoadListener
557      implements ProcedureExecutor.ProcedureExecutorListener {
558    private final ProcedureExecutor<MasterProcedureEnv> procExec;
559    private TreeSet<Long> procsToAbort = null;
560
561    public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) {
562      this.procExec = procExec;
563    }
564
565    public void addProcId(long procId) {
566      if (procsToAbort == null) {
567        procsToAbort = new TreeSet<>();
568      }
569      procsToAbort.add(procId);
570    }
571
572    @Override
573    public void procedureLoaded(long procId) {
574      if (procsToAbort != null && !procsToAbort.contains(procId)) {
575        return;
576      }
577      procExec.abort(procId);
578    }
579
580    @Override
581    public void procedureAdded(long procId) { /* no-op */ }
582
583    @Override
584    public void procedureFinished(long procId) { /* no-op */ }
585  }
586}