001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.junit.Assert.assertTrue;
023import static org.junit.Assert.fail;
024
025import java.io.IOException;
026import java.util.ArrayList;
027import java.util.Arrays;
028import java.util.List;
029import java.util.Objects;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.stream.Collectors;
033import org.apache.commons.lang3.RandomUtils;
034import org.apache.hadoop.conf.Configuration;
035import org.apache.hadoop.fs.FileSystem;
036import org.apache.hadoop.fs.Path;
037import org.apache.hadoop.hbase.HBaseClassTestRule;
038import org.apache.hadoop.hbase.HBaseTestingUtility;
039import org.apache.hadoop.hbase.MetaTableAccessor;
040import org.apache.hadoop.hbase.MiniHBaseCluster;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.StartMiniClusterOption;
043import org.apache.hadoop.hbase.TableName;
044import org.apache.hadoop.hbase.UnknownRegionException;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
047import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionReplicaUtil;
051import org.apache.hadoop.hbase.client.Result;
052import org.apache.hadoop.hbase.client.ResultScanner;
053import org.apache.hadoop.hbase.client.Scan;
054import org.apache.hadoop.hbase.client.Table;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.exceptions.MergeRegionException;
057import org.apache.hadoop.hbase.master.HMaster;
058import org.apache.hadoop.hbase.master.MasterRpcServices;
059import org.apache.hadoop.hbase.master.RegionState;
060import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
061import org.apache.hadoop.hbase.master.assignment.RegionStates;
062import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
063import org.apache.hadoop.hbase.testclassification.LargeTests;
064import org.apache.hadoop.hbase.testclassification.RegionServerTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.CommonFSUtils;
067import org.apache.hadoop.hbase.util.FutureUtils;
068import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
069import org.apache.hadoop.hbase.util.Pair;
070import org.apache.hadoop.hbase.util.PairOfSameType;
071import org.apache.hadoop.hbase.util.Threads;
072import org.apache.hadoop.util.StringUtils;
073import org.apache.zookeeper.KeeperException;
074import org.junit.AfterClass;
075import org.junit.BeforeClass;
076import org.junit.ClassRule;
077import org.junit.Rule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080import org.junit.rules.TestName;
081import org.slf4j.Logger;
082import org.slf4j.LoggerFactory;
083
084import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
085import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
086import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
087
088import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
090import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
091
092@Category({RegionServerTests.class, LargeTests.class})
093public class TestRegionMergeTransactionOnCluster {
094
095  @ClassRule
096  public static final HBaseClassTestRule CLASS_RULE =
097      HBaseClassTestRule.forClass(TestRegionMergeTransactionOnCluster.class);
098
099  private static final Logger LOG =
100      LoggerFactory.getLogger(TestRegionMergeTransactionOnCluster.class);
101
102  @Rule public TestName name = new TestName();
103
104  private static final int NB_SERVERS = 3;
105
106  private static final byte[] FAMILYNAME = Bytes.toBytes("fam");
107  private static final byte[] QUALIFIER = Bytes.toBytes("q");
108
109  private static byte[] ROW = Bytes.toBytes("testRow");
110  private static final int INITIAL_REGION_NUM = 10;
111  private static final int ROWSIZE = 200;
112  private static byte[][] ROWS = makeN(ROW, ROWSIZE);
113
114  private static int waitTime = 60 * 1000;
115
116  static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
117
118  private static HMaster MASTER;
119  private static Admin ADMIN;
120
121  @BeforeClass
122  public static void beforeAllTests() throws Exception {
123    // Start a cluster
124    StartMiniClusterOption option = StartMiniClusterOption.builder()
125        .masterClass(MyMaster.class).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
126    TEST_UTIL.startMiniCluster(option);
127    MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
128    MASTER = cluster.getMaster();
129    MASTER.balanceSwitch(false);
130    ADMIN = TEST_UTIL.getConnection().getAdmin();
131  }
132
133  @AfterClass
134  public static void afterAllTests() throws Exception {
135    TEST_UTIL.shutdownMiniCluster();
136    if (ADMIN != null) {
137      ADMIN.close();
138    }
139  }
140
141  @Test
142  public void testWholesomeMerge() throws Exception {
143    LOG.info("Starting " + name.getMethodName());
144    final TableName tableName = TableName.valueOf(name.getMethodName());
145
146    try {
147      // Create table and load data.
148      Table table = createTableAndLoadData(MASTER, tableName);
149      // Merge 1st and 2nd region
150      mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
151
152      // Merge 2nd and 3th region
153      PairOfSameType<RegionInfo> mergedRegions =
154        mergeRegionsAndVerifyRegionNum(MASTER, tableName, 1, 2, INITIAL_REGION_NUM - 2);
155
156      verifyRowCount(table, ROWSIZE);
157
158      // Randomly choose one of the two merged regions
159      RegionInfo hri = RandomUtils.nextBoolean() ? mergedRegions.getFirst() : mergedRegions.getSecond();
160      MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
161      AssignmentManager am = cluster.getMaster().getAssignmentManager();
162      RegionStates regionStates = am.getRegionStates();
163
164      // We should not be able to assign it again
165      am.assign(hri);
166      assertFalse("Merged region can't be assigned", regionStates.isRegionInTransition(hri));
167
168      // We should not be able to unassign it either
169      am.unassign(hri);
170      assertFalse("Merged region can't be unassigned", regionStates.isRegionInTransition(hri));
171
172      table.close();
173    } finally {
174      TEST_UTIL.deleteTable(tableName);
175    }
176  }
177
178  /**
179   * Not really restarting the master. Simulate it by clear of new region
180   * state since it is not persisted, will be lost after master restarts.
181   */
182  @Test
183  public void testMergeAndRestartingMaster() throws Exception {
184    final TableName tableName = TableName.valueOf(name.getMethodName());
185
186    try {
187      // Create table and load data.
188      Table table = createTableAndLoadData(MASTER, tableName);
189
190      try {
191        MyMasterRpcServices.enabled.set(true);
192
193        // Merge 1st and 2nd region
194        mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
195      } finally {
196        MyMasterRpcServices.enabled.set(false);
197      }
198
199      table.close();
200    } finally {
201      TEST_UTIL.deleteTable(tableName);
202    }
203  }
204
205  @Test
206  public void testCleanMergeReference() throws Exception {
207    LOG.info("Starting " + name.getMethodName());
208    ADMIN.enableCatalogJanitor(false);
209    final TableName tableName = TableName.valueOf(name.getMethodName());
210    try {
211      // Create table and load data.
212      Table table = createTableAndLoadData(MASTER, tableName);
213      // Merge 1st and 2nd region
214      mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 1, INITIAL_REGION_NUM - 1);
215      verifyRowCount(table, ROWSIZE);
216      table.close();
217
218      List<Pair<RegionInfo, ServerName>> tableRegions = MetaTableAccessor
219          .getTableRegionsAndLocations(MASTER.getConnection(), tableName);
220      RegionInfo mergedRegionInfo = tableRegions.get(0).getFirst();
221      TableDescriptor tableDescriptor = MASTER.getTableDescriptors().get(
222          tableName);
223      Result mergedRegionResult = MetaTableAccessor.getRegionResult(
224        MASTER.getConnection(), mergedRegionInfo.getRegionName());
225
226      // contains merge reference in META
227      assertTrue(MetaTableAccessor.hasMergeRegions(mergedRegionResult.rawCells()));
228
229      // merging regions' directory are in the file system all the same
230      List<RegionInfo> p = MetaTableAccessor.getMergeRegions(mergedRegionResult.rawCells());
231      RegionInfo regionA = p.get(0);
232      RegionInfo regionB = p.get(1);
233      FileSystem fs = MASTER.getMasterFileSystem().getFileSystem();
234      Path rootDir = MASTER.getMasterFileSystem().getRootDir();
235
236      Path tabledir = CommonFSUtils.getTableDir(rootDir, mergedRegionInfo.getTable());
237      Path regionAdir = new Path(tabledir, regionA.getEncodedName());
238      Path regionBdir = new Path(tabledir, regionB.getEncodedName());
239      assertTrue(fs.exists(regionAdir));
240      assertTrue(fs.exists(regionBdir));
241
242      ColumnFamilyDescriptor[] columnFamilies = tableDescriptor.getColumnFamilies();
243      HRegionFileSystem hrfs = new HRegionFileSystem(
244        TEST_UTIL.getConfiguration(), fs, tabledir, mergedRegionInfo);
245      int count = 0;
246      for(ColumnFamilyDescriptor colFamily : columnFamilies) {
247        count += hrfs.getStoreFiles(colFamily.getName()).size();
248      }
249      ADMIN.compactRegion(mergedRegionInfo.getRegionName());
250      // clean up the merged region store files
251      // wait until merged region have reference file
252      long timeout = System.currentTimeMillis() + waitTime;
253      int newcount = 0;
254      while (System.currentTimeMillis() < timeout) {
255        for(ColumnFamilyDescriptor colFamily : columnFamilies) {
256          newcount += hrfs.getStoreFiles(colFamily.getName()).size();
257        }
258        if(newcount > count) {
259          break;
260        }
261        Thread.sleep(50);
262      }
263      assertTrue(newcount > count);
264      List<RegionServerThread> regionServerThreads = TEST_UTIL.getHBaseCluster()
265          .getRegionServerThreads();
266      for (RegionServerThread rs : regionServerThreads) {
267        CompactedHFilesDischarger cleaner = new CompactedHFilesDischarger(100, null,
268            rs.getRegionServer(), false);
269        cleaner.chore();
270        Thread.sleep(1000);
271      }
272      while (System.currentTimeMillis() < timeout) {
273        int newcount1 = 0;
274        for(ColumnFamilyDescriptor colFamily : columnFamilies) {
275          newcount1 += hrfs.getStoreFiles(colFamily.getName()).size();
276        }
277        if(newcount1 <= 1) {
278          break;
279        }
280        Thread.sleep(50);
281      }
282      // run CatalogJanitor to clean merge references in hbase:meta and archive the
283      // files of merging regions
284      int cleaned = 0;
285      while (cleaned == 0) {
286        cleaned = ADMIN.runCatalogScan();
287        LOG.debug("catalog janitor returned " + cleaned);
288        Thread.sleep(50);
289        // Cleanup is async so wait till all procedures are done running.
290        ProcedureTestingUtility.waitNoProcedureRunning(
291            TEST_UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor());
292      }
293      // We used to check for existence of region in fs but sometimes the region dir was
294      // cleaned up by the time we got here making the test sometimes flakey.
295      assertTrue(cleaned > 0);
296
297      // Wait around a bit to give stuff a chance to complete.
298      while (true) {
299        mergedRegionResult = MetaTableAccessor
300          .getRegionResult(TEST_UTIL.getConnection(), mergedRegionInfo.getRegionName());
301        if (MetaTableAccessor.hasMergeRegions(mergedRegionResult.rawCells())) {
302          LOG.info("Waiting on cleanup of merge columns {}",
303            Arrays.asList(mergedRegionResult.rawCells()).stream().
304              map(c -> c.toString()).collect(Collectors.joining(",")));
305          Threads.sleep(50);
306        } else {
307          break;
308        }
309      }
310      assertFalse(MetaTableAccessor.hasMergeRegions(mergedRegionResult.rawCells()));
311    } finally {
312      ADMIN.enableCatalogJanitor(true);
313      TEST_UTIL.deleteTable(tableName);
314    }
315  }
316
317  /**
318   * This test tests 1, merging region not online;
319   * 2, merging same two regions; 3, merging unknown regions.
320   * They are in one test case so that we don't have to create
321   * many tables, and these tests are simple.
322   */
323  @Test
324  public void testMerge() throws Exception {
325    LOG.info("Starting " + name.getMethodName());
326    final TableName tableName = TableName.valueOf(name.getMethodName());
327    final Admin admin = TEST_UTIL.getAdmin();
328    final int syncWaitTimeout = 10 * 60000; // 10min
329
330    try {
331      // Create table and load data.
332      Table table = createTableAndLoadData(MASTER, tableName);
333      AssignmentManager am = MASTER.getAssignmentManager();
334      List<RegionInfo> regions = am.getRegionStates().getRegionsOfTable(tableName);
335      // Fake offline one region
336      RegionInfo a = regions.get(0);
337      RegionInfo b = regions.get(1);
338      am.unassign(b);
339      am.offlineRegion(b);
340      try {
341        // Merge offline region. Region a is offline here
342        admin.mergeRegionsAsync(a.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), false)
343                .get(syncWaitTimeout, TimeUnit.MILLISECONDS);
344        fail("Offline regions should not be able to merge");
345      } catch (DoNotRetryRegionException ie) {
346        System.out.println(ie);
347        assertTrue(ie instanceof MergeRegionException);
348      }
349
350      try {
351        // Merge the same region: b and b.
352        FutureUtils
353          .get(admin.mergeRegionsAsync(b.getEncodedNameAsBytes(), b.getEncodedNameAsBytes(), true));
354        fail("A region should not be able to merge with itself, even forcfully");
355      } catch (IOException ie) {
356        assertTrue("Exception should mention regions not online",
357          StringUtils.stringifyException(ie).contains("region to itself")
358            && ie instanceof MergeRegionException);
359      }
360
361      try {
362        // Merge unknown regions
363        admin.mergeRegionsAsync(Bytes.toBytes("-f1"), Bytes.toBytes("-f2"), true);
364        fail("Unknown region could not be merged");
365      } catch (IOException ie) {
366        assertTrue("UnknownRegionException should be thrown",
367          ie instanceof UnknownRegionException);
368      }
369      table.close();
370    } finally {
371      TEST_UTIL.deleteTable(tableName);
372    }
373  }
374
375  @Test
376  public void testMergeWithReplicas() throws Exception {
377    final TableName tableName = TableName.valueOf(name.getMethodName());
378    try {
379      // Create table and load data.
380      Table table = createTableAndLoadData(MASTER, tableName, 5, 2);
381      List<Pair<RegionInfo, ServerName>> initialRegionToServers =
382        MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName);
383      // Merge 1st and 2nd region
384      PairOfSameType<RegionInfo> mergedRegions =
385        mergeRegionsAndVerifyRegionNum(MASTER, tableName, 0, 2, 5 * 2 - 2);
386      List<Pair<RegionInfo, ServerName>> currentRegionToServers =
387        MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tableName);
388      List<RegionInfo> initialRegions = new ArrayList<>();
389      for (Pair<RegionInfo, ServerName> p : initialRegionToServers) {
390        initialRegions.add(p.getFirst());
391      }
392      List<RegionInfo> currentRegions = new ArrayList<>();
393      for (Pair<RegionInfo, ServerName> p : currentRegionToServers) {
394        currentRegions.add(p.getFirst());
395      }
396      assertTrue(initialRegions.contains(mergedRegions.getFirst())); //this is the first region
397      assertTrue(initialRegions.contains(RegionReplicaUtil
398        .getRegionInfoForReplica(mergedRegions.getFirst(), 1))); //this is the replica of the first region
399      assertTrue(initialRegions.contains(mergedRegions.getSecond())); //this is the second region
400      assertTrue(initialRegions.contains(RegionReplicaUtil
401        .getRegionInfoForReplica(mergedRegions.getSecond(), 1))); //this is the replica of the second region
402      assertTrue(!initialRegions.contains(currentRegions.get(0))); //this is the new region
403      assertTrue(!initialRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1))); //replica of the new region
404      assertTrue(currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(currentRegions.get(0), 1))); //replica of the new region
405      assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getFirst(), 1))); //replica of the merged region
406      assertTrue(!currentRegions.contains(RegionReplicaUtil.getRegionInfoForReplica(mergedRegions.getSecond(), 1))); //replica of the merged region
407      table.close();
408    } finally {
409      TEST_UTIL.deleteTable(tableName);
410    }
411  }
412
413  private PairOfSameType<RegionInfo> mergeRegionsAndVerifyRegionNum(
414      HMaster master, TableName tablename,
415      int regionAnum, int regionBnum, int expectedRegionNum) throws Exception {
416    PairOfSameType<RegionInfo> mergedRegions =
417      requestMergeRegion(master, tablename, regionAnum, regionBnum);
418    waitAndVerifyRegionNum(master, tablename, expectedRegionNum);
419    return mergedRegions;
420  }
421
422  private PairOfSameType<RegionInfo> requestMergeRegion(
423      HMaster master, TableName tablename,
424      int regionAnum, int regionBnum) throws Exception {
425    List<Pair<RegionInfo, ServerName>> tableRegions = MetaTableAccessor
426        .getTableRegionsAndLocations(
427            TEST_UTIL.getConnection(), tablename);
428    RegionInfo regionA = tableRegions.get(regionAnum).getFirst();
429    RegionInfo regionB = tableRegions.get(regionBnum).getFirst();
430    ADMIN.mergeRegionsAsync(
431      regionA.getEncodedNameAsBytes(),
432      regionB.getEncodedNameAsBytes(), false);
433    return new PairOfSameType<>(regionA, regionB);
434  }
435
436  private void waitAndVerifyRegionNum(HMaster master, TableName tablename,
437      int expectedRegionNum) throws Exception {
438    List<Pair<RegionInfo, ServerName>> tableRegionsInMeta;
439    List<RegionInfo> tableRegionsInMaster;
440    long timeout = System.currentTimeMillis() + waitTime;
441    while (System.currentTimeMillis() < timeout) {
442      tableRegionsInMeta =
443          MetaTableAccessor.getTableRegionsAndLocations(TEST_UTIL.getConnection(), tablename);
444      tableRegionsInMaster =
445          master.getAssignmentManager().getRegionStates().getRegionsOfTable(tablename);
446      LOG.info(Objects.toString(tableRegionsInMaster));
447      LOG.info(Objects.toString(tableRegionsInMeta));
448      int tableRegionsInMetaSize = tableRegionsInMeta.size();
449      int tableRegionsInMasterSize = tableRegionsInMaster.size();
450      if (tableRegionsInMetaSize == expectedRegionNum
451          && tableRegionsInMasterSize == expectedRegionNum) {
452        break;
453      }
454      Thread.sleep(250);
455    }
456
457    tableRegionsInMeta = MetaTableAccessor.getTableRegionsAndLocations(
458        TEST_UTIL.getConnection(), tablename);
459    LOG.info("Regions after merge:" + Joiner.on(',').join(tableRegionsInMeta));
460    assertEquals(expectedRegionNum, tableRegionsInMeta.size());
461  }
462
463  private Table createTableAndLoadData(HMaster master, TableName tablename)
464      throws Exception {
465    return createTableAndLoadData(master, tablename, INITIAL_REGION_NUM, 1);
466  }
467
468  private Table createTableAndLoadData(HMaster master, TableName tablename,
469      int numRegions, int replication) throws Exception {
470    assertTrue("ROWSIZE must > numregions:" + numRegions, ROWSIZE > numRegions);
471    byte[][] splitRows = new byte[numRegions - 1][];
472    for (int i = 0; i < splitRows.length; i++) {
473      splitRows[i] = ROWS[(i + 1) * ROWSIZE / numRegions];
474    }
475
476    Table table = TEST_UTIL.createTable(tablename, FAMILYNAME, splitRows);
477    LOG.info("Created " + table.getName());
478    if (replication > 1) {
479      HBaseTestingUtility.setReplicas(ADMIN, tablename, replication);
480      LOG.info("Set replication of " + replication + " on " + table.getName());
481    }
482    loadData(table);
483    LOG.info("Loaded " + table.getName());
484    verifyRowCount(table, ROWSIZE);
485    LOG.info("Verified " + table.getName());
486
487    List<Pair<RegionInfo, ServerName>> tableRegions;
488    TEST_UTIL.waitUntilAllRegionsAssigned(tablename);
489    LOG.info("All regions assigned for table - " + table.getName());
490    tableRegions = MetaTableAccessor.getTableRegionsAndLocations(
491        TEST_UTIL.getConnection(), tablename);
492    assertEquals("Wrong number of regions in table " + tablename,
493        numRegions * replication, tableRegions.size());
494    LOG.info(tableRegions.size() + "Regions after load: " + Joiner.on(',').join(tableRegions));
495    assertEquals(numRegions * replication, tableRegions.size());
496    return table;
497  }
498
499  private static byte[][] makeN(byte[] base, int n) {
500    byte[][] ret = new byte[n][];
501    for (int i = 0; i < n; i++) {
502      ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%04d", i)));
503    }
504    return ret;
505  }
506
507  private void loadData(Table table) throws IOException {
508    for (int i = 0; i < ROWSIZE; i++) {
509      Put put = new Put(ROWS[i]);
510      put.addColumn(FAMILYNAME, QUALIFIER, Bytes.toBytes(i));
511      table.put(put);
512    }
513  }
514
515  private void verifyRowCount(Table table, int expectedRegionNum)
516      throws IOException {
517    ResultScanner scanner = table.getScanner(new Scan());
518    int rowCount = 0;
519    while (scanner.next() != null) {
520      rowCount++;
521    }
522    assertEquals(expectedRegionNum, rowCount);
523    scanner.close();
524  }
525
526  // Make it public so that JVMClusterUtil can access it.
527  public static class MyMaster extends HMaster {
528    public MyMaster(Configuration conf) throws IOException, KeeperException, InterruptedException {
529      super(conf);
530    }
531
532    @Override
533    protected RSRpcServices createRpcServices() throws IOException {
534      return new MyMasterRpcServices(this);
535    }
536  }
537
538  static class MyMasterRpcServices extends MasterRpcServices {
539    static AtomicBoolean enabled = new AtomicBoolean(false);
540
541    private HMaster myMaster;
542    public MyMasterRpcServices(HMaster master) throws IOException {
543      super(master);
544      myMaster = master;
545    }
546
547    @Override
548    public ReportRegionStateTransitionResponse reportRegionStateTransition(RpcController c,
549        ReportRegionStateTransitionRequest req) throws ServiceException {
550      ReportRegionStateTransitionResponse resp = super.reportRegionStateTransition(c, req);
551      if (enabled.get() && req.getTransition(0).getTransitionCode()
552          == TransitionCode.READY_TO_MERGE && !resp.hasErrorMessage()) {
553        RegionStates regionStates = myMaster.getAssignmentManager().getRegionStates();
554        for (RegionState regionState: regionStates.getRegionsStateInTransition()) {
555          // Find the merging_new region and remove it
556          if (regionState.isMergingNew()) {
557            regionStates.deleteRegion(regionState.getRegion());
558          }
559        }
560      }
561      return resp;
562    }
563  }
564}