001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.TreeSet;
028import java.util.concurrent.Callable;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.CatalogFamilyFormat;
034import org.apache.hadoop.hbase.ClientMetaTableAccessor;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.MetaTableAccessor;
038import org.apache.hadoop.hbase.RegionLocations;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.BufferedMutator;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.Durability;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.client.TableState;
054import org.apache.hadoop.hbase.master.HMaster;
055import org.apache.hadoop.hbase.master.RegionState;
056import org.apache.hadoop.hbase.master.TableStateManager;
057import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
058import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
059import org.apache.hadoop.hbase.procedure2.Procedure;
060import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
061import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
062import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
063import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.FSUtils;
068import org.apache.hadoop.hbase.util.MD5Hash;
069import org.apache.hadoop.hbase.util.ModifyRegionUtils;
070import org.apache.yetus.audience.InterfaceAudience;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074@InterfaceAudience.Private
075public class MasterProcedureTestingUtility {
076  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class);
077
078  private MasterProcedureTestingUtility() {
079  }
080
081  public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec)
082    throws Exception {
083    final MasterProcedureEnv env = procExec.getEnvironment();
084    final HMaster master = (HMaster) env.getMasterServices();
085    ProcedureTestingUtility.restart(procExec, true, true,
086      // stop services
087      new Callable<Void>() {
088        @Override
089        public Void call() throws Exception {
090          master.setServiceStarted(false);
091          AssignmentManager am = env.getAssignmentManager();
092          // try to simulate a master restart by removing the ServerManager states about seqIDs
093          for (RegionState regionState : am.getRegionStates().getRegionStates()) {
094            env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
095          }
096          am.stop();
097          master.setInitialized(false);
098          return null;
099        }
100      },
101      // setup RIT before starting workers
102      new Callable<Void>() {
103
104        @Override
105        public Void call() throws Exception {
106          AssignmentManager am = env.getAssignmentManager();
107          am.start();
108          // just follow the same way with HMaster.finishActiveMasterInitialization. See the
109          // comments there
110          am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess())
111            .filter(p -> p instanceof TransitRegionStateProcedure)
112            .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList()));
113          // create server state node, to simulate master start up
114          env.getMasterServices().getServerManager().getOnlineServersList()
115            .forEach(am.getRegionStates()::createServer);
116          am.initializationPostMetaOnline();
117          master.setServiceStarted(true);
118          return null;
119        }
120      },
121      // restart services
122      new Callable<Void>() {
123        @Override
124        public Void call() throws Exception {
125          AssignmentManager am = env.getAssignmentManager();
126          try {
127            am.joinCluster();
128            am.wakeMetaLoadedEvent();
129            master.setInitialized(true);
130          } catch (Exception e) {
131            LOG.warn("Failed to load meta", e);
132          }
133          return null;
134        }
135      });
136  }
137
138  // ==========================================================================
139  // Master failover utils
140  // ==========================================================================
141  public static void masterFailover(final HBaseTestingUtil testUtil) throws Exception {
142    SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster();
143
144    // Kill the master
145    HMaster oldMaster = cluster.getMaster();
146    cluster.killMaster(cluster.getMaster().getServerName());
147
148    // Wait the secondary
149    waitBackupMaster(testUtil, oldMaster);
150  }
151
152  public static void waitBackupMaster(final HBaseTestingUtil testUtil, final HMaster oldMaster)
153    throws Exception {
154    SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster();
155
156    HMaster newMaster = cluster.getMaster();
157    while (newMaster == null || newMaster == oldMaster) {
158      Thread.sleep(250);
159      newMaster = cluster.getMaster();
160    }
161
162    while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
163      Thread.sleep(250);
164    }
165  }
166
167  // ==========================================================================
168  // Table Helpers
169  // ==========================================================================
170  public static TableDescriptor createHTD(final TableName tableName, final String... family) {
171    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
172    for (int i = 0; i < family.length; ++i) {
173      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i]));
174    }
175    return builder.build();
176  }
177
178  public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec,
179    final TableName tableName, final byte[][] splitKeys, String... family) throws IOException {
180    TableDescriptor htd = createHTD(tableName, family);
181    RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys);
182    long procId = ProcedureTestingUtility.submitAndWait(procExec,
183      new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
184    ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
185    return regions;
186  }
187
188  public static void validateTableCreation(final HMaster master, final TableName tableName,
189    final RegionInfo[] regions, String... family) throws IOException {
190    validateTableCreation(master, tableName, regions, true, family);
191  }
192
193  public static void validateTableCreation(final HMaster master, final TableName tableName,
194    final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException {
195    // check filesystem
196    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
197    final Path tableDir =
198      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
199    assertTrue(fs.exists(tableDir));
200    CommonFSUtils.logFileSystemState(fs, tableDir, LOG);
201    List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir);
202    for (int i = 0; i < regions.length; ++i) {
203      Path regionDir = new Path(tableDir, regions[i].getEncodedName());
204      assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir));
205      assertTrue(unwantedRegionDirs.remove(regionDir));
206      List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir);
207      for (int j = 0; j < family.length; ++j) {
208        final Path familyDir = new Path(regionDir, family[j]);
209        if (hasFamilyDirs) {
210          assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir));
211          assertTrue(allFamilyDirs.remove(familyDir));
212        } else {
213          // TODO: WARN: Modify Table/Families does not create a family dir
214          if (!fs.exists(familyDir)) {
215            LOG.warn(family[j] + " family dir does not exist");
216          }
217          allFamilyDirs.remove(familyDir);
218        }
219      }
220      assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty());
221    }
222    assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty());
223    LOG.debug("Table directory layout is as expected.");
224
225    // check meta
226    assertTrue(tableExists(master.getConnection(), tableName));
227    assertEquals(regions.length, countMetaRegions(master, tableName));
228
229    // check htd
230    TableDescriptor htd = master.getTableDescriptors().get(tableName);
231    assertTrue("table descriptor not found", htd != null);
232    for (int i = 0; i < family.length; ++i) {
233      assertTrue("family not found " + family[i],
234        htd.getColumnFamily(Bytes.toBytes(family[i])) != null);
235    }
236    assertEquals(family.length, htd.getColumnFamilyCount());
237
238    // checks store file tracker impl has been properly set in htd
239    String storeFileTrackerImpl =
240      StoreFileTrackerFactory.getStoreFileTrackerName(master.getConfiguration());
241    assertEquals(storeFileTrackerImpl, htd.getValue(TRACKER_IMPL));
242  }
243
244  public static void validateTableDeletion(final HMaster master, final TableName tableName)
245    throws IOException {
246    // check filesystem
247    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
248    final Path tableDir =
249      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
250    assertFalse(fs.exists(tableDir));
251
252    // check meta
253    assertFalse(tableExists(master.getConnection(), tableName));
254    assertEquals(0, countMetaRegions(master, tableName));
255
256    // check htd
257    assertTrue("found htd of deleted table", master.getTableDescriptors().get(tableName) == null);
258  }
259
260  private static int countMetaRegions(final HMaster master, final TableName tableName)
261    throws IOException {
262    final AtomicInteger actualRegCount = new AtomicInteger(0);
263    final ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
264      @Override
265      public boolean visit(Result rowResult) throws IOException {
266        RegionLocations list = CatalogFamilyFormat.getRegionLocations(rowResult);
267        if (list == null) {
268          LOG.warn("No serialized RegionInfo in " + rowResult);
269          return true;
270        }
271        HRegionLocation l = list.getRegionLocation();
272        if (l == null) {
273          return true;
274        }
275        if (!l.getRegion().getTable().equals(tableName)) {
276          return false;
277        }
278        if (l.getRegion().isOffline() || l.getRegion().isSplit()) {
279          return true;
280        }
281
282        HRegionLocation[] locations = list.getRegionLocations();
283        for (HRegionLocation location : locations) {
284          if (location == null) continue;
285          ServerName serverName = location.getServerName();
286          // Make sure that regions are assigned to server
287          if (serverName != null && serverName.getAddress() != null) {
288            actualRegCount.incrementAndGet();
289          }
290        }
291        return true;
292      }
293    };
294    MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName);
295    return actualRegCount.get();
296  }
297
298  public static void validateTableIsEnabled(final HMaster master, final TableName tableName)
299    throws IOException {
300    TableStateManager tsm = master.getTableStateManager();
301    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED));
302  }
303
304  public static void validateTableIsDisabled(final HMaster master, final TableName tableName)
305    throws IOException {
306    TableStateManager tsm = master.getTableStateManager();
307    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED));
308  }
309
310  public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName,
311    final String family) throws IOException {
312    TableDescriptor htd = master.getTableDescriptors().get(tableName);
313    assertTrue(htd != null);
314
315    assertTrue(htd.hasColumnFamily(Bytes.toBytes(family)));
316  }
317
318  public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName,
319    final String family) throws IOException {
320    // verify htd
321    TableDescriptor htd = master.getTableDescriptors().get(tableName);
322    assertTrue(htd != null);
323    assertFalse(htd.hasColumnFamily(Bytes.toBytes(family)));
324
325    // verify fs
326    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
327    final Path tableDir =
328      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
329    for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) {
330      final Path familyDir = new Path(regionDir, family);
331      assertFalse(family + " family dir should not exist", fs.exists(familyDir));
332    }
333  }
334
335  public static void validateColumnFamilyModification(final HMaster master,
336    final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor)
337    throws IOException {
338    TableDescriptor htd = master.getTableDescriptors().get(tableName);
339    assertTrue(htd != null);
340
341    ColumnFamilyDescriptor hcfd = htd.getColumnFamily(Bytes.toBytes(family));
342    assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor));
343  }
344
345  public static void loadData(final Connection connection, final TableName tableName, int rows,
346    final byte[][] splitKeys, final String... sfamilies) throws IOException {
347    byte[][] families = new byte[sfamilies.length][];
348    for (int i = 0; i < families.length; ++i) {
349      families[i] = Bytes.toBytes(sfamilies[i]);
350    }
351
352    BufferedMutator mutator = connection.getBufferedMutator(tableName);
353
354    // Ensure one row per region
355    assertTrue(rows >= splitKeys.length);
356    for (byte[] k : splitKeys) {
357      byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), k);
358      byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
359      mutator.mutate(createPut(families, key, value));
360      rows--;
361    }
362
363    // Add other extra rows. more rows, more files
364    while (rows-- > 0) {
365      byte[] value =
366        Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), Bytes.toBytes(rows));
367      byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
368      mutator.mutate(createPut(families, key, value));
369    }
370    mutator.flush();
371  }
372
373  private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
374    byte[] q = Bytes.toBytes("q");
375    Put put = new Put(key);
376    put.setDurability(Durability.SKIP_WAL);
377    for (byte[] family : families) {
378      put.addColumn(family, q, value);
379    }
380    return put;
381  }
382
383  // ==========================================================================
384  // Procedure Helpers
385  // ==========================================================================
386  public static long generateNonceGroup(final HMaster master) {
387    return master.getAsyncClusterConnection().getNonceGenerator().getNonceGroup();
388  }
389
390  public static long generateNonce(final HMaster master) {
391    return master.getAsyncClusterConnection().getNonceGenerator().newNonce();
392  }
393
394  /**
395   * Run through all procedure flow states TWICE while also restarting procedure executor at each
396   * step; i.e force a reread of procedure store.
397   * <p>
398   * It does
399   * <ol>
400   * <li>Execute step N - kill the executor before store update
401   * <li>Restart executor/store
402   * <li>Execute step N - and then save to store
403   * </ol>
404   * <p>
405   * This is a good test for finding state that needs persisting and steps that are not idempotent.
406   * Use this version of the test when a procedure executes all flow steps from start to finish.
407   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long)
408   */
409  public static void testRecoveryAndDoubleExecution(
410    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep,
411    final boolean expectExecRunning) throws Exception {
412    ProcedureTestingUtility.waitProcedure(procExec, procId);
413    assertEquals(false, procExec.isRunning());
414
415    // Restart the executor and execute the step twice
416    // execute step N - kill before store update
417    // restart executor/store
418    // execute step N - save on store
419    // NOTE: currently we make assumption that states/ steps are sequential. There are already
420    // instances of a procedures which skip (don't use) intermediate states/ steps. In future,
421    // intermediate states/ steps can be added with ordinal greater than lastStep. If and when
422    // that happens the states can not be treated as sequential steps and the condition in
423    // following while loop needs to be changed. We can use euqals/ not equals operator to check
424    // if the procedure has reached the user specified state. But there is a possibility that
425    // while loop may not get the control back exaclty when the procedure is in lastStep. Proper
426    // fix would be get all visited states by the procedure and then check if user speccified
427    // state is in that list. Current assumption of sequential proregression of steps/ states is
428    // made at multiple places so we can keep while condition below for simplicity.
429    Procedure<?> proc = procExec.getProcedure(procId);
430    int stepNum = proc instanceof StateMachineProcedure
431      ? ((StateMachineProcedure) proc).getCurrentStateId()
432      : 0;
433    for (;;) {
434      if (stepNum == lastStep) {
435        break;
436      }
437      LOG.info("Restart " + stepNum + " exec state=" + proc);
438      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
439      restartMasterProcedureExecutor(procExec);
440      ProcedureTestingUtility.waitProcedure(procExec, procId);
441      // Old proc object is stale, need to get the new one after ProcedureExecutor restart
442      proc = procExec.getProcedure(procId);
443      stepNum = proc instanceof StateMachineProcedure
444        ? ((StateMachineProcedure) proc).getCurrentStateId()
445        : stepNum + 1;
446    }
447
448    assertEquals(expectExecRunning, procExec.isRunning());
449  }
450
451  /**
452   * Run through all procedure flow states TWICE while also restarting procedure executor at each
453   * step; i.e force a reread of procedure store.
454   * <p>
455   * It does
456   * <ol>
457   * <li>Execute step N - kill the executor before store update
458   * <li>Restart executor/store
459   * <li>Executes hook for each step twice
460   * <li>Execute step N - and then save to store
461   * </ol>
462   * <p>
463   * This is a good test for finding state that needs persisting and steps that are not idempotent.
464   * Use this version of the test when the order in which flow steps are executed is not start to
465   * finish; where the procedure may vary the flow steps dependent on circumstance found.
466   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean)
467   */
468  public static void testRecoveryAndDoubleExecution(
469    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook)
470    throws Exception {
471    ProcedureTestingUtility.waitProcedure(procExec, procId);
472    assertEquals(false, procExec.isRunning());
473    for (int i = 0; !procExec.isFinished(procId); ++i) {
474      LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId));
475      if (hook != null) {
476        assertTrue(hook.execute(i));
477      }
478      restartMasterProcedureExecutor(procExec);
479      ProcedureTestingUtility.waitProcedure(procExec, procId);
480    }
481    assertEquals(true, procExec.isRunning());
482    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
483  }
484
485  public static void testRecoveryAndDoubleExecution(
486    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception {
487    testRecoveryAndDoubleExecution(procExec, procId, null);
488  }
489
490  /**
491   * Hook which will be executed on each step
492   */
493  public interface StepHook {
494    /**
495     * @param step Step no. at which this will be executed
496     * @return false if test should fail otherwise true
497     */
498    boolean execute(int step) throws IOException;
499  }
500
501  /**
502   * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an
503   * abort() is injected. If the procedure implement abort() this should result in rollback being
504   * triggered. Each rollback step is called twice, by restarting the executor after every step. At
505   * the end of this call the procedure should be finished and rolledback. This method assert on the
506   * procedure being terminated with an AbortException.
507   */
508  public static void testRollbackAndDoubleExecution(
509    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep)
510    throws Exception {
511    testRollbackAndDoubleExecution(procExec, procId, lastStep, false);
512  }
513
514  public static void testRollbackAndDoubleExecution(
515    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep,
516    boolean waitForAsyncProcs) throws Exception {
517    // Execute up to last step
518    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
519
520    // Restart the executor and rollback the step twice
521    // rollback step N - kill before store update
522    // restart executor/store
523    // rollback step N - save on store
524    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
525    abortListener.addProcId(procId);
526    procExec.registerListener(abortListener);
527    try {
528      for (int i = 0; !procExec.isFinished(procId); ++i) {
529        LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId));
530        ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
531        restartMasterProcedureExecutor(procExec);
532        ProcedureTestingUtility.waitProcedure(procExec, procId);
533      }
534    } finally {
535      assertTrue(procExec.unregisterListener(abortListener));
536    }
537
538    if (waitForAsyncProcs) {
539      // Sometimes there are other procedures still executing (including asynchronously spawned by
540      // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before
541      // store update. Let all pending procedures finish normally.
542      ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
543      // check 3 times to confirm that the procedure executor has not been killed
544      for (int i = 0; i < 3; i++) {
545        if (!procExec.isRunning()) {
546          LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due"
547            + " to KillAndToggleBeforeStoreUpdate flag.");
548          restartMasterProcedureExecutor(procExec);
549          break;
550        }
551        Thread.sleep(1000);
552      }
553      ProcedureTestingUtility.waitNoProcedureRunning(procExec);
554    }
555
556    assertEquals(true, procExec.isRunning());
557    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
558  }
559
560  /**
561   * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an
562   * abort() is injected. If the procedure implement abort() this should result in rollback being
563   * triggered. At the end of this call the procedure should be finished and rolledback. This method
564   * assert on the procedure being terminated with an AbortException.
565   */
566  public static void testRollbackRetriableFailure(
567    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep)
568    throws Exception {
569    // Execute up to last step
570    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
571
572    // execute the rollback
573    testRestartWithAbort(procExec, procId);
574
575    assertEquals(true, procExec.isRunning());
576    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
577  }
578
579  /**
580   * Restart the ProcedureExecutor and inject an abort to the specified procedure. If the procedure
581   * implement abort() this should result in rollback being triggered. At the end of this call the
582   * procedure should be finished and rolledback, if abort is implemnted
583   */
584  public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec,
585    long procId) throws Exception {
586    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
587    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
588    abortListener.addProcId(procId);
589    procExec.registerListener(abortListener);
590    try {
591      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
592      LOG.info("Restart and rollback procId=" + procId);
593      restartMasterProcedureExecutor(procExec);
594      ProcedureTestingUtility.waitProcedure(procExec, procId);
595    } finally {
596      assertTrue(procExec.unregisterListener(abortListener));
597    }
598  }
599
600  public static boolean tableExists(Connection conn, TableName tableName) throws IOException {
601    try (Admin admin = conn.getAdmin()) {
602      return admin.tableExists(tableName);
603    }
604  }
605
606  public static class InjectAbortOnLoadListener
607    implements ProcedureExecutor.ProcedureExecutorListener {
608    private final ProcedureExecutor<MasterProcedureEnv> procExec;
609    private TreeSet<Long> procsToAbort = null;
610
611    public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) {
612      this.procExec = procExec;
613    }
614
615    public void addProcId(long procId) {
616      if (procsToAbort == null) {
617        procsToAbort = new TreeSet<>();
618      }
619      procsToAbort.add(procId);
620    }
621
622    @Override
623    public void procedureLoaded(long procId) {
624      if (procsToAbort != null && !procsToAbort.contains(procId)) {
625        return;
626      }
627      procExec.abort(procId);
628    }
629
630    @Override
631    public void procedureAdded(long procId) {
632      /* no-op */ }
633
634    @Override
635    public void procedureFinished(long procId) {
636      /* no-op */ }
637  }
638}