001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.CompletableFuture;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.TableName;
029import org.apache.hadoop.hbase.Waiter;
030import org.apache.hadoop.hbase.client.Admin;
031import org.apache.hadoop.hbase.client.Put;
032import org.apache.hadoop.hbase.ipc.RpcServer;
033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
034import org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll;
035import org.apache.hadoop.hbase.testclassification.MediumTests;
036import org.apache.hadoop.hbase.testclassification.ReplicationTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.wal.WAL.Entry;
039import org.junit.jupiter.api.BeforeAll;
040import org.junit.jupiter.api.Tag;
041import org.junit.jupiter.api.Test;
042import org.slf4j.Logger;
043import org.slf4j.LoggerFactory;
044
045import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
046
047@Tag(ReplicationTests.TAG)
048@Tag(MediumTests.TAG)
049public class TestReplicator extends TestReplicationBaseNoBeforeAll {
050
051  static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class);
052  static final int NUM_ROWS = 10;
053
054  @BeforeAll
055  public static void setUpBeforeClass() throws Exception {
056    configureClusters(UTIL1, UTIL2);
057    // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
058    CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
059    startClusters();
060  }
061
062  @Test
063  public void testReplicatorBatching() throws Exception {
064    // Clear the tables
065    truncateTable(UTIL1, tableName);
066    truncateTable(UTIL2, tableName);
067
068    // Replace the peer set up for us by the base class with a wrapper for this test
069    hbaseAdmin.addReplicationPeer("testReplicatorBatching",
070      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI())
071        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build());
072
073    ReplicationEndpointForTest.setBatchCount(0);
074    ReplicationEndpointForTest.setEntriesCount(0);
075    try {
076      ReplicationEndpointForTest.pause();
077      try {
078        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
079        // have to be replicated separately.
080        final byte[] valueBytes = new byte[8 * 1024];
081        for (int i = 0; i < NUM_ROWS; i++) {
082          htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
083            valueBytes));
084        }
085      } finally {
086        ReplicationEndpointForTest.resume();
087      }
088
089      // Wait for replication to complete.
090      Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
091        @Override
092        public boolean evaluate() throws Exception {
093          LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount());
094          return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
095        }
096
097        @Override
098        public String explainFailure() throws Exception {
099          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
100        }
101      });
102
103      assertEquals(NUM_ROWS, ReplicationEndpointForTest.getBatchCount(),
104        "We sent an incorrect number of batches");
105      assertEquals(NUM_ROWS, HBaseTestingUtil.countRows(htable2),
106        "We did not replicate enough rows");
107    } finally {
108      hbaseAdmin.removeReplicationPeer("testReplicatorBatching");
109    }
110  }
111
112  @Test
113  public void testReplicatorWithErrors() throws Exception {
114    // Clear the tables
115    truncateTable(UTIL1, tableName);
116    truncateTable(UTIL2, tableName);
117
118    // Replace the peer set up for us by the base class with a wrapper for this test
119    hbaseAdmin.addReplicationPeer("testReplicatorWithErrors",
120      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI())
121        .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName())
122        .build());
123
124    FailureInjectingReplicationEndpointForTest.setBatchCount(0);
125    FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
126    try {
127      FailureInjectingReplicationEndpointForTest.pause();
128      try {
129        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
130        // have to be replicated separately.
131        final byte[] valueBytes = new byte[8 * 1024];
132        for (int i = 0; i < NUM_ROWS; i++) {
133          htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
134            valueBytes));
135        }
136      } finally {
137        FailureInjectingReplicationEndpointForTest.resume();
138      }
139
140      // Wait for replication to complete.
141      // We can expect 10 batches
142      Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
143        @Override
144        public boolean evaluate() throws Exception {
145          return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
146        }
147
148        @Override
149        public String explainFailure() throws Exception {
150          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
151        }
152      });
153
154      assertEquals(NUM_ROWS, HBaseTestingUtil.countRows(htable2),
155        "We did not replicate enough rows");
156    } finally {
157      hbaseAdmin.removeReplicationPeer("testReplicatorWithErrors");
158    }
159  }
160
161  private void truncateTable(HBaseTestingUtil util, TableName tablename) throws IOException {
162    Admin admin = util.getAdmin();
163    admin.disableTable(tableName);
164    admin.truncateTable(tablename, false);
165  }
166
167  public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
168
169    protected static AtomicInteger batchCount = new AtomicInteger(0);
170    protected static int entriesCount;
171    private static final Object latch = new Object();
172    private static AtomicBoolean useLatch = new AtomicBoolean(false);
173
174    public static void resume() {
175      useLatch.set(false);
176      synchronized (latch) {
177        latch.notifyAll();
178      }
179    }
180
181    public static void pause() {
182      useLatch.set(true);
183    }
184
185    public static void await() throws InterruptedException {
186      if (useLatch.get()) {
187        LOG.info("Waiting on latch");
188        synchronized (latch) {
189          latch.wait();
190        }
191        LOG.info("Waited on latch, now proceeding");
192      }
193    }
194
195    public static int getBatchCount() {
196      return batchCount.get();
197    }
198
199    public static void setBatchCount(int i) {
200      LOG.info("SetBatchCount=" + i + ", old=" + getBatchCount());
201      batchCount.set(i);
202    }
203
204    public static int getEntriesCount() {
205      return entriesCount;
206    }
207
208    public static void setEntriesCount(int i) {
209      LOG.info("SetEntriesCount=" + i);
210      entriesCount = i;
211    }
212
213    @Override
214    public boolean replicate(ReplicateContext replicateContext) {
215      try {
216        await();
217      } catch (InterruptedException e) {
218        LOG.warn("Interrupted waiting for latch", e);
219      }
220      return super.replicate(replicateContext);
221    }
222
223    @Override
224    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
225      int timeout) {
226      return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
227        entriesCount += entries.size();
228        int count = batchCount.incrementAndGet();
229        LOG.info(
230          "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
231      });
232
233    }
234  }
235
236  public static class FailureInjectingReplicationEndpointForTest
237    extends ReplicationEndpointForTest {
238    private final AtomicBoolean failNext = new AtomicBoolean(false);
239
240    @Override
241    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
242      int timeout) {
243
244      if (failNext.compareAndSet(false, true)) {
245        return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
246          entriesCount += entries.size();
247          int count = batchCount.incrementAndGet();
248          LOG.info(
249            "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
250        });
251      } else if (failNext.compareAndSet(true, false)) {
252        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
253        future.completeExceptionally(new ServiceException("Injected failure"));
254        return future;
255      }
256      return CompletableFuture.completedFuture(ordinal);
257
258    }
259  }
260}