001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.Callable;
025import java.util.concurrent.atomic.AtomicBoolean;
026import java.util.concurrent.atomic.AtomicInteger;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.Waiter;
031import org.apache.hadoop.hbase.client.HBaseAdmin;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.ipc.RpcServer;
034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
035import org.apache.hadoop.hbase.replication.TestReplicationBase;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.wal.WAL.Entry;
039import org.junit.AfterClass;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Test;
043import org.junit.experimental.categories.Category;
044import org.slf4j.Logger;
045import org.slf4j.LoggerFactory;
046
047import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
048
049@Category(MediumTests.class)
050public class TestReplicator extends TestReplicationBase {
051
052  @ClassRule
053  public static final HBaseClassTestRule CLASS_RULE =
054    HBaseClassTestRule.forClass(TestReplicator.class);
055
056  static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class);
057  static final int NUM_ROWS = 10;
058
059  @BeforeClass
060  public static void setUpBeforeClass() throws Exception {
061    // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
062    CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
063    TestReplicationBase.setUpBeforeClass();
064  }
065
066  @Test
067  public void testReplicatorBatching() throws Exception {
068    // Clear the tables
069    truncateTable(UTIL1, tableName);
070    truncateTable(UTIL2, tableName);
071
072    // Replace the peer set up for us by the base class with a wrapper for this test
073    admin.addPeer("testReplicatorBatching",
074      new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey())
075        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()),
076      null);
077
078    ReplicationEndpointForTest.setBatchCount(0);
079    ReplicationEndpointForTest.setEntriesCount(0);
080    try {
081      ReplicationEndpointForTest.pause();
082      try {
083        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
084        // have to be replicated separately.
085        final byte[] valueBytes = new byte[8 * 1024];
086        for (int i = 0; i < NUM_ROWS; i++) {
087          htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
088            valueBytes));
089        }
090      } finally {
091        ReplicationEndpointForTest.resume();
092      }
093
094      // Wait for replication to complete.
095      Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
096        @Override
097        public boolean evaluate() throws Exception {
098          LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount());
099          return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
100        }
101
102        @Override
103        public String explainFailure() throws Exception {
104          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
105        }
106      });
107
108      assertEquals("We sent an incorrect number of batches", NUM_ROWS,
109        ReplicationEndpointForTest.getBatchCount());
110      assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
111    } finally {
112      admin.removePeer("testReplicatorBatching");
113    }
114  }
115
116  @Test
117  public void testReplicatorWithErrors() throws Exception {
118    // Clear the tables
119    truncateTable(UTIL1, tableName);
120    truncateTable(UTIL2, tableName);
121
122    // Replace the peer set up for us by the base class with a wrapper for this test
123    admin.addPeer(
124      "testReplicatorWithErrors", new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey())
125        .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
126      null);
127
128    FailureInjectingReplicationEndpointForTest.setBatchCount(0);
129    FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
130    try {
131      FailureInjectingReplicationEndpointForTest.pause();
132      try {
133        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
134        // have to be replicated separately.
135        final byte[] valueBytes = new byte[8 * 1024];
136        for (int i = 0; i < NUM_ROWS; i++) {
137          htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
138            valueBytes));
139        }
140      } finally {
141        FailureInjectingReplicationEndpointForTest.resume();
142      }
143
144      // Wait for replication to complete.
145      // We can expect 10 batches
146      Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
147        @Override
148        public boolean evaluate() throws Exception {
149          return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
150        }
151
152        @Override
153        public String explainFailure() throws Exception {
154          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
155        }
156      });
157
158      assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
159    } finally {
160      admin.removePeer("testReplicatorWithErrors");
161    }
162  }
163
164  @AfterClass
165  public static void tearDownAfterClass() throws Exception {
166    TestReplicationBase.tearDownAfterClass();
167  }
168
169  private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
170    HBaseAdmin admin = util.getHBaseAdmin();
171    admin.disableTable(tableName);
172    admin.truncateTable(tablename, false);
173  }
174
175  public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
176
177    protected static AtomicInteger batchCount = new AtomicInteger(0);
178    protected static int entriesCount;
179    private static final Object latch = new Object();
180    private static AtomicBoolean useLatch = new AtomicBoolean(false);
181
182    public static void resume() {
183      useLatch.set(false);
184      synchronized (latch) {
185        latch.notifyAll();
186      }
187    }
188
189    public static void pause() {
190      useLatch.set(true);
191    }
192
193    public static void await() throws InterruptedException {
194      if (useLatch.get()) {
195        LOG.info("Waiting on latch");
196        synchronized (latch) {
197          latch.wait();
198        }
199        LOG.info("Waited on latch, now proceeding");
200      }
201    }
202
203    public static int getBatchCount() {
204      return batchCount.get();
205    }
206
207    public static void setBatchCount(int i) {
208      LOG.info("SetBatchCount=" + i + ", old=" + getBatchCount());
209      batchCount.set(i);
210    }
211
212    public static int getEntriesCount() {
213      return entriesCount;
214    }
215
216    public static void setEntriesCount(int i) {
217      LOG.info("SetEntriesCount=" + i);
218      entriesCount = i;
219    }
220
221    @Override
222    public boolean replicate(ReplicateContext replicateContext) {
223      try {
224        await();
225      } catch (InterruptedException e) {
226        LOG.warn("Interrupted waiting for latch", e);
227      }
228      return super.replicate(replicateContext);
229    }
230
231    @Override
232    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
233      return () -> {
234        int batchIndex = replicateEntries(entries, ordinal, timeout);
235        entriesCount += entries.size();
236        int count = batchCount.incrementAndGet();
237        LOG.info(
238          "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
239        return batchIndex;
240      };
241    }
242  }
243
244  public static class FailureInjectingReplicationEndpointForTest
245    extends ReplicationEndpointForTest {
246    private final AtomicBoolean failNext = new AtomicBoolean(false);
247
248    @Override
249    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
250      return () -> {
251        if (failNext.compareAndSet(false, true)) {
252          int batchIndex = replicateEntries(entries, ordinal, timeout);
253          entriesCount += entries.size();
254          int count = batchCount.incrementAndGet();
255          LOG.info(
256            "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
257          return batchIndex;
258        } else if (failNext.compareAndSet(true, false)) {
259          throw new ServiceException("Injected failure");
260        }
261        return ordinal;
262      };
263    }
264  }
265}