001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import static org.junit.jupiter.api.Assertions.assertFalse; 021import static org.junit.jupiter.api.Assertions.assertNotNull; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023import static org.junit.jupiter.api.Assertions.fail; 024 025import java.io.IOException; 026import java.util.Arrays; 027import java.util.List; 028import java.util.Optional; 029import java.util.concurrent.atomic.AtomicInteger; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.fs.FileSystem; 032import org.apache.hadoop.fs.Path; 033import org.apache.hadoop.hbase.Abortable; 034import org.apache.hadoop.hbase.DoNotRetryIOException; 035import org.apache.hadoop.hbase.HBaseTestingUtil; 036import org.apache.hadoop.hbase.HConstants; 037import org.apache.hadoop.hbase.StartTestingClusterOption; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.Consistency; 041import org.apache.hadoop.hbase.client.Get; 042import org.apache.hadoop.hbase.client.Mutation; 043import org.apache.hadoop.hbase.client.Put; 044import org.apache.hadoop.hbase.client.RegionInfo; 045import org.apache.hadoop.hbase.client.RegionReplicaUtil; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.Table; 048import org.apache.hadoop.hbase.client.TableDescriptor; 049import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 050import org.apache.hadoop.hbase.coprocessor.ObserverContext; 051import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; 052import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; 053import org.apache.hadoop.hbase.coprocessor.RegionObserver; 054import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; 055import org.apache.hadoop.hbase.regionserver.HRegion; 056import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; 057import org.apache.hadoop.hbase.regionserver.HRegionServer; 058import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; 059import org.apache.hadoop.hbase.regionserver.RegionServerServices; 060import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 061import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; 062import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 063import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 064import org.apache.hadoop.hbase.regionserver.wal.WALSyncTimeoutIOException; 065import org.apache.hadoop.hbase.testclassification.LargeTests; 066import org.apache.hadoop.hbase.testclassification.RegionServerTests; 067import org.apache.hadoop.hbase.util.Bytes; 068import org.apache.hadoop.hbase.util.CommonFSUtils; 069import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 070import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; 071import org.apache.hadoop.hbase.wal.WAL; 072import org.apache.hadoop.hbase.wal.WALFactory; 073import org.apache.hadoop.hbase.wal.WALKeyImpl; 074import org.apache.hadoop.hbase.wal.WALProvider; 075import org.junit.jupiter.api.AfterAll; 076import org.junit.jupiter.api.BeforeAll; 077import org.junit.jupiter.api.Tag; 078import org.junit.jupiter.api.Test; 079import org.mockito.Mockito; 080 081import org.apache.hbase.thirdparty.io.netty.channel.Channel; 082import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; 083 084@Tag(RegionServerTests.TAG) 085@Tag(LargeTests.TAG) 086public class TestRegionReplicationForWriteException { 087 088 private static final byte[] FAMILY = Bytes.toBytes("family_test"); 089 090 private static final byte[] QUAL = Bytes.toBytes("qualifier_test"); 091 092 private static final HBaseTestingUtil HTU = new HBaseTestingUtil(); 093 private static final int NB_SERVERS = 2; 094 095 private static TableName tableName = TableName.valueOf("TestRegionReplicationForWriteException"); 096 private static volatile boolean testWALTimout = false; 097 private static volatile boolean testCP = false; 098 private static final long timeoutMIlliseconds = 3000; 099 private static final String USER_THREAD_NAME = tableName.getNameAsString(); 100 101 @BeforeAll 102 public static void setUp() throws Exception { 103 Configuration conf = HTU.getConfiguration(); 104 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 105 conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); 106 conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1); 107 conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000); 108 conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 109 conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000); 110 conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000); 111 conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false); 112 conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3); 113 conf.setClass(WALFactory.WAL_PROVIDER, SlowAsyncFSWALProvider.class, WALProvider.class); 114 conf.setLong(AbstractFSWAL.WAL_SYNC_TIMEOUT_MS, timeoutMIlliseconds); 115 HTU.startMiniCluster(StartTestingClusterOption.builder().numRegionServers(NB_SERVERS).build()); 116 117 } 118 119 @AfterAll 120 public static void tearDown() throws Exception { 121 HTU.shutdownMiniCluster(); 122 } 123 124 /** 125 * This test is for HBASE-27303. 126 */ 127 @Test 128 public void testWriteException() throws Exception { 129 final HRegionForTest[] regions = this.createTable(); 130 RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get(); 131 assertTrue(regionReplicationSink != null); 132 final AtomicInteger replicateCounter = new AtomicInteger(0); 133 this.setUpSpiedRegionReplicationSink(regionReplicationSink, regions[0], replicateCounter); 134 135 String oldThreadName = Thread.currentThread().getName(); 136 Thread.currentThread().setName(USER_THREAD_NAME); 137 try { 138 testCP = true; 139 try { 140 byte[] rowKey1 = Bytes.toBytes(1); 141 byte[] value1 = Bytes.toBytes(3); 142 /** 143 * Write first put,{@link WAL#sync} is successful but {@link RegionObserver#postBatchMutate} 144 * throws exception,the rowkey1 is applied to primary and secondary replicas successfully. 145 */ 146 try { 147 regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, value1)); 148 fail(); 149 } catch (DoNotRetryIOException e) { 150 assertTrue(e.getMessage().equals(MyRegionObserver.ERROR_MESSAGE)); 151 } 152 153 try (Table table = HTU.getConnection().getTable(tableName)) { 154 assertTrue(checkReplica(table, FAMILY, QUAL, rowKey1, value1, 0)); 155 HTU.waitFor(30000, () -> checkReplica(table, FAMILY, QUAL, rowKey1, value1, 1)); 156 } 157 } finally { 158 testCP = false; 159 } 160 161 byte[] rowKey2 = Bytes.toBytes(2); 162 byte[] value2 = Bytes.toBytes(6); 163 replicateCounter.set(0); 164 testWALTimout = true; 165 try { 166 /** 167 * Write second put,the {@link WAL#sync} timeout and throws 168 * {@link WALSyncTimeoutIOException},{@link HRegion#put} is failed and rowKey2 is not 169 * applied to primary and secondary replicas. 170 */ 171 try { 172 regions[0].put(new Put(rowKey2).addColumn(FAMILY, QUAL, value2)); 173 fail(); 174 } catch (WALSyncTimeoutIOException e) { 175 assertTrue(e != null); 176 } 177 178 assertTrue(regions[0].getRSServices().isAborted()); 179 assertTrue(replicateCounter.get() == 0); 180 Thread.sleep(2000); 181 try (Table table = HTU.getConnection().getTable(tableName)) { 182 assertFalse(checkReplica(table, FAMILY, QUAL, rowKey2, value2, 1)); 183 } 184 } finally { 185 testWALTimout = false; 186 } 187 } finally { 188 Thread.currentThread().setName(oldThreadName); 189 } 190 } 191 192 private RegionReplicationSink setUpSpiedRegionReplicationSink( 193 final RegionReplicationSink regionReplicationSink, final HRegionForTest primaryRegion, 194 final AtomicInteger counter) { 195 RegionReplicationSink spiedRegionReplicationSink = Mockito.spy(regionReplicationSink); 196 197 Mockito.doAnswer((invocationOnMock) -> { 198 if (!testWALTimout || !USER_THREAD_NAME.equals(Thread.currentThread().getName())) { 199 invocationOnMock.callRealMethod(); 200 return null; 201 } 202 WALKeyImpl walKey = invocationOnMock.getArgument(0); 203 if (!walKey.getTableName().equals(tableName)) { 204 invocationOnMock.callRealMethod(); 205 return null; 206 } 207 counter.incrementAndGet(); 208 invocationOnMock.callRealMethod(); 209 return null; 210 }).when(spiedRegionReplicationSink).add(Mockito.any(), Mockito.any(), Mockito.any()); 211 primaryRegion.setRegionReplicationSink(spiedRegionReplicationSink); 212 return spiedRegionReplicationSink; 213 } 214 215 private static boolean checkReplica(Table table, byte[] fam, byte[] qual, byte[] rowKey, 216 byte[] expectValue, int replicaId) throws IOException { 217 Get get = new Get(rowKey).setConsistency(Consistency.TIMELINE).setReplicaId(replicaId); 218 Result result = table.get(get); 219 byte[] value = result.getValue(fam, qual); 220 return value != null && value.length > 0 && Arrays.equals(expectValue, value); 221 } 222 223 private HRegionForTest[] createTable() throws Exception { 224 TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(tableName) 225 .setRegionReplication(NB_SERVERS).setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 226 .setCoprocessor(MyRegionObserver.class.getName()).build(); 227 HTU.getAdmin().createTable(tableDescriptor); 228 final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS]; 229 for (int i = 0; i < NB_SERVERS; i++) { 230 HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i); 231 List<HRegion> onlineRegions = rs.getRegions(tableName); 232 for (HRegion region : onlineRegions) { 233 int replicaId = region.getRegionInfo().getReplicaId(); 234 assertTrue(regions[replicaId] == null); 235 regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region; 236 } 237 } 238 for (HRegionForTest region : regions) { 239 assertNotNull(region); 240 } 241 return regions; 242 } 243 244 public static final class HRegionForTest extends HRegion { 245 246 public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, 247 TableDescriptor htd, RegionServerServices rsServices) { 248 super(fs, wal, confParam, htd, rsServices); 249 } 250 251 @SuppressWarnings("deprecation") 252 public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 253 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 254 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 255 } 256 257 public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) { 258 this.regionReplicationSink = Optional.of(regionReplicationSink); 259 } 260 261 public RegionServerServices getRSServices() { 262 return this.rsServices; 263 } 264 } 265 266 public static class SlowAsyncFSWAL extends AsyncFSWAL { 267 268 public SlowAsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, 269 String archiveDir, Configuration conf, List<WALActionsListener> listeners, 270 boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, 271 Class<? extends Channel> channelClass, StreamSlowMonitor monitor) 272 throws FailedLogCloseException, IOException { 273 super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, 274 suffix, null, null, eventLoopGroup, channelClass, monitor); 275 } 276 277 @Override 278 protected void atHeadOfRingBufferEventHandlerAppend() { 279 if (testWALTimout) { 280 try { 281 Thread.sleep(timeoutMIlliseconds + 1000); 282 } catch (InterruptedException e) { 283 throw new RuntimeException(e); 284 } 285 } 286 super.atHeadOfRingBufferEventHandlerAppend(); 287 } 288 289 } 290 291 public static class SlowAsyncFSWALProvider extends AsyncFSWALProvider { 292 293 @Override 294 protected AsyncFSWAL createWAL() throws IOException { 295 return new SlowAsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, 296 CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.getFactoryId()), 297 getWALArchiveDirectoryName(conf, factory.getFactoryId()), conf, listeners, true, logPrefix, 298 META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, 299 channelClass, factory.getExcludeDatanodeManager().getStreamSlowMonitor(providerId)); 300 } 301 302 } 303 304 public static class MyRegionObserver implements RegionCoprocessor, RegionObserver { 305 306 private static final String ERROR_MESSAGE = "Inject error!"; 307 308 @Override 309 public Optional<RegionObserver> getRegionObserver() { 310 return Optional.of(this); 311 } 312 313 @Override 314 public void postBatchMutate(ObserverContext<? extends RegionCoprocessorEnvironment> c, 315 MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { 316 if (!testCP || !RegionReplicaUtil.isDefaultReplica(c.getEnvironment().getRegionInfo())) { 317 return; 318 } 319 throw new DoNotRetryIOException(ERROR_MESSAGE); 320 } 321 } 322 323}