001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.ArgumentMatchers.anyInt;
023import static org.mockito.ArgumentMatchers.anyList;
024import static org.mockito.ArgumentMatchers.anyLong;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.never;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Map;
036import java.util.TreeMap;
037import java.util.concurrent.CompletableFuture;
038import java.util.function.Function;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.commons.lang3.mutable.MutableInt;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.HBaseConfiguration;
045import org.apache.hadoop.hbase.TableNameTestExtension;
046import org.apache.hadoop.hbase.client.AsyncClusterConnection;
047import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
048import org.apache.hadoop.hbase.client.RegionInfo;
049import org.apache.hadoop.hbase.client.RegionInfoBuilder;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.ipc.ServerCall;
053import org.apache.hadoop.hbase.testclassification.MediumTests;
054import org.apache.hadoop.hbase.testclassification.RegionServerTests;
055import org.apache.hadoop.hbase.util.Bytes;
056import org.apache.hadoop.hbase.wal.WALEdit;
057import org.apache.hadoop.hbase.wal.WALKeyImpl;
058import org.junit.jupiter.api.AfterEach;
059import org.junit.jupiter.api.BeforeEach;
060import org.junit.jupiter.api.Tag;
061import org.junit.jupiter.api.Test;
062import org.junit.jupiter.api.extension.RegisterExtension;
063
064import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
067
068@Tag(RegionServerTests.TAG)
069@Tag(MediumTests.TAG)
070public class TestRegionReplicationSink {
071
072  private Configuration conf;
073
074  private TableDescriptor td;
075
076  private RegionInfo primary;
077
078  private Runnable flushRequester;
079
080  private AsyncClusterConnection conn;
081
082  private RegionReplicationBufferManager manager;
083
084  private RegionReplicationSink sink;
085
086  @RegisterExtension
087  public final TableNameTestExtension name = new TableNameTestExtension();
088
089  @BeforeEach
090  public void setUp() {
091    conf = HBaseConfiguration.create();
092    conf.setLong(RegionReplicationSink.BATCH_COUNT_CAPACITY, 5);
093    conf.setLong(RegionReplicationSink.BATCH_SIZE_CAPACITY, 1024 * 1024);
094    td = TableDescriptorBuilder.newBuilder(name.getTableName())
095      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build();
096    primary = RegionInfoBuilder.newBuilder(name.getTableName()).build();
097    flushRequester = mock(Runnable.class);
098    conn = mock(AsyncClusterConnection.class);
099    manager = mock(RegionReplicationBufferManager.class);
100    sink = new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
101  }
102
103  @AfterEach
104  public void tearDown() throws InterruptedException {
105    sink.stop();
106    sink.waitUntilStopped();
107  }
108
109  @Test
110  public void testNormal() {
111    MutableInt next = new MutableInt(0);
112    List<CompletableFuture<Void>> futures =
113      Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
114    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
115      .then(i -> futures.get(next.getAndIncrement()));
116    ServerCall<?> rpcCall = mock(ServerCall.class);
117    WALKeyImpl key = mock(WALKeyImpl.class);
118    when(key.estimatedSerializedSizeOf()).thenReturn(100L);
119    WALEdit edit = mock(WALEdit.class);
120    when(edit.estimatedSerializedSizeOf()).thenReturn(1000L);
121    when(manager.increase(anyLong())).thenReturn(true);
122
123    sink.add(key, edit, rpcCall);
124    // should call increase on manager
125    verify(manager, times(1)).increase(anyLong());
126    // should have been retained
127    verify(rpcCall, times(1)).retainByWAL();
128    assertEquals(1100, sink.pendingSize());
129
130    futures.get(0).complete(null);
131    // should not call decrease yet
132    verify(manager, never()).decrease(anyLong());
133    // should not call release yet
134    verify(rpcCall, never()).releaseByWAL();
135    assertEquals(1100, sink.pendingSize());
136
137    futures.get(1).complete(null);
138    // should call decrease
139    verify(manager, times(1)).decrease(anyLong());
140    // should call release
141    verify(rpcCall, times(1)).releaseByWAL();
142    assertEquals(0, sink.pendingSize());
143  }
144
145  @Test
146  public void testDropEdits() {
147    MutableInt next = new MutableInt(0);
148    List<CompletableFuture<Void>> futures =
149      Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
150    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
151      .then(i -> futures.get(next.getAndIncrement()));
152    ServerCall<?> rpcCall1 = mock(ServerCall.class);
153    WALKeyImpl key1 = mock(WALKeyImpl.class);
154    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
155    WALEdit edit1 = mock(WALEdit.class);
156    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
157    when(manager.increase(anyLong())).thenReturn(true);
158
159    sink.add(key1, edit1, rpcCall1);
160    verify(manager, times(1)).increase(anyLong());
161    verify(manager, never()).decrease(anyLong());
162    verify(rpcCall1, times(1)).retainByWAL();
163    assertEquals(1100, sink.pendingSize());
164
165    ServerCall<?> rpcCall2 = mock(ServerCall.class);
166    WALKeyImpl key2 = mock(WALKeyImpl.class);
167    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
168    WALEdit edit2 = mock(WALEdit.class);
169    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
170
171    sink.add(key2, edit2, rpcCall2);
172    verify(manager, times(2)).increase(anyLong());
173    verify(manager, never()).decrease(anyLong());
174    verify(rpcCall2, times(1)).retainByWAL();
175    assertEquals(3300, sink.pendingSize());
176
177    ServerCall<?> rpcCall3 = mock(ServerCall.class);
178    WALKeyImpl key3 = mock(WALKeyImpl.class);
179    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
180    WALEdit edit3 = mock(WALEdit.class);
181    when(edit3.estimatedSerializedSizeOf()).thenReturn(3000L);
182    when(manager.increase(anyLong())).thenReturn(false);
183
184    // should not buffer this edit
185    sink.add(key3, edit3, rpcCall3);
186    verify(manager, times(3)).increase(anyLong());
187    verify(manager, times(1)).decrease(anyLong());
188    // should retain and then release immediately
189    verify(rpcCall3, times(1)).retainByWAL();
190    verify(rpcCall3, times(1)).releaseByWAL();
191    // should also clear the pending edit
192    verify(rpcCall2, times(1)).releaseByWAL();
193    assertEquals(1100, sink.pendingSize());
194    // should have request flush
195    verify(flushRequester, times(1)).run();
196
197    // finish the replication for first edit, we should decrease the size, release the rpc call,and
198    // the pendingSize should be 0 as there are no pending entries
199    futures.forEach(f -> f.complete(null));
200    verify(manager, times(2)).decrease(anyLong());
201    verify(rpcCall1, times(1)).releaseByWAL();
202    assertEquals(0, sink.pendingSize());
203
204    // should only call replicate 2 times for replicating the first edit, as we have 2 secondary
205    // replicas
206    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
207  }
208
209  @Test
210  public void testNotAddToFailedReplicas() {
211    MutableInt next = new MutableInt(0);
212    List<CompletableFuture<Void>> futures =
213      Stream.generate(() -> new CompletableFuture<Void>()).limit(4).collect(Collectors.toList());
214    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
215      .then(i -> futures.get(next.getAndIncrement()));
216
217    ServerCall<?> rpcCall1 = mock(ServerCall.class);
218    WALKeyImpl key1 = mock(WALKeyImpl.class);
219    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
220    when(key1.getSequenceId()).thenReturn(1L);
221    WALEdit edit1 = mock(WALEdit.class);
222    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
223    when(manager.increase(anyLong())).thenReturn(true);
224    sink.add(key1, edit1, rpcCall1);
225
226    ServerCall<?> rpcCall2 = mock(ServerCall.class);
227    WALKeyImpl key2 = mock(WALKeyImpl.class);
228    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
229    when(key2.getSequenceId()).thenReturn(3L);
230
231    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
232      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
233        throw new IllegalStateException();
234      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
235    FlushDescriptor fd =
236      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
237    WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd);
238    sink.add(key2, edit2, rpcCall2);
239
240    // fail the call to replica 2
241    futures.get(0).complete(null);
242    futures.get(1).completeExceptionally(new IOException("inject error"));
243
244    // the failure should not cause replica 2 to be added to failedReplicas, as we have already
245    // trigger a flush after it.
246    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
247
248    futures.get(2).complete(null);
249    futures.get(3).complete(null);
250
251    // should have send out all so no pending entries.
252    assertEquals(0, sink.pendingSize());
253  }
254
255  @Test
256  public void testAddToFailedReplica() {
257    MutableInt next = new MutableInt(0);
258    List<CompletableFuture<Void>> futures =
259      Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList());
260    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
261      .then(i -> futures.get(next.getAndIncrement()));
262
263    ServerCall<?> rpcCall1 = mock(ServerCall.class);
264    WALKeyImpl key1 = mock(WALKeyImpl.class);
265    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
266    when(key1.getSequenceId()).thenReturn(1L);
267    WALEdit edit1 = mock(WALEdit.class);
268    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
269    when(manager.increase(anyLong())).thenReturn(true);
270    sink.add(key1, edit1, rpcCall1);
271
272    ServerCall<?> rpcCall2 = mock(ServerCall.class);
273    WALKeyImpl key2 = mock(WALKeyImpl.class);
274    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
275    when(key2.getSequenceId()).thenReturn(1L);
276    WALEdit edit2 = mock(WALEdit.class);
277    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
278    when(manager.increase(anyLong())).thenReturn(true);
279    sink.add(key2, edit2, rpcCall2);
280
281    // fail the call to replica 2
282    futures.get(0).complete(null);
283    futures.get(1).completeExceptionally(new IOException("inject error"));
284
285    // we should only call replicate once for edit2, since replica 2 is marked as failed
286    verify(conn, times(3)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
287    futures.get(2).complete(null);
288    // should have send out all so no pending entries.
289    assertEquals(0, sink.pendingSize());
290
291    ServerCall<?> rpcCall3 = mock(ServerCall.class);
292    WALKeyImpl key3 = mock(WALKeyImpl.class);
293    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
294    when(key3.getSequenceId()).thenReturn(3L);
295    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
296      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
297        throw new IllegalStateException();
298      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
299    FlushDescriptor fd =
300      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
301    WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
302    sink.add(key3, edit3, rpcCall3);
303
304    // the flush marker should have cleared the failedReplicas, so we will send the edit to 2
305    // replicas again
306    verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
307    futures.get(3).complete(null);
308    futures.get(4).complete(null);
309
310    // should have send out all so no pending entries.
311    assertEquals(0, sink.pendingSize());
312  }
313
314  /**
315   * This test is for HBASE-29230, when all replicas are failed, resource should be released
316   * completely.
317   */
318  @Test
319  public void testAllReplicaFailed() {
320    MutableInt next = new MutableInt(0);
321    List<CompletableFuture<Void>> futures =
322      Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList());
323    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
324      .then(i -> futures.get(next.getAndIncrement()));
325
326    ServerCall<?> rpcCall1 = mock(ServerCall.class);
327    WALKeyImpl key1 = mock(WALKeyImpl.class);
328    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
329    when(key1.getSequenceId()).thenReturn(1L);
330    WALEdit edit1 = mock(WALEdit.class);
331    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
332    when(manager.increase(anyLong())).thenReturn(true);
333    sink.add(key1, edit1, rpcCall1);
334    verify(rpcCall1, times(1)).retainByWAL();
335
336    ServerCall<?> rpcCall2 = mock(ServerCall.class);
337    WALKeyImpl key2 = mock(WALKeyImpl.class);
338    when(key2.estimatedSerializedSizeOf()).thenReturn(100L);
339    when(key2.getSequenceId()).thenReturn(2L);
340    WALEdit edit2 = mock(WALEdit.class);
341    when(edit2.estimatedSerializedSizeOf()).thenReturn(1000L);
342    when(manager.increase(anyLong())).thenReturn(true);
343    sink.add(key2, edit2, rpcCall2);
344    verify(rpcCall2, times(1)).retainByWAL();
345
346    // fail all replicas for edit1, so edit2 could not send.
347    futures.get(0).completeExceptionally(new IOException("inject error"));
348    futures.get(1).completeExceptionally(new IOException("inject error"));
349
350    // we should only call replicate for edit1
351    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
352
353    // flush
354    ServerCall<?> rpcCall3 = mock(ServerCall.class);
355    WALKeyImpl key3 = mock(WALKeyImpl.class);
356    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
357    when(key3.getSequenceId()).thenReturn(4L);
358    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
359      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
360        throw new IllegalStateException();
361      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
362    FlushDescriptor fd =
363      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 3L, committedFiles);
364    WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
365    sink.add(key3, edit3, rpcCall3);
366    verify(rpcCall3, times(1)).retainByWAL();
367
368    // the flush marker should have cleared the failedReplicas, so we will send the edit to 2
369    // replicas again
370    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
371    futures.get(2).complete(null);
372    futures.get(3).complete(null);
373
374    // all ServerCall should be released and pendingSize should be 0
375    verify(rpcCall1, times(1)).releaseByWAL();
376    verify(rpcCall2, times(1)).releaseByWAL();
377    verify(rpcCall3, times(1)).releaseByWAL();
378    assertEquals(0, sink.pendingSize());
379
380  }
381
382  @Test
383  public void testSizeCapacity() {
384    MutableInt next = new MutableInt(0);
385    List<CompletableFuture<Void>> futures =
386      Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList());
387    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
388      .then(i -> futures.get(next.getAndIncrement()));
389    for (int i = 0; i < 3; i++) {
390      ServerCall<?> rpcCall = mock(ServerCall.class);
391      WALKeyImpl key = mock(WALKeyImpl.class);
392      when(key.estimatedSerializedSizeOf()).thenReturn(100L);
393      when(key.getSequenceId()).thenReturn(i + 1L);
394      WALEdit edit = mock(WALEdit.class);
395      when(edit.estimatedSerializedSizeOf()).thenReturn((i + 1) * 600L * 1024);
396      when(manager.increase(anyLong())).thenReturn(true);
397      sink.add(key, edit, rpcCall);
398    }
399    // the first entry will be send out immediately
400    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
401
402    // complete the first send
403    futures.get(0).complete(null);
404    futures.get(1).complete(null);
405
406    // we should have another batch
407    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
408
409    // complete the second send
410    futures.get(2).complete(null);
411    futures.get(3).complete(null);
412
413    // the size of the second entry is greater than 1024 * 1024, so we will have another batch
414    verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
415
416    // complete the third send
417    futures.get(4).complete(null);
418    futures.get(5).complete(null);
419
420    // should have send out all so no pending entries.
421    assertEquals(0, sink.pendingSize());
422  }
423
424  @Test
425  public void testCountCapacity() {
426    MutableInt next = new MutableInt(0);
427    List<CompletableFuture<Void>> futures =
428      Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList());
429    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
430      .then(i -> futures.get(next.getAndIncrement()));
431    for (int i = 0; i < 7; i++) {
432      ServerCall<?> rpcCall = mock(ServerCall.class);
433      WALKeyImpl key = mock(WALKeyImpl.class);
434      when(key.estimatedSerializedSizeOf()).thenReturn(100L);
435      when(key.getSequenceId()).thenReturn(i + 1L);
436      WALEdit edit = mock(WALEdit.class);
437      when(edit.estimatedSerializedSizeOf()).thenReturn(1000L);
438      when(manager.increase(anyLong())).thenReturn(true);
439      sink.add(key, edit, rpcCall);
440    }
441    // the first entry will be send out immediately
442    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
443
444    // complete the first send
445    futures.get(0).complete(null);
446    futures.get(1).complete(null);
447
448    // we should have another batch
449    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
450
451    // complete the second send
452    futures.get(2).complete(null);
453    futures.get(3).complete(null);
454
455    // because of the count limit is 5, the above send can not send all the edits, so we will do
456    // another send
457    verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
458
459    // complete the third send
460    futures.get(4).complete(null);
461    futures.get(5).complete(null);
462
463    // should have send out all so no pending entries.
464    assertEquals(0, sink.pendingSize());
465  }
466}