001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Objects;
030import java.util.concurrent.ThreadLocalRandom;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.stream.Collectors;
033import org.apache.hadoop.conf.Configuration;
034import org.apache.hadoop.fs.FileSystem;
035import org.apache.hadoop.fs.Path;
036import org.apache.hadoop.hbase.CatalogFamilyFormat;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtil;
039import org.apache.hadoop.hbase.MetaTableAccessor;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
042import org.apache.hadoop.hbase.StartTestingClusterOption;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.UnknownRegionException;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionReplicaUtil;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.exceptions.MergeRegionException;
057import org.apache.hadoop.hbase.master.HMaster;
058import org.apache.hadoop.hbase.master.MasterRpcServices;
059import org.apache.hadoop.hbase.master.RegionState;
060import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
061import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
062import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTracker;
063import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
064import org.apache.hadoop.hbase.testclassification.LargeTests;
065import org.apache.hadoop.hbase.testclassification.RegionServerTests;
066import org.apache.hadoop.hbase.util.Bytes;
067import org.apache.hadoop.hbase.util.CommonFSUtils;
068import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
069import org.apache.hadoop.hbase.util.FutureUtils;
070import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
071import org.apache.hadoop.hbase.util.Pair;
072import org.apache.hadoop.hbase.util.PairOfSameType;
073import org.apache.hadoop.hbase.util.Threads;
074import org.apache.hadoop.util.StringUtils;
075import org.apache.zookeeper.KeeperException;
076import org.junit.AfterClass;
077import org.junit.BeforeClass;
078import org.junit.ClassRule;
079import org.junit.Rule;
080import org.junit.Test;
081import org.junit.experimental.categories.Category;
082import org.junit.rules.TestName;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
087import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
088import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
089
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
091import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
092import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
093
094@Category({ RegionServerTests.class, LargeTests.class })
095public class TestRegionMergeTransactionOnCluster {
096
097  @ClassRule
098  public static final HBaseClassTestRule CLASS_RULE =
099    HBaseClassTestRule.forClass(TestRegionMergeTransactionOnCluster.class);
100
101  private static final Logger LOG =
102    LoggerFactory.getLogger(TestRegionMergeTransactionOnCluster.class);
103
104  @Rule
105  public TestName name = new TestName();
106
107  private static final int NB_SERVERS = 3;
108
109  private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
110  private static final byte[] QUALIFIER = Bytes.toBytes("q");
111
112  private static byte[] ROW = Bytes.toBytes("testRow");
113  private static final int INITIAL_REGION_NUM = 10;
114  private static final int ROWSIZE = 200;
115  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
116
117  private static int waitTime = 60 * 1000;
118
119  static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
120
121  private static HMaster MASTER;
122  private static Admin ADMIN;
123
124  @BeforeClass
125  public static void beforeAllTests() throws Exception {
126    // Start a cluster
127    StartTestingClusterOption option = StartTestingClusterOption.builder()
128      .masterClass(MyMaster.class).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
129    TEST_UTIL.startMiniCluster(option);
130    SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
131    MASTER = cluster.getMaster();
132    MASTER.balanceSwitch(false);
133    ADMIN = TEST_UTIL.getConnection().getAdmin();
134  }
135
136  @AfterClass
137  public static void afterAllTests() throws Exception {
138    TEST_UTIL.shutdownMiniCluster();
139    if (ADMIN != null) {
140      ADMIN.close();
141    }
142  }
143
144  @Test
145  public void testWholesomeMerge() throws Exception {
146    LOG.info("Starting " + name.getMethodName());
147    final TableName tableName = TableName.valueOf(name.getMethodName());
148
149    try {
150      // Create table and load data.
151      Table table = createTableAndLoadData(MASTER, tableName);
152      // Merge 1st and 2nd region
153      mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
154
155      // Merge 2nd and 3th region
156      PairOfSameType<RegionInfo> mergedRegions =
157        mergeRegionsAndVerifyRegionNum(MASTER, tableName, 1, 2, INITIAL_REGION_NUM - 2);
158
159      verifyRowCount(table, ROWSIZE);
160
161      // Randomly choose one of the two merged regions
162      RegionInfo hri = ThreadLocalRandom.current().nextBoolean()
163        ? mergedRegions.getFirst()
164        : mergedRegions.getSecond();
165      SingleProcessHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
166      AssignmentManager am = cluster.getMaster().getAssignmentManager();
167
168      // We should not be able to assign it again, but we are able to do it here. Assertions are
169      // poor here and missing that assign is possible here. Created HBASE-29692 for resolving this.
170      am.assign(hri);
171      assertFalse("Merged region can't be assigned",
172        am.getRegionStates().getRegionStateNode(hri).isTransitionScheduled());
173
174      // We should not be able to unassign it either
175      am.unassign(hri);
176      assertFalse("Merged region can't be unassigned",
177        am.getRegionStates().getRegionStateNode(hri).isTransitionScheduled());
178
179      table.close();
180    } finally {
181      TEST_UTIL.deleteTable(tableName);
182    }
183  }
184
185  /**
186   * Not really restarting the master. Simulate it by clear of new region state since it is not
187   * persisted, will be lost after master restarts.
188   */
189  @Test
190  public void testMergeAndRestartingMaster() throws Exception {
191    final TableName tableName = TableName.valueOf(name.getMethodName());
192
193    try {
194      // Create table and load data.
195      Table table = createTableAndLoadData(MASTER, tableName);
196
197      try {
198        MyMasterRpcServices.enabled.set(true);
199
200        // Merge 1st and 2nd region
201        mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
202      } finally {
203        MyMasterRpcServices.enabled.set(false);
204      }
205
206      table.close();
207    } finally {
208      TEST_UTIL.deleteTable(tableName);
209    }
210  }
211
212  @Test
213  public void testCleanMergeReference() throws Exception {
214    LOG.info("Starting " + name.getMethodName());
215    ADMIN.catalogJanitorSwitch(false);
216    final TableName tableName = TableName.valueOf(name.getMethodName());
217    try {
218      // Create table and load data.
219      Table table = createTableAndLoadData(MASTER, tableName);
220      // Merge 1st and 2nd region
221      mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
222      verifyRowCount(table, ROWSIZE);
223      table.close();
224
225      List<Pair<RegionInfo, ServerName>> tableRegions =
226        MetaTableAccessor.getTableRegionsAndLocations(MASTER.getConnection(), tableName);
227      RegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
228      TableDescriptor tableDescriptor = MASTER.getTableDescriptors().get(tableName);
229      Result mergedRegionResult =
230        MetaTableAccessor.getRegionResult(MASTER.getConnection(), mergedRegionInfo);
231
232      // contains merge reference in META
233      assertTrue(CatalogFamilyFormat.hasMergeRegions(mergedRegionResult.rawCells()));
234
235      // merging regions' directory are in the file system all the same
236      List<RegionInfo> p = CatalogFamilyFormat.getMergeRegions(mergedRegionResult.rawCells());
237      RegionInfo regionA = p.get(0);
238      RegionInfo regionB = p.get(1);
239      FileSystem fs = MASTER.getMasterFileSystem().getFileSystem();
240      Path rootDir = MASTER.getMasterFileSystem().getRootDir();
241
242      Path tabledir = CommonFSUtils.getTableDir(rootDir, mergedRegionInfo.getTable());
243      Path regionAdir = new Path(tabledir, regionA.getEncodedName());
244      Path regionBdir = new Path(tabledir, regionB.getEncodedName());
245      assertTrue(fs.exists(regionAdir));
246      assertTrue(fs.exists(regionBdir));
247
248      ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
249      HRegionFileSystem hrfs =
250        new HRegionFileSystem(TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
251      int count = 0;
252      for (ColumnFamilyDescriptor colFamily : columnFamilies) {
253        StoreFileTracker sft = StoreFileTrackerFactory.create(TEST_UTIL.getConfiguration(),
254          tableDescriptor, colFamily, hrfs, false);
255        count += sft.load().size();
256      }
257      ADMIN.compactRegion(mergedRegionInfo.getRegionName());
258      // clean up the merged region store files
259      // wait until merged region have reference file
260      long timeout = EnvironmentEdgeManager.currentTime() + waitTime;
261      int newcount = 0;
262      while (EnvironmentEdgeManager.currentTime() < timeout) {
263        for (ColumnFamilyDescriptor colFamily : columnFamilies) {
264          StoreFileTracker sft = StoreFileTrackerFactory.create(TEST_UTIL.getConfiguration(),
265            tableDescriptor, colFamily, hrfs, false);
266          newcount += sft.load().size();
267        }
268        if (newcount > count) {
269          break;
270        }
271        Thread.sleep(50);
272      }
273      assertTrue(newcount > count);
274      List<RegionServerThread> regionServerThreads =
275        TEST_UTIL.getHBaseCluster().getRegionServerThreads();
276      for (RegionServerThread rs : regionServerThreads) {
277        CompactedHFilesDischarger cleaner =
278          new CompactedHFilesDischarger(100, null, rs.getRegionServer(), false);
279        cleaner.chore();
280        Thread.sleep(1000);
281      }
282      while (EnvironmentEdgeManager.currentTime() < timeout) {
283        int newcount1 = 0;
284        for (ColumnFamilyDescriptor colFamily : columnFamilies) {
285          StoreFileTracker sft = StoreFileTrackerFactory.create(TEST_UTIL.getConfiguration(),
286            tableDescriptor, colFamily, hrfs, false);
287          newcount1 += sft.load().size();
288        }
289        if (newcount1 <= 1) {
290          break;
291        }
292        Thread.sleep(50);
293      }
294      // run CatalogJanitor to clean merge references in hbase:meta and archive the
295      // files of merging regions
296      int cleaned = 0;
297      while (cleaned == 0) {
298        cleaned = ADMIN.runCatalogJanitor();
299        LOG.debug("catalog janitor returned " + cleaned);
300        Thread.sleep(50);
301        // Cleanup is async so wait till all procedures are done running.
302        ProcedureTestingUtility.waitNoProcedureRunning(
303          TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor());
304      }
305      // We used to check for existence of region in fs but sometimes the region dir was
306      // cleaned up by the time we got here making the test sometimes flakey.
307      assertTrue(cleaned > 0);
308
309      // Wait around a bit to give stuff a chance to complete.
310      while (true) {
311        mergedRegionResult =
312          MetaTableAccessor.getRegionResult(TEST_UTIL.getConnection(), mergedRegionInfo);
313        if (CatalogFamilyFormat.hasMergeRegions(mergedRegionResult.rawCells())) {
314          LOG.info("Waiting on cleanup of merge columns {}",
315            Arrays.asList(mergedRegionResult.rawCells()).stream().map(c -> c.toString())
316              .collect(Collectors.joining(",")));
317          Threads.sleep(50);
318        } else {
319          break;
320        }
321      }
322      assertFalse(CatalogFamilyFormat.hasMergeRegions(mergedRegionResult.rawCells()));
323    } finally {
324      ADMIN.catalogJanitorSwitch(true);
325      TEST_UTIL.deleteTable(tableName);
326    }
327  }
328
329  /**
330   * This test tests 1, merging region not online; 2, merging same two regions; 3, merging unknown
331   * regions. They are in one test case so that we don't have to create many tables, and these tests
332   * are simple.
333   */
334  @Test
335  public void testMerge() throws Exception {
336    LOG.info("Starting " + name.getMethodName());
337    final TableName tableName = TableName.valueOf(name.getMethodName());
338    final Admin admin = TEST_UTIL.getAdmin();
339
340    try {
341      // Create table and load data.
342      Table table = createTableAndLoadData(MASTER, tableName);
343      AssignmentManager am = MASTER.getAssignmentManager();
344      List<RegionInfo> regions = am.getRegionStates().getRegionsOfTable(tableName);
345      // Fake offline one region
346      RegionInfo a = regions.get(0);
347      RegionInfo b = regions.get(1);
348      am.unassign(b);
349      am.offlineRegion(b);
350      try {
351        // Merge offline region. Region a is offline here
352        FutureUtils.get(
353          admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false));
354        fail("Offline regions should not be able to merge");
355      } catch (DoNotRetryRegionException ie) {
356        System.out.println(ie);
357        assertTrue(ie instanceof MergeRegionException);
358      }
359
360      try {
361        // Merge the same region: b and b.
362        FutureUtils
363          .get(admin.mergeRegionsAsync(b.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), true));
364        fail("A region should not be able to merge with itself, even forcfully");
365      } catch (IOException ie) {
366        assertTrue("Exception should mention regions not online",
367          StringUtils.stringifyException(ie).contains("region to itself")
368            && ie instanceof MergeRegionException);
369      }
370
371      try {
372        // Merge unknown regions
373        FutureUtils.get(admin.mergeRegionsAsync(Bytes.toBytes("-f1"), Bytes.toBytes("-f2"), true));
374        fail("Unknown region could not be merged");
375      } catch (IOException ie) {
376        assertTrue("UnknownRegionException should be thrown", ie instanceof UnknownRegionException);
377      }
378      table.close();
379    } finally {
380      TEST_UTIL.deleteTable(tableName);
381    }
382  }
383
384  @Test
385  public void testMergeWithReplicas() throws Exception {
386    final TableName tableName = TableName.valueOf(name.getMethodName());
387    try {
388      // Create table and load data.
389      Table table = createTableAndLoadData(MASTER, tableName, 5, 2);
390      List<Pair<RegionInfo, ServerName>> initialRegionToServers =
391        MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName);
392      // Merge 1st and 2nd region
393      PairOfSameType<RegionInfo> mergedRegions =
394        mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 2, 5 * 2 - 2);
395      List<Pair<RegionInfo, ServerName>> currentRegionToServers =
396        MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName);
397      List<RegionInfo> initialRegions = new ArrayList<>();
398      for (Pair<RegionInfo, ServerName> p : initialRegionToServers) {
399        initialRegions.add(p.getFirst());
400      }
401      List<RegionInfo> currentRegions = new ArrayList<>();
402      for (Pair<RegionInfo, ServerName> p : currentRegionToServers) {
403        currentRegions.add(p.getFirst());
404      }
405      // this is the first region
406      assertTrue(initialRegions.contains(mergedRegions.getFirst()));
407      // this is the replica of the first region
408      assertTrue(initialRegions
409        .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1)));
410      // this is the second region
411      assertTrue(initialRegions.contains(mergedRegions.getSecond()));
412      // this is the replica of the second region
413      assertTrue(initialRegions
414        .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1)));
415      // this is the new region
416      assertTrue(!initialRegions.contains(currentRegions.get(0)));
417      // replica of the new region
418      assertTrue(!initialRegions
419        .contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1)));
420      // replica of the new region
421      assertTrue(currentRegions
422        .contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1)));
423      // replica of the merged region
424      assertTrue(!currentRegions
425        .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1)));
426      // replica of the merged region
427      assertTrue(!currentRegions
428        .contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1)));
429      table.close();
430    } finally {
431      TEST_UTIL.deleteTable(tableName);
432    }
433  }
434
435  private PairOfSameType<RegionInfo> mergeRegionsAndVerifyRegionNum(HMaster master,
436    TableName tablename, int regionAnum, int regionBnum, int expectedRegionNum) throws Exception {
437    PairOfSameType<RegionInfo> mergedRegions =
438      requestMergeRegion(master, tablename, regionAnum, regionBnum);
439    waitAndVerifyRegionNum(master, tablename, expectedRegionNum);
440    return mergedRegions;
441  }
442
443  private PairOfSameType<RegionInfo> requestMergeRegion(HMaster master, TableName tablename,
444    int regionAnum, int regionBnum) throws Exception {
445    List<Pair<RegionInfo, ServerName>> tableRegions =
446      MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
447    RegionInfo regionA = tableRegions.get(regionAnum).getFirst();
448    RegionInfo regionB = tableRegions.get(regionBnum).getFirst();
449    ADMIN.mergeRegionsAsync(regionA.getEncodedNameAsBytes(), regionB.getEncodedNameAsBytes(),
450      false);
451    return new PairOfSameType<>(regionA, regionB);
452  }
453
454  private void waitAndVerifyRegionNum(HMaster master, TableName tablename, int expectedRegionNum)
455    throws Exception {
456    List<Pair<RegionInfo, ServerName>> tableRegionsInMeta;
457    List<RegionInfo> tableRegionsInMaster;
458    long timeout = EnvironmentEdgeManager.currentTime() + waitTime;
459    while (EnvironmentEdgeManager.currentTime() < timeout) {
460      tableRegionsInMeta =
461        MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
462      tableRegionsInMaster =
463        master.getAssignmentManager().getRegionStates().getRegionsOfTable(tablename);
464      LOG.info(Objects.toString(tableRegionsInMaster));
465      LOG.info(Objects.toString(tableRegionsInMeta));
466      int tableRegionsInMetaSize = tableRegionsInMeta.size();
467      int tableRegionsInMasterSize = tableRegionsInMaster.size();
468      if (
469        tableRegionsInMetaSize == expectedRegionNum && tableRegionsInMasterSize == expectedRegionNum
470      ) {
471        break;
472      }
473      Thread.sleep(250);
474    }
475
476    tableRegionsInMeta =
477      MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
478    LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegionsInMeta));
479    assertEquals(expectedRegionNum, tableRegionsInMeta.size());
480  }
481
482  private Table createTableAndLoadData(HMaster master, TableName tablename) throws Exception {
483    return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1);
484  }
485
486  private Table createTableAndLoadData(HMaster master, TableName tablename, int numRegions,
487    int replication) throws Exception {
488    assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions);
489    byte[][] splitRows = new byte[numRegions - 1][];
490    for (int i = 0; i < splitRows.length; i++) {
491      splitRows[i] = ROWS[(i + 1) * ROWSIZE / numRegions];
492    }
493
494    Table table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
495    LOG.info("Created " + table.getName());
496    if (replication > 1) {
497      HBaseTestingUtil.setReplicas(ADMIN, tablename, replication);
498      LOG.info("Set replication of " + replication + " on " + table.getName());
499    }
500    loadData(table);
501    LOG.info("Loaded " + table.getName());
502    verifyRowCount(table, ROWSIZE);
503    LOG.info("Verified " + table.getName());
504
505    List<Pair<RegionInfo, ServerName>> tableRegions;
506    TEST_UTIL.waitUntilAllRegionsAssigned(tablename);
507    LOG.info("All regions assigned for table - " + table.getName());
508    tableRegions =
509      MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
510    assertEquals("Wrong number of regions in table " + tablename, numRegions * replication,
511      tableRegions.size());
512    LOG.info(tableRegions.size() + "Regions after load: " + Joiner.on(',').join(tableRegions));
513    assertEquals(numRegions * replication, tableRegions.size());
514    return table;
515  }
516
517  private static byte[][] makeN(byte[] base, int n) {
518    byte[][] ret = new byte[n][];
519    for (int i = 0; i < n; i++) {
520      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
521    }
522    return ret;
523  }
524
525  private void loadData(Table table) throws IOException {
526    for (int i = 0; i < ROWSIZE; i++) {
527      Put put = new Put(ROWS[i]);
528      put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(i));
529      table.put(put);
530    }
531  }
532
533  private void verifyRowCount(Table table, int expectedRegionNum) throws IOException {
534    ResultScanner scanner = table.getScanner(new Scan());
535    int rowCount = 0;
536    while (scanner.next() != null) {
537      rowCount++;
538    }
539    assertEquals(expectedRegionNum, rowCount);
540    scanner.close();
541  }
542
543  // Make it public so that JVMClusterUtil can access it.
544  public static class MyMaster extends HMaster {
545    public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
546      super(conf);
547    }
548
549    @Override
550    protected MasterRpcServices createRpcServices() throws IOException {
551      return new MyMasterRpcServices(this);
552    }
553  }
554
555  static class MyMasterRpcServices extends MasterRpcServices {
556    static AtomicBoolean enabled = new AtomicBoolean(false);
557
558    private HMaster myMaster;
559
560    public MyMasterRpcServices(HMaster master) throws IOException {
561      super(master);
562      myMaster = master;
563    }
564
565    @Override
566    public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c,
567      ReportRegionStateTransitionRequest req) throws ServiceException {
568      ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(c, req);
569      if (
570        enabled.get() && req.getTransition(0).getTransitionCode() == TransitionCode.READY_TO_MERGE
571          && !resp.hasErrorMessage()
572      ) {
573        AssignmentManager am = myMaster.getAssignmentManager();
574        for (RegionState regionState : am.getRegionsStateInTransition()) {
575          // Find the merging_new region and remove it
576          if (regionState.isMergingNew()) {
577            am.getRegionStates().deleteRegion(regionState.getRegion());
578          }
579        }
580      }
581      return resp;
582    }
583  }
584}