001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.mockito.ArgumentMatchers.any; 023import static org.mockito.ArgumentMatchers.anyInt; 024import static org.mockito.ArgumentMatchers.anyList; 025import static org.mockito.ArgumentMatchers.anyLong; 026import static org.mockito.Mockito.mock; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.ArrayDeque; 033import java.util.ArrayList; 034import java.util.Arrays; 035import java.util.Collection; 036import java.util.List; 037import java.util.Queue; 038import java.util.UUID; 039import java.util.concurrent.CompletableFuture; 040import org.apache.hadoop.conf.Configuration; 041import org.apache.hadoop.fs.FileSystem; 042import org.apache.hadoop.fs.Path; 043import org.apache.hadoop.hbase.ExtendedCellScanner; 044import org.apache.hadoop.hbase.HBaseTestingUtil; 045import org.apache.hadoop.hbase.HConstants; 046import org.apache.hadoop.hbase.ServerName; 047import org.apache.hadoop.hbase.TableName; 048import org.apache.hadoop.hbase.client.AsyncClusterConnection; 049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 050import org.apache.hadoop.hbase.client.Get; 051import org.apache.hadoop.hbase.client.Put; 052import org.apache.hadoop.hbase.client.RegionInfo; 053import org.apache.hadoop.hbase.client.RegionInfoBuilder; 054import org.apache.hadoop.hbase.client.RegionReplicaUtil; 055import org.apache.hadoop.hbase.client.TableDescriptor; 056import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 057import org.apache.hadoop.hbase.executor.ExecutorService; 058import org.apache.hadoop.hbase.executor.ExecutorType; 059import org.apache.hadoop.hbase.monitoring.MonitoredTask; 060import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; 061import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; 062import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; 063import org.apache.hadoop.hbase.testclassification.MediumTests; 064import org.apache.hadoop.hbase.testclassification.RegionServerTests; 065import org.apache.hadoop.hbase.util.Bytes; 066import org.apache.hadoop.hbase.util.Pair; 067import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; 068import org.apache.hadoop.hbase.wal.WAL; 069import org.apache.hadoop.hbase.wal.WALFactory; 070import org.junit.jupiter.api.AfterAll; 071import org.junit.jupiter.api.AfterEach; 072import org.junit.jupiter.api.BeforeAll; 073import org.junit.jupiter.api.BeforeEach; 074import org.junit.jupiter.api.Tag; 075import org.junit.jupiter.api.Test; 076import org.junit.jupiter.api.TestInfo; 077 078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; 080 081@Tag(RegionServerTests.TAG) 082@Tag(MediumTests.TAG) 083public class TestReplicateToReplica { 084 085 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 086 087 private static byte[] FAMILY = Bytes.toBytes("family"); 088 089 private static byte[] QUAL = Bytes.toBytes("qualifier"); 090 091 private static ExecutorService EXEC; 092 093 private TableName tableName; 094 095 private Path testDir; 096 097 private TableDescriptor td; 098 099 private RegionServerServices rss; 100 101 private AsyncClusterConnection conn; 102 103 private RegionReplicationBufferManager manager; 104 105 private FlushRequester flushRequester; 106 107 private HRegion primary; 108 109 private HRegion secondary; 110 111 private WALFactory walFactory; 112 113 private boolean queueReqAndResps; 114 115 private Queue<Pair<List<WAL.Entry>, CompletableFuture<Void>>> reqAndResps; 116 117 private static List<Put> TO_ADD_AFTER_PREPARE_FLUSH; 118 119 public static final class HRegionForTest extends HRegion { 120 121 public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, 122 TableDescriptor htd, RegionServerServices rsServices) { 123 super(fs, wal, confParam, htd, rsServices); 124 } 125 126 @SuppressWarnings("deprecation") 127 public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, 128 RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { 129 super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); 130 } 131 132 @Override 133 protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, 134 Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, 135 FlushLifeCycleTracker tracker) throws IOException { 136 PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush, 137 status, writeFlushWalMarker, tracker); 138 for (Put put : TO_ADD_AFTER_PREPARE_FLUSH) { 139 put(put); 140 } 141 TO_ADD_AFTER_PREPARE_FLUSH.clear(); 142 return result; 143 } 144 145 } 146 147 @BeforeAll 148 public static void setUpBeforeClass() { 149 Configuration conf = UTIL.getConfiguration(); 150 conf.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1); 151 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); 152 conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true); 153 conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); 154 EXEC = new ExecutorService("test"); 155 EXEC.startExecutorService(EXEC.new ExecutorConfig().setCorePoolSize(1) 156 .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)); 157 ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, 158 MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); 159 } 160 161 @AfterAll 162 public static void tearDownAfterClass() { 163 EXEC.shutdown(); 164 UTIL.cleanupTestDir(); 165 } 166 167 @BeforeEach 168 public void setUp(TestInfo testInfo) throws IOException { 169 TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList<>(); 170 tableName = TableName.valueOf(testInfo.getTestMethod().get().getName()); 171 testDir = UTIL.getDataTestDir(tableName.getNameAsString()); 172 Configuration conf = UTIL.getConfiguration(); 173 conf.set(HConstants.HBASE_DIR, testDir.toString()); 174 175 td = TableDescriptorBuilder.newBuilder(tableName) 176 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2) 177 .setRegionMemStoreReplication(true).build(); 178 179 reqAndResps = new ArrayDeque<>(); 180 queueReqAndResps = true; 181 conn = mock(AsyncClusterConnection.class); 182 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i -> { 183 if (queueReqAndResps) { 184 @SuppressWarnings("unchecked") 185 List<WAL.Entry> entries = i.getArgument(1, List.class); 186 CompletableFuture<Void> future = new CompletableFuture<>(); 187 reqAndResps.add(Pair.newPair(entries, future)); 188 return future; 189 } else { 190 return CompletableFuture.completedFuture(null); 191 } 192 }); 193 194 flushRequester = mock(FlushRequester.class); 195 196 rss = mock(RegionServerServices.class); 197 when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); 198 when(rss.getConfiguration()).thenReturn(conf); 199 when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf)); 200 when(rss.getExecutorService()).thenReturn(EXEC); 201 when(rss.getAsyncClusterConnection()).thenReturn(conn); 202 when(rss.getFlushRequester()).thenReturn(flushRequester); 203 204 manager = new RegionReplicationBufferManager(rss); 205 when(rss.getRegionReplicationBufferManager()).thenReturn(manager); 206 207 RegionInfo primaryHri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); 208 RegionInfo secondaryHri = RegionReplicaUtil.getRegionInfoForReplica(primaryHri, 1); 209 210 walFactory = new WALFactory(conf, UUID.randomUUID().toString()); 211 WAL wal = walFactory.getWAL(primaryHri); 212 primary = HRegion.createHRegion(primaryHri, testDir, conf, td, wal); 213 primary.close(); 214 215 primary = HRegion.openHRegion(testDir, primaryHri, td, wal, conf, rss, null); 216 secondary = HRegion.openHRegion(secondaryHri, td, null, conf, rss, null); 217 218 when(rss.getRegions()).then(i -> { 219 return Arrays.asList(primary, secondary); 220 }); 221 222 // process the open events 223 replicateAll(); 224 } 225 226 @AfterEach 227 public void tearDown() throws IOException { 228 // close region will issue a flush, which will enqueue an edit into the replication sink so we 229 // need to complete it otherwise the test will hang. 230 queueReqAndResps = false; 231 failAll(); 232 HBaseTestingUtil.closeRegionAndWAL(primary); 233 HBaseTestingUtil.closeRegionAndWAL(secondary); 234 if (walFactory != null) { 235 walFactory.close(); 236 } 237 } 238 239 private FlushResult flushPrimary() throws IOException { 240 return primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY); 241 } 242 243 private void replicate(Pair<List<WAL.Entry>, CompletableFuture<Void>> pair) throws IOException { 244 Pair<ReplicateWALEntryRequest, 245 ExtendedCellScanner> params = ReplicationProtobufUtil.buildReplicateWALEntryRequest( 246 pair.getFirst().toArray(new WAL.Entry[0]), 247 secondary.getRegionInfo().getEncodedNameAsBytes(), null, null, null); 248 for (WALEntry entry : params.getFirst().getEntryList()) { 249 secondary.replayWALEntry(entry, params.getSecond()); 250 } 251 pair.getSecond().complete(null); 252 } 253 254 private void replicateOne() throws IOException { 255 replicate(reqAndResps.remove()); 256 } 257 258 private void replicateAll() throws IOException { 259 for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) { 260 pair = reqAndResps.poll(); 261 if (pair == null) { 262 break; 263 } 264 replicate(pair); 265 } 266 } 267 268 private void failOne() { 269 reqAndResps.remove().getSecond().completeExceptionally(new IOException("Inject error")); 270 } 271 272 private void failAll() { 273 for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) { 274 pair = reqAndResps.poll(); 275 if (pair == null) { 276 break; 277 } 278 pair.getSecond().completeExceptionally(new IOException("Inject error")); 279 } 280 } 281 282 @Test 283 public void testNormalReplicate() throws IOException { 284 byte[] row = Bytes.toBytes(0); 285 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 286 replicateOne(); 287 assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 288 } 289 290 @Test 291 public void testNormalFlush() throws IOException { 292 byte[] row = Bytes.toBytes(0); 293 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 294 TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); 295 flushPrimary(); 296 replicateAll(); 297 assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 298 299 // we should have the same memstore size, i.e, the secondary should have also dropped the 300 // snapshot 301 assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize()); 302 } 303 304 @Test 305 public void testErrorBeforeFlushStart() throws IOException { 306 byte[] row = Bytes.toBytes(0); 307 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 308 failOne(); 309 verify(flushRequester, times(1)).requestFlush(any(), anyList(), any()); 310 TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); 311 flushPrimary(); 312 // this also tests start flush with empty memstore at secondary replica side 313 replicateAll(); 314 assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 315 assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize()); 316 } 317 318 @Test 319 public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException { 320 primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 321 replicateAll(); 322 TO_ADD_AFTER_PREPARE_FLUSH 323 .add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); 324 flushPrimary(); 325 // replicate the start flush edit 326 replicateOne(); 327 // fail the remaining edits, the put and the commit flush edit 328 failOne(); 329 verify(flushRequester, times(1)).requestFlush(any(), anyList(), any()); 330 primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3))); 331 flushPrimary(); 332 replicateAll(); 333 for (int i = 0; i < 3; i++) { 334 assertEquals(i + 1, 335 Bytes.toInt(secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL))); 336 } 337 // should have nothing in memstore 338 assertEquals(0, secondary.getMemStoreDataSize()); 339 } 340 341 @Test 342 public void testCatchUpWithCannotFlush() throws IOException, InterruptedException { 343 byte[] row = Bytes.toBytes(0); 344 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 345 failOne(); 346 verify(flushRequester, times(1)).requestFlush(any(), anyList(), any()); 347 flushPrimary(); 348 failAll(); 349 Thread.sleep(2000); 350 // we will request flush the second time 351 verify(flushRequester, times(2)).requestFlush(any(), anyList(), any()); 352 // we can not flush because no content in memstore 353 FlushResult result = flushPrimary(); 354 assertEquals(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.getResult()); 355 // the secondary replica does not have this row yet 356 assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue()); 357 // replicate the can not flush edit 358 replicateOne(); 359 // we should have the row now 360 assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 361 } 362 363 @Test 364 public void testCatchUpWithReopen() throws IOException { 365 byte[] row = Bytes.toBytes(0); 366 primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); 367 failOne(); 368 primary.close(); 369 // the secondary replica does not have this row yet, although the above close has flushed the 370 // data out 371 assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue()); 372 373 // reopen 374 primary = HRegion.openHRegion(testDir, primary.getRegionInfo(), td, primary.getWAL(), 375 UTIL.getConfiguration(), rss, null); 376 replicateAll(); 377 // we should have the row now 378 assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); 379 } 380}