001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.regionreplication; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.mockito.ArgumentMatchers.any; 022import static org.mockito.ArgumentMatchers.anyInt; 023import static org.mockito.ArgumentMatchers.anyList; 024import static org.mockito.ArgumentMatchers.anyLong; 025import static org.mockito.Mockito.mock; 026import static org.mockito.Mockito.never; 027import static org.mockito.Mockito.times; 028import static org.mockito.Mockito.verify; 029import static org.mockito.Mockito.when; 030 031import java.io.IOException; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.Map; 036import java.util.TreeMap; 037import java.util.concurrent.CompletableFuture; 038import java.util.function.Function; 039import java.util.stream.Collectors; 040import java.util.stream.Stream; 041import org.apache.commons.lang3.mutable.MutableInt; 042import org.apache.hadoop.conf.Configuration; 043import org.apache.hadoop.fs.Path; 044import org.apache.hadoop.hbase.HBaseConfiguration; 045import org.apache.hadoop.hbase.TableNameTestExtension; 046import org.apache.hadoop.hbase.client.AsyncClusterConnection; 047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 048import org.apache.hadoop.hbase.client.RegionInfo; 049import org.apache.hadoop.hbase.client.RegionInfoBuilder; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.ipc.ServerCall; 053import org.apache.hadoop.hbase.testclassification.MediumTests; 054import org.apache.hadoop.hbase.testclassification.RegionServerTests; 055import org.apache.hadoop.hbase.util.Bytes; 056import org.apache.hadoop.hbase.wal.WALEdit; 057import org.apache.hadoop.hbase.wal.WALKeyImpl; 058import org.junit.jupiter.api.AfterEach; 059import org.junit.jupiter.api.BeforeEach; 060import org.junit.jupiter.api.Tag; 061import org.junit.jupiter.api.Test; 062import org.junit.jupiter.api.extension.RegisterExtension; 063 064import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; 067 068@Tag(RegionServerTests.TAG) 069@Tag(MediumTests.TAG) 070public class TestRegionReplicationSink { 071 072 private Configuration conf; 073 074 private TableDescriptor td; 075 076 private RegionInfo primary; 077 078 private Runnable flushRequester; 079 080 private AsyncClusterConnection conn; 081 082 private RegionReplicationBufferManager manager; 083 084 private RegionReplicationSink sink; 085 086 @RegisterExtension 087 public final TableNameTestExtension name = new TableNameTestExtension(); 088 089 @BeforeEach 090 public void setUp() { 091 conf = HBaseConfiguration.create(); 092 conf.setLong(RegionReplicationSink.BATCH_COUNT_CAPACITY, 5); 093 conf.setLong(RegionReplicationSink.BATCH_SIZE_CAPACITY, 1024 * 1024); 094 td = TableDescriptorBuilder.newBuilder(name.getTableName()) 095 .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build(); 096 primary = RegionInfoBuilder.newBuilder(name.getTableName()).build(); 097 flushRequester = mock(Runnable.class); 098 conn = mock(AsyncClusterConnection.class); 099 manager = mock(RegionReplicationBufferManager.class); 100 sink = new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn); 101 } 102 103 @AfterEach 104 public void tearDown() throws InterruptedException { 105 sink.stop(); 106 sink.waitUntilStopped(); 107 } 108 109 @Test 110 public void testNormal() { 111 MutableInt next = new MutableInt(0); 112 List<CompletableFuture<Void>> futures = 113 Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>()); 114 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 115 .then(i -> futures.get(next.getAndIncrement())); 116 ServerCall<?> rpcCall = mock(ServerCall.class); 117 WALKeyImpl key = mock(WALKeyImpl.class); 118 when(key.estimatedSerializedSizeOf()).thenReturn(100L); 119 WALEdit edit = mock(WALEdit.class); 120 when(edit.estimatedSerializedSizeOf()).thenReturn(1000L); 121 when(manager.increase(anyLong())).thenReturn(true); 122 123 sink.add(key, edit, rpcCall); 124 // should call increase on manager 125 verify(manager, times(1)).increase(anyLong()); 126 // should have been retained 127 verify(rpcCall, times(1)).retainByWAL(); 128 assertEquals(1100, sink.pendingSize()); 129 130 futures.get(0).complete(null); 131 // should not call decrease yet 132 verify(manager, never()).decrease(anyLong()); 133 // should not call release yet 134 verify(rpcCall, never()).releaseByWAL(); 135 assertEquals(1100, sink.pendingSize()); 136 137 futures.get(1).complete(null); 138 // should call decrease 139 verify(manager, times(1)).decrease(anyLong()); 140 // should call release 141 verify(rpcCall, times(1)).releaseByWAL(); 142 assertEquals(0, sink.pendingSize()); 143 } 144 145 @Test 146 public void testDropEdits() { 147 MutableInt next = new MutableInt(0); 148 List<CompletableFuture<Void>> futures = 149 Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>()); 150 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 151 .then(i -> futures.get(next.getAndIncrement())); 152 ServerCall<?> rpcCall1 = mock(ServerCall.class); 153 WALKeyImpl key1 = mock(WALKeyImpl.class); 154 when(key1.estimatedSerializedSizeOf()).thenReturn(100L); 155 WALEdit edit1 = mock(WALEdit.class); 156 when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); 157 when(manager.increase(anyLong())).thenReturn(true); 158 159 sink.add(key1, edit1, rpcCall1); 160 verify(manager, times(1)).increase(anyLong()); 161 verify(manager, never()).decrease(anyLong()); 162 verify(rpcCall1, times(1)).retainByWAL(); 163 assertEquals(1100, sink.pendingSize()); 164 165 ServerCall<?> rpcCall2 = mock(ServerCall.class); 166 WALKeyImpl key2 = mock(WALKeyImpl.class); 167 when(key2.estimatedSerializedSizeOf()).thenReturn(200L); 168 WALEdit edit2 = mock(WALEdit.class); 169 when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L); 170 171 sink.add(key2, edit2, rpcCall2); 172 verify(manager, times(2)).increase(anyLong()); 173 verify(manager, never()).decrease(anyLong()); 174 verify(rpcCall2, times(1)).retainByWAL(); 175 assertEquals(3300, sink.pendingSize()); 176 177 ServerCall<?> rpcCall3 = mock(ServerCall.class); 178 WALKeyImpl key3 = mock(WALKeyImpl.class); 179 when(key3.estimatedSerializedSizeOf()).thenReturn(200L); 180 WALEdit edit3 = mock(WALEdit.class); 181 when(edit3.estimatedSerializedSizeOf()).thenReturn(3000L); 182 when(manager.increase(anyLong())).thenReturn(false); 183 184 // should not buffer this edit 185 sink.add(key3, edit3, rpcCall3); 186 verify(manager, times(3)).increase(anyLong()); 187 verify(manager, times(1)).decrease(anyLong()); 188 // should retain and then release immediately 189 verify(rpcCall3, times(1)).retainByWAL(); 190 verify(rpcCall3, times(1)).releaseByWAL(); 191 // should also clear the pending edit 192 verify(rpcCall2, times(1)).releaseByWAL(); 193 assertEquals(1100, sink.pendingSize()); 194 // should have request flush 195 verify(flushRequester, times(1)).run(); 196 197 // finish the replication for first edit, we should decrease the size, release the rpc call,and 198 // the pendingSize should be 0 as there are no pending entries 199 futures.forEach(f -> f.complete(null)); 200 verify(manager, times(2)).decrease(anyLong()); 201 verify(rpcCall1, times(1)).releaseByWAL(); 202 assertEquals(0, sink.pendingSize()); 203 204 // should only call replicate 2 times for replicating the first edit, as we have 2 secondary 205 // replicas 206 verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 207 } 208 209 @Test 210 public void testNotAddToFailedReplicas() { 211 MutableInt next = new MutableInt(0); 212 List<CompletableFuture<Void>> futures = 213 Stream.generate(() -> new CompletableFuture<Void>()).limit(4).collect(Collectors.toList()); 214 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 215 .then(i -> futures.get(next.getAndIncrement())); 216 217 ServerCall<?> rpcCall1 = mock(ServerCall.class); 218 WALKeyImpl key1 = mock(WALKeyImpl.class); 219 when(key1.estimatedSerializedSizeOf()).thenReturn(100L); 220 when(key1.getSequenceId()).thenReturn(1L); 221 WALEdit edit1 = mock(WALEdit.class); 222 when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); 223 when(manager.increase(anyLong())).thenReturn(true); 224 sink.add(key1, edit1, rpcCall1); 225 226 ServerCall<?> rpcCall2 = mock(ServerCall.class); 227 WALKeyImpl key2 = mock(WALKeyImpl.class); 228 when(key2.estimatedSerializedSizeOf()).thenReturn(200L); 229 when(key2.getSequenceId()).thenReturn(3L); 230 231 Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream() 232 .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> { 233 throw new IllegalStateException(); 234 }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); 235 FlushDescriptor fd = 236 ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles); 237 WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd); 238 sink.add(key2, edit2, rpcCall2); 239 240 // fail the call to replica 2 241 futures.get(0).complete(null); 242 futures.get(1).completeExceptionally(new IOException("inject error")); 243 244 // the failure should not cause replica 2 to be added to failedReplicas, as we have already 245 // trigger a flush after it. 246 verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 247 248 futures.get(2).complete(null); 249 futures.get(3).complete(null); 250 251 // should have send out all so no pending entries. 252 assertEquals(0, sink.pendingSize()); 253 } 254 255 @Test 256 public void testAddToFailedReplica() { 257 MutableInt next = new MutableInt(0); 258 List<CompletableFuture<Void>> futures = 259 Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList()); 260 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 261 .then(i -> futures.get(next.getAndIncrement())); 262 263 ServerCall<?> rpcCall1 = mock(ServerCall.class); 264 WALKeyImpl key1 = mock(WALKeyImpl.class); 265 when(key1.estimatedSerializedSizeOf()).thenReturn(100L); 266 when(key1.getSequenceId()).thenReturn(1L); 267 WALEdit edit1 = mock(WALEdit.class); 268 when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); 269 when(manager.increase(anyLong())).thenReturn(true); 270 sink.add(key1, edit1, rpcCall1); 271 272 ServerCall<?> rpcCall2 = mock(ServerCall.class); 273 WALKeyImpl key2 = mock(WALKeyImpl.class); 274 when(key2.estimatedSerializedSizeOf()).thenReturn(200L); 275 when(key2.getSequenceId()).thenReturn(1L); 276 WALEdit edit2 = mock(WALEdit.class); 277 when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L); 278 when(manager.increase(anyLong())).thenReturn(true); 279 sink.add(key2, edit2, rpcCall2); 280 281 // fail the call to replica 2 282 futures.get(0).complete(null); 283 futures.get(1).completeExceptionally(new IOException("inject error")); 284 285 // we should only call replicate once for edit2, since replica 2 is marked as failed 286 verify(conn, times(3)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 287 futures.get(2).complete(null); 288 // should have send out all so no pending entries. 289 assertEquals(0, sink.pendingSize()); 290 291 ServerCall<?> rpcCall3 = mock(ServerCall.class); 292 WALKeyImpl key3 = mock(WALKeyImpl.class); 293 when(key3.estimatedSerializedSizeOf()).thenReturn(200L); 294 when(key3.getSequenceId()).thenReturn(3L); 295 Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream() 296 .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> { 297 throw new IllegalStateException(); 298 }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); 299 FlushDescriptor fd = 300 ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles); 301 WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd); 302 sink.add(key3, edit3, rpcCall3); 303 304 // the flush marker should have cleared the failedReplicas, so we will send the edit to 2 305 // replicas again 306 verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 307 futures.get(3).complete(null); 308 futures.get(4).complete(null); 309 310 // should have send out all so no pending entries. 311 assertEquals(0, sink.pendingSize()); 312 } 313 314 /** 315 * This test is for HBASE-29230, when all replicas are failed, resource should be released 316 * completely. 317 */ 318 @Test 319 public void testAllReplicaFailed() { 320 MutableInt next = new MutableInt(0); 321 List<CompletableFuture<Void>> futures = 322 Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList()); 323 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 324 .then(i -> futures.get(next.getAndIncrement())); 325 326 ServerCall<?> rpcCall1 = mock(ServerCall.class); 327 WALKeyImpl key1 = mock(WALKeyImpl.class); 328 when(key1.estimatedSerializedSizeOf()).thenReturn(100L); 329 when(key1.getSequenceId()).thenReturn(1L); 330 WALEdit edit1 = mock(WALEdit.class); 331 when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L); 332 when(manager.increase(anyLong())).thenReturn(true); 333 sink.add(key1, edit1, rpcCall1); 334 verify(rpcCall1, times(1)).retainByWAL(); 335 336 ServerCall<?> rpcCall2 = mock(ServerCall.class); 337 WALKeyImpl key2 = mock(WALKeyImpl.class); 338 when(key2.estimatedSerializedSizeOf()).thenReturn(100L); 339 when(key2.getSequenceId()).thenReturn(2L); 340 WALEdit edit2 = mock(WALEdit.class); 341 when(edit2.estimatedSerializedSizeOf()).thenReturn(1000L); 342 when(manager.increase(anyLong())).thenReturn(true); 343 sink.add(key2, edit2, rpcCall2); 344 verify(rpcCall2, times(1)).retainByWAL(); 345 346 // fail all replicas for edit1, so edit2 could not send. 347 futures.get(0).completeExceptionally(new IOException("inject error")); 348 futures.get(1).completeExceptionally(new IOException("inject error")); 349 350 // we should only call replicate for edit1 351 verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 352 353 // flush 354 ServerCall<?> rpcCall3 = mock(ServerCall.class); 355 WALKeyImpl key3 = mock(WALKeyImpl.class); 356 when(key3.estimatedSerializedSizeOf()).thenReturn(200L); 357 when(key3.getSequenceId()).thenReturn(4L); 358 Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream() 359 .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> { 360 throw new IllegalStateException(); 361 }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); 362 FlushDescriptor fd = 363 ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 3L, committedFiles); 364 WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd); 365 sink.add(key3, edit3, rpcCall3); 366 verify(rpcCall3, times(1)).retainByWAL(); 367 368 // the flush marker should have cleared the failedReplicas, so we will send the edit to 2 369 // replicas again 370 verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 371 futures.get(2).complete(null); 372 futures.get(3).complete(null); 373 374 // all ServerCall should be released and pendingSize should be 0 375 verify(rpcCall1, times(1)).releaseByWAL(); 376 verify(rpcCall2, times(1)).releaseByWAL(); 377 verify(rpcCall3, times(1)).releaseByWAL(); 378 assertEquals(0, sink.pendingSize()); 379 380 } 381 382 @Test 383 public void testSizeCapacity() { 384 MutableInt next = new MutableInt(0); 385 List<CompletableFuture<Void>> futures = 386 Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList()); 387 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 388 .then(i -> futures.get(next.getAndIncrement())); 389 for (int i = 0; i < 3; i++) { 390 ServerCall<?> rpcCall = mock(ServerCall.class); 391 WALKeyImpl key = mock(WALKeyImpl.class); 392 when(key.estimatedSerializedSizeOf()).thenReturn(100L); 393 when(key.getSequenceId()).thenReturn(i + 1L); 394 WALEdit edit = mock(WALEdit.class); 395 when(edit.estimatedSerializedSizeOf()).thenReturn((i + 1) * 600L * 1024); 396 when(manager.increase(anyLong())).thenReturn(true); 397 sink.add(key, edit, rpcCall); 398 } 399 // the first entry will be send out immediately 400 verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 401 402 // complete the first send 403 futures.get(0).complete(null); 404 futures.get(1).complete(null); 405 406 // we should have another batch 407 verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 408 409 // complete the second send 410 futures.get(2).complete(null); 411 futures.get(3).complete(null); 412 413 // the size of the second entry is greater than 1024 * 1024, so we will have another batch 414 verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 415 416 // complete the third send 417 futures.get(4).complete(null); 418 futures.get(5).complete(null); 419 420 // should have send out all so no pending entries. 421 assertEquals(0, sink.pendingSize()); 422 } 423 424 @Test 425 public void testCountCapacity() { 426 MutableInt next = new MutableInt(0); 427 List<CompletableFuture<Void>> futures = 428 Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList()); 429 when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) 430 .then(i -> futures.get(next.getAndIncrement())); 431 for (int i = 0; i < 7; i++) { 432 ServerCall<?> rpcCall = mock(ServerCall.class); 433 WALKeyImpl key = mock(WALKeyImpl.class); 434 when(key.estimatedSerializedSizeOf()).thenReturn(100L); 435 when(key.getSequenceId()).thenReturn(i + 1L); 436 WALEdit edit = mock(WALEdit.class); 437 when(edit.estimatedSerializedSizeOf()).thenReturn(1000L); 438 when(manager.increase(anyLong())).thenReturn(true); 439 sink.add(key, edit, rpcCall); 440 } 441 // the first entry will be send out immediately 442 verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 443 444 // complete the first send 445 futures.get(0).complete(null); 446 futures.get(1).complete(null); 447 448 // we should have another batch 449 verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 450 451 // complete the second send 452 futures.get(2).complete(null); 453 futures.get(3).complete(null); 454 455 // because of the count limit is 5, the above send can not send all the edits, so we will do 456 // another send 457 verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); 458 459 // complete the third send 460 futures.get(4).complete(null); 461 futures.get(5).complete(null); 462 463 // should have send out all so no pending entries. 464 assertEquals(0, sink.pendingSize()); 465 } 466}