001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.TreeSet;
028import java.util.concurrent.Callable;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.MetaTableAccessor;
036import org.apache.hadoop.hbase.MiniHBaseCluster;
037import org.apache.hadoop.hbase.RegionLocations;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.BufferedMutator;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.client.TableState;
051import org.apache.hadoop.hbase.master.HMaster;
052import org.apache.hadoop.hbase.master.RegionState;
053import org.apache.hadoop.hbase.master.TableStateManager;
054import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
055import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
056import org.apache.hadoop.hbase.procedure2.Procedure;
057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
058import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
059import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.FSUtils;
062import org.apache.hadoop.hbase.util.MD5Hash;
063import org.apache.hadoop.hbase.util.ModifyRegionUtils;
064import org.apache.yetus.audience.InterfaceAudience;
065import org.slf4j.Logger;
066import org.slf4j.LoggerFactory;
067
068@InterfaceAudience.Private
069public class MasterProcedureTestingUtility {
070  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class);
071
072  private MasterProcedureTestingUtility() { }
073
074  public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec)
075      throws Exception {
076    final MasterProcedureEnv env = procExec.getEnvironment();
077    final HMaster master = (HMaster) env.getMasterServices();
078    ProcedureTestingUtility.restart(procExec, true, true,
079      // stop services
080      new Callable<Void>() {
081        @Override
082        public Void call() throws Exception {
083          AssignmentManager am = env.getAssignmentManager();
084          // try to simulate a master restart by removing the ServerManager states about seqIDs
085          for (RegionState regionState: am.getRegionStates().getRegionStates()) {
086            env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
087          }
088          am.stop();
089          master.setInitialized(false);
090          return null;
091        }
092      },
093      // setup RIT before starting workers
094      new Callable<Void>() {
095
096        @Override
097        public Void call() throws Exception {
098          AssignmentManager am = env.getAssignmentManager();
099          am.start();
100          // just follow the same way with HMaster.finishActiveMasterInitialization. See the
101          // comments there
102          am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess())
103            .filter(p -> p instanceof TransitRegionStateProcedure)
104            .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList()));
105          return null;
106        }
107      },
108      // restart services
109      new Callable<Void>() {
110        @Override
111        public Void call() throws Exception {
112          AssignmentManager am = env.getAssignmentManager();
113          try {
114            am.joinCluster();
115            master.setInitialized(true);
116          } catch (Exception e) {
117            LOG.warn("Failed to load meta", e);
118          }
119          return null;
120        }
121      });
122  }
123
124  // ==========================================================================
125  //  Master failover utils
126  // ==========================================================================
127  public static void masterFailover(final HBaseTestingUtility testUtil)
128      throws Exception {
129    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
130
131    // Kill the master
132    HMaster oldMaster = cluster.getMaster();
133    cluster.killMaster(cluster.getMaster().getServerName());
134
135    // Wait the secondary
136    waitBackupMaster(testUtil, oldMaster);
137  }
138
139  public static void waitBackupMaster(final HBaseTestingUtility testUtil,
140      final HMaster oldMaster) throws Exception {
141    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
142
143    HMaster newMaster = cluster.getMaster();
144    while (newMaster == null || newMaster == oldMaster) {
145      Thread.sleep(250);
146      newMaster = cluster.getMaster();
147    }
148
149    while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
150      Thread.sleep(250);
151    }
152  }
153
154  // ==========================================================================
155  //  Table Helpers
156  // ==========================================================================
157  public static TableDescriptor createHTD(final TableName tableName, final String... family) {
158    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
159    for (int i = 0; i < family.length; ++i) {
160      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i]));
161    }
162    return builder.build();
163  }
164
165  public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec,
166      final TableName tableName, final byte[][] splitKeys, String... family) throws IOException {
167    TableDescriptor htd = createHTD(tableName, family);
168    RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys);
169    long procId = ProcedureTestingUtility.submitAndWait(procExec,
170      new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
171    ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
172    return regions;
173  }
174
175  public static void validateTableCreation(final HMaster master, final TableName tableName,
176      final RegionInfo[] regions, String... family) throws IOException {
177    validateTableCreation(master, tableName, regions, true, family);
178  }
179
180  public static void validateTableCreation(final HMaster master, final TableName tableName,
181      final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException {
182    // check filesystem
183    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
184    final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
185    assertTrue(fs.exists(tableDir));
186    FSUtils.logFileSystemState(fs, tableDir, LOG);
187    List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir);
188    for (int i = 0; i < regions.length; ++i) {
189      Path regionDir = new Path(tableDir, regions[i].getEncodedName());
190      assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir));
191      assertTrue(unwantedRegionDirs.remove(regionDir));
192      List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir);
193      for (int j = 0; j < family.length; ++j) {
194        final Path familyDir = new Path(regionDir, family[j]);
195        if (hasFamilyDirs) {
196          assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir));
197          assertTrue(allFamilyDirs.remove(familyDir));
198        } else {
199          // TODO: WARN: Modify Table/Families does not create a family dir
200          if (!fs.exists(familyDir)) {
201            LOG.warn(family[j] + " family dir does not exist");
202          }
203          allFamilyDirs.remove(familyDir);
204        }
205      }
206      assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty());
207    }
208    assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty());
209    LOG.debug("Table directory layout is as expected.");
210
211    // check meta
212    assertTrue(MetaTableAccessor.tableExists(master.getConnection(), tableName));
213    assertEquals(regions.length, countMetaRegions(master, tableName));
214
215    // check htd
216    TableDescriptor htd = master.getTableDescriptors().get(tableName);
217    assertTrue("table descriptor not found", htd != null);
218    for (int i = 0; i < family.length; ++i) {
219      assertTrue("family not found " + family[i], htd.getColumnFamily(Bytes.toBytes(family[i])) != null);
220    }
221    assertEquals(family.length, htd.getColumnFamilyCount());
222  }
223
224  public static void validateTableDeletion(
225      final HMaster master, final TableName tableName) throws IOException {
226    // check filesystem
227    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
228    final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
229    assertFalse(fs.exists(tableDir));
230
231    // check meta
232    assertFalse(MetaTableAccessor.tableExists(master.getConnection(), tableName));
233    assertEquals(0, countMetaRegions(master, tableName));
234
235    // check htd
236    assertTrue("found htd of deleted table",
237      master.getTableDescriptors().get(tableName) == null);
238  }
239
240  private static int countMetaRegions(final HMaster master, final TableName tableName)
241      throws IOException {
242    final AtomicInteger actualRegCount = new AtomicInteger(0);
243    final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
244      @Override
245      public boolean visit(Result rowResult) throws IOException {
246        RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
247        if (list == null) {
248          LOG.warn("No serialized RegionInfo in " + rowResult);
249          return true;
250        }
251        HRegionLocation l = list.getRegionLocation();
252        if (l == null) {
253          return true;
254        }
255        if (!l.getRegion().getTable().equals(tableName)) {
256          return false;
257        }
258        if (l.getRegion().isOffline() || l.getRegion().isSplit()) {
259          return true;
260        }
261
262        HRegionLocation[] locations = list.getRegionLocations();
263        for (HRegionLocation location : locations) {
264          if (location == null) continue;
265          ServerName serverName = location.getServerName();
266          // Make sure that regions are assigned to server
267          if (serverName != null && serverName.getHostAndPort() != null) {
268            actualRegCount.incrementAndGet();
269          }
270        }
271        return true;
272      }
273    };
274    MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName);
275    return actualRegCount.get();
276  }
277
278  public static void validateTableIsEnabled(final HMaster master, final TableName tableName)
279      throws IOException {
280    TableStateManager tsm = master.getTableStateManager();
281    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED));
282  }
283
284  public static void validateTableIsDisabled(final HMaster master, final TableName tableName)
285      throws IOException {
286    TableStateManager tsm = master.getTableStateManager();
287    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED));
288  }
289
290  public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName,
291      final String family) throws IOException {
292    TableDescriptor htd = master.getTableDescriptors().get(tableName);
293    assertTrue(htd != null);
294
295    assertTrue(htd.hasColumnFamily(Bytes.toBytes(family)));
296  }
297
298  public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName,
299      final String family) throws IOException {
300    // verify htd
301    TableDescriptor htd = master.getTableDescriptors().get(tableName);
302    assertTrue(htd != null);
303    assertFalse(htd.hasColumnFamily(Bytes.toBytes(family)));
304
305    // verify fs
306    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
307    final Path tableDir = FSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
308    for (Path regionDir: FSUtils.getRegionDirs(fs, tableDir)) {
309      final Path familyDir = new Path(regionDir, family);
310      assertFalse(family + " family dir should not exist", fs.exists(familyDir));
311    }
312  }
313
314  public static void validateColumnFamilyModification(final HMaster master,
315      final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor)
316      throws IOException {
317    TableDescriptor htd = master.getTableDescriptors().get(tableName);
318    assertTrue(htd != null);
319
320    ColumnFamilyDescriptor hcfd = htd.getColumnFamily(Bytes.toBytes(family));
321    assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor));
322  }
323
324  public static void loadData(final Connection connection, final TableName tableName,
325      int rows, final byte[][] splitKeys,  final String... sfamilies) throws IOException {
326    byte[][] families = new byte[sfamilies.length][];
327    for (int i = 0; i < families.length; ++i) {
328      families[i] = Bytes.toBytes(sfamilies[i]);
329    }
330
331    BufferedMutator mutator = connection.getBufferedMutator(tableName);
332
333    // Ensure one row per region
334    assertTrue(rows >= splitKeys.length);
335    for (byte[] k: splitKeys) {
336      byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k);
337      byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
338      mutator.mutate(createPut(families, key, value));
339      rows--;
340    }
341
342    // Add other extra rows. more rows, more files
343    while (rows-- > 0) {
344      byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows));
345      byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
346      mutator.mutate(createPut(families, key, value));
347    }
348    mutator.flush();
349  }
350
351  private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
352    byte[] q = Bytes.toBytes("q");
353    Put put = new Put(key);
354    put.setDurability(Durability.SKIP_WAL);
355    for (byte[] family: families) {
356      put.addColumn(family, q, value);
357    }
358    return put;
359  }
360
361  // ==========================================================================
362  //  Procedure Helpers
363  // ==========================================================================
364  public static long generateNonceGroup(final HMaster master) {
365    return master.getAsyncClusterConnection().getNonceGenerator().getNonceGroup();
366  }
367
368  public static long generateNonce(final HMaster master) {
369    return master.getAsyncClusterConnection().getNonceGenerator().newNonce();
370  }
371
372  /**
373   * Run through all procedure flow states TWICE while also restarting procedure executor at each
374   * step; i.e force a reread of procedure store.
375   *
376   *<p>It does
377   * <ol><li>Execute step N - kill the executor before store update
378   * <li>Restart executor/store
379   * <li>Execute step N - and then save to store
380   * </ol>
381   *
382   *<p>This is a good test for finding state that needs persisting and steps that are not
383   * idempotent. Use this version of the test when a procedure executes all flow steps from start to
384   * finish.
385   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long)
386   */
387  public static void testRecoveryAndDoubleExecution(
388      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
389      final int lastStep, final boolean expectExecRunning) throws Exception {
390    ProcedureTestingUtility.waitProcedure(procExec, procId);
391    assertEquals(false, procExec.isRunning());
392
393    // Restart the executor and execute the step twice
394    //   execute step N - kill before store update
395    //   restart executor/store
396    //   execute step N - save on store
397    // NOTE: currently we make assumption that states/ steps are sequential. There are already
398    // instances of a procedures which skip (don't use) intermediate states/ steps. In future,
399    // intermediate states/ steps can be added with ordinal greater than lastStep. If and when
400    // that happens the states can not be treated as sequential steps and the condition in
401    // following while loop needs to be changed. We can use euqals/ not equals operator to check
402    // if the procedure has reached the user specified state. But there is a possibility that
403    // while loop may not get the control back exaclty when the procedure is in lastStep. Proper
404    // fix would be get all visited states by the procedure and then check if user speccified
405    // state is in that list. Current assumption of sequential proregression of steps/ states is
406    // made at multiple places so we can keep while condition below for simplicity.
407    Procedure<?> proc = procExec.getProcedure(procId);
408    int stepNum = proc instanceof StateMachineProcedure ?
409        ((StateMachineProcedure) proc).getCurrentStateId() : 0;
410    for (;;) {
411      if (stepNum == lastStep) {
412        break;
413      }
414      LOG.info("Restart " + stepNum + " exec state=" + proc);
415      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
416      restartMasterProcedureExecutor(procExec);
417      ProcedureTestingUtility.waitProcedure(procExec, procId);
418      // Old proc object is stale, need to get the new one after ProcedureExecutor restart
419      proc = procExec.getProcedure(procId);
420      stepNum = proc instanceof StateMachineProcedure ?
421          ((StateMachineProcedure) proc).getCurrentStateId() : stepNum + 1;
422    }
423
424    assertEquals(expectExecRunning, procExec.isRunning());
425  }
426
427  /**
428   * Run through all procedure flow states TWICE while also restarting
429   * procedure executor at each step; i.e force a reread of procedure store.
430   *
431   *<p>It does
432   * <ol><li>Execute step N - kill the executor before store update
433   * <li>Restart executor/store
434   * <li>Executes hook for each step twice
435   * <li>Execute step N - and then save to store
436   * </ol>
437   *
438   *<p>This is a good test for finding state that needs persisting and steps that are not
439   * idempotent. Use this version of the test when the order in which flow steps are executed is
440   * not start to finish; where the procedure may vary the flow steps dependent on circumstance
441   * found.
442   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean)
443   */
444  public static void testRecoveryAndDoubleExecution(
445      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook)
446      throws Exception {
447    ProcedureTestingUtility.waitProcedure(procExec, procId);
448    assertEquals(false, procExec.isRunning());
449    for (int i = 0; !procExec.isFinished(procId); ++i) {
450      LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId));
451      if (hook != null) {
452        assertTrue(hook.execute(i));
453      }
454      restartMasterProcedureExecutor(procExec);
455      ProcedureTestingUtility.waitProcedure(procExec, procId);
456    }
457    assertEquals(true, procExec.isRunning());
458    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
459  }
460
461  public static void testRecoveryAndDoubleExecution(
462      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception {
463    testRecoveryAndDoubleExecution(procExec, procId, null);
464  }
465
466  /**
467   * Hook which will be executed on each step
468   */
469  public interface StepHook{
470    /**
471     * @param step Step no. at which this will be executed
472     * @return false if test should fail otherwise true
473     * @throws IOException
474     */
475    boolean execute(int step) throws IOException;
476  }
477
478  /**
479   * Execute the procedure up to "lastStep" and then the ProcedureExecutor
480   * is restarted and an abort() is injected.
481   * If the procedure implement abort() this should result in rollback being triggered.
482   * Each rollback step is called twice, by restarting the executor after every step.
483   * At the end of this call the procedure should be finished and rolledback.
484   * This method assert on the procedure being terminated with an AbortException.
485   */
486  public static void testRollbackAndDoubleExecution(
487      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
488      final int lastStep) throws Exception {
489    testRollbackAndDoubleExecution(procExec, procId, lastStep, false);
490  }
491
492  public static void testRollbackAndDoubleExecution(
493      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
494      final int lastStep, boolean waitForAsyncProcs) throws Exception {
495    // Execute up to last step
496    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
497
498    // Restart the executor and rollback the step twice
499    //   rollback step N - kill before store update
500    //   restart executor/store
501    //   rollback step N - save on store
502    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
503    abortListener.addProcId(procId);
504    procExec.registerListener(abortListener);
505    try {
506      for (int i = 0; !procExec.isFinished(procId); ++i) {
507        LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId));
508        ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
509        restartMasterProcedureExecutor(procExec);
510        ProcedureTestingUtility.waitProcedure(procExec, procId);
511      }
512    } finally {
513      assertTrue(procExec.unregisterListener(abortListener));
514    }
515
516    if (waitForAsyncProcs) {
517      // Sometimes there are other procedures still executing (including asynchronously spawned by
518      // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before
519      // store update. Let all pending procedures finish normally.
520      ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
521      // check 3 times to confirm that the procedure executor has not been killed
522      for (int i = 0; i < 3; i++) {
523        if (!procExec.isRunning()) {
524          LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" +
525            " to KillAndToggleBeforeStoreUpdate flag.");
526          restartMasterProcedureExecutor(procExec);
527          break;
528        }
529        Thread.sleep(1000);
530      }
531      ProcedureTestingUtility.waitNoProcedureRunning(procExec);
532    }
533
534    assertEquals(true, procExec.isRunning());
535    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
536  }
537
538  /**
539   * Execute the procedure up to "lastStep" and then the ProcedureExecutor
540   * is restarted and an abort() is injected.
541   * If the procedure implement abort() this should result in rollback being triggered.
542   * At the end of this call the procedure should be finished and rolledback.
543   * This method assert on the procedure being terminated with an AbortException.
544   */
545  public static void testRollbackRetriableFailure(
546      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
547      final int lastStep) throws Exception {
548    // Execute up to last step
549    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
550
551    // execute the rollback
552    testRestartWithAbort(procExec, procId);
553
554    assertEquals(true, procExec.isRunning());
555    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
556  }
557
558  /**
559   * Restart the ProcedureExecutor and inject an abort to the specified procedure.
560   * If the procedure implement abort() this should result in rollback being triggered.
561   * At the end of this call the procedure should be finished and rolledback, if abort is implemnted
562   */
563  public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec,
564      long procId) throws Exception {
565    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
566    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
567    abortListener.addProcId(procId);
568    procExec.registerListener(abortListener);
569    try {
570      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
571      LOG.info("Restart and rollback procId=" + procId);
572      restartMasterProcedureExecutor(procExec);
573      ProcedureTestingUtility.waitProcedure(procExec, procId);
574    } finally {
575      assertTrue(procExec.unregisterListener(abortListener));
576    }
577  }
578
579  public static class InjectAbortOnLoadListener
580      implements ProcedureExecutor.ProcedureExecutorListener {
581    private final ProcedureExecutor<MasterProcedureEnv> procExec;
582    private TreeSet<Long> procsToAbort = null;
583
584    public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) {
585      this.procExec = procExec;
586    }
587
588    public void addProcId(long procId) {
589      if (procsToAbort == null) {
590        procsToAbort = new TreeSet<>();
591      }
592      procsToAbort.add(procId);
593    }
594
595    @Override
596    public void procedureLoaded(long procId) {
597      if (procsToAbort != null && !procsToAbort.contains(procId)) {
598        return;
599      }
600      procExec.abort(procId);
601    }
602
603    @Override
604    public void procedureAdded(long procId) { /* no-op */ }
605
606    @Override
607    public void procedureFinished(long procId) { /* no-op */ }
608  }
609}