001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.master.procedure;
020
021import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
022import static org.junit.Assert.assertEquals;
023import static org.junit.Assert.assertFalse;
024import static org.junit.Assert.assertTrue;
025
026import java.io.IOException;
027import java.util.List;
028import java.util.TreeSet;
029import java.util.concurrent.Callable;
030import java.util.concurrent.atomic.AtomicInteger;
031import java.util.stream.Collectors;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.CatalogFamilyFormat;
035import org.apache.hadoop.hbase.ClientMetaTableAccessor;
036import org.apache.hadoop.hbase.HBaseTestingUtil;
037import org.apache.hadoop.hbase.HRegionLocation;
038import org.apache.hadoop.hbase.MetaTableAccessor;
039import org.apache.hadoop.hbase.RegionLocations;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.client.Admin;
044import org.apache.hadoop.hbase.client.BufferedMutator;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
047import org.apache.hadoop.hbase.client.Connection;
048import org.apache.hadoop.hbase.client.Durability;
049import org.apache.hadoop.hbase.client.Put;
050import org.apache.hadoop.hbase.client.RegionInfo;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.client.TableState;
055import org.apache.hadoop.hbase.master.HMaster;
056import org.apache.hadoop.hbase.master.RegionState;
057import org.apache.hadoop.hbase.master.TableStateManager;
058import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
059import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
060import org.apache.hadoop.hbase.procedure2.Procedure;
061import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
063import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
064import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.CommonFSUtils;
067import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
068import org.apache.hadoop.hbase.util.FSUtils;
069import org.apache.hadoop.hbase.util.MD5Hash;
070import org.apache.hadoop.hbase.util.ModifyRegionUtils;
071import org.apache.yetus.audience.InterfaceAudience;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075@InterfaceAudience.Private
076public class MasterProcedureTestingUtility {
077  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class);
078
079  private MasterProcedureTestingUtility() { }
080
081  public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec)
082      throws Exception {
083    final MasterProcedureEnv env = procExec.getEnvironment();
084    final HMaster master = (HMaster) env.getMasterServices();
085    ProcedureTestingUtility.restart(procExec, true, true,
086      // stop services
087      new Callable<Void>() {
088        @Override
089        public Void call() throws Exception {
090          AssignmentManager am = env.getAssignmentManager();
091          // try to simulate a master restart by removing the ServerManager states about seqIDs
092          for (RegionState regionState: am.getRegionStates().getRegionStates()) {
093            env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
094          }
095          am.stop();
096          master.setInitialized(false);
097          return null;
098        }
099      },
100      // setup RIT before starting workers
101      new Callable<Void>() {
102
103        @Override
104        public Void call() throws Exception {
105          AssignmentManager am = env.getAssignmentManager();
106          am.start();
107          // just follow the same way with HMaster.finishActiveMasterInitialization. See the
108          // comments there
109          am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess())
110            .filter(p -> p instanceof TransitRegionStateProcedure)
111            .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList()));
112          return null;
113        }
114      },
115      // restart services
116      new Callable<Void>() {
117        @Override
118        public Void call() throws Exception {
119          AssignmentManager am = env.getAssignmentManager();
120          try {
121            am.joinCluster();
122            am.wakeMetaLoadedEvent();
123            master.setInitialized(true);
124          } catch (Exception e) {
125            LOG.warn("Failed to load meta", e);
126          }
127          return null;
128        }
129      });
130  }
131
132  // ==========================================================================
133  //  Master failover utils
134  // ==========================================================================
135  public static void masterFailover(final HBaseTestingUtil testUtil)
136      throws Exception {
137    SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster();
138
139    // Kill the master
140    HMaster oldMaster = cluster.getMaster();
141    cluster.killMaster(cluster.getMaster().getServerName());
142
143    // Wait the secondary
144    waitBackupMaster(testUtil, oldMaster);
145  }
146
147  public static void waitBackupMaster(final HBaseTestingUtil testUtil,
148      final HMaster oldMaster) throws Exception {
149    SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster();
150
151    HMaster newMaster = cluster.getMaster();
152    while (newMaster == null || newMaster == oldMaster) {
153      Thread.sleep(250);
154      newMaster = cluster.getMaster();
155    }
156
157    while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
158      Thread.sleep(250);
159    }
160  }
161
162  // ==========================================================================
163  //  Table Helpers
164  // ==========================================================================
165  public static TableDescriptor createHTD(final TableName tableName, final String... family) {
166    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
167    for (int i = 0; i < family.length; ++i) {
168      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i]));
169    }
170    return builder.build();
171  }
172
173  public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec,
174      final TableName tableName, final byte[][] splitKeys, String... family) throws IOException {
175    TableDescriptor htd = createHTD(tableName, family);
176    RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys);
177    long procId = ProcedureTestingUtility.submitAndWait(procExec,
178      new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
179    ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
180    return regions;
181  }
182
183  public static void validateTableCreation(final HMaster master, final TableName tableName,
184      final RegionInfo[] regions, String... family) throws IOException {
185    validateTableCreation(master, tableName, regions, true, family);
186  }
187
188  public static void validateTableCreation(final HMaster master, final TableName tableName,
189      final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException {
190    // check filesystem
191    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
192    final Path tableDir =
193      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
194    assertTrue(fs.exists(tableDir));
195    CommonFSUtils.logFileSystemState(fs, tableDir, LOG);
196    List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir);
197    for (int i = 0; i < regions.length; ++i) {
198      Path regionDir = new Path(tableDir, regions[i].getEncodedName());
199      assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir));
200      assertTrue(unwantedRegionDirs.remove(regionDir));
201      List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir);
202      for (int j = 0; j < family.length; ++j) {
203        final Path familyDir = new Path(regionDir, family[j]);
204        if (hasFamilyDirs) {
205          assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir));
206          assertTrue(allFamilyDirs.remove(familyDir));
207        } else {
208          // TODO: WARN: Modify Table/Families does not create a family dir
209          if (!fs.exists(familyDir)) {
210            LOG.warn(family[j] + " family dir does not exist");
211          }
212          allFamilyDirs.remove(familyDir);
213        }
214      }
215      assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty());
216    }
217    assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty());
218    LOG.debug("Table directory layout is as expected.");
219
220    // check meta
221    assertTrue(tableExists(master.getConnection(), tableName));
222    assertEquals(regions.length, countMetaRegions(master, tableName));
223
224    // check htd
225    TableDescriptor htd = master.getTableDescriptors().get(tableName);
226    assertTrue("table descriptor not found", htd != null);
227    for (int i = 0; i < family.length; ++i) {
228      assertTrue("family not found " + family[i], htd.getColumnFamily(Bytes.toBytes(family[i])) != null);
229    }
230    assertEquals(family.length, htd.getColumnFamilyCount());
231
232    // checks store file tracker impl has been properly set in htd
233    String storeFileTrackerImpl =
234      StoreFileTrackerFactory.getStoreFileTrackerName(master.getConfiguration());
235    assertEquals(storeFileTrackerImpl, htd.getValue(TRACKER_IMPL));
236  }
237
238  public static void validateTableDeletion(
239      final HMaster master, final TableName tableName) throws IOException {
240    // check filesystem
241    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
242    final Path tableDir =
243      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
244    assertFalse(fs.exists(tableDir));
245
246    // check meta
247    assertFalse(tableExists(master.getConnection(), tableName));
248    assertEquals(0, countMetaRegions(master, tableName));
249
250    // check htd
251    assertTrue("found htd of deleted table",
252      master.getTableDescriptors().get(tableName) == null);
253  }
254
255  private static int countMetaRegions(final HMaster master, final TableName tableName)
256      throws IOException {
257    final AtomicInteger actualRegCount = new AtomicInteger(0);
258    final ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
259      @Override
260      public boolean visit(Result rowResult) throws IOException {
261        RegionLocations list = CatalogFamilyFormat.getRegionLocations(rowResult);
262        if (list == null) {
263          LOG.warn("No serialized RegionInfo in " + rowResult);
264          return true;
265        }
266        HRegionLocation l = list.getRegionLocation();
267        if (l == null) {
268          return true;
269        }
270        if (!l.getRegion().getTable().equals(tableName)) {
271          return false;
272        }
273        if (l.getRegion().isOffline() || l.getRegion().isSplit()) {
274          return true;
275        }
276
277        HRegionLocation[] locations = list.getRegionLocations();
278        for (HRegionLocation location : locations) {
279          if (location == null) continue;
280          ServerName serverName = location.getServerName();
281          // Make sure that regions are assigned to server
282          if (serverName != null && serverName.getAddress() != null) {
283            actualRegCount.incrementAndGet();
284          }
285        }
286        return true;
287      }
288    };
289    MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName);
290    return actualRegCount.get();
291  }
292
293  public static void validateTableIsEnabled(final HMaster master, final TableName tableName)
294      throws IOException {
295    TableStateManager tsm = master.getTableStateManager();
296    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED));
297  }
298
299  public static void validateTableIsDisabled(final HMaster master, final TableName tableName)
300      throws IOException {
301    TableStateManager tsm = master.getTableStateManager();
302    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED));
303  }
304
305  public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName,
306      final String family) throws IOException {
307    TableDescriptor htd = master.getTableDescriptors().get(tableName);
308    assertTrue(htd != null);
309
310    assertTrue(htd.hasColumnFamily(Bytes.toBytes(family)));
311  }
312
313  public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName,
314      final String family) throws IOException {
315    // verify htd
316    TableDescriptor htd = master.getTableDescriptors().get(tableName);
317    assertTrue(htd != null);
318    assertFalse(htd.hasColumnFamily(Bytes.toBytes(family)));
319
320    // verify fs
321    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
322    final Path tableDir =
323      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
324    for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) {
325      final Path familyDir = new Path(regionDir, family);
326      assertFalse(family + " family dir should not exist", fs.exists(familyDir));
327    }
328  }
329
330  public static void validateColumnFamilyModification(final HMaster master,
331      final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor)
332      throws IOException {
333    TableDescriptor htd = master.getTableDescriptors().get(tableName);
334    assertTrue(htd != null);
335
336    ColumnFamilyDescriptor hcfd = htd.getColumnFamily(Bytes.toBytes(family));
337    assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor));
338  }
339
340  public static void loadData(final Connection connection, final TableName tableName,
341      int rows, final byte[][] splitKeys,  final String... sfamilies) throws IOException {
342    byte[][] families = new byte[sfamilies.length][];
343    for (int i = 0; i < families.length; ++i) {
344      families[i] = Bytes.toBytes(sfamilies[i]);
345    }
346
347    BufferedMutator mutator = connection.getBufferedMutator(tableName);
348
349    // Ensure one row per region
350    assertTrue(rows >= splitKeys.length);
351    for (byte[] k: splitKeys) {
352      byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), k);
353      byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
354      mutator.mutate(createPut(families, key, value));
355      rows--;
356    }
357
358    // Add other extra rows. more rows, more files
359    while (rows-- > 0) {
360      byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()),
361        Bytes.toBytes(rows));
362      byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
363      mutator.mutate(createPut(families, key, value));
364    }
365    mutator.flush();
366  }
367
368  private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
369    byte[] q = Bytes.toBytes("q");
370    Put put = new Put(key);
371    put.setDurability(Durability.SKIP_WAL);
372    for (byte[] family: families) {
373      put.addColumn(family, q, value);
374    }
375    return put;
376  }
377
378  // ==========================================================================
379  //  Procedure Helpers
380  // ==========================================================================
381  public static long generateNonceGroup(final HMaster master) {
382    return master.getAsyncClusterConnection().getNonceGenerator().getNonceGroup();
383  }
384
385  public static long generateNonce(final HMaster master) {
386    return master.getAsyncClusterConnection().getNonceGenerator().newNonce();
387  }
388
389  /**
390   * Run through all procedure flow states TWICE while also restarting procedure executor at each
391   * step; i.e force a reread of procedure store.
392   *
393   *<p>It does
394   * <ol><li>Execute step N - kill the executor before store update
395   * <li>Restart executor/store
396   * <li>Execute step N - and then save to store
397   * </ol>
398   *
399   *<p>This is a good test for finding state that needs persisting and steps that are not
400   * idempotent. Use this version of the test when a procedure executes all flow steps from start to
401   * finish.
402   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long)
403   */
404  public static void testRecoveryAndDoubleExecution(
405      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
406      final int lastStep, final boolean expectExecRunning) throws Exception {
407    ProcedureTestingUtility.waitProcedure(procExec, procId);
408    assertEquals(false, procExec.isRunning());
409
410    // Restart the executor and execute the step twice
411    //   execute step N - kill before store update
412    //   restart executor/store
413    //   execute step N - save on store
414    // NOTE: currently we make assumption that states/ steps are sequential. There are already
415    // instances of a procedures which skip (don't use) intermediate states/ steps. In future,
416    // intermediate states/ steps can be added with ordinal greater than lastStep. If and when
417    // that happens the states can not be treated as sequential steps and the condition in
418    // following while loop needs to be changed. We can use euqals/ not equals operator to check
419    // if the procedure has reached the user specified state. But there is a possibility that
420    // while loop may not get the control back exaclty when the procedure is in lastStep. Proper
421    // fix would be get all visited states by the procedure and then check if user speccified
422    // state is in that list. Current assumption of sequential proregression of steps/ states is
423    // made at multiple places so we can keep while condition below for simplicity.
424    Procedure<?> proc = procExec.getProcedure(procId);
425    int stepNum = proc instanceof StateMachineProcedure ?
426        ((StateMachineProcedure) proc).getCurrentStateId() : 0;
427    for (;;) {
428      if (stepNum == lastStep) {
429        break;
430      }
431      LOG.info("Restart " + stepNum + " exec state=" + proc);
432      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
433      restartMasterProcedureExecutor(procExec);
434      ProcedureTestingUtility.waitProcedure(procExec, procId);
435      // Old proc object is stale, need to get the new one after ProcedureExecutor restart
436      proc = procExec.getProcedure(procId);
437      stepNum = proc instanceof StateMachineProcedure ?
438          ((StateMachineProcedure) proc).getCurrentStateId() : stepNum + 1;
439    }
440
441    assertEquals(expectExecRunning, procExec.isRunning());
442  }
443
444  /**
445   * Run through all procedure flow states TWICE while also restarting
446   * procedure executor at each step; i.e force a reread of procedure store.
447   *
448   *<p>It does
449   * <ol><li>Execute step N - kill the executor before store update
450   * <li>Restart executor/store
451   * <li>Executes hook for each step twice
452   * <li>Execute step N - and then save to store
453   * </ol>
454   *
455   *<p>This is a good test for finding state that needs persisting and steps that are not
456   * idempotent. Use this version of the test when the order in which flow steps are executed is
457   * not start to finish; where the procedure may vary the flow steps dependent on circumstance
458   * found.
459   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean)
460   */
461  public static void testRecoveryAndDoubleExecution(
462      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook)
463      throws Exception {
464    ProcedureTestingUtility.waitProcedure(procExec, procId);
465    assertEquals(false, procExec.isRunning());
466    for (int i = 0; !procExec.isFinished(procId); ++i) {
467      LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId));
468      if (hook != null) {
469        assertTrue(hook.execute(i));
470      }
471      restartMasterProcedureExecutor(procExec);
472      ProcedureTestingUtility.waitProcedure(procExec, procId);
473    }
474    assertEquals(true, procExec.isRunning());
475    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
476  }
477
478  public static void testRecoveryAndDoubleExecution(
479      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception {
480    testRecoveryAndDoubleExecution(procExec, procId, null);
481  }
482
483  /**
484   * Hook which will be executed on each step
485   */
486  public interface StepHook{
487    /**
488     * @param step Step no. at which this will be executed
489     * @return false if test should fail otherwise true
490     * @throws IOException
491     */
492    boolean execute(int step) throws IOException;
493  }
494
495  /**
496   * Execute the procedure up to "lastStep" and then the ProcedureExecutor
497   * is restarted and an abort() is injected.
498   * If the procedure implement abort() this should result in rollback being triggered.
499   * Each rollback step is called twice, by restarting the executor after every step.
500   * At the end of this call the procedure should be finished and rolledback.
501   * This method assert on the procedure being terminated with an AbortException.
502   */
503  public static void testRollbackAndDoubleExecution(
504      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
505      final int lastStep) throws Exception {
506    testRollbackAndDoubleExecution(procExec, procId, lastStep, false);
507  }
508
509  public static void testRollbackAndDoubleExecution(
510      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
511      final int lastStep, boolean waitForAsyncProcs) throws Exception {
512    // Execute up to last step
513    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
514
515    // Restart the executor and rollback the step twice
516    //   rollback step N - kill before store update
517    //   restart executor/store
518    //   rollback step N - save on store
519    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
520    abortListener.addProcId(procId);
521    procExec.registerListener(abortListener);
522    try {
523      for (int i = 0; !procExec.isFinished(procId); ++i) {
524        LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId));
525        ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
526        restartMasterProcedureExecutor(procExec);
527        ProcedureTestingUtility.waitProcedure(procExec, procId);
528      }
529    } finally {
530      assertTrue(procExec.unregisterListener(abortListener));
531    }
532
533    if (waitForAsyncProcs) {
534      // Sometimes there are other procedures still executing (including asynchronously spawned by
535      // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before
536      // store update. Let all pending procedures finish normally.
537      ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
538      // check 3 times to confirm that the procedure executor has not been killed
539      for (int i = 0; i < 3; i++) {
540        if (!procExec.isRunning()) {
541          LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due" +
542            " to KillAndToggleBeforeStoreUpdate flag.");
543          restartMasterProcedureExecutor(procExec);
544          break;
545        }
546        Thread.sleep(1000);
547      }
548      ProcedureTestingUtility.waitNoProcedureRunning(procExec);
549    }
550
551    assertEquals(true, procExec.isRunning());
552    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
553  }
554
555  /**
556   * Execute the procedure up to "lastStep" and then the ProcedureExecutor
557   * is restarted and an abort() is injected.
558   * If the procedure implement abort() this should result in rollback being triggered.
559   * At the end of this call the procedure should be finished and rolledback.
560   * This method assert on the procedure being terminated with an AbortException.
561   */
562  public static void testRollbackRetriableFailure(
563      final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId,
564      final int lastStep) throws Exception {
565    // Execute up to last step
566    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
567
568    // execute the rollback
569    testRestartWithAbort(procExec, procId);
570
571    assertEquals(true, procExec.isRunning());
572    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
573  }
574
575  /**
576   * Restart the ProcedureExecutor and inject an abort to the specified procedure.
577   * If the procedure implement abort() this should result in rollback being triggered.
578   * At the end of this call the procedure should be finished and rolledback, if abort is implemnted
579   */
580  public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec,
581      long procId) throws Exception {
582    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
583    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
584    abortListener.addProcId(procId);
585    procExec.registerListener(abortListener);
586    try {
587      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
588      LOG.info("Restart and rollback procId=" + procId);
589      restartMasterProcedureExecutor(procExec);
590      ProcedureTestingUtility.waitProcedure(procExec, procId);
591    } finally {
592      assertTrue(procExec.unregisterListener(abortListener));
593    }
594  }
595
596  public static boolean tableExists(Connection conn, TableName tableName) throws IOException {
597    try (Admin admin = conn.getAdmin()) {
598      return admin.tableExists(tableName);
599    }
600  }
601
602  public static class InjectAbortOnLoadListener
603      implements ProcedureExecutor.ProcedureExecutorListener {
604    private final ProcedureExecutor<MasterProcedureEnv> procExec;
605    private TreeSet<Long> procsToAbort = null;
606
607    public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) {
608      this.procExec = procExec;
609    }
610
611    public void addProcId(long procId) {
612      if (procsToAbort == null) {
613        procsToAbort = new TreeSet<>();
614      }
615      procsToAbort.add(procId);
616    }
617
618    @Override
619    public void procedureLoaded(long procId) {
620      if (procsToAbort != null && !procsToAbort.contains(procId)) {
621        return;
622      }
623      procExec.abort(procId);
624    }
625
626    @Override
627    public void procedureAdded(long procId) { /* no-op */ }
628
629    @Override
630    public void procedureFinished(long procId) { /* no-op */ }
631  }
632}