001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver.regionreplication;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.ArgumentMatchers.any;
022import static org.mockito.ArgumentMatchers.anyInt;
023import static org.mockito.ArgumentMatchers.anyList;
024import static org.mockito.ArgumentMatchers.anyLong;
025import static org.mockito.Mockito.mock;
026import static org.mockito.Mockito.never;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.Map;
036import java.util.TreeMap;
037import java.util.concurrent.CompletableFuture;
038import java.util.function.Function;
039import java.util.stream.Collectors;
040import java.util.stream.Stream;
041import org.apache.commons.lang3.mutable.MutableInt;
042import org.apache.hadoop.conf.Configuration;
043import org.apache.hadoop.fs.Path;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseConfiguration;
046import org.apache.hadoop.hbase.TableNameTestRule;
047import org.apache.hadoop.hbase.client.AsyncClusterConnection;
048import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
049import org.apache.hadoop.hbase.client.RegionInfo;
050import org.apache.hadoop.hbase.client.RegionInfoBuilder;
051import org.apache.hadoop.hbase.client.TableDescriptor;
052import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
053import org.apache.hadoop.hbase.ipc.ServerCall;
054import org.apache.hadoop.hbase.testclassification.MediumTests;
055import org.apache.hadoop.hbase.testclassification.RegionServerTests;
056import org.apache.hadoop.hbase.util.Bytes;
057import org.apache.hadoop.hbase.wal.WALEdit;
058import org.apache.hadoop.hbase.wal.WALKeyImpl;
059import org.junit.After;
060import org.junit.Before;
061import org.junit.ClassRule;
062import org.junit.Rule;
063import org.junit.Test;
064import org.junit.experimental.categories.Category;
065
066import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
069
070@Category({ RegionServerTests.class, MediumTests.class })
071public class TestRegionReplicationSink {
072
073  @ClassRule
074  public static final HBaseClassTestRule CLASS_RULE =
075    HBaseClassTestRule.forClass(TestRegionReplicationSink.class);
076
077  private Configuration conf;
078
079  private TableDescriptor td;
080
081  private RegionInfo primary;
082
083  private Runnable flushRequester;
084
085  private AsyncClusterConnection conn;
086
087  private RegionReplicationBufferManager manager;
088
089  private RegionReplicationSink sink;
090
091  @Rule
092  public final TableNameTestRule name = new TableNameTestRule();
093
094  @Before
095  public void setUp() {
096    conf = HBaseConfiguration.create();
097    conf.setLong(RegionReplicationSink.BATCH_COUNT_CAPACITY, 5);
098    conf.setLong(RegionReplicationSink.BATCH_SIZE_CAPACITY, 1024 * 1024);
099    td = TableDescriptorBuilder.newBuilder(name.getTableName())
100      .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build();
101    primary = RegionInfoBuilder.newBuilder(name.getTableName()).build();
102    flushRequester = mock(Runnable.class);
103    conn = mock(AsyncClusterConnection.class);
104    manager = mock(RegionReplicationBufferManager.class);
105    sink = new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
106  }
107
108  @After
109  public void tearDown() throws InterruptedException {
110    sink.stop();
111    sink.waitUntilStopped();
112  }
113
114  @Test
115  public void testNormal() {
116    MutableInt next = new MutableInt(0);
117    List<CompletableFuture<Void>> futures =
118      Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
119    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
120      .then(i -> futures.get(next.getAndIncrement()));
121    ServerCall<?> rpcCall = mock(ServerCall.class);
122    WALKeyImpl key = mock(WALKeyImpl.class);
123    when(key.estimatedSerializedSizeOf()).thenReturn(100L);
124    WALEdit edit = mock(WALEdit.class);
125    when(edit.estimatedSerializedSizeOf()).thenReturn(1000L);
126    when(manager.increase(anyLong())).thenReturn(true);
127
128    sink.add(key, edit, rpcCall);
129    // should call increase on manager
130    verify(manager, times(1)).increase(anyLong());
131    // should have been retained
132    verify(rpcCall, times(1)).retainByWAL();
133    assertEquals(1100, sink.pendingSize());
134
135    futures.get(0).complete(null);
136    // should not call decrease yet
137    verify(manager, never()).decrease(anyLong());
138    // should not call release yet
139    verify(rpcCall, never()).releaseByWAL();
140    assertEquals(1100, sink.pendingSize());
141
142    futures.get(1).complete(null);
143    // should call decrease
144    verify(manager, times(1)).decrease(anyLong());
145    // should call release
146    verify(rpcCall, times(1)).releaseByWAL();
147    assertEquals(0, sink.pendingSize());
148  }
149
150  @Test
151  public void testDropEdits() {
152    MutableInt next = new MutableInt(0);
153    List<CompletableFuture<Void>> futures =
154      Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
155    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
156      .then(i -> futures.get(next.getAndIncrement()));
157    ServerCall<?> rpcCall1 = mock(ServerCall.class);
158    WALKeyImpl key1 = mock(WALKeyImpl.class);
159    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
160    WALEdit edit1 = mock(WALEdit.class);
161    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
162    when(manager.increase(anyLong())).thenReturn(true);
163
164    sink.add(key1, edit1, rpcCall1);
165    verify(manager, times(1)).increase(anyLong());
166    verify(manager, never()).decrease(anyLong());
167    verify(rpcCall1, times(1)).retainByWAL();
168    assertEquals(1100, sink.pendingSize());
169
170    ServerCall<?> rpcCall2 = mock(ServerCall.class);
171    WALKeyImpl key2 = mock(WALKeyImpl.class);
172    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
173    WALEdit edit2 = mock(WALEdit.class);
174    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
175
176    sink.add(key2, edit2, rpcCall2);
177    verify(manager, times(2)).increase(anyLong());
178    verify(manager, never()).decrease(anyLong());
179    verify(rpcCall2, times(1)).retainByWAL();
180    assertEquals(3300, sink.pendingSize());
181
182    ServerCall<?> rpcCall3 = mock(ServerCall.class);
183    WALKeyImpl key3 = mock(WALKeyImpl.class);
184    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
185    WALEdit edit3 = mock(WALEdit.class);
186    when(edit3.estimatedSerializedSizeOf()).thenReturn(3000L);
187    when(manager.increase(anyLong())).thenReturn(false);
188
189    // should not buffer this edit
190    sink.add(key3, edit3, rpcCall3);
191    verify(manager, times(3)).increase(anyLong());
192    verify(manager, times(1)).decrease(anyLong());
193    // should retain and then release immediately
194    verify(rpcCall3, times(1)).retainByWAL();
195    verify(rpcCall3, times(1)).releaseByWAL();
196    // should also clear the pending edit
197    verify(rpcCall2, times(1)).releaseByWAL();
198    assertEquals(1100, sink.pendingSize());
199    // should have request flush
200    verify(flushRequester, times(1)).run();
201
202    // finish the replication for first edit, we should decrease the size, release the rpc call,and
203    // the pendingSize should be 0 as there are no pending entries
204    futures.forEach(f -> f.complete(null));
205    verify(manager, times(2)).decrease(anyLong());
206    verify(rpcCall1, times(1)).releaseByWAL();
207    assertEquals(0, sink.pendingSize());
208
209    // should only call replicate 2 times for replicating the first edit, as we have 2 secondary
210    // replicas
211    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
212  }
213
214  @Test
215  public void testNotAddToFailedReplicas() {
216    MutableInt next = new MutableInt(0);
217    List<CompletableFuture<Void>> futures =
218      Stream.generate(() -> new CompletableFuture<Void>()).limit(4).collect(Collectors.toList());
219    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
220      .then(i -> futures.get(next.getAndIncrement()));
221
222    ServerCall<?> rpcCall1 = mock(ServerCall.class);
223    WALKeyImpl key1 = mock(WALKeyImpl.class);
224    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
225    when(key1.getSequenceId()).thenReturn(1L);
226    WALEdit edit1 = mock(WALEdit.class);
227    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
228    when(manager.increase(anyLong())).thenReturn(true);
229    sink.add(key1, edit1, rpcCall1);
230
231    ServerCall<?> rpcCall2 = mock(ServerCall.class);
232    WALKeyImpl key2 = mock(WALKeyImpl.class);
233    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
234    when(key2.getSequenceId()).thenReturn(3L);
235
236    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
237      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
238        throw new IllegalStateException();
239      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
240    FlushDescriptor fd =
241      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
242    WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd);
243    sink.add(key2, edit2, rpcCall2);
244
245    // fail the call to replica 2
246    futures.get(0).complete(null);
247    futures.get(1).completeExceptionally(new IOException("inject error"));
248
249    // the failure should not cause replica 2 to be added to failedReplicas, as we have already
250    // trigger a flush after it.
251    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
252
253    futures.get(2).complete(null);
254    futures.get(3).complete(null);
255
256    // should have send out all so no pending entries.
257    assertEquals(0, sink.pendingSize());
258  }
259
260  @Test
261  public void testAddToFailedReplica() {
262    MutableInt next = new MutableInt(0);
263    List<CompletableFuture<Void>> futures =
264      Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList());
265    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
266      .then(i -> futures.get(next.getAndIncrement()));
267
268    ServerCall<?> rpcCall1 = mock(ServerCall.class);
269    WALKeyImpl key1 = mock(WALKeyImpl.class);
270    when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
271    when(key1.getSequenceId()).thenReturn(1L);
272    WALEdit edit1 = mock(WALEdit.class);
273    when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
274    when(manager.increase(anyLong())).thenReturn(true);
275    sink.add(key1, edit1, rpcCall1);
276
277    ServerCall<?> rpcCall2 = mock(ServerCall.class);
278    WALKeyImpl key2 = mock(WALKeyImpl.class);
279    when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
280    when(key2.getSequenceId()).thenReturn(1L);
281    WALEdit edit2 = mock(WALEdit.class);
282    when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
283    when(manager.increase(anyLong())).thenReturn(true);
284    sink.add(key2, edit2, rpcCall2);
285
286    // fail the call to replica 2
287    futures.get(0).complete(null);
288    futures.get(1).completeExceptionally(new IOException("inject error"));
289
290    // we should only call replicate once for edit2, since replica 2 is marked as failed
291    verify(conn, times(3)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
292    futures.get(2).complete(null);
293    // should have send out all so no pending entries.
294    assertEquals(0, sink.pendingSize());
295
296    ServerCall<?> rpcCall3 = mock(ServerCall.class);
297    WALKeyImpl key3 = mock(WALKeyImpl.class);
298    when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
299    when(key3.getSequenceId()).thenReturn(3L);
300    Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
301      .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
302        throw new IllegalStateException();
303      }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
304    FlushDescriptor fd =
305      ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
306    WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
307    sink.add(key3, edit3, rpcCall3);
308
309    // the flush marker should have cleared the failedReplicas, so we will send the edit to 2
310    // replicas again
311    verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
312    futures.get(3).complete(null);
313    futures.get(4).complete(null);
314
315    // should have send out all so no pending entries.
316    assertEquals(0, sink.pendingSize());
317  }
318
319  @Test
320  public void testSizeCapacity() {
321    MutableInt next = new MutableInt(0);
322    List<CompletableFuture<Void>> futures =
323      Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList());
324    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
325      .then(i -> futures.get(next.getAndIncrement()));
326    for (int i = 0; i < 3; i++) {
327      ServerCall<?> rpcCall = mock(ServerCall.class);
328      WALKeyImpl key = mock(WALKeyImpl.class);
329      when(key.estimatedSerializedSizeOf()).thenReturn(100L);
330      when(key.getSequenceId()).thenReturn(i + 1L);
331      WALEdit edit = mock(WALEdit.class);
332      when(edit.estimatedSerializedSizeOf()).thenReturn((i + 1) * 600L * 1024);
333      when(manager.increase(anyLong())).thenReturn(true);
334      sink.add(key, edit, rpcCall);
335    }
336    // the first entry will be send out immediately
337    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
338
339    // complete the first send
340    futures.get(0).complete(null);
341    futures.get(1).complete(null);
342
343    // we should have another batch
344    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
345
346    // complete the second send
347    futures.get(2).complete(null);
348    futures.get(3).complete(null);
349
350    // the size of the second entry is greater than 1024 * 1024, so we will have another batch
351    verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
352
353    // complete the third send
354    futures.get(4).complete(null);
355    futures.get(5).complete(null);
356
357    // should have send out all so no pending entries.
358    assertEquals(0, sink.pendingSize());
359  }
360
361  @Test
362  public void testCountCapacity() {
363    MutableInt next = new MutableInt(0);
364    List<CompletableFuture<Void>> futures =
365      Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList());
366    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
367      .then(i -> futures.get(next.getAndIncrement()));
368    for (int i = 0; i < 7; i++) {
369      ServerCall<?> rpcCall = mock(ServerCall.class);
370      WALKeyImpl key = mock(WALKeyImpl.class);
371      when(key.estimatedSerializedSizeOf()).thenReturn(100L);
372      when(key.getSequenceId()).thenReturn(i + 1L);
373      WALEdit edit = mock(WALEdit.class);
374      when(edit.estimatedSerializedSizeOf()).thenReturn(1000L);
375      when(manager.increase(anyLong())).thenReturn(true);
376      sink.add(key, edit, rpcCall);
377    }
378    // the first entry will be send out immediately
379    verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
380
381    // complete the first send
382    futures.get(0).complete(null);
383    futures.get(1).complete(null);
384
385    // we should have another batch
386    verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
387
388    // complete the second send
389    futures.get(2).complete(null);
390    futures.get(3).complete(null);
391
392    // because of the count limit is 5, the above send can not send all the edits, so we will do
393    // another send
394    verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
395
396    // complete the third send
397    futures.get(4).complete(null);
398    futures.get(5).complete(null);
399
400    // should have send out all so no pending entries.
401    assertEquals(0, sink.pendingSize());
402  }
403}