001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.TreeSet;
028import java.util.concurrent.Callable;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.MetaTableAccessor;
036import org.apache.hadoop.hbase.MiniHBaseCluster;
037import org.apache.hadoop.hbase.RegionLocations;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.BufferedMutator;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.Durability;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.RegionInfo;
047import org.apache.hadoop.hbase.client.Result;
048import org.apache.hadoop.hbase.client.TableDescriptor;
049import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
050import org.apache.hadoop.hbase.client.TableState;
051import org.apache.hadoop.hbase.master.HMaster;
052import org.apache.hadoop.hbase.master.RegionState;
053import org.apache.hadoop.hbase.master.TableStateManager;
054import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
055import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
056import org.apache.hadoop.hbase.procedure2.Procedure;
057import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
058import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
059import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
060import org.apache.hadoop.hbase.util.Bytes;
061import org.apache.hadoop.hbase.util.CommonFSUtils;
062import org.apache.hadoop.hbase.util.FSUtils;
063import org.apache.hadoop.hbase.util.MD5Hash;
064import org.apache.hadoop.hbase.util.ModifyRegionUtils;
065import org.apache.yetus.audience.InterfaceAudience;
066import org.slf4j.Logger;
067import org.slf4j.LoggerFactory;
068
069@InterfaceAudience.Private
070public class MasterProcedureTestingUtility {
071  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class);
072
073  private MasterProcedureTestingUtility() { }
074
075  public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec)
076      throws Exception {
077    final MasterProcedureEnv env = procExec.getEnvironment();
078    final HMaster master = (HMaster) env.getMasterServices();
079    ProcedureTestingUtility.restart(procExec, true, true,
080      // stop services
081      new Callable<Void>() {
082        @Override
083        public Void call() throws Exception {
084          AssignmentManager am = env.getAssignmentManager();
085          // try to simulate a master restart by removing the ServerManager states about seqIDs
086          for (RegionState regionState: am.getRegionStates().getRegionStates()) {
087            env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
088          }
089          am.stop();
090          master.setInitialized(false);
091          return null;
092        }
093      },
094      // setup RIT before starting workers
095      new Callable<Void>() {
096
097        @Override
098        public Void call() throws Exception {
099          AssignmentManager am = env.getAssignmentManager();
100          am.start();
101          // just follow the same way with HMaster.finishActiveMasterInitialization. See the
102          // comments there
103          am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess())
104            .filter(p -> p instanceof TransitRegionStateProcedure)
105            .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList()));
106          return null;
107        }
108      },
109      // restart services
110      new Callable<Void>() {
111        @Override
112        public Void call() throws Exception {
113          AssignmentManager am = env.getAssignmentManager();
114          try {
115            am.joinCluster();
116            master.setInitialized(true);
117          } catch (Exception e) {
118            LOG.warn("Failed to load meta", e);
119          }
120          return null;
121        }
122      });
123  }
124
125  // ==========================================================================
126  //  Master failover utils
127  // ==========================================================================
128  public static void masterFailover(final HBaseTestingUtility testUtil)
129      throws Exception {
130    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
131
132    // Kill the master
133    HMaster oldMaster = cluster.getMaster();
134    cluster.killMaster(cluster.getMaster().getServerName());
135
136    // Wait the secondary
137    waitBackupMaster(testUtil, oldMaster);
138  }
139
140  public static void waitBackupMaster(final HBaseTestingUtility testUtil,
141      final HMaster oldMaster) throws Exception {
142    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
143
144    HMaster newMaster = cluster.getMaster();
145    while (newMaster == null || newMaster == oldMaster) {
146      Thread.sleep(250);
147      newMaster = cluster.getMaster();
148    }
149
150    while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
151      Thread.sleep(250);
152    }
153  }
154
155  // ==========================================================================
156  //  Table Helpers
157  // ==========================================================================
158  public static TableDescriptor createHTD(final TableName tableName, final String... family) {
159    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
160    for (int i = 0; i < family.length; ++i) {
161      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i]));
162    }
163    return builder.build();
164  }
165
166  public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec,
167      final TableName tableName, final byte[][] splitKeys, String... family) throws IOException {
168    TableDescriptor htd = createHTD(tableName, family);
169    RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys);
170    long procId = ProcedureTestingUtility.submitAndWait(procExec,
171      new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
172    ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
173    return regions;
174  }
175
176  public static void validateTableCreation(final HMaster master, final TableName tableName,
177      final RegionInfo[] regions, String... family) throws IOException {
178    validateTableCreation(master, tableName, regions, true, family);
179  }
180
181  public static void validateTableCreation(final HMaster master, final TableName tableName,
182      final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException {
183    // check filesystem
184    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
185    final Path tableDir =
186      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
187    assertTrue(fs.exists(tableDir));
188    CommonFSUtils.logFileSystemState(fs, tableDir, LOG);
189    List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir);
190    for (int i = 0; i < regions.length; ++i) {
191      Path regionDir = new Path(tableDir, regions[i].getEncodedName());
192      assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir));
193      assertTrue(unwantedRegionDirs.remove(regionDir));
194      List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir);
195      for (int j = 0; j < family.length; ++j) {
196        final Path familyDir = new Path(regionDir, family[j]);
197        if (hasFamilyDirs) {
198          assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir));
199          assertTrue(allFamilyDirs.remove(familyDir));
200        } else {
201          // TODO: WARN: Modify Table/Families does not create a family dir
202          if (!fs.exists(familyDir)) {
203            LOG.warn(family[j] + " family dir does not exist");
204          }
205          allFamilyDirs.remove(familyDir);
206        }
207      }
208      assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty());
209    }
210    assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty());
211    LOG.debug("Table directory layout is as expected.");
212
213    // check meta
214    assertTrue(MetaTableAccessor.tableExists(master.getConnection(), tableName));
215    assertEquals(regions.length, countMetaRegions(master, tableName));
216
217    // check htd
218    TableDescriptor htd = master.getTableDescriptors().get(tableName);
219    assertTrue("table descriptor not found", htd != null);
220    for (int i = 0; i < family.length; ++i) {
221      assertTrue("family not found " + family[i], htd.getColumnFamily(Bytes.toBytes(family[i])) != null);
222    }
223    assertEquals(family.length, htd.getColumnFamilyCount());
224  }
225
226  public static void validateTableDeletion(
227      final HMaster master, final TableName tableName) throws IOException {
228    // check filesystem
229    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
230    final Path tableDir =
231      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
232    assertFalse(fs.exists(tableDir));
233
234    // check meta
235    assertFalse(MetaTableAccessor.tableExists(master.getConnection(), tableName));
236    assertEquals(0, countMetaRegions(master, tableName));
237
238    // check htd
239    assertTrue("found htd of deleted table",
240      master.getTableDescriptors().get(tableName) == null);
241  }
242
243  private static int countMetaRegions(final HMaster master, final TableName tableName)
244      throws IOException {
245    final AtomicInteger actualRegCount = new AtomicInteger(0);
246    final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
247      @Override
248      public boolean visit(Result rowResult) throws IOException {
249        RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
250        if (list == null) {
251          LOG.warn("No serialized RegionInfo in " + rowResult);
252          return true;
253        }
254        HRegionLocation l = list.getRegionLocation();
255        if (l == null) {
256          return true;
257        }
258        if (!l.getRegionInfo().getTable().equals(tableName)) {
259          return false;
260        }
261        if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
262        HRegionLocation[] locations = list.getRegionLocations();
263        for (HRegionLocation location : locations) {
264          if (location == null) continue;
265          ServerName serverName = location.getServerName();
266          // Make sure that regions are assigned to server
267          if (serverName != null && serverName.getHostAndPort() != null) {
268            actualRegCount.incrementAndGet();
269          }
270        }
271        return true;
272      }
273    };
274    MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName);
275    return actualRegCount.get();
276  }
277
278  public static void validateTableIsEnabled(final HMaster master, final TableName tableName)
279      throws IOException {
280    TableStateManager tsm = master.getTableStateManager();
281    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED));
282  }
283
284  public static void validateTableIsDisabled(final HMaster master, final TableName tableName)
285      throws IOException {
286    TableStateManager tsm = master.getTableStateManager();
287    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED));
288  }
289
290  public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName,
291      final String family) throws IOException {
292    TableDescriptor htd = master.getTableDescriptors().get(tableName);
293    assertTrue(htd != null);
294
295    assertTrue(htd.hasColumnFamily(family.getBytes()));
296  }
297
298  public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName,
299      final String family) throws IOException {
300    // verify htd
301    TableDescriptor htd = master.getTableDescriptors().get(tableName);
302    assertTrue(htd != null);
303    assertFalse(htd.hasColumnFamily(family.getBytes()));
304
305    // verify fs
306    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
307    final Path tableDir =
308      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
309    for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) {
310      final Path familyDir = new Path(regionDir, family);
311      assertFalse(family + " family dir should not exist", fs.exists(familyDir));
312    }
313  }
314
315  public static void validateColumnFamilyModification(final HMaster master,
316      final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor)
317      throws IOException {
318    TableDescriptor htd = master.getTableDescriptors().get(tableName);
319    assertTrue(htd != null);
320
321    ColumnFamilyDescriptor hcfd = htd.getColumnFamily(family.getBytes());
322    assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor));
323  }
324
325  public static void loadData(final Connection connection, final TableName tableName,
326      int rows, final byte[][] splitKeys,  final String... sfamilies) throws IOException {
327    byte[][] families = new byte[sfamilies.length][];
328    for (int i = 0; i < families.length; ++i) {
329      families[i] = Bytes.toBytes(sfamilies[i]);
330    }
331
332    BufferedMutator mutator = connection.getBufferedMutator(tableName);
333
334    // Ensure one row per region
335    assertTrue(rows >= splitKeys.length);
336    for (byte[] k: splitKeys) {
337      byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), k);
338      byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
339      mutator.mutate(createPut(families, key, value));
340      rows--;
341    }
342
343    // Add other extra rows. more rows, more files
344    while (rows-- > 0) {
345      byte[] value = Bytes.add(Bytes.toBytes(System.currentTimeMillis()), Bytes.toBytes(rows));
346      byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
347      mutator.mutate(createPut(families, key, value));
348    }
349    mutator.flush();
350  }
351
352  private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
353    byte[] q = Bytes.toBytes("q");
354    Put put = new Put(key);
355    put.setDurability(Durability.SKIP_WAL);
356    for (byte[] family: families) {
357      put.addColumn(family, q, value);
358    }
359    return put;
360  }
361
362  // ==========================================================================
363  //  Procedure Helpers
364  // ==========================================================================
365  public static long generateNonceGroup(final HMaster master) {
366    return master.getClusterConnection().getNonceGenerator().getNonceGroup();
367  }
368
369  public static long generateNonce(final HMaster master) {
370    return master.getClusterConnection().getNonceGenerator().newNonce();
371  }
372
373  /**
374   * Run through all procedure flow states TWICE while also restarting procedure executor at each
375   * step; i.e force a reread of procedure store.
376   *
377   *<p>It does
378   * <ol><li>Execute step N - kill the executor before store update
379   * <li>Restart executor/store
380   * <li>Execute step N - and then save to store
381   * </ol>
382   *
383   *<p>This is a good test for finding state that needs persisting and steps that are not
384   * idempotent. Use this version of the test when a procedure executes all flow steps from start to
385   * finish.
386   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long)
387   */
388  public static void testRecoveryAndDoubleExecution(
389      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
390      final int lastStep, final boolean expectExecRunning) throws Exception {
391    ProcedureTestingUtility.waitProcedure(procExec, procId);
392    assertEquals(false, procExec.isRunning());
393
394    // Restart the executor and execute the step twice
395    //   execute step N - kill before store update
396    //   restart executor/store
397    //   execute step N - save on store
398    // NOTE: currently we make assumption that states/ steps are sequential. There are already
399    // instances of a procedures which skip (don't use) intermediate states/ steps. In future,
400    // intermediate states/ steps can be added with ordinal greater than lastStep. If and when
401    // that happens the states can not be treated as sequential steps and the condition in
402    // following while loop needs to be changed. We can use euqals/ not equals operator to check
403    // if the procedure has reached the user specified state. But there is a possibility that
404    // while loop may not get the control back exaclty when the procedure is in lastStep. Proper
405    // fix would be get all visited states by the procedure and then check if user speccified
406    // state is in that list. Current assumption of sequential proregression of steps/ states is
407    // made at multiple places so we can keep while condition below for simplicity.
408    Procedure<?> proc = procExec.getProcedure(procId);
409    int stepNum = proc instanceof StateMachineProcedure ?
410        ((StateMachineProcedure) proc).getCurrentStateId() : 0;
411    for (;;) {
412      if (stepNum == lastStep) {
413        break;
414      }
415      LOG.info("Restart " + stepNum + " exec state=" + proc);
416      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
417      restartMasterProcedureExecutor(procExec);
418      ProcedureTestingUtility.waitProcedure(procExec, procId);
419      // Old proc object is stale, need to get the new one after ProcedureExecutor restart
420      proc = procExec.getProcedure(procId);
421      stepNum = proc instanceof StateMachineProcedure ?
422          ((StateMachineProcedure) proc).getCurrentStateId() : stepNum + 1;
423    }
424
425    assertEquals(expectExecRunning, procExec.isRunning());
426  }
427
428  /**
429   * Run through all procedure flow states TWICE while also restarting
430   * procedure executor at each step; i.e force a reread of procedure store.
431   *
432   *<p>It does
433   * <ol><li>Execute step N - kill the executor before store update
434   * <li>Restart executor/store
435   * <li>Executes hook for each step twice
436   * <li>Execute step N - and then save to store
437   * </ol>
438   *
439   *<p>This is a good test for finding state that needs persisting and steps that are not
440   * idempotent. Use this version of the test when the order in which flow steps are executed is
441   * not start to finish; where the procedure may vary the flow steps dependent on circumstance
442   * found.
443   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean)
444   */
445  public static void testRecoveryAndDoubleExecution(
446      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook)
447      throws Exception {
448    ProcedureTestingUtility.waitProcedure(procExec, procId);
449    assertEquals(false, procExec.isRunning());
450    for (int i = 0; !procExec.isFinished(procId); ++i) {
451      LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId));
452      if (hook != null) {
453        assertTrue(hook.execute(i));
454      }
455      restartMasterProcedureExecutor(procExec);
456      ProcedureTestingUtility.waitProcedure(procExec, procId);
457    }
458    assertEquals(true, procExec.isRunning());
459    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
460  }
461
462  public static void testRecoveryAndDoubleExecution(
463      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception {
464    testRecoveryAndDoubleExecution(procExec, procId, null);
465  }
466
467  /**
468   * Hook which will be executed on each step
469   */
470  public interface StepHook{
471    /**
472     * @param step Step no. at which this will be executed
473     * @return false if test should fail otherwise true
474     * @throws IOException
475     */
476    boolean execute(int step) throws IOException;
477  }
478
479  /**
480   * Execute the procedure up to "lastStep" and then the ProcedureExecutor
481   * is restarted and an abort() is injected.
482   * If the procedure implement abort() this should result in rollback being triggered.
483   * Each rollback step is called twice, by restarting the executor after every step.
484   * At the end of this call the procedure should be finished and rolledback.
485   * This method assert on the procedure being terminated with an AbortException.
486   */
487  public static void testRollbackAndDoubleExecution(
488      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
489      final int lastStep) throws Exception {
490    testRollbackAndDoubleExecution(procExec, procId, lastStep, false);
491  }
492
493  public static void testRollbackAndDoubleExecution(
494      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
495      final int lastStep, boolean waitForAsyncProcs) throws Exception {
496    // Execute up to last step
497    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
498
499    // Restart the executor and rollback the step twice
500    //   rollback step N - kill before store update
501    //   restart executor/store
502    //   rollback step N - save on store
503    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
504    abortListener.addProcId(procId);
505    procExec.registerListener(abortListener);
506    try {
507      for (int i = 0; !procExec.isFinished(procId); ++i) {
508        LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId));
509        ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
510        restartMasterProcedureExecutor(procExec);
511        ProcedureTestingUtility.waitProcedure(procExec, procId);
512      }
513    } finally {
514      assertTrue(procExec.unregisterListener(abortListener));
515    }
516
517    if (waitForAsyncProcs) {
518      // Sometimes there are other procedures still executing (including asynchronously spawned by
519      // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before
520      // store update. Let all pending procedures finish normally.
521      ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
522      // check 3 times to confirm that the procedure executor has not been killed
523      for (int i = 0; i < 3; i++) {
524        if (!procExec.isRunning()) {
525          LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" +
526            " to KillAndToggleBeforeStoreUpdate flag.");
527          restartMasterProcedureExecutor(procExec);
528          break;
529        }
530        Thread.sleep(1000);
531      }
532      ProcedureTestingUtility.waitNoProcedureRunning(procExec);
533    }
534
535    assertEquals(true, procExec.isRunning());
536    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
537  }
538
539  /**
540   * Execute the procedure up to "lastStep" and then the ProcedureExecutor
541   * is restarted and an abort() is injected.
542   * If the procedure implement abort() this should result in rollback being triggered.
543   * At the end of this call the procedure should be finished and rolledback.
544   * This method assert on the procedure being terminated with an AbortException.
545   */
546  public static void testRollbackRetriableFailure(
547      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
548      final int lastStep) throws Exception {
549    // Execute up to last step
550    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
551
552    // execute the rollback
553    testRestartWithAbort(procExec, procId);
554
555    assertEquals(true, procExec.isRunning());
556    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
557  }
558
559  /**
560   * Restart the ProcedureExecutor and inject an abort to the specified procedure.
561   * If the procedure implement abort() this should result in rollback being triggered.
562   * At the end of this call the procedure should be finished and rolledback, if abort is implemnted
563   */
564  public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec,
565      long procId) throws Exception {
566    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
567    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
568    abortListener.addProcId(procId);
569    procExec.registerListener(abortListener);
570    try {
571      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
572      LOG.info("Restart and rollback procId=" + procId);
573      restartMasterProcedureExecutor(procExec);
574      ProcedureTestingUtility.waitProcedure(procExec, procId);
575    } finally {
576      assertTrue(procExec.unregisterListener(abortListener));
577    }
578  }
579
580  public static class InjectAbortOnLoadListener
581      implements ProcedureExecutor.ProcedureExecutorListener {
582    private final ProcedureExecutor<MasterProcedureEnv> procExec;
583    private TreeSet<Long> procsToAbort = null;
584
585    public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) {
586      this.procExec = procExec;
587    }
588
589    public void addProcId(long procId) {
590      if (procsToAbort == null) {
591        procsToAbort = new TreeSet<>();
592      }
593      procsToAbort.add(procId);
594    }
595
596    @Override
597    public void procedureLoaded(long procId) {
598      if (procsToAbort != null && !procsToAbort.contains(procId)) {
599        return;
600      }
601      procExec.abort(procId);
602    }
603
604    @Override
605    public void procedureAdded(long procId) { /* no-op */ }
606
607    @Override
608    public void procedureFinished(long procId) { /* no-op */ }
609  }
610}