001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.Callable; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.Waiter; 031import org.apache.hadoop.hbase.client.HBaseAdmin; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.ipc.RpcServer; 034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 035import org.apache.hadoop.hbase.replication.TestReplicationBase; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.wal.WAL.Entry; 039import org.junit.AfterClass; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Test; 043import org.junit.experimental.categories.Category; 044import org.slf4j.Logger; 045import org.slf4j.LoggerFactory; 046 047import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 048 049@Category(MediumTests.class) 050public class TestReplicator extends TestReplicationBase { 051 052 @ClassRule 053 public static final HBaseClassTestRule CLASS_RULE = 054 HBaseClassTestRule.forClass(TestReplicator.class); 055 056 static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class); 057 static final int NUM_ROWS = 10; 058 059 @BeforeClass 060 public static void setUpBeforeClass() throws Exception { 061 // Set RPC size limit to 10kb (will be applied to both source and sink clusters) 062 CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); 063 TestReplicationBase.setUpBeforeClass(); 064 } 065 066 @Test 067 public void testReplicatorBatching() throws Exception { 068 // Clear the tables 069 truncateTable(UTIL1, tableName); 070 truncateTable(UTIL2, tableName); 071 072 // Replace the peer set up for us by the base class with a wrapper for this test 073 admin.addPeer("testReplicatorBatching", 074 new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey()) 075 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), 076 null); 077 078 ReplicationEndpointForTest.setBatchCount(0); 079 ReplicationEndpointForTest.setEntriesCount(0); 080 try { 081 ReplicationEndpointForTest.pause(); 082 try { 083 // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all 084 // have to be replicated separately. 085 final byte[] valueBytes = new byte[8 * 1024]; 086 for (int i = 0; i < NUM_ROWS; i++) { 087 htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null, 088 valueBytes)); 089 } 090 } finally { 091 ReplicationEndpointForTest.resume(); 092 } 093 094 // Wait for replication to complete. 095 Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() { 096 @Override 097 public boolean evaluate() throws Exception { 098 LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount()); 099 return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS; 100 } 101 102 @Override 103 public String explainFailure() throws Exception { 104 return "We waited too long for expected replication of " + NUM_ROWS + " entries"; 105 } 106 }); 107 108 assertEquals("We sent an incorrect number of batches", NUM_ROWS, 109 ReplicationEndpointForTest.getBatchCount()); 110 assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); 111 } finally { 112 admin.removePeer("testReplicatorBatching"); 113 } 114 } 115 116 @Test 117 public void testReplicatorWithErrors() throws Exception { 118 // Clear the tables 119 truncateTable(UTIL1, tableName); 120 truncateTable(UTIL2, tableName); 121 122 // Replace the peer set up for us by the base class with a wrapper for this test 123 admin.addPeer( 124 "testReplicatorWithErrors", new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey()) 125 .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), 126 null); 127 128 FailureInjectingReplicationEndpointForTest.setBatchCount(0); 129 FailureInjectingReplicationEndpointForTest.setEntriesCount(0); 130 try { 131 FailureInjectingReplicationEndpointForTest.pause(); 132 try { 133 // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all 134 // have to be replicated separately. 135 final byte[] valueBytes = new byte[8 * 1024]; 136 for (int i = 0; i < NUM_ROWS; i++) { 137 htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null, 138 valueBytes)); 139 } 140 } finally { 141 FailureInjectingReplicationEndpointForTest.resume(); 142 } 143 144 // Wait for replication to complete. 145 // We can expect 10 batches 146 Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() { 147 @Override 148 public boolean evaluate() throws Exception { 149 return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS; 150 } 151 152 @Override 153 public String explainFailure() throws Exception { 154 return "We waited too long for expected replication of " + NUM_ROWS + " entries"; 155 } 156 }); 157 158 assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2)); 159 } finally { 160 admin.removePeer("testReplicatorWithErrors"); 161 } 162 } 163 164 @AfterClass 165 public static void tearDownAfterClass() throws Exception { 166 TestReplicationBase.tearDownAfterClass(); 167 } 168 169 private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException { 170 HBaseAdmin admin = util.getHBaseAdmin(); 171 admin.disableTable(tableName); 172 admin.truncateTable(tablename, false); 173 } 174 175 public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { 176 177 protected static AtomicInteger batchCount = new AtomicInteger(0); 178 protected static int entriesCount; 179 private static final Object latch = new Object(); 180 private static AtomicBoolean useLatch = new AtomicBoolean(false); 181 182 public static void resume() { 183 useLatch.set(false); 184 synchronized (latch) { 185 latch.notifyAll(); 186 } 187 } 188 189 public static void pause() { 190 useLatch.set(true); 191 } 192 193 public static void await() throws InterruptedException { 194 if (useLatch.get()) { 195 LOG.info("Waiting on latch"); 196 synchronized (latch) { 197 latch.wait(); 198 } 199 LOG.info("Waited on latch, now proceeding"); 200 } 201 } 202 203 public static int getBatchCount() { 204 return batchCount.get(); 205 } 206 207 public static void setBatchCount(int i) { 208 LOG.info("SetBatchCount=" + i + ", old=" + getBatchCount()); 209 batchCount.set(i); 210 } 211 212 public static int getEntriesCount() { 213 return entriesCount; 214 } 215 216 public static void setEntriesCount(int i) { 217 LOG.info("SetEntriesCount=" + i); 218 entriesCount = i; 219 } 220 221 @Override 222 public boolean replicate(ReplicateContext replicateContext) { 223 try { 224 await(); 225 } catch (InterruptedException e) { 226 LOG.warn("Interrupted waiting for latch", e); 227 } 228 return super.replicate(replicateContext); 229 } 230 231 @Override 232 protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { 233 return () -> { 234 int batchIndex = replicateEntries(entries, ordinal, timeout); 235 entriesCount += entries.size(); 236 int count = batchCount.incrementAndGet(); 237 LOG.info( 238 "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); 239 return batchIndex; 240 }; 241 } 242 } 243 244 public static class FailureInjectingReplicationEndpointForTest 245 extends ReplicationEndpointForTest { 246 private final AtomicBoolean failNext = new AtomicBoolean(false); 247 248 @Override 249 protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) { 250 return () -> { 251 if (failNext.compareAndSet(false, true)) { 252 int batchIndex = replicateEntries(entries, ordinal, timeout); 253 entriesCount += entries.size(); 254 int count = batchCount.incrementAndGet(); 255 LOG.info( 256 "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); 257 return batchIndex; 258 } else if (failNext.compareAndSet(true, false)) { 259 throw new ServiceException("Injected failure"); 260 } 261 return ordinal; 262 }; 263 } 264 } 265}