001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.concurrent.CountDownLatch; 024import java.util.concurrent.ForkJoinPool; 025import java.util.concurrent.Future; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HConstants; 028import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; 029import org.apache.hadoop.hbase.procedure2.Procedure; 030import org.apache.hadoop.hbase.regionserver.HRegionServer; 031import org.apache.hadoop.hbase.replication.ReplicationPeer; 032import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; 033import org.apache.hadoop.hbase.replication.TestReplicationBase; 034import org.apache.hadoop.hbase.testclassification.MasterTests; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.junit.jupiter.api.Tag; 037import org.junit.jupiter.api.Test; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; 040 041/** 042 * This UT is used to make sure that we will not accidentally change the way to generate online 043 * servers. See HBASE-25774 and HBASE-25032 for more details. 044 */ 045@Tag(MasterTests.TAG) 046@Tag(MediumTests.TAG) 047public class TestRefreshPeerWhileRegionServerRestarts extends TestReplicationBase { 048 049 private static CountDownLatch ARRIVE; 050 051 private static CountDownLatch RESUME; 052 053 public static final class RegionServerForTest extends HRegionServer { 054 055 public RegionServerForTest(Configuration conf) throws IOException { 056 super(conf); 057 } 058 059 @Override 060 protected void tryRegionServerReport(long reportStartTime, long reportEndTime) 061 throws IOException { 062 if (ARRIVE != null) { 063 ARRIVE.countDown(); 064 ARRIVE = null; 065 try { 066 RESUME.await(); 067 } catch (InterruptedException e) { 068 } 069 } 070 super.tryRegionServerReport(reportStartTime, reportEndTime); 071 } 072 } 073 074 @Test 075 public void testRestart() throws Exception { 076 UTIL1.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, 077 RegionServerForTest.class, HRegionServer.class); 078 CountDownLatch arrive = new CountDownLatch(1); 079 ARRIVE = arrive; 080 RESUME = new CountDownLatch(1); 081 // restart a new region server, and wait until it finish initialization and want to call 082 // regionServerReport, so it will load the peer state to peer cache. 083 Future<HRegionServer> regionServerFuture = ForkJoinPool.commonPool() 084 .submit(() -> UTIL1.getMiniHBaseCluster().startRegionServer().getRegionServer()); 085 ARRIVE.await(); 086 // change the peer state, wait until it reach the last state, where we have already get the 087 // region server list for refreshing 088 Future<Void> future = hbaseAdmin.disableReplicationPeerAsync(PEER_ID2); 089 try { 090 UTIL1.waitFor(30000, () -> { 091 for (Procedure<?> proc : UTIL1.getMiniHBaseCluster().getMaster().getProcedures()) { 092 if (proc instanceof DisablePeerProcedure) { 093 return ((DisablePeerProcedure) proc).getCurrentStateId() 094 == PeerModificationState.POST_PEER_MODIFICATION_VALUE; 095 } 096 } 097 return false; 098 }); 099 } finally { 100 // let the new region server go 101 RESUME.countDown(); 102 } 103 // wait the disable peer operation to finish 104 future.get(); 105 // assert that the peer cache on the new region server has also been refreshed 106 ReplicationPeer peer = regionServerFuture.get().getReplicationSourceService() 107 .getReplicationPeers().getPeer(PEER_ID2); 108 assertEquals(PeerState.DISABLED, peer.getPeerState()); 109 } 110}