001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.CompletableFuture;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtil;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.Waiter;
031import org.apache.hadoop.hbase.client.Admin;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.ipc.RpcServer;
034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
035import org.apache.hadoop.hbase.replication.TestReplicationBase;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.wal.WAL.Entry;
039import org.junit.AfterClass;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
048
049@Category(MediumTests.class)
050public class TestReplicator extends TestReplicationBase {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054    HBaseClassTestRule.forClass(TestReplicator.class);
055
056  static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class);
057  static final int NUM_ROWS = 10;
058
059  @BeforeClass
060  public static void setUpBeforeClass() throws Exception {
061    // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
062    CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
063    TestReplicationBase.setUpBeforeClass();
064  }
065
066  @Test
067  public void testReplicatorBatching() throws Exception {
068    // Clear the tables
069    truncateTable(UTIL1, tableName);
070    truncateTable(UTIL2, tableName);
071
072    // Replace the peer set up for us by the base class with a wrapper for this test
073    hbaseAdmin.addReplicationPeer("testReplicatorBatching",
074      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
075        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build());
076
077    ReplicationEndpointForTest.setBatchCount(0);
078    ReplicationEndpointForTest.setEntriesCount(0);
079    try {
080      ReplicationEndpointForTest.pause();
081      try {
082        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
083        // have to be replicated separately.
084        final byte[] valueBytes = new byte[8 * 1024];
085        for (int i = 0; i < NUM_ROWS; i++) {
086          htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
087            valueBytes));
088        }
089      } finally {
090        ReplicationEndpointForTest.resume();
091      }
092
093      // Wait for replication to complete.
094      Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
095        @Override
096        public boolean evaluate() throws Exception {
097          LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount());
098          return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
099        }
100
101        @Override
102        public String explainFailure() throws Exception {
103          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
104        }
105      });
106
107      assertEquals("We sent an incorrect number of batches", NUM_ROWS,
108        ReplicationEndpointForTest.getBatchCount());
109      assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
110    } finally {
111      hbaseAdmin.removeReplicationPeer("testReplicatorBatching");
112    }
113  }
114
115  @Test
116  public void testReplicatorWithErrors() throws Exception {
117    // Clear the tables
118    truncateTable(UTIL1, tableName);
119    truncateTable(UTIL2, tableName);
120
121    // Replace the peer set up for us by the base class with a wrapper for this test
122    hbaseAdmin.addReplicationPeer("testReplicatorWithErrors",
123      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
124        .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName())
125        .build());
126
127    FailureInjectingReplicationEndpointForTest.setBatchCount(0);
128    FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
129    try {
130      FailureInjectingReplicationEndpointForTest.pause();
131      try {
132        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
133        // have to be replicated separately.
134        final byte[] valueBytes = new byte[8 * 1024];
135        for (int i = 0; i < NUM_ROWS; i++) {
136          htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
137            valueBytes));
138        }
139      } finally {
140        FailureInjectingReplicationEndpointForTest.resume();
141      }
142
143      // Wait for replication to complete.
144      // We can expect 10 batches
145      Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
146        @Override
147        public boolean evaluate() throws Exception {
148          return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
149        }
150
151        @Override
152        public String explainFailure() throws Exception {
153          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
154        }
155      });
156
157      assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
158    } finally {
159      hbaseAdmin.removeReplicationPeer("testReplicatorWithErrors");
160    }
161  }
162
163  @AfterClass
164  public static void tearDownAfterClass() throws Exception {
165    TestReplicationBase.tearDownAfterClass();
166  }
167
168  private void truncateTable(HBaseTestingUtil util, TableName tablename) throws IOException {
169    Admin admin = util.getAdmin();
170    admin.disableTable(tableName);
171    admin.truncateTable(tablename, false);
172  }
173
174  public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
175
176    protected static AtomicInteger batchCount = new AtomicInteger(0);
177    protected static int entriesCount;
178    private static final Object latch = new Object();
179    private static AtomicBoolean useLatch = new AtomicBoolean(false);
180
181    public static void resume() {
182      useLatch.set(false);
183      synchronized (latch) {
184        latch.notifyAll();
185      }
186    }
187
188    public static void pause() {
189      useLatch.set(true);
190    }
191
192    public static void await() throws InterruptedException {
193      if (useLatch.get()) {
194        LOG.info("Waiting on latch");
195        synchronized (latch) {
196          latch.wait();
197        }
198        LOG.info("Waited on latch, now proceeding");
199      }
200    }
201
202    public static int getBatchCount() {
203      return batchCount.get();
204    }
205
206    public static void setBatchCount(int i) {
207      LOG.info("SetBatchCount=" + i + ", old=" + getBatchCount());
208      batchCount.set(i);
209    }
210
211    public static int getEntriesCount() {
212      return entriesCount;
213    }
214
215    public static void setEntriesCount(int i) {
216      LOG.info("SetEntriesCount=" + i);
217      entriesCount = i;
218    }
219
220    @Override
221    public boolean replicate(ReplicateContext replicateContext) {
222      try {
223        await();
224      } catch (InterruptedException e) {
225        LOG.warn("Interrupted waiting for latch", e);
226      }
227      return super.replicate(replicateContext);
228    }
229
230    @Override
231    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
232      int timeout) {
233      return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
234        entriesCount += entries.size();
235        int count = batchCount.incrementAndGet();
236        LOG.info(
237          "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
238      });
239
240    }
241  }
242
243  public static class FailureInjectingReplicationEndpointForTest
244    extends ReplicationEndpointForTest {
245    private final AtomicBoolean failNext = new AtomicBoolean(false);
246
247    @Override
248    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
249      int timeout) {
250
251      if (failNext.compareAndSet(false, true)) {
252        return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
253          entriesCount += entries.size();
254          int count = batchCount.incrementAndGet();
255          LOG.info(
256            "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
257        });
258      } else if (failNext.compareAndSet(true, false)) {
259        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
260        future.completeExceptionally(new ServiceException("Injected failure"));
261        return future;
262      }
263      return CompletableFuture.completedFuture(ordinal);
264
265    }
266  }
267}