001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.junit.jupiter.api.Assertions.assertFalse;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.ArgumentMatchers.anyInt;
024import static org.mockito.ArgumentMatchers.anyList;
025import static org.mockito.ArgumentMatchers.anyLong;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.ArrayDeque;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.List;
037import java.util.Queue;
038import java.util.UUID;
039import java.util.concurrent.CompletableFuture;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.ExtendedCellScanner;
044import org.apache.hadoop.hbase.HBaseTestingUtil;
045import org.apache.hadoop.hbase.HConstants;
046import org.apache.hadoop.hbase.ServerName;
047import org.apache.hadoop.hbase.TableName;
048import org.apache.hadoop.hbase.client.AsyncClusterConnection;
049import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
050import org.apache.hadoop.hbase.client.Get;
051import org.apache.hadoop.hbase.client.Put;
052import org.apache.hadoop.hbase.client.RegionInfo;
053import org.apache.hadoop.hbase.client.RegionInfoBuilder;
054import org.apache.hadoop.hbase.client.RegionReplicaUtil;
055import org.apache.hadoop.hbase.client.TableDescriptor;
056import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
057import org.apache.hadoop.hbase.executor.ExecutorService;
058import org.apache.hadoop.hbase.executor.ExecutorType;
059import org.apache.hadoop.hbase.monitoring.MonitoredTask;
060import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
061import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
062import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
063import org.apache.hadoop.hbase.testclassification.MediumTests;
064import org.apache.hadoop.hbase.testclassification.RegionServerTests;
065import org.apache.hadoop.hbase.util.Bytes;
066import org.apache.hadoop.hbase.util.Pair;
067import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
068import org.apache.hadoop.hbase.wal.WAL;
069import org.apache.hadoop.hbase.wal.WALFactory;
070import org.junit.jupiter.api.AfterAll;
071import org.junit.jupiter.api.AfterEach;
072import org.junit.jupiter.api.BeforeAll;
073import org.junit.jupiter.api.BeforeEach;
074import org.junit.jupiter.api.Tag;
075import org.junit.jupiter.api.Test;
076import org.junit.jupiter.api.TestInfo;
077
078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
080
081@Tag(RegionServerTests.TAG)
082@Tag(MediumTests.TAG)
083public class TestReplicateToReplica {
084
085  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
086
087  private static byte[] FAMILY = Bytes.toBytes("family");
088
089  private static byte[] QUAL = Bytes.toBytes("qualifier");
090
091  private static ExecutorService EXEC;
092
093  private TableName tableName;
094
095  private Path testDir;
096
097  private TableDescriptor td;
098
099  private RegionServerServices rss;
100
101  private AsyncClusterConnection conn;
102
103  private RegionReplicationBufferManager manager;
104
105  private FlushRequester flushRequester;
106
107  private HRegion primary;
108
109  private HRegion secondary;
110
111  private WALFactory walFactory;
112
113  private boolean queueReqAndResps;
114
115  private Queue<Pair<List<WAL.Entry>, CompletableFuture<Void>>> reqAndResps;
116
117  private static List<Put> TO_ADD_AFTER_PREPARE_FLUSH;
118
119  public static final class HRegionForTest extends HRegion {
120
121    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
122      TableDescriptor htd, RegionServerServices rsServices) {
123      super(fs, wal, confParam, htd, rsServices);
124    }
125
126    @SuppressWarnings("deprecation")
127    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
128      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
129      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
130    }
131
132    @Override
133    protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
134      Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
135      FlushLifeCycleTracker tracker) throws IOException {
136      PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
137        status, writeFlushWalMarker, tracker);
138      for (Put put : TO_ADD_AFTER_PREPARE_FLUSH) {
139        put(put);
140      }
141      TO_ADD_AFTER_PREPARE_FLUSH.clear();
142      return result;
143    }
144
145  }
146
147  @BeforeAll
148  public static void setUpBeforeClass() {
149    Configuration conf = UTIL.getConfiguration();
150    conf.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1);
151    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
152    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
153    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
154    EXEC = new ExecutorService("test");
155    EXEC.startExecutorService(EXEC.new ExecutorConfig().setCorePoolSize(1)
156      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
157    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
158      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
159  }
160
161  @AfterAll
162  public static void tearDownAfterClass() {
163    EXEC.shutdown();
164    UTIL.cleanupTestDir();
165  }
166
167  @BeforeEach
168  public void setUp(TestInfo testInfo) throws IOException {
169    TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList<>();
170    tableName = TableName.valueOf(testInfo.getTestMethod().get().getName());
171    testDir = UTIL.getDataTestDir(tableName.getNameAsString());
172    Configuration conf = UTIL.getConfiguration();
173    conf.set(HConstants.HBASE_DIR, testDir.toString());
174
175    td = TableDescriptorBuilder.newBuilder(tableName)
176      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2)
177      .setRegionMemStoreReplication(true).build();
178
179    reqAndResps = new ArrayDeque<>();
180    queueReqAndResps = true;
181    conn = mock(AsyncClusterConnection.class);
182    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i -> {
183      if (queueReqAndResps) {
184        @SuppressWarnings("unchecked")
185        List<WAL.Entry> entries = i.getArgument(1, List.class);
186        CompletableFuture<Void> future = new CompletableFuture<>();
187        reqAndResps.add(Pair.newPair(entries, future));
188        return future;
189      } else {
190        return CompletableFuture.completedFuture(null);
191      }
192    });
193
194    flushRequester = mock(FlushRequester.class);
195
196    rss = mock(RegionServerServices.class);
197    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
198    when(rss.getConfiguration()).thenReturn(conf);
199    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf));
200    when(rss.getExecutorService()).thenReturn(EXEC);
201    when(rss.getAsyncClusterConnection()).thenReturn(conn);
202    when(rss.getFlushRequester()).thenReturn(flushRequester);
203
204    manager = new RegionReplicationBufferManager(rss);
205    when(rss.getRegionReplicationBufferManager()).thenReturn(manager);
206
207    RegionInfo primaryHri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
208    RegionInfo secondaryHri = RegionReplicaUtil.getRegionInfoForReplica(primaryHri, 1);
209
210    walFactory = new WALFactory(conf, UUID.randomUUID().toString());
211    WAL wal = walFactory.getWAL(primaryHri);
212    primary = HRegion.createHRegion(primaryHri, testDir, conf, td, wal);
213    primary.close();
214
215    primary = HRegion.openHRegion(testDir, primaryHri, td, wal, conf, rss, null);
216    secondary = HRegion.openHRegion(secondaryHri, td, null, conf, rss, null);
217
218    when(rss.getRegions()).then(i -> {
219      return Arrays.asList(primary, secondary);
220    });
221
222    // process the open events
223    replicateAll();
224  }
225
226  @AfterEach
227  public void tearDown() throws IOException {
228    // close region will issue a flush, which will enqueue an edit into the replication sink so we
229    // need to complete it otherwise the test will hang.
230    queueReqAndResps = false;
231    failAll();
232    HBaseTestingUtil.closeRegionAndWAL(primary);
233    HBaseTestingUtil.closeRegionAndWAL(secondary);
234    if (walFactory != null) {
235      walFactory.close();
236    }
237  }
238
239  private FlushResult flushPrimary() throws IOException {
240    return primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
241  }
242
243  private void replicate(Pair<List<WAL.Entry>, CompletableFuture<Void>> pair) throws IOException {
244    Pair<ReplicateWALEntryRequest,
245      ExtendedCellScanner> params = ReplicationProtobufUtil.buildReplicateWALEntryRequest(
246        pair.getFirst().toArray(new WAL.Entry[0]),
247        secondary.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
248    for (WALEntry entry : params.getFirst().getEntryList()) {
249      secondary.replayWALEntry(entry, params.getSecond());
250    }
251    pair.getSecond().complete(null);
252  }
253
254  private void replicateOne() throws IOException {
255    replicate(reqAndResps.remove());
256  }
257
258  private void replicateAll() throws IOException {
259    for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
260      pair = reqAndResps.poll();
261      if (pair == null) {
262        break;
263      }
264      replicate(pair);
265    }
266  }
267
268  private void failOne() {
269    reqAndResps.remove().getSecond().completeExceptionally(new IOException("Inject error"));
270  }
271
272  private void failAll() {
273    for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
274      pair = reqAndResps.poll();
275      if (pair == null) {
276        break;
277      }
278      pair.getSecond().completeExceptionally(new IOException("Inject error"));
279    }
280  }
281
282  @Test
283  public void testNormalReplicate() throws IOException {
284    byte[] row = Bytes.toBytes(0);
285    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
286    replicateOne();
287    assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
288  }
289
290  @Test
291  public void testNormalFlush() throws IOException {
292    byte[] row = Bytes.toBytes(0);
293    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
294    TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
295    flushPrimary();
296    replicateAll();
297    assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
298
299    // we should have the same memstore size, i.e, the secondary should have also dropped the
300    // snapshot
301    assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
302  }
303
304  @Test
305  public void testErrorBeforeFlushStart() throws IOException {
306    byte[] row = Bytes.toBytes(0);
307    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
308    failOne();
309    verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
310    TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
311    flushPrimary();
312    // this also tests start flush with empty memstore at secondary replica side
313    replicateAll();
314    assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
315    assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
316  }
317
318  @Test
319  public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException {
320    primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
321    replicateAll();
322    TO_ADD_AFTER_PREPARE_FLUSH
323      .add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
324    flushPrimary();
325    // replicate the start flush edit
326    replicateOne();
327    // fail the remaining edits, the put and the commit flush edit
328    failOne();
329    verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
330    primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3)));
331    flushPrimary();
332    replicateAll();
333    for (int i = 0; i < 3; i++) {
334      assertEquals(i + 1,
335        Bytes.toInt(secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL)));
336    }
337    // should have nothing in memstore
338    assertEquals(0, secondary.getMemStoreDataSize());
339  }
340
341  @Test
342  public void testCatchUpWithCannotFlush() throws IOException, InterruptedException {
343    byte[] row = Bytes.toBytes(0);
344    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
345    failOne();
346    verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
347    flushPrimary();
348    failAll();
349    Thread.sleep(2000);
350    // we will request flush the second time
351    verify(flushRequester, times(2)).requestFlush(any(), anyList(), any());
352    // we can not flush because no content in memstore
353    FlushResult result = flushPrimary();
354    assertEquals(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.getResult());
355    // the secondary replica does not have this row yet
356    assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
357    // replicate the can not flush edit
358    replicateOne();
359    // we should have the row now
360    assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
361  }
362
363  @Test
364  public void testCatchUpWithReopen() throws IOException {
365    byte[] row = Bytes.toBytes(0);
366    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
367    failOne();
368    primary.close();
369    // the secondary replica does not have this row yet, although the above close has flushed the
370    // data out
371    assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
372
373    // reopen
374    primary = HRegion.openHRegion(testDir, primary.getRegionInfo(), td, primary.getWAL(),
375      UTIL.getConfiguration(), rss, null);
376    replicateAll();
377    // we should have the row now
378    assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
379  }
380}