001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.CompletableFuture; 025import java.util.concurrent.atomic.AtomicBoolean; 026import java.util.concurrent.atomic.AtomicInteger; 027import org.apache.hadoop.hbase.HBaseTestingUtil; 028import org.apache.hadoop.hbase.TableName; 029import org.apache.hadoop.hbase.Waiter; 030import org.apache.hadoop.hbase.client.Admin; 031import org.apache.hadoop.hbase.client.Put; 032import org.apache.hadoop.hbase.ipc.RpcServer; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 034import org.apache.hadoop.hbase.replication.TestReplicationBaseNoBeforeAll; 035import org.apache.hadoop.hbase.testclassification.MediumTests; 036import org.apache.hadoop.hbase.testclassification.ReplicationTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.wal.WAL.Entry; 039import org.junit.jupiter.api.BeforeAll; 040import org.junit.jupiter.api.Tag; 041import org.junit.jupiter.api.Test; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 046 047@Tag(ReplicationTests.TAG) 048@Tag(MediumTests.TAG) 049public class TestReplicator extends TestReplicationBaseNoBeforeAll { 050 051 static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class); 052 static final int NUM_ROWS = 10; 053 054 @BeforeAll 055 public static void setUpBeforeClass() throws Exception { 056 configureClusters(UTIL1, UTIL2); 057 // Set RPC size limit to 10kb (will be applied to both source and sink clusters) 058 CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); 059 startClusters(); 060 } 061 062 @Test 063 public void testReplicatorBatching() throws Exception { 064 // Clear the tables 065 truncateTable(UTIL1, tableName); 066 truncateTable(UTIL2, tableName); 067 068 // Replace the peer set up for us by the base class with a wrapper for this test 069 hbaseAdmin.addReplicationPeer("testReplicatorBatching", 070 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) 071 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()).build()); 072 073 ReplicationEndpointForTest.setBatchCount(0); 074 ReplicationEndpointForTest.setEntriesCount(0); 075 try { 076 ReplicationEndpointForTest.pause(); 077 try { 078 // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all 079 // have to be replicated separately. 080 final byte[] valueBytes = new byte[8 * 1024]; 081 for (int i = 0; i < NUM_ROWS; i++) { 082 htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null, 083 valueBytes)); 084 } 085 } finally { 086 ReplicationEndpointForTest.resume(); 087 } 088 089 // Wait for replication to complete. 090 Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() { 091 @Override 092 public boolean evaluate() throws Exception { 093 LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount()); 094 return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS; 095 } 096 097 @Override 098 public String explainFailure() throws Exception { 099 return "We waited too long for expected replication of " + NUM_ROWS + " entries"; 100 } 101 }); 102 103 assertEquals(NUM_ROWS, ReplicationEndpointForTest.getBatchCount(), 104 "We sent an incorrect number of batches"); 105 assertEquals(NUM_ROWS, HBaseTestingUtil.countRows(htable2), 106 "We did not replicate enough rows"); 107 } finally { 108 hbaseAdmin.removeReplicationPeer("testReplicatorBatching"); 109 } 110 } 111 112 @Test 113 public void testReplicatorWithErrors() throws Exception { 114 // Clear the tables 115 truncateTable(UTIL1, tableName); 116 truncateTable(UTIL2, tableName); 117 118 // Replace the peer set up for us by the base class with a wrapper for this test 119 hbaseAdmin.addReplicationPeer("testReplicatorWithErrors", 120 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getRpcConnnectionURI()) 121 .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()) 122 .build()); 123 124 FailureInjectingReplicationEndpointForTest.setBatchCount(0); 125 FailureInjectingReplicationEndpointForTest.setEntriesCount(0); 126 try { 127 FailureInjectingReplicationEndpointForTest.pause(); 128 try { 129 // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all 130 // have to be replicated separately. 131 final byte[] valueBytes = new byte[8 * 1024]; 132 for (int i = 0; i < NUM_ROWS; i++) { 133 htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null, 134 valueBytes)); 135 } 136 } finally { 137 FailureInjectingReplicationEndpointForTest.resume(); 138 } 139 140 // Wait for replication to complete. 141 // We can expect 10 batches 142 Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() { 143 @Override 144 public boolean evaluate() throws Exception { 145 return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS; 146 } 147 148 @Override 149 public String explainFailure() throws Exception { 150 return "We waited too long for expected replication of " + NUM_ROWS + " entries"; 151 } 152 }); 153 154 assertEquals(NUM_ROWS, HBaseTestingUtil.countRows(htable2), 155 "We did not replicate enough rows"); 156 } finally { 157 hbaseAdmin.removeReplicationPeer("testReplicatorWithErrors"); 158 } 159 } 160 161 private void truncateTable(HBaseTestingUtil util, TableName tablename) throws IOException { 162 Admin admin = util.getAdmin(); 163 admin.disableTable(tableName); 164 admin.truncateTable(tablename, false); 165 } 166 167 public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { 168 169 protected static AtomicInteger batchCount = new AtomicInteger(0); 170 protected static int entriesCount; 171 private static final Object latch = new Object(); 172 private static AtomicBoolean useLatch = new AtomicBoolean(false); 173 174 public static void resume() { 175 useLatch.set(false); 176 synchronized (latch) { 177 latch.notifyAll(); 178 } 179 } 180 181 public static void pause() { 182 useLatch.set(true); 183 } 184 185 public static void await() throws InterruptedException { 186 if (useLatch.get()) { 187 LOG.info("Waiting on latch"); 188 synchronized (latch) { 189 latch.wait(); 190 } 191 LOG.info("Waited on latch, now proceeding"); 192 } 193 } 194 195 public static int getBatchCount() { 196 return batchCount.get(); 197 } 198 199 public static void setBatchCount(int i) { 200 LOG.info("SetBatchCount=" + i + ", old=" + getBatchCount()); 201 batchCount.set(i); 202 } 203 204 public static int getEntriesCount() { 205 return entriesCount; 206 } 207 208 public static void setEntriesCount(int i) { 209 LOG.info("SetEntriesCount=" + i); 210 entriesCount = i; 211 } 212 213 @Override 214 public boolean replicate(ReplicateContext replicateContext) { 215 try { 216 await(); 217 } catch (InterruptedException e) { 218 LOG.warn("Interrupted waiting for latch", e); 219 } 220 return super.replicate(replicateContext); 221 } 222 223 @Override 224 protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, 225 int timeout) { 226 return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { 227 entriesCount += entries.size(); 228 int count = batchCount.incrementAndGet(); 229 LOG.info( 230 "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); 231 }); 232 233 } 234 } 235 236 public static class FailureInjectingReplicationEndpointForTest 237 extends ReplicationEndpointForTest { 238 private final AtomicBoolean failNext = new AtomicBoolean(false); 239 240 @Override 241 protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, 242 int timeout) { 243 244 if (failNext.compareAndSet(false, true)) { 245 return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> { 246 entriesCount += entries.size(); 247 int count = batchCount.incrementAndGet(); 248 LOG.info( 249 "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count); 250 }); 251 } else if (failNext.compareAndSet(true, false)) { 252 CompletableFuture<Integer> future = new CompletableFuture<Integer>(); 253 future.completeExceptionally(new ServiceException("Injected failure")); 254 return future; 255 } 256 return CompletableFuture.completedFuture(ordinal); 257 258 } 259 } 260}