001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertFalse;
022import static org.mockito.ArgumentMatchers.any;
023import static org.mockito.ArgumentMatchers.anyInt;
024import static org.mockito.ArgumentMatchers.anyList;
025import static org.mockito.ArgumentMatchers.anyLong;
026import static org.mockito.Mockito.mock;
027import static org.mockito.Mockito.times;
028import static org.mockito.Mockito.verify;
029import static org.mockito.Mockito.when;
030
031import java.io.IOException;
032import java.util.ArrayDeque;
033import java.util.ArrayList;
034import java.util.Arrays;
035import java.util.Collection;
036import java.util.List;
037import java.util.Queue;
038import java.util.UUID;
039import java.util.concurrent.CompletableFuture;
040import org.apache.hadoop.conf.Configuration;
041import org.apache.hadoop.fs.FileSystem;
042import org.apache.hadoop.fs.Path;
043import org.apache.hadoop.hbase.CellScanner;
044import org.apache.hadoop.hbase.HBaseClassTestRule;
045import org.apache.hadoop.hbase.HBaseTestingUtil;
046import org.apache.hadoop.hbase.HConstants;
047import org.apache.hadoop.hbase.ServerName;
048import org.apache.hadoop.hbase.TableName;
049import org.apache.hadoop.hbase.TableNameTestRule;
050import org.apache.hadoop.hbase.client.AsyncClusterConnection;
051import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Put;
054import org.apache.hadoop.hbase.client.RegionInfo;
055import org.apache.hadoop.hbase.client.RegionInfoBuilder;
056import org.apache.hadoop.hbase.client.RegionReplicaUtil;
057import org.apache.hadoop.hbase.client.TableDescriptor;
058import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
059import org.apache.hadoop.hbase.executor.ExecutorService;
060import org.apache.hadoop.hbase.executor.ExecutorType;
061import org.apache.hadoop.hbase.monitoring.MonitoredTask;
062import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
063import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
064import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
065import org.apache.hadoop.hbase.testclassification.MediumTests;
066import org.apache.hadoop.hbase.testclassification.RegionServerTests;
067import org.apache.hadoop.hbase.util.Bytes;
068import org.apache.hadoop.hbase.util.Pair;
069import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
070import org.apache.hadoop.hbase.wal.WAL;
071import org.apache.hadoop.hbase.wal.WALFactory;
072import org.junit.After;
073import org.junit.AfterClass;
074import org.junit.Before;
075import org.junit.BeforeClass;
076import org.junit.ClassRule;
077import org.junit.Rule;
078import org.junit.Test;
079import org.junit.experimental.categories.Category;
080
081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
083
084@Category({ RegionServerTests.class, MediumTests.class })
085public class TestReplicateToReplica {
086
087  @ClassRule
088  public static final HBaseClassTestRule CLASS_RULE =
089    HBaseClassTestRule.forClass(TestReplicateToReplica.class);
090
091  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
092
093  private static byte[] FAMILY = Bytes.toBytes("family");
094
095  private static byte[] QUAL = Bytes.toBytes("qualifier");
096
097  private static ExecutorService EXEC;
098
099  @Rule
100  public final TableNameTestRule name = new TableNameTestRule();
101
102  private TableName tableName;
103
104  private Path testDir;
105
106  private TableDescriptor td;
107
108  private RegionServerServices rss;
109
110  private AsyncClusterConnection conn;
111
112  private RegionReplicationBufferManager manager;
113
114  private FlushRequester flushRequester;
115
116  private HRegion primary;
117
118  private HRegion secondary;
119
120  private WALFactory walFactory;
121
122  private boolean queueReqAndResps;
123
124  private Queue<Pair<List<WAL.Entry>, CompletableFuture<Void>>> reqAndResps;
125
126  private static List<Put> TO_ADD_AFTER_PREPARE_FLUSH;
127
128  public static final class HRegionForTest extends HRegion {
129
130    public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
131      TableDescriptor htd, RegionServerServices rsServices) {
132      super(fs, wal, confParam, htd, rsServices);
133    }
134
135    @SuppressWarnings("deprecation")
136    public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
137      RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
138      super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
139    }
140
141    @Override
142    protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
143      Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
144      FlushLifeCycleTracker tracker) throws IOException {
145      PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
146        status, writeFlushWalMarker, tracker);
147      for (Put put : TO_ADD_AFTER_PREPARE_FLUSH) {
148        put(put);
149      }
150      TO_ADD_AFTER_PREPARE_FLUSH.clear();
151      return result;
152    }
153
154  }
155
156  @BeforeClass
157  public static void setUpBeforeClass() {
158    Configuration conf = UTIL.getConfiguration();
159    conf.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1);
160    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
161    conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
162    conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
163    EXEC = new ExecutorService("test");
164    EXEC.startExecutorService(EXEC.new ExecutorConfig().setCorePoolSize(1)
165      .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
166    ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
167      MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
168  }
169
170  @AfterClass
171  public static void tearDownAfterClass() {
172    EXEC.shutdown();
173    UTIL.cleanupTestDir();
174  }
175
176  @Before
177  public void setUp() throws IOException {
178    TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList<>();
179    tableName = name.getTableName();
180    testDir = UTIL.getDataTestDir(tableName.getNameAsString());
181    Configuration conf = UTIL.getConfiguration();
182    conf.set(HConstants.HBASE_DIR, testDir.toString());
183
184    td = TableDescriptorBuilder.newBuilder(tableName)
185      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2)
186      .setRegionMemStoreReplication(true).build();
187
188    reqAndResps = new ArrayDeque<>();
189    queueReqAndResps = true;
190    conn = mock(AsyncClusterConnection.class);
191    when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i -> {
192      if (queueReqAndResps) {
193        @SuppressWarnings("unchecked")
194        List<WAL.Entry> entries = i.getArgument(1, List.class);
195        CompletableFuture<Void> future = new CompletableFuture<>();
196        reqAndResps.add(Pair.newPair(entries, future));
197        return future;
198      } else {
199        return CompletableFuture.completedFuture(null);
200      }
201    });
202
203    flushRequester = mock(FlushRequester.class);
204
205    rss = mock(RegionServerServices.class);
206    when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
207    when(rss.getConfiguration()).thenReturn(conf);
208    when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf));
209    when(rss.getExecutorService()).thenReturn(EXEC);
210    when(rss.getAsyncClusterConnection()).thenReturn(conn);
211    when(rss.getFlushRequester()).thenReturn(flushRequester);
212
213    manager = new RegionReplicationBufferManager(rss);
214    when(rss.getRegionReplicationBufferManager()).thenReturn(manager);
215
216    RegionInfo primaryHri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
217    RegionInfo secondaryHri = RegionReplicaUtil.getRegionInfoForReplica(primaryHri, 1);
218
219    walFactory = new WALFactory(conf, UUID.randomUUID().toString());
220    WAL wal = walFactory.getWAL(primaryHri);
221    primary = HRegion.createHRegion(primaryHri, testDir, conf, td, wal);
222    primary.close();
223
224    primary = HRegion.openHRegion(testDir, primaryHri, td, wal, conf, rss, null);
225    secondary = HRegion.openHRegion(secondaryHri, td, null, conf, rss, null);
226
227    when(rss.getRegions()).then(i -> {
228      return Arrays.asList(primary, secondary);
229    });
230
231    // process the open events
232    replicateAll();
233  }
234
235  @After
236  public void tearDown() throws IOException {
237    // close region will issue a flush, which will enqueue an edit into the replication sink so we
238    // need to complete it otherwise the test will hang.
239    queueReqAndResps = false;
240    failAll();
241    HBaseTestingUtil.closeRegionAndWAL(primary);
242    HBaseTestingUtil.closeRegionAndWAL(secondary);
243    if (walFactory != null) {
244      walFactory.close();
245    }
246  }
247
248  private FlushResult flushPrimary() throws IOException {
249    return primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
250  }
251
252  private void replicate(Pair<List<WAL.Entry>, CompletableFuture<Void>> pair) throws IOException {
253    Pair<ReplicateWALEntryRequest,
254      CellScanner> params = ReplicationProtobufUtil.buildReplicateWALEntryRequest(
255        pair.getFirst().toArray(new WAL.Entry[0]),
256        secondary.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
257    for (WALEntry entry : params.getFirst().getEntryList()) {
258      secondary.replayWALEntry(entry, params.getSecond());
259    }
260    pair.getSecond().complete(null);
261  }
262
263  private void replicateOne() throws IOException {
264    replicate(reqAndResps.remove());
265  }
266
267  private void replicateAll() throws IOException {
268    for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
269      pair = reqAndResps.poll();
270      if (pair == null) {
271        break;
272      }
273      replicate(pair);
274    }
275  }
276
277  private void failOne() {
278    reqAndResps.remove().getSecond().completeExceptionally(new IOException("Inject error"));
279  }
280
281  private void failAll() {
282    for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
283      pair = reqAndResps.poll();
284      if (pair == null) {
285        break;
286      }
287      pair.getSecond().completeExceptionally(new IOException("Inject error"));
288    }
289  }
290
291  @Test
292  public void testNormalReplicate() throws IOException {
293    byte[] row = Bytes.toBytes(0);
294    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
295    replicateOne();
296    assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
297  }
298
299  @Test
300  public void testNormalFlush() throws IOException {
301    byte[] row = Bytes.toBytes(0);
302    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
303    TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
304    flushPrimary();
305    replicateAll();
306    assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
307
308    // we should have the same memstore size, i.e, the secondary should have also dropped the
309    // snapshot
310    assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
311  }
312
313  @Test
314  public void testErrorBeforeFlushStart() throws IOException {
315    byte[] row = Bytes.toBytes(0);
316    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
317    failOne();
318    verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
319    TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
320    flushPrimary();
321    // this also tests start flush with empty memstore at secondary replica side
322    replicateAll();
323    assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
324    assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
325  }
326
327  @Test
328  public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException {
329    primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
330    replicateAll();
331    TO_ADD_AFTER_PREPARE_FLUSH
332      .add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
333    flushPrimary();
334    // replicate the start flush edit
335    replicateOne();
336    // fail the remaining edits, the put and the commit flush edit
337    failOne();
338    verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
339    primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3)));
340    flushPrimary();
341    replicateAll();
342    for (int i = 0; i < 3; i++) {
343      assertEquals(i + 1,
344        Bytes.toInt(secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL)));
345    }
346    // should have nothing in memstore
347    assertEquals(0, secondary.getMemStoreDataSize());
348  }
349
350  @Test
351  public void testCatchUpWithCannotFlush() throws IOException, InterruptedException {
352    byte[] row = Bytes.toBytes(0);
353    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
354    failOne();
355    verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
356    flushPrimary();
357    failAll();
358    Thread.sleep(2000);
359    // we will request flush the second time
360    verify(flushRequester, times(2)).requestFlush(any(), anyList(), any());
361    // we can not flush because no content in memstore
362    FlushResult result = flushPrimary();
363    assertEquals(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.getResult());
364    // the secondary replica does not have this row yet
365    assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
366    // replicate the can not flush edit
367    replicateOne();
368    // we should have the row now
369    assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
370  }
371
372  @Test
373  public void testCatchUpWithReopen() throws IOException {
374    byte[] row = Bytes.toBytes(0);
375    primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
376    failOne();
377    primary.close();
378    // the secondary replica does not have this row yet, although the above close has flushed the
379    // data out
380    assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
381
382    // reopen
383    primary = HRegion.openHRegion(testDir, primary.getRegionInfo(), td, primary.getWAL(),
384      UTIL.getConfiguration(), rss, null);
385    replicateAll();
386    // we should have the row now
387    assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
388  }
389}