001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import static org.junit.jupiter.api.Assertions.assertNotNull; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.CyclicBarrier; 028import java.util.concurrent.atomic.AtomicBoolean; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.NotServingRegionException; 037import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 038import org.apache.hadoop.hbase.StartTestingClusterOption; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionReplicaUtil; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.monitoring.MonitoredTask; 047import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.HStore; 052import org.apache.hadoop.hbase.regionserver.RSRpcServices; 053import org.apache.hadoop.hbase.regionserver.Region; 054import org.apache.hadoop.hbase.regionserver.RegionServerServices; 055import org.apache.hadoop.hbase.testclassification.LargeTests; 056import org.apache.hadoop.hbase.testclassification.RegionServerTests; 057import org.apache.hadoop.hbase.util.Bytes; 058import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 059import org.apache.hadoop.hbase.wal.WAL; 060import org.junit.jupiter.api.AfterAll; 061import org.junit.jupiter.api.BeforeAll; 062import org.junit.jupiter.api.Tag; 063import org.junit.jupiter.api.Test; 064import org.mockito.Mockito; 065 066import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 067import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 068import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 069import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 070 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 074 075@Tag(RegionServerTests.TAG) 076@Tag(LargeTests.TAG) 077public class TestRegionReplicationSinkCallbackAndFlushConcurrently { 078 079 private static final byte[] FAMILY = Bytes.toBytes("family_test"); 080 081 private static final byte[] QUAL = Bytes.toBytes("qualifier_test"); 082 083 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 084 private static final int NB_SERVERS = 2; 085 086 private static TableName tableName = TableName.valueOf("testRegionReplicationSinkSuspend"); 087 private static volatile boolean startTest = false; 088 089 @BeforeAll 090 public static void setUp() throws Exception { 091 Configuration conf = HTU.getConfiguration(); 092 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 093 conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); 094 conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 15); 095 conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000); 096 conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 097 conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000); 098 conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 099 conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); 100 HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class) 101 .numRegionServers(NB_SERVERS).build()); 102 103 } 104 105 @AfterAll 106 public static void tearDown() throws Exception { 107 HTU.shutdownMiniCluster(); 108 } 109 110 /** 111 * This test is for HBASE-26768,test the case that we have already clear the 112 * {@link RegionReplicationSink#failedReplicas} due to a flush all edit,which may in flusher 113 * thread,and then in the callback of replay, which may in Netty nioEventLoop,we add a replica to 114 * the {@link RegionReplicationSink#failedReplicas} because of a failure of replicating. 115 */ 116 @Test 117 public void test() throws Exception { 118 final HRegionForTest[] regions = this.createTable(); 119 final AtomicBoolean completedRef = new AtomicBoolean(false); 120 RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); 121 assertTrue(regionReplicationSink != null); 122 123 RegionReplicationSink spiedRegionReplicationSink = 124 this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], completedRef); 125 126 String oldThreadName = Thread.currentThread().getName(); 127 Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME); 128 try { 129 startTest = true; 130 /** 131 * Write First cell,replicating to secondary replica is error. 132 */ 133 byte[] rowKey1 = Bytes.toBytes(1); 134 135 regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 136 regions[0].flushcache(true, true, FlushLifeCycleTracker.DUMMY); 137 138 HTU.waitFor(120000, () -> completedRef.get()); 139 assertTrue(spiedRegionReplicationSink.getFailedReplicas().isEmpty()); 140 } finally { 141 startTest = false; 142 Thread.currentThread().setName(oldThreadName); 143 } 144 } 145 146 private RegionReplicationSink setUpSpiedRegionReplicationSink( 147 final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion, 148 final AtomicBoolean completedRef) { 149 final AtomicInteger onCompleteCounter = new AtomicInteger(0); 150 final AtomicInteger getStartFlushAllDescriptorCounter = new AtomicInteger(0); 151 RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink); 152 153 Mockito.doAnswer((invocationOnMock) -> { 154 if (!startTest) { 155 invocationOnMock.callRealMethod(); 156 return null; 157 } 158 int count = onCompleteCounter.incrementAndGet(); 159 if (count == 1) { 160 primaryRegion.cyclicBarrier.await(); 161 invocationOnMock.callRealMethod(); 162 completedRef.set(true); 163 return null; 164 } 165 invocationOnMock.callRealMethod(); 166 return null; 167 }).when(spiedRegionReplicationSink).onComplete(Mockito.anyList(), Mockito.anyMap()); 168 169 Mockito.doAnswer((invocationOnMock) -> { 170 if (!startTest) { 171 return invocationOnMock.callRealMethod(); 172 } 173 if ( 174 primaryRegion.prepareFlush 175 && Thread.currentThread().getName().equals(HRegionForTest.USER_THREAD_NAME) 176 ) { 177 int count = getStartFlushAllDescriptorCounter.incrementAndGet(); 178 if (count == 1) { 179 // onComplete could execute 180 primaryRegion.cyclicBarrier.await(); 181 return invocationOnMock.callRealMethod(); 182 } 183 } 184 return invocationOnMock.callRealMethod(); 185 }).when(spiedRegionReplicationSink).getStartFlushAllDescriptor(Mockito.any()); 186 187 primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink); 188 return spiedRegionReplicationSink; 189 } 190 191 private HRegionForTest[] createTable() throws Exception { 192 TableDescriptor tableDescriptor = 193 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS) 194 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 195 HTU.getAdmin().createTable(tableDescriptor); 196 final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; 197 for (int i = 0; i < NB_SERVERS; i++) { 198 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 199 List<HRegion> onlineRegions = rs.getRegions(tableName); 200 for (HRegion region : onlineRegions) { 201 int replicaId = region.getRegionInfo().getReplicaId(); 202 assertTrue(regions[replicaId] == null); 203 regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; 204 } 205 } 206 for (Region region : regions) { 207 assertNotNull(region); 208 } 209 return regions; 210 } 211 212 public static final class HRegionForTest extends HRegion { 213 static final String USER_THREAD_NAME = "TestReplicationHang"; 214 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 215 volatile boolean prepareFlush = false; 216 217 public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, 218 TableDescriptor htd, RegionServerServices rsServices) { 219 super(fs, wal, confParam, htd, rsServices); 220 } 221 222 @SuppressWarnings("deprecation") 223 public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 224 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 225 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 226 } 227 228 public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) { 229 this.regionReplicationSink = Optional.of(regionReplicationSink); 230 } 231 232 @Override 233 protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, 234 Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, 235 FlushLifeCycleTracker tracker) throws IOException { 236 if (!startTest) { 237 return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, 238 writeFlushWalMarker, tracker); 239 } 240 241 if ( 242 this.getRegionInfo().getReplicaId() == 0 243 && Thread.currentThread().getName().equals(USER_THREAD_NAME) 244 ) { 245 this.prepareFlush = true; 246 } 247 try { 248 PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush, 249 status, writeFlushWalMarker, tracker); 250 251 return result; 252 } finally { 253 if ( 254 this.getRegionInfo().getReplicaId() == 0 255 && Thread.currentThread().getName().equals(USER_THREAD_NAME) 256 ) { 257 this.prepareFlush = false; 258 } 259 } 260 261 } 262 } 263 264 public static final class ErrorReplayRSRpcServices extends RSRpcServices { 265 private static final AtomicInteger callCounter = new AtomicInteger(0); 266 267 public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException { 268 super(rs); 269 } 270 271 @Override 272 public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController, 273 ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException { 274 275 if (!startTest) { 276 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 277 } 278 279 List<WALEntry> entries = replicateWALEntryRequest.getEntryList(); 280 if (CollectionUtils.isEmpty(entries)) { 281 return ReplicateWALEntryResponse.getDefaultInstance(); 282 } 283 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 284 285 HRegion region; 286 try { 287 region = server.getRegionByEncodedName(regionName.toStringUtf8()); 288 } catch (NotServingRegionException e) { 289 throw new ServiceException(e); 290 } 291 292 if ( 293 !region.getRegionInfo().getTable().equals(tableName) 294 || region.getRegionInfo().getReplicaId() != 1 295 ) { 296 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 297 } 298 299 /** 300 * Simulate the first cell replicating error. 301 */ 302 int count = callCounter.incrementAndGet(); 303 if (count > 1) { 304 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 305 } 306 throw new ServiceException(new DoNotRetryIOException("Inject error!")); 307 } 308 } 309 310 public static final class RSForTest 311 extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { 312 313 public RSForTest(Configuration conf) throws IOException, InterruptedException { 314 super(conf); 315 } 316 317 @Override 318 protected RSRpcServices createRpcServices() throws IOException { 319 return new ErrorReplayRSRpcServices(this); 320 } 321 } 322}