001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.TreeSet;
028import java.util.concurrent.Callable;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HRegionLocation;
035import org.apache.hadoop.hbase.MetaTableAccessor;
036import org.apache.hadoop.hbase.MiniHBaseCluster;
037import org.apache.hadoop.hbase.RegionLocations;
038import org.apache.hadoop.hbase.ServerName;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.client.Admin;
041import org.apache.hadoop.hbase.client.BufferedMutator;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
043import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Connection;
045import org.apache.hadoop.hbase.client.Durability;
046import org.apache.hadoop.hbase.client.Put;
047import org.apache.hadoop.hbase.client.RegionInfo;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.TableDescriptor;
050import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
051import org.apache.hadoop.hbase.client.TableState;
052import org.apache.hadoop.hbase.master.HMaster;
053import org.apache.hadoop.hbase.master.RegionState;
054import org.apache.hadoop.hbase.master.TableStateManager;
055import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
056import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
057import org.apache.hadoop.hbase.procedure2.Procedure;
058import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
059import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
060import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
061import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
062import org.apache.hadoop.hbase.util.Bytes;
063import org.apache.hadoop.hbase.util.CommonFSUtils;
064import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
065import org.apache.hadoop.hbase.util.FSUtils;
066import org.apache.hadoop.hbase.util.MD5Hash;
067import org.apache.hadoop.hbase.util.ModifyRegionUtils;
068import org.apache.yetus.audience.InterfaceAudience;
069import org.slf4j.Logger;
070import org.slf4j.LoggerFactory;
071
072@InterfaceAudience.Private
073public class MasterProcedureTestingUtility {
074  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class);
075
076  private MasterProcedureTestingUtility() {
077  }
078
079  public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec)
080    throws Exception {
081    final MasterProcedureEnv env = procExec.getEnvironment();
082    final HMaster master = (HMaster) env.getMasterServices();
083    ProcedureTestingUtility.restart(procExec, true, true,
084      // stop services
085      new Callable<Void>() {
086        @Override
087        public Void call() throws Exception {
088          AssignmentManager am = env.getAssignmentManager();
089          // try to simulate a master restart by removing the ServerManager states about seqIDs
090          for (RegionState regionState : am.getRegionStates().getRegionStates()) {
091            env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
092          }
093          am.stop();
094          master.setInitialized(false);
095          return null;
096        }
097      },
098      // setup RIT before starting workers
099      new Callable<Void>() {
100
101        @Override
102        public Void call() throws Exception {
103          AssignmentManager am = env.getAssignmentManager();
104          am.start();
105          // just follow the same way with HMaster.finishActiveMasterInitialization. See the
106          // comments there
107          am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess())
108            .filter(p -> p instanceof TransitRegionStateProcedure)
109            .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList()));
110          return null;
111        }
112      },
113      // restart services
114      new Callable<Void>() {
115        @Override
116        public Void call() throws Exception {
117          AssignmentManager am = env.getAssignmentManager();
118          try {
119            am.joinCluster();
120            am.wakeMetaLoadedEvent();
121            master.setInitialized(true);
122          } catch (Exception e) {
123            LOG.warn("Failed to load meta", e);
124          }
125          return null;
126        }
127      });
128  }
129
130  // ==========================================================================
131  // Master failover utils
132  // ==========================================================================
133  public static void masterFailover(final HBaseTestingUtility testUtil) throws Exception {
134    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
135
136    // Kill the master
137    HMaster oldMaster = cluster.getMaster();
138    cluster.killMaster(cluster.getMaster().getServerName());
139
140    // Wait the secondary
141    waitBackupMaster(testUtil, oldMaster);
142  }
143
144  public static void waitBackupMaster(final HBaseTestingUtility testUtil, final HMaster oldMaster)
145    throws Exception {
146    MiniHBaseCluster cluster = testUtil.getMiniHBaseCluster();
147
148    HMaster newMaster = cluster.getMaster();
149    while (newMaster == null || newMaster == oldMaster) {
150      Thread.sleep(250);
151      newMaster = cluster.getMaster();
152    }
153
154    while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
155      Thread.sleep(250);
156    }
157  }
158
159  // ==========================================================================
160  // Table Helpers
161  // ==========================================================================
162  public static TableDescriptor createHTD(final TableName tableName, final String... family) {
163    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
164    for (int i = 0; i < family.length; ++i) {
165      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i]));
166    }
167    return builder.build();
168  }
169
170  public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec,
171    final TableName tableName, final byte[][] splitKeys, String... family) throws IOException {
172    TableDescriptor htd = createHTD(tableName, family);
173    RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys);
174    long procId = ProcedureTestingUtility.submitAndWait(procExec,
175      new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
176    ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
177    return regions;
178  }
179
180  public static void validateTableCreation(final HMaster master, final TableName tableName,
181    final RegionInfo[] regions, String... family) throws IOException {
182    validateTableCreation(master, tableName, regions, true, family);
183  }
184
185  public static void validateTableCreation(final HMaster master, final TableName tableName,
186    final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException {
187    // check filesystem
188    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
189    final Path tableDir =
190      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
191    assertTrue(fs.exists(tableDir));
192    CommonFSUtils.logFileSystemState(fs, tableDir, LOG);
193    List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir);
194    for (int i = 0; i < regions.length; ++i) {
195      Path regionDir = new Path(tableDir, regions[i].getEncodedName());
196      assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir));
197      assertTrue(unwantedRegionDirs.remove(regionDir));
198      List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir);
199      for (int j = 0; j < family.length; ++j) {
200        final Path familyDir = new Path(regionDir, family[j]);
201        if (hasFamilyDirs) {
202          assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir));
203          assertTrue(allFamilyDirs.remove(familyDir));
204        } else {
205          // TODO: WARN: Modify Table/Families does not create a family dir
206          if (!fs.exists(familyDir)) {
207            LOG.warn(family[j] + " family dir does not exist");
208          }
209          allFamilyDirs.remove(familyDir);
210        }
211      }
212      assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty());
213    }
214    assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty());
215    LOG.debug("Table directory layout is as expected.");
216
217    // check meta
218    assertTrue(tableExists(master.getConnection(), tableName));
219    assertEquals(regions.length, countMetaRegions(master, tableName));
220
221    // check htd
222    TableDescriptor htd = master.getTableDescriptors().get(tableName);
223    assertTrue("table descriptor not found", htd != null);
224    for (int i = 0; i < family.length; ++i) {
225      assertTrue("family not found " + family[i],
226        htd.getColumnFamily(Bytes.toBytes(family[i])) != null);
227    }
228    assertEquals(family.length, htd.getColumnFamilyCount());
229
230    // checks store file tracker impl has been properly set in htd
231    String storeFileTrackerImpl =
232      StoreFileTrackerFactory.getStoreFileTrackerName(master.getConfiguration());
233    assertEquals(storeFileTrackerImpl, htd.getValue(TRACKER_IMPL));
234  }
235
236  public static void validateTableDeletion(final HMaster master, final TableName tableName)
237    throws IOException {
238    // check filesystem
239    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
240    final Path tableDir =
241      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
242    assertFalse(fs.exists(tableDir));
243
244    // check meta
245    assertFalse(tableExists(master.getConnection(), tableName));
246    assertEquals(0, countMetaRegions(master, tableName));
247
248    // check htd
249    assertTrue("found htd of deleted table", master.getTableDescriptors().get(tableName) == null);
250  }
251
252  private static int countMetaRegions(final HMaster master, final TableName tableName)
253    throws IOException {
254    final AtomicInteger actualRegCount = new AtomicInteger(0);
255    final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() {
256      @Override
257      public boolean visit(Result rowResult) throws IOException {
258        RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult);
259        if (list == null) {
260          LOG.warn("No serialized RegionInfo in " + rowResult);
261          return true;
262        }
263        HRegionLocation l = list.getRegionLocation();
264        if (l == null) {
265          return true;
266        }
267        if (!l.getRegionInfo().getTable().equals(tableName)) {
268          return false;
269        }
270        if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true;
271        HRegionLocation[] locations = list.getRegionLocations();
272        for (HRegionLocation location : locations) {
273          if (location == null) continue;
274          ServerName serverName = location.getServerName();
275          // Make sure that regions are assigned to server
276          if (serverName != null && serverName.getAddress() != null) {
277            actualRegCount.incrementAndGet();
278          }
279        }
280        return true;
281      }
282    };
283    MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName);
284    return actualRegCount.get();
285  }
286
287  public static void validateTableIsEnabled(final HMaster master, final TableName tableName)
288    throws IOException {
289    TableStateManager tsm = master.getTableStateManager();
290    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED));
291  }
292
293  public static void validateTableIsDisabled(final HMaster master, final TableName tableName)
294    throws IOException {
295    TableStateManager tsm = master.getTableStateManager();
296    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED));
297  }
298
299  public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName,
300    final String family) throws IOException {
301    TableDescriptor htd = master.getTableDescriptors().get(tableName);
302    assertTrue(htd != null);
303
304    assertTrue(htd.hasColumnFamily(family.getBytes()));
305  }
306
307  public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName,
308    final String family) throws IOException {
309    // verify htd
310    TableDescriptor htd = master.getTableDescriptors().get(tableName);
311    assertTrue(htd != null);
312    assertFalse(htd.hasColumnFamily(family.getBytes()));
313
314    // verify fs
315    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
316    final Path tableDir =
317      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
318    for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) {
319      final Path familyDir = new Path(regionDir, family);
320      assertFalse(family + " family dir should not exist", fs.exists(familyDir));
321    }
322  }
323
324  public static void validateColumnFamilyModification(final HMaster master,
325    final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor)
326    throws IOException {
327    TableDescriptor htd = master.getTableDescriptors().get(tableName);
328    assertTrue(htd != null);
329
330    ColumnFamilyDescriptor hcfd = htd.getColumnFamily(family.getBytes());
331    assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor));
332  }
333
334  public static void loadData(final Connection connection, final TableName tableName, int rows,
335    final byte[][] splitKeys, final String... sfamilies) throws IOException {
336    byte[][] families = new byte[sfamilies.length][];
337    for (int i = 0; i < families.length; ++i) {
338      families[i] = Bytes.toBytes(sfamilies[i]);
339    }
340
341    BufferedMutator mutator = connection.getBufferedMutator(tableName);
342
343    // Ensure one row per region
344    assertTrue(rows >= splitKeys.length);
345    for (byte[] k : splitKeys) {
346      byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), k);
347      byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
348      mutator.mutate(createPut(families, key, value));
349      rows--;
350    }
351
352    // Add other extra rows. more rows, more files
353    while (rows-- > 0) {
354      byte[] value =
355        Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), Bytes.toBytes(rows));
356      byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
357      mutator.mutate(createPut(families, key, value));
358    }
359    mutator.flush();
360  }
361
362  private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
363    byte[] q = Bytes.toBytes("q");
364    Put put = new Put(key);
365    put.setDurability(Durability.SKIP_WAL);
366    for (byte[] family : families) {
367      put.addColumn(family, q, value);
368    }
369    return put;
370  }
371
372  // ==========================================================================
373  // Procedure Helpers
374  // ==========================================================================
375  public static long generateNonceGroup(final HMaster master) {
376    return master.getClusterConnection().getNonceGenerator().getNonceGroup();
377  }
378
379  public static long generateNonce(final HMaster master) {
380    return master.getClusterConnection().getNonceGenerator().newNonce();
381  }
382
383  /**
384   * Run through all procedure flow states TWICE while also restarting procedure executor at each
385   * step; i.e force a reread of procedure store.
386   * <p>
387   * It does
388   * <ol>
389   * <li>Execute step N - kill the executor before store update
390   * <li>Restart executor/store
391   * <li>Execute step N - and then save to store
392   * </ol>
393   * <p>
394   * This is a good test for finding state that needs persisting and steps that are not idempotent.
395   * Use this version of the test when a procedure executes all flow steps from start to finish.
396   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long)
397   */
398  public static void testRecoveryAndDoubleExecution(
399    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep,
400    final boolean expectExecRunning) throws Exception {
401    ProcedureTestingUtility.waitProcedure(procExec, procId);
402    assertEquals(false, procExec.isRunning());
403
404    // Restart the executor and execute the step twice
405    // execute step N - kill before store update
406    // restart executor/store
407    // execute step N - save on store
408    // NOTE: currently we make assumption that states/ steps are sequential. There are already
409    // instances of a procedures which skip (don't use) intermediate states/ steps. In future,
410    // intermediate states/ steps can be added with ordinal greater than lastStep. If and when
411    // that happens the states can not be treated as sequential steps and the condition in
412    // following while loop needs to be changed. We can use euqals/ not equals operator to check
413    // if the procedure has reached the user specified state. But there is a possibility that
414    // while loop may not get the control back exaclty when the procedure is in lastStep. Proper
415    // fix would be get all visited states by the procedure and then check if user speccified
416    // state is in that list. Current assumption of sequential proregression of steps/ states is
417    // made at multiple places so we can keep while condition below for simplicity.
418    Procedure<?> proc = procExec.getProcedure(procId);
419    int stepNum = proc instanceof StateMachineProcedure
420      ? ((StateMachineProcedure) proc).getCurrentStateId()
421      : 0;
422    for (;;) {
423      if (stepNum == lastStep) {
424        break;
425      }
426      LOG.info("Restart " + stepNum + " exec state=" + proc);
427      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
428      restartMasterProcedureExecutor(procExec);
429      ProcedureTestingUtility.waitProcedure(procExec, procId);
430      // Old proc object is stale, need to get the new one after ProcedureExecutor restart
431      proc = procExec.getProcedure(procId);
432      stepNum = proc instanceof StateMachineProcedure
433        ? ((StateMachineProcedure) proc).getCurrentStateId()
434        : stepNum + 1;
435    }
436
437    assertEquals(expectExecRunning, procExec.isRunning());
438  }
439
440  /**
441   * Run through all procedure flow states TWICE while also restarting procedure executor at each
442   * step; i.e force a reread of procedure store.
443   * <p>
444   * It does
445   * <ol>
446   * <li>Execute step N - kill the executor before store update
447   * <li>Restart executor/store
448   * <li>Executes hook for each step twice
449   * <li>Execute step N - and then save to store
450   * </ol>
451   * <p>
452   * This is a good test for finding state that needs persisting and steps that are not idempotent.
453   * Use this version of the test when the order in which flow steps are executed is not start to
454   * finish; where the procedure may vary the flow steps dependent on circumstance found.
455   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean)
456   */
457  public static void testRecoveryAndDoubleExecution(
458    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook)
459    throws Exception {
460    ProcedureTestingUtility.waitProcedure(procExec, procId);
461    assertEquals(false, procExec.isRunning());
462    for (int i = 0; !procExec.isFinished(procId); ++i) {
463      LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId));
464      if (hook != null) {
465        assertTrue(hook.execute(i));
466      }
467      restartMasterProcedureExecutor(procExec);
468      ProcedureTestingUtility.waitProcedure(procExec, procId);
469    }
470    assertEquals(true, procExec.isRunning());
471    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
472  }
473
474  public static void testRecoveryAndDoubleExecution(
475    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception {
476    testRecoveryAndDoubleExecution(procExec, procId, null);
477  }
478
479  /**
480   * Hook which will be executed on each step
481   */
482  public interface StepHook {
483    /**
484     * @param step Step no. at which this will be executed
485     * @return false if test should fail otherwise true n
486     */
487    boolean execute(int step) throws IOException;
488  }
489
490  /**
491   * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an
492   * abort() is injected. If the procedure implement abort() this should result in rollback being
493   * triggered. Each rollback step is called twice, by restarting the executor after every step. At
494   * the end of this call the procedure should be finished and rolledback. This method assert on the
495   * procedure being terminated with an AbortException.
496   */
497  public static void testRollbackAndDoubleExecution(
498    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep)
499    throws Exception {
500    testRollbackAndDoubleExecution(procExec, procId, lastStep, false);
501  }
502
503  public static void testRollbackAndDoubleExecution(
504    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep,
505    boolean waitForAsyncProcs) throws Exception {
506    // Execute up to last step
507    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
508
509    // Restart the executor and rollback the step twice
510    // rollback step N - kill before store update
511    // restart executor/store
512    // rollback step N - save on store
513    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
514    abortListener.addProcId(procId);
515    procExec.registerListener(abortListener);
516    try {
517      for (int i = 0; !procExec.isFinished(procId); ++i) {
518        LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId));
519        ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
520        restartMasterProcedureExecutor(procExec);
521        ProcedureTestingUtility.waitProcedure(procExec, procId);
522      }
523    } finally {
524      assertTrue(procExec.unregisterListener(abortListener));
525    }
526
527    if (waitForAsyncProcs) {
528      // Sometimes there are other procedures still executing (including asynchronously spawned by
529      // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before
530      // store update. Let all pending procedures finish normally.
531      ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
532      // check 3 times to confirm that the procedure executor has not been killed
533      for (int i = 0; i < 3; i++) {
534        if (!procExec.isRunning()) {
535          LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due"
536            + " to KillAndToggleBeforeStoreUpdate flag.");
537          restartMasterProcedureExecutor(procExec);
538          break;
539        }
540        Thread.sleep(1000);
541      }
542      ProcedureTestingUtility.waitNoProcedureRunning(procExec);
543    }
544
545    assertEquals(true, procExec.isRunning());
546    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
547  }
548
549  /**
550   * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an
551   * abort() is injected. If the procedure implement abort() this should result in rollback being
552   * triggered. At the end of this call the procedure should be finished and rolledback. This method
553   * assert on the procedure being terminated with an AbortException.
554   */
555  public static void testRollbackRetriableFailure(
556    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep)
557    throws Exception {
558    // Execute up to last step
559    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
560
561    // execute the rollback
562    testRestartWithAbort(procExec, procId);
563
564    assertEquals(true, procExec.isRunning());
565    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
566  }
567
568  /**
569   * Restart the ProcedureExecutor and inject an abort to the specified procedure. If the procedure
570   * implement abort() this should result in rollback being triggered. At the end of this call the
571   * procedure should be finished and rolledback, if abort is implemnted
572   */
573  public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec,
574    long procId) throws Exception {
575    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
576    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
577    abortListener.addProcId(procId);
578    procExec.registerListener(abortListener);
579    try {
580      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
581      LOG.info("Restart and rollback procId=" + procId);
582      restartMasterProcedureExecutor(procExec);
583      ProcedureTestingUtility.waitProcedure(procExec, procId);
584    } finally {
585      assertTrue(procExec.unregisterListener(abortListener));
586    }
587  }
588
589  public static boolean tableExists(Connection conn, TableName tableName) throws IOException {
590    try (Admin admin = conn.getAdmin()) {
591      return admin.tableExists(tableName);
592    }
593  }
594
595  public static class InjectAbortOnLoadListener
596    implements ProcedureExecutor.ProcedureExecutorListener {
597    private final ProcedureExecutor<MasterProcedureEnv> procExec;
598    private TreeSet<Long> procsToAbort = null;
599
600    public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) {
601      this.procExec = procExec;
602    }
603
604    public void addProcId(long procId) {
605      if (procsToAbort == null) {
606        procsToAbort = new TreeSet<>();
607      }
608      procsToAbort.add(procId);
609    }
610
611    @Override
612    public void procedureLoaded(long procId) {
613      if (procsToAbort != null && !procsToAbort.contains(procId)) {
614        return;
615      }
616      procExec.abort(procId);
617    }
618
619    @Override
620    public void procedureAdded(long procId) {
621      /* no-op */ }
622
623    @Override
624    public void procedureFinished(long procId) {
625      /* no-op */ }
626  }
627}