001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import static org.junit.jupiter.api.Assertions.assertNotNull; 021import static org.junit.jupiter.api.Assertions.assertTrue; 022 023import java.io.IOException; 024import java.util.Collection; 025import java.util.List; 026import java.util.Optional; 027import java.util.concurrent.BrokenBarrierException; 028import java.util.concurrent.CyclicBarrier; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.DoNotRetryIOException; 034import org.apache.hadoop.hbase.HBaseTestingUtil; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.NotServingRegionException; 037import org.apache.hadoop.hbase.SingleProcessHBaseCluster; 038import org.apache.hadoop.hbase.StartTestingClusterOption; 039import org.apache.hadoop.hbase.TableName; 040import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 041import org.apache.hadoop.hbase.client.Put; 042import org.apache.hadoop.hbase.client.RegionInfo; 043import org.apache.hadoop.hbase.client.RegionReplicaUtil; 044import org.apache.hadoop.hbase.client.TableDescriptor; 045import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 046import org.apache.hadoop.hbase.monitoring.MonitoredTask; 047import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 048import org.apache.hadoop.hbase.regionserver.HRegion; 049import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 050import org.apache.hadoop.hbase.regionserver.HRegionServer; 051import org.apache.hadoop.hbase.regionserver.HStore; 052import org.apache.hadoop.hbase.regionserver.MemStoreFlusher; 053import org.apache.hadoop.hbase.regionserver.RSRpcServices; 054import org.apache.hadoop.hbase.regionserver.Region; 055import org.apache.hadoop.hbase.regionserver.RegionServerServices; 056import org.apache.hadoop.hbase.testclassification.LargeTests; 057import org.apache.hadoop.hbase.testclassification.RegionServerTests; 058import org.apache.hadoop.hbase.util.Bytes; 059import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 060import org.apache.hadoop.hbase.wal.WAL; 061import org.junit.jupiter.api.AfterAll; 062import org.junit.jupiter.api.BeforeAll; 063import org.junit.jupiter.api.Tag; 064import org.junit.jupiter.api.Test; 065 066import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 067import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 068import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 069import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; 070 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 075 076@Tag(RegionServerTests.TAG) 077@Tag(LargeTests.TAG) 078public class TestRegionReplicationForFlushMarker { 079 080 private static final byte[] FAMILY = Bytes.toBytes("family_test"); 081 082 private static final byte[] QUAL = Bytes.toBytes("qualifier_test"); 083 084 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 085 private static final int NB_SERVERS = 2; 086 087 private static TableName tableName = TableName.valueOf("TestRegionReplicationForFlushMarker"); 088 private static volatile boolean startTest = false; 089 090 @BeforeAll 091 public static void setUp() throws Exception { 092 Configuration conf = HTU.getConfiguration(); 093 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 094 conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); 095 conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1); 096 conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000); 097 conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 098 conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000); 099 conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 100 conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); 101 conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3); 102 HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class) 103 .numRegionServers(NB_SERVERS).build()); 104 105 } 106 107 @AfterAll 108 public static void tearDown() throws Exception { 109 HTU.shutdownMiniCluster(); 110 } 111 112 /** 113 * This test is for HBASE-26960, before HBASE-26960, {@link MemStoreFlusher} does not write the 114 * {@link FlushAction#CANNOT_FLUSH} marker to the WAL when the memstore is empty,so if the 115 * {@link RegionReplicationSink} request a flush when the memstore is empty, it could not receive 116 * the {@link FlushAction#CANNOT_FLUSH} and the replication may be hanged. After HBASE-26768,when 117 * the {@link RegionReplicationSink} request a flush when the memstore is empty,even it does not 118 * writes the {@link FlushAction#CANNOT_FLUSH} marker to the WAL,we also replicate the 119 * {@link FlushAction#CANNOT_FLUSH} marker to the secondary region replica. 120 */ 121 @Test 122 public void testCannotFlushMarker() throws Exception { 123 final HRegionForTest[] regions = this.createTable(); 124 RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); 125 assertTrue(regionReplicationSink != null); 126 127 String oldThreadName = Thread.currentThread().getName(); 128 Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME); 129 try { 130 131 byte[] rowKey1 = Bytes.toBytes(1); 132 startTest = true; 133 /** 134 * Write First cell,replicating to secondary replica is error,and then 135 * {@link RegionReplicationSink} request flush,after {@link RegionReplicationSink} receiving 136 * the {@link FlushAction#START_FLUSH},the {@link RegionReplicationSink#failedReplicas} is 137 * cleared,but replicating {@link FlushAction#START_FLUSH} is failed again,so 138 * {@link RegionReplicationSink} request flush once more, but now memstore is empty,so the 139 * {@link MemStoreFlusher} just write a {@link FlushAction#CANNOT_FLUSH} marker to the WAL. 140 */ 141 regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 142 /** 143 * Wait for the {@link FlushAction#CANNOT_FLUSH} is written and initiating replication 144 */ 145 regions[0].cyclicBarrier.await(); 146 assertTrue(regions[0].prepareFlushCounter.get() == 2); 147 /** 148 * The {@link RegionReplicationSink#failedReplicas} is cleared by the 149 * {@link FlushAction#CANNOT_FLUSH} marker. 150 */ 151 assertTrue(regionReplicationSink.getFailedReplicas().isEmpty()); 152 } finally { 153 startTest = false; 154 Thread.currentThread().setName(oldThreadName); 155 } 156 } 157 158 private HRegionForTest[] createTable() throws Exception { 159 TableDescriptor tableDescriptor = 160 TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS) 161 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build(); 162 HTU.getAdmin().createTable(tableDescriptor); 163 final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; 164 for (int i = 0; i < NB_SERVERS; i++) { 165 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 166 List<HRegion> onlineRegions = rs.getRegions(tableName); 167 for (HRegion region : onlineRegions) { 168 int replicaId = region.getRegionInfo().getReplicaId(); 169 assertTrue(regions[replicaId] == null); 170 regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; 171 } 172 } 173 for (Region region : regions) { 174 assertNotNull(region); 175 } 176 return regions; 177 } 178 179 public static final class HRegionForTest extends HRegion { 180 static final String USER_THREAD_NAME = "TestRegionReplicationForFlushMarker"; 181 final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); 182 final AtomicInteger prepareFlushCounter = new AtomicInteger(0); 183 184 public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, 185 TableDescriptor htd, RegionServerServices rsServices) { 186 super(fs, wal, confParam, htd, rsServices); 187 } 188 189 @SuppressWarnings("deprecation") 190 public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 191 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 192 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 193 } 194 195 public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) { 196 this.regionReplicationSink = Optional.of(regionReplicationSink); 197 } 198 199 @Override 200 protected void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException { 201 // not write the region open marker to interrupt the test. 202 } 203 204 @Override 205 protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, 206 Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, 207 FlushLifeCycleTracker tracker) throws IOException { 208 if (!startTest) { 209 return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, 210 writeFlushWalMarker, tracker); 211 } 212 213 if (this.getRegionInfo().getReplicaId() != 0) { 214 return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status, 215 writeFlushWalMarker, tracker); 216 } 217 218 try { 219 PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush, 220 status, writeFlushWalMarker, tracker); 221 this.prepareFlushCounter.incrementAndGet(); 222 /** 223 * First flush is {@link FlushAction#START_FLUSH} marker and the second flush is 224 * {@link FlushAction#CANNOT_FLUSH} marker because the memstore is empty. 225 */ 226 if ( 227 this.prepareFlushCounter.get() == 2 && result.getResult() != null 228 && result.getResult().getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY 229 ) { 230 231 cyclicBarrier.await(); 232 } 233 return result; 234 } catch (BrokenBarrierException | InterruptedException e) { 235 throw new RuntimeException(e); 236 } 237 238 } 239 } 240 241 public static final class ErrorReplayRSRpcServices extends RSRpcServices { 242 private static final AtomicInteger callCounter = new AtomicInteger(0); 243 244 public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException { 245 super(rs); 246 } 247 248 @Override 249 public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController, 250 ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException { 251 252 if (!startTest) { 253 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 254 } 255 256 List<WALEntry> entries = replicateWALEntryRequest.getEntryList(); 257 if (CollectionUtils.isEmpty(entries)) { 258 return ReplicateWALEntryResponse.getDefaultInstance(); 259 } 260 ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); 261 262 HRegion region; 263 try { 264 region = server.getRegionByEncodedName(regionName.toStringUtf8()); 265 } catch (NotServingRegionException e) { 266 throw new ServiceException(e); 267 } 268 269 if ( 270 !region.getRegionInfo().getTable().equals(tableName) 271 || region.getRegionInfo().getReplicaId() != 1 272 ) { 273 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 274 } 275 276 /** 277 * Simulate the first cell write and {@link FlushAction#START_FLUSH} marker replicating error. 278 */ 279 int count = callCounter.incrementAndGet(); 280 if (count > 2) { 281 return super.replicateToReplica(rpcController, replicateWALEntryRequest); 282 } 283 throw new ServiceException(new DoNotRetryIOException("Inject error!")); 284 } 285 } 286 287 public static final class RSForTest 288 extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer { 289 290 public RSForTest(Configuration conf) throws IOException, InterruptedException { 291 super(conf); 292 } 293 294 @Override 295 protected RSRpcServices createRpcServices() throws IOException { 296 return new ErrorReplayRSRpcServices(this); 297 } 298 } 299 300}