001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
021import static org.junit.Assert.assertEquals;
022import static org.junit.Assert.assertFalse;
023import static org.junit.Assert.assertTrue;
024
025import java.io.IOException;
026import java.util.List;
027import java.util.TreeSet;
028import java.util.concurrent.Callable;
029import java.util.concurrent.atomic.AtomicInteger;
030import java.util.stream.Collectors;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.CatalogFamilyFormat;
034import org.apache.hadoop.hbase.ClientMetaTableAccessor;
035import org.apache.hadoop.hbase.HBaseTestingUtil;
036import org.apache.hadoop.hbase.HRegionLocation;
037import org.apache.hadoop.hbase.MetaTableAccessor;
038import org.apache.hadoop.hbase.RegionLocations;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.client.BufferedMutator;
044import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
045import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
046import org.apache.hadoop.hbase.client.Connection;
047import org.apache.hadoop.hbase.client.Durability;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.Result;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.client.TableState;
054import org.apache.hadoop.hbase.master.HMaster;
055import org.apache.hadoop.hbase.master.RegionState;
056import org.apache.hadoop.hbase.master.TableStateManager;
057import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
058import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
059import org.apache.hadoop.hbase.procedure2.Procedure;
060import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
061import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
062import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
063import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
064import org.apache.hadoop.hbase.util.Bytes;
065import org.apache.hadoop.hbase.util.CommonFSUtils;
066import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
067import org.apache.hadoop.hbase.util.FSUtils;
068import org.apache.hadoop.hbase.util.MD5Hash;
069import org.apache.hadoop.hbase.util.ModifyRegionUtils;
070import org.apache.yetus.audience.InterfaceAudience;
071import org.slf4j.Logger;
072import org.slf4j.LoggerFactory;
073
074@InterfaceAudience.Private
075public class MasterProcedureTestingUtility {
076  private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureTestingUtility.class);
077
078  private MasterProcedureTestingUtility() {
079  }
080
081  public static void restartMasterProcedureExecutor(ProcedureExecutor<MasterProcedureEnv> procExec)
082    throws Exception {
083    final MasterProcedureEnv env = procExec.getEnvironment();
084    final HMaster master = (HMaster) env.getMasterServices();
085    ProcedureTestingUtility.restart(procExec, true, true,
086      // stop services
087      new Callable<Void>() {
088        @Override
089        public Void call() throws Exception {
090          master.setServiceStarted(false);
091          AssignmentManager am = env.getAssignmentManager();
092          // try to simulate a master restart by removing the ServerManager states about seqIDs
093          for (RegionState regionState : am.getRegionStates().getRegionStates()) {
094            env.getMasterServices().getServerManager().removeRegion(regionState.getRegion());
095          }
096          am.stop();
097          master.setInitialized(false);
098          return null;
099        }
100      },
101      // setup RIT before starting workers
102      new Callable<Void>() {
103
104        @Override
105        public Void call() throws Exception {
106          AssignmentManager am = env.getAssignmentManager();
107          am.start();
108          // just follow the same way with HMaster.finishActiveMasterInitialization. See the
109          // comments there
110          am.setupRIT(procExec.getActiveProceduresNoCopy().stream().filter(p -> !p.isSuccess())
111            .filter(p -> p instanceof TransitRegionStateProcedure)
112            .map(p -> (TransitRegionStateProcedure) p).collect(Collectors.toList()));
113          // create server state node, to simulate master start up
114          env.getMasterServices().getServerManager().getOnlineServersList()
115            .forEach(am.getRegionStates()::createServer);
116          master.setServiceStarted(true);
117          return null;
118        }
119      },
120      // restart services
121      new Callable<Void>() {
122        @Override
123        public Void call() throws Exception {
124          AssignmentManager am = env.getAssignmentManager();
125          try {
126            am.joinCluster();
127            am.wakeMetaLoadedEvent();
128            master.setInitialized(true);
129          } catch (Exception e) {
130            LOG.warn("Failed to load meta", e);
131          }
132          return null;
133        }
134      });
135  }
136
137  // ==========================================================================
138  // Master failover utils
139  // ==========================================================================
140  public static void masterFailover(final HBaseTestingUtil testUtil) throws Exception {
141    SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster();
142
143    // Kill the master
144    HMaster oldMaster = cluster.getMaster();
145    cluster.killMaster(cluster.getMaster().getServerName());
146
147    // Wait the secondary
148    waitBackupMaster(testUtil, oldMaster);
149  }
150
151  public static void waitBackupMaster(final HBaseTestingUtil testUtil, final HMaster oldMaster)
152    throws Exception {
153    SingleProcessHBaseCluster cluster = testUtil.getMiniHBaseCluster();
154
155    HMaster newMaster = cluster.getMaster();
156    while (newMaster == null || newMaster == oldMaster) {
157      Thread.sleep(250);
158      newMaster = cluster.getMaster();
159    }
160
161    while (!(newMaster.isActiveMaster() && newMaster.isInitialized())) {
162      Thread.sleep(250);
163    }
164  }
165
166  // ==========================================================================
167  // Table Helpers
168  // ==========================================================================
169  public static TableDescriptor createHTD(final TableName tableName, final String... family) {
170    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
171    for (int i = 0; i < family.length; ++i) {
172      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(family[i]));
173    }
174    return builder.build();
175  }
176
177  public static RegionInfo[] createTable(final ProcedureExecutor<MasterProcedureEnv> procExec,
178    final TableName tableName, final byte[][] splitKeys, String... family) throws IOException {
179    TableDescriptor htd = createHTD(tableName, family);
180    RegionInfo[] regions = ModifyRegionUtils.createRegionInfos(htd, splitKeys);
181    long procId = ProcedureTestingUtility.submitAndWait(procExec,
182      new CreateTableProcedure(procExec.getEnvironment(), htd, regions));
183    ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
184    return regions;
185  }
186
187  public static void validateTableCreation(final HMaster master, final TableName tableName,
188    final RegionInfo[] regions, String... family) throws IOException {
189    validateTableCreation(master, tableName, regions, true, family);
190  }
191
192  public static void validateTableCreation(final HMaster master, final TableName tableName,
193    final RegionInfo[] regions, boolean hasFamilyDirs, String... family) throws IOException {
194    // check filesystem
195    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
196    final Path tableDir =
197      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
198    assertTrue(fs.exists(tableDir));
199    CommonFSUtils.logFileSystemState(fs, tableDir, LOG);
200    List<Path> unwantedRegionDirs = FSUtils.getRegionDirs(fs, tableDir);
201    for (int i = 0; i < regions.length; ++i) {
202      Path regionDir = new Path(tableDir, regions[i].getEncodedName());
203      assertTrue(regions[i] + " region dir does not exist", fs.exists(regionDir));
204      assertTrue(unwantedRegionDirs.remove(regionDir));
205      List<Path> allFamilyDirs = FSUtils.getFamilyDirs(fs, regionDir);
206      for (int j = 0; j < family.length; ++j) {
207        final Path familyDir = new Path(regionDir, family[j]);
208        if (hasFamilyDirs) {
209          assertTrue(family[j] + " family dir does not exist", fs.exists(familyDir));
210          assertTrue(allFamilyDirs.remove(familyDir));
211        } else {
212          // TODO: WARN: Modify Table/Families does not create a family dir
213          if (!fs.exists(familyDir)) {
214            LOG.warn(family[j] + " family dir does not exist");
215          }
216          allFamilyDirs.remove(familyDir);
217        }
218      }
219      assertTrue("found extraneous families: " + allFamilyDirs, allFamilyDirs.isEmpty());
220    }
221    assertTrue("found extraneous regions: " + unwantedRegionDirs, unwantedRegionDirs.isEmpty());
222    LOG.debug("Table directory layout is as expected.");
223
224    // check meta
225    assertTrue(tableExists(master.getConnection(), tableName));
226    assertEquals(regions.length, countMetaRegions(master, tableName));
227
228    // check htd
229    TableDescriptor htd = master.getTableDescriptors().get(tableName);
230    assertTrue("table descriptor not found", htd != null);
231    for (int i = 0; i < family.length; ++i) {
232      assertTrue("family not found " + family[i],
233        htd.getColumnFamily(Bytes.toBytes(family[i])) != null);
234    }
235    assertEquals(family.length, htd.getColumnFamilyCount());
236
237    // checks store file tracker impl has been properly set in htd
238    String storeFileTrackerImpl =
239      StoreFileTrackerFactory.getStoreFileTrackerName(master.getConfiguration());
240    assertEquals(storeFileTrackerImpl, htd.getValue(TRACKER_IMPL));
241  }
242
243  public static void validateTableDeletion(final HMaster master, final TableName tableName)
244    throws IOException {
245    // check filesystem
246    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
247    final Path tableDir =
248      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
249    assertFalse(fs.exists(tableDir));
250
251    // check meta
252    assertFalse(tableExists(master.getConnection(), tableName));
253    assertEquals(0, countMetaRegions(master, tableName));
254
255    // check htd
256    assertTrue("found htd of deleted table", master.getTableDescriptors().get(tableName) == null);
257  }
258
259  private static int countMetaRegions(final HMaster master, final TableName tableName)
260    throws IOException {
261    final AtomicInteger actualRegCount = new AtomicInteger(0);
262    final ClientMetaTableAccessor.Visitor visitor = new ClientMetaTableAccessor.Visitor() {
263      @Override
264      public boolean visit(Result rowResult) throws IOException {
265        RegionLocations list = CatalogFamilyFormat.getRegionLocations(rowResult);
266        if (list == null) {
267          LOG.warn("No serialized RegionInfo in " + rowResult);
268          return true;
269        }
270        HRegionLocation l = list.getRegionLocation();
271        if (l == null) {
272          return true;
273        }
274        if (!l.getRegion().getTable().equals(tableName)) {
275          return false;
276        }
277        if (l.getRegion().isOffline() || l.getRegion().isSplit()) {
278          return true;
279        }
280
281        HRegionLocation[] locations = list.getRegionLocations();
282        for (HRegionLocation location : locations) {
283          if (location == null) continue;
284          ServerName serverName = location.getServerName();
285          // Make sure that regions are assigned to server
286          if (serverName != null && serverName.getAddress() != null) {
287            actualRegCount.incrementAndGet();
288          }
289        }
290        return true;
291      }
292    };
293    MetaTableAccessor.scanMetaForTableRegions(master.getConnection(), visitor, tableName);
294    return actualRegCount.get();
295  }
296
297  public static void validateTableIsEnabled(final HMaster master, final TableName tableName)
298    throws IOException {
299    TableStateManager tsm = master.getTableStateManager();
300    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.ENABLED));
301  }
302
303  public static void validateTableIsDisabled(final HMaster master, final TableName tableName)
304    throws IOException {
305    TableStateManager tsm = master.getTableStateManager();
306    assertTrue(tsm.getTableState(tableName).getState().equals(TableState.State.DISABLED));
307  }
308
309  public static void validateColumnFamilyAddition(final HMaster master, final TableName tableName,
310    final String family) throws IOException {
311    TableDescriptor htd = master.getTableDescriptors().get(tableName);
312    assertTrue(htd != null);
313
314    assertTrue(htd.hasColumnFamily(Bytes.toBytes(family)));
315  }
316
317  public static void validateColumnFamilyDeletion(final HMaster master, final TableName tableName,
318    final String family) throws IOException {
319    // verify htd
320    TableDescriptor htd = master.getTableDescriptors().get(tableName);
321    assertTrue(htd != null);
322    assertFalse(htd.hasColumnFamily(Bytes.toBytes(family)));
323
324    // verify fs
325    final FileSystem fs = master.getMasterFileSystem().getFileSystem();
326    final Path tableDir =
327      CommonFSUtils.getTableDir(master.getMasterFileSystem().getRootDir(), tableName);
328    for (Path regionDir : FSUtils.getRegionDirs(fs, tableDir)) {
329      final Path familyDir = new Path(regionDir, family);
330      assertFalse(family + " family dir should not exist", fs.exists(familyDir));
331    }
332  }
333
334  public static void validateColumnFamilyModification(final HMaster master,
335    final TableName tableName, final String family, ColumnFamilyDescriptor columnDescriptor)
336    throws IOException {
337    TableDescriptor htd = master.getTableDescriptors().get(tableName);
338    assertTrue(htd != null);
339
340    ColumnFamilyDescriptor hcfd = htd.getColumnFamily(Bytes.toBytes(family));
341    assertEquals(0, ColumnFamilyDescriptor.COMPARATOR.compare(hcfd, columnDescriptor));
342  }
343
344  public static void loadData(final Connection connection, final TableName tableName, int rows,
345    final byte[][] splitKeys, final String... sfamilies) throws IOException {
346    byte[][] families = new byte[sfamilies.length][];
347    for (int i = 0; i < families.length; ++i) {
348      families[i] = Bytes.toBytes(sfamilies[i]);
349    }
350
351    BufferedMutator mutator = connection.getBufferedMutator(tableName);
352
353    // Ensure one row per region
354    assertTrue(rows >= splitKeys.length);
355    for (byte[] k : splitKeys) {
356      byte[] value = Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), k);
357      byte[] key = Bytes.add(k, Bytes.toBytes(MD5Hash.getMD5AsHex(value)));
358      mutator.mutate(createPut(families, key, value));
359      rows--;
360    }
361
362    // Add other extra rows. more rows, more files
363    while (rows-- > 0) {
364      byte[] value =
365        Bytes.add(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), Bytes.toBytes(rows));
366      byte[] key = Bytes.toBytes(MD5Hash.getMD5AsHex(value));
367      mutator.mutate(createPut(families, key, value));
368    }
369    mutator.flush();
370  }
371
372  private static Put createPut(final byte[][] families, final byte[] key, final byte[] value) {
373    byte[] q = Bytes.toBytes("q");
374    Put put = new Put(key);
375    put.setDurability(Durability.SKIP_WAL);
376    for (byte[] family : families) {
377      put.addColumn(family, q, value);
378    }
379    return put;
380  }
381
382  // ==========================================================================
383  // Procedure Helpers
384  // ==========================================================================
385  public static long generateNonceGroup(final HMaster master) {
386    return master.getAsyncClusterConnection().getNonceGenerator().getNonceGroup();
387  }
388
389  public static long generateNonce(final HMaster master) {
390    return master.getAsyncClusterConnection().getNonceGenerator().newNonce();
391  }
392
393  /**
394   * Run through all procedure flow states TWICE while also restarting procedure executor at each
395   * step; i.e force a reread of procedure store.
396   * <p>
397   * It does
398   * <ol>
399   * <li>Execute step N - kill the executor before store update
400   * <li>Restart executor/store
401   * <li>Execute step N - and then save to store
402   * </ol>
403   * <p>
404   * This is a good test for finding state that needs persisting and steps that are not idempotent.
405   * Use this version of the test when a procedure executes all flow steps from start to finish.
406   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long)
407   */
408  public static void testRecoveryAndDoubleExecution(
409    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep,
410    final boolean expectExecRunning) throws Exception {
411    ProcedureTestingUtility.waitProcedure(procExec, procId);
412    assertEquals(false, procExec.isRunning());
413
414    // Restart the executor and execute the step twice
415    // execute step N - kill before store update
416    // restart executor/store
417    // execute step N - save on store
418    // NOTE: currently we make assumption that states/ steps are sequential. There are already
419    // instances of a procedures which skip (don't use) intermediate states/ steps. In future,
420    // intermediate states/ steps can be added with ordinal greater than lastStep. If and when
421    // that happens the states can not be treated as sequential steps and the condition in
422    // following while loop needs to be changed. We can use euqals/ not equals operator to check
423    // if the procedure has reached the user specified state. But there is a possibility that
424    // while loop may not get the control back exaclty when the procedure is in lastStep. Proper
425    // fix would be get all visited states by the procedure and then check if user speccified
426    // state is in that list. Current assumption of sequential proregression of steps/ states is
427    // made at multiple places so we can keep while condition below for simplicity.
428    Procedure<?> proc = procExec.getProcedure(procId);
429    int stepNum = proc instanceof StateMachineProcedure
430      ? ((StateMachineProcedure) proc).getCurrentStateId()
431      : 0;
432    for (;;) {
433      if (stepNum == lastStep) {
434        break;
435      }
436      LOG.info("Restart " + stepNum + " exec state=" + proc);
437      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
438      restartMasterProcedureExecutor(procExec);
439      ProcedureTestingUtility.waitProcedure(procExec, procId);
440      // Old proc object is stale, need to get the new one after ProcedureExecutor restart
441      proc = procExec.getProcedure(procId);
442      stepNum = proc instanceof StateMachineProcedure
443        ? ((StateMachineProcedure) proc).getCurrentStateId()
444        : stepNum + 1;
445    }
446
447    assertEquals(expectExecRunning, procExec.isRunning());
448  }
449
450  /**
451   * Run through all procedure flow states TWICE while also restarting procedure executor at each
452   * step; i.e force a reread of procedure store.
453   * <p>
454   * It does
455   * <ol>
456   * <li>Execute step N - kill the executor before store update
457   * <li>Restart executor/store
458   * <li>Executes hook for each step twice
459   * <li>Execute step N - and then save to store
460   * </ol>
461   * <p>
462   * This is a good test for finding state that needs persisting and steps that are not idempotent.
463   * Use this version of the test when the order in which flow steps are executed is not start to
464   * finish; where the procedure may vary the flow steps dependent on circumstance found.
465   * @see #testRecoveryAndDoubleExecution(ProcedureExecutor, long, int, boolean)
466   */
467  public static void testRecoveryAndDoubleExecution(
468    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final StepHook hook)
469    throws Exception {
470    ProcedureTestingUtility.waitProcedure(procExec, procId);
471    assertEquals(false, procExec.isRunning());
472    for (int i = 0; !procExec.isFinished(procId); ++i) {
473      LOG.info("Restart " + i + " exec state=" + procExec.getProcedure(procId));
474      if (hook != null) {
475        assertTrue(hook.execute(i));
476      }
477      restartMasterProcedureExecutor(procExec);
478      ProcedureTestingUtility.waitProcedure(procExec, procId);
479    }
480    assertEquals(true, procExec.isRunning());
481    ProcedureTestingUtility.assertProcNotFailed(procExec, procId);
482  }
483
484  public static void testRecoveryAndDoubleExecution(
485    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId) throws Exception {
486    testRecoveryAndDoubleExecution(procExec, procId, null);
487  }
488
489  /**
490   * Hook which will be executed on each step
491   */
492  public interface StepHook {
493    /**
494     * @param step Step no. at which this will be executed
495     * @return false if test should fail otherwise true
496     */
497    boolean execute(int step) throws IOException;
498  }
499
500  /**
501   * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an
502   * abort() is injected. If the procedure implement abort() this should result in rollback being
503   * triggered. Each rollback step is called twice, by restarting the executor after every step. At
504   * the end of this call the procedure should be finished and rolledback. This method assert on the
505   * procedure being terminated with an AbortException.
506   */
507  public static void testRollbackAndDoubleExecution(
508    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep)
509    throws Exception {
510    testRollbackAndDoubleExecution(procExec, procId, lastStep, false);
511  }
512
513  public static void testRollbackAndDoubleExecution(
514    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep,
515    boolean waitForAsyncProcs) throws Exception {
516    // Execute up to last step
517    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
518
519    // Restart the executor and rollback the step twice
520    // rollback step N - kill before store update
521    // restart executor/store
522    // rollback step N - save on store
523    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
524    abortListener.addProcId(procId);
525    procExec.registerListener(abortListener);
526    try {
527      for (int i = 0; !procExec.isFinished(procId); ++i) {
528        LOG.info("Restart " + i + " rollback state: " + procExec.getProcedure(procId));
529        ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
530        restartMasterProcedureExecutor(procExec);
531        ProcedureTestingUtility.waitProcedure(procExec, procId);
532      }
533    } finally {
534      assertTrue(procExec.unregisterListener(abortListener));
535    }
536
537    if (waitForAsyncProcs) {
538      // Sometimes there are other procedures still executing (including asynchronously spawned by
539      // procId) and due to KillAndToggleBeforeStoreUpdate flag ProcedureExecutor is stopped before
540      // store update. Let all pending procedures finish normally.
541      ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
542      // check 3 times to confirm that the procedure executor has not been killed
543      for (int i = 0; i < 3; i++) {
544        if (!procExec.isRunning()) {
545          LOG.warn("ProcedureExecutor not running, may have been stopped by pending procedure due"
546            + " to KillAndToggleBeforeStoreUpdate flag.");
547          restartMasterProcedureExecutor(procExec);
548          break;
549        }
550        Thread.sleep(1000);
551      }
552      ProcedureTestingUtility.waitNoProcedureRunning(procExec);
553    }
554
555    assertEquals(true, procExec.isRunning());
556    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
557  }
558
559  /**
560   * Execute the procedure up to "lastStep" and then the ProcedureExecutor is restarted and an
561   * abort() is injected. If the procedure implement abort() this should result in rollback being
562   * triggered. At the end of this call the procedure should be finished and rolledback. This method
563   * assert on the procedure being terminated with an AbortException.
564   */
565  public static void testRollbackRetriableFailure(
566    final ProcedureExecutor<MasterProcedureEnv> procExec, final long procId, final int lastStep)
567    throws Exception {
568    // Execute up to last step
569    testRecoveryAndDoubleExecution(procExec, procId, lastStep, false);
570
571    // execute the rollback
572    testRestartWithAbort(procExec, procId);
573
574    assertEquals(true, procExec.isRunning());
575    ProcedureTestingUtility.assertIsAbortException(procExec.getResult(procId));
576  }
577
578  /**
579   * Restart the ProcedureExecutor and inject an abort to the specified procedure. If the procedure
580   * implement abort() this should result in rollback being triggered. At the end of this call the
581   * procedure should be finished and rolledback, if abort is implemnted
582   */
583  public static void testRestartWithAbort(ProcedureExecutor<MasterProcedureEnv> procExec,
584    long procId) throws Exception {
585    ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, false);
586    InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec);
587    abortListener.addProcId(procId);
588    procExec.registerListener(abortListener);
589    try {
590      ProcedureTestingUtility.assertProcNotYetCompleted(procExec, procId);
591      LOG.info("Restart and rollback procId=" + procId);
592      restartMasterProcedureExecutor(procExec);
593      ProcedureTestingUtility.waitProcedure(procExec, procId);
594    } finally {
595      assertTrue(procExec.unregisterListener(abortListener));
596    }
597  }
598
599  public static boolean tableExists(Connection conn, TableName tableName) throws IOException {
600    try (Admin admin = conn.getAdmin()) {
601      return admin.tableExists(tableName);
602    }
603  }
604
605  public static class InjectAbortOnLoadListener
606    implements ProcedureExecutor.ProcedureExecutorListener {
607    private final ProcedureExecutor<MasterProcedureEnv> procExec;
608    private TreeSet<Long> procsToAbort = null;
609
610    public InjectAbortOnLoadListener(final ProcedureExecutor<MasterProcedureEnv> procExec) {
611      this.procExec = procExec;
612    }
613
614    public void addProcId(long procId) {
615      if (procsToAbort == null) {
616        procsToAbort = new TreeSet<>();
617      }
618      procsToAbort.add(procId);
619    }
620
621    @Override
622    public void procedureLoaded(long procId) {
623      if (procsToAbort != null && !procsToAbort.contains(procId)) {
624        return;
625      }
626      procExec.abort(procId);
627    }
628
629    @Override
630    public void procedureAdded(long procId) {
631      /* no-op */ }
632
633    @Override
634    public void procedureFinished(long procId) {
635      /* no-op */ }
636  }
637}