001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021 022import java.io.IOException; 023import java.util.List; 024import java.util.concurrent.atomic.AtomicBoolean; 025import java.util.concurrent.atomic.AtomicInteger; 026import org.apache.hadoop.fs.Path; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.TableName; 030import org.apache.hadoop.hbase.Waiter; 031import org.apache.hadoop.hbase.client.HBaseAdmin; 032import org.apache.hadoop.hbase.client.Put; 033import org.apache.hadoop.hbase.ipc.RpcServer; 034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 035import org.apache.hadoop.hbase.replication.TestReplicationBase; 036import org.apache.hadoop.hbase.testclassification.MediumTests; 037import org.apache.hadoop.hbase.util.Bytes; 038import org.apache.hadoop.hbase.wal.WAL.Entry; 039import org.junit.AfterClass; 040import org.junit.BeforeClass; 041import org.junit.ClassRule; 042import org.junit.Ignore; 043import org.junit.Test; 044import org.junit.experimental.categories.Category; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 049import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 050 051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse; 056import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest; 057import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse; 058import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest; 059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse; 060import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest; 061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; 062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; 063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse; 064import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest; 065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse; 066import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; 067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; 068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest; 069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse; 070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest; 071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse; 072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; 073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse; 074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; 075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; 076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; 077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; 078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; 079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse; 080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; 081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; 082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; 083import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; 084import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; 085import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; 086import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest; 087import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest; 089import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; 090 091@Category(MediumTests.class) 092@Ignore("Flaky, needs to be rewritten, see HBASE-19125") 093public class TestReplicator extends TestReplicationBase { 094 095 @ClassRule 096 public static final HBaseClassTestRule CLASS_RULE = 097 HBaseClassTestRule.forClass(TestReplicator.class); 098 099 static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class); 100 static final int NUM_ROWS = 10; 101 102 @BeforeClass 103 public static void setUpBeforeClass() throws Exception { 104 // Set RPC size limit to 10kb (will be applied to both source and sink clusters) 105 conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); 106 TestReplicationBase.setUpBeforeClass(); 107 admin.removePeer("2"); // Remove the peer set up for us by base class 108 } 109 110 @Test 111 public void testReplicatorBatching() throws Exception { 112 // Clear the tables 113 truncateTable(utility1, tableName); 114 truncateTable(utility2, tableName); 115 116 // Replace the peer set up for us by the base class with a wrapper for this test 117 admin.addPeer("testReplicatorBatching", 118 new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) 119 .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null); 120 121 ReplicationEndpointForTest.setBatchCount(0); 122 ReplicationEndpointForTest.setEntriesCount(0); 123 try { 124 ReplicationEndpointForTest.pause(); 125 try { 126 // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all 127 // have to be replicated separately. 128 final byte[] valueBytes = new byte[8 *1024]; 129 for (int i = 0; i < NUM_ROWS; i++) { 130 htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i))) 131 .addColumn(famName, null, valueBytes) 132 ); 133 } 134 } finally { 135 ReplicationEndpointForTest.resume(); 136 } 137 138 // Wait for replication to complete. 139 Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() { 140 @Override 141 public boolean evaluate() throws Exception { 142 LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount()); 143 return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS; 144 } 145 146 @Override 147 public String explainFailure() throws Exception { 148 return "We waited too long for expected replication of " + NUM_ROWS + " entries"; 149 } 150 }); 151 152 assertEquals("We sent an incorrect number of batches", NUM_ROWS, 153 ReplicationEndpointForTest.getBatchCount()); 154 assertEquals("We did not replicate enough rows", NUM_ROWS, 155 utility2.countRows(htable2)); 156 } finally { 157 admin.removePeer("testReplicatorBatching"); 158 } 159 } 160 161 @Test 162 public void testReplicatorWithErrors() throws Exception { 163 // Clear the tables 164 truncateTable(utility1, tableName); 165 truncateTable(utility2, tableName); 166 167 // Replace the peer set up for us by the base class with a wrapper for this test 168 admin.addPeer("testReplicatorWithErrors", 169 new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()) 170 .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), 171 null); 172 173 FailureInjectingReplicationEndpointForTest.setBatchCount(0); 174 FailureInjectingReplicationEndpointForTest.setEntriesCount(0); 175 try { 176 FailureInjectingReplicationEndpointForTest.pause(); 177 try { 178 // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all 179 // have to be replicated separately. 180 final byte[] valueBytes = new byte[8 *1024]; 181 for (int i = 0; i < NUM_ROWS; i++) { 182 htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i))) 183 .addColumn(famName, null, valueBytes) 184 ); 185 } 186 } finally { 187 FailureInjectingReplicationEndpointForTest.resume(); 188 } 189 190 // Wait for replication to complete. 191 // We can expect 10 batches 192 Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() { 193 @Override 194 public boolean evaluate() throws Exception { 195 return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS; 196 } 197 198 @Override 199 public String explainFailure() throws Exception { 200 return "We waited too long for expected replication of " + NUM_ROWS + " entries"; 201 } 202 }); 203 204 assertEquals("We did not replicate enough rows", NUM_ROWS, 205 utility2.countRows(htable2)); 206 } finally { 207 admin.removePeer("testReplicatorWithErrors"); 208 } 209 } 210 211 @AfterClass 212 public static void tearDownAfterClass() throws Exception { 213 TestReplicationBase.tearDownAfterClass(); 214 } 215 216 private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException { 217 HBaseAdmin admin = util.getHBaseAdmin(); 218 admin.disableTable(tableName); 219 admin.truncateTable(tablename, false); 220 } 221 222 public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint { 223 224 private static AtomicInteger batchCount = new AtomicInteger(0); 225 private static int entriesCount; 226 private static final Object latch = new Object(); 227 private static AtomicBoolean useLatch = new AtomicBoolean(false); 228 229 public static void resume() { 230 useLatch.set(false); 231 synchronized (latch) { 232 latch.notifyAll(); 233 } 234 } 235 236 public static void pause() { 237 useLatch.set(true); 238 } 239 240 public static void await() throws InterruptedException { 241 if (useLatch.get()) { 242 LOG.info("Waiting on latch"); 243 synchronized(latch) { 244 latch.wait(); 245 } 246 LOG.info("Waited on latch, now proceeding"); 247 } 248 } 249 250 public static int getBatchCount() { 251 return batchCount.get(); 252 } 253 254 public static void setBatchCount(int i) { 255 LOG.info("SetBatchCount=" + i + ", old=" + getBatchCount()); 256 batchCount.set(i); 257 } 258 259 public static int getEntriesCount() { 260 return entriesCount; 261 } 262 263 public static void setEntriesCount(int i) { 264 LOG.info("SetEntriesCount=" + i); 265 entriesCount = i; 266 } 267 268 public class ReplicatorForTest extends Replicator { 269 270 public ReplicatorForTest(List<Entry> entries, int ordinal) { 271 super(entries, ordinal); 272 } 273 274 @Override 275 protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries, 276 String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) 277 throws IOException { 278 try { 279 long size = 0; 280 for (Entry e: entries) { 281 size += e.getKey().estimatedSerializedSizeOf(); 282 size += e.getEdit().estimatedSerializedSizeOf(); 283 } 284 LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " + 285 entries.size() + " entries with total size " + size + " bytes to " + 286 replicationClusterId); 287 super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, 288 hfileArchiveDir); 289 entriesCount += entries.size(); 290 int count = batchCount.incrementAndGet(); 291 LOG.info("Completed replicating batch " + System.identityHashCode(entries) + 292 " count=" + count); 293 } catch (IOException e) { 294 LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e); 295 throw e; 296 } 297 } 298 } 299 300 @Override 301 public boolean replicate(ReplicateContext replicateContext) { 302 try { 303 await(); 304 } catch (InterruptedException e) { 305 LOG.warn("Interrupted waiting for latch", e); 306 } 307 return super.replicate(replicateContext); 308 } 309 310 @Override 311 protected Replicator createReplicator(List<Entry> entries, int ordinal) { 312 return new ReplicatorForTest(entries, ordinal); 313 } 314 } 315 316 public static class FailureInjectingReplicationEndpointForTest 317 extends ReplicationEndpointForTest { 318 319 static class FailureInjectingBlockingInterface implements BlockingInterface { 320 321 private final BlockingInterface delegate; 322 private volatile boolean failNext; 323 324 public FailureInjectingBlockingInterface(BlockingInterface delegate) { 325 this.delegate = delegate; 326 } 327 328 @Override 329 public GetRegionInfoResponse getRegionInfo(RpcController controller, 330 GetRegionInfoRequest request) throws ServiceException { 331 return delegate.getRegionInfo(controller, request); 332 } 333 334 @Override 335 public GetStoreFileResponse getStoreFile(RpcController controller, 336 GetStoreFileRequest request) throws ServiceException { 337 return delegate.getStoreFile(controller, request); 338 } 339 340 @Override 341 public GetOnlineRegionResponse getOnlineRegion(RpcController controller, 342 GetOnlineRegionRequest request) throws ServiceException { 343 return delegate.getOnlineRegion(controller, request); 344 } 345 346 @Override 347 public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request) 348 throws ServiceException { 349 return delegate.openRegion(controller, request); 350 } 351 352 @Override 353 public WarmupRegionResponse warmupRegion(RpcController controller, 354 WarmupRegionRequest request) throws ServiceException { 355 return delegate.warmupRegion(controller, request); 356 } 357 358 @Override 359 public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request) 360 throws ServiceException { 361 return delegate.closeRegion(controller, request); 362 } 363 364 @Override 365 public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request) 366 throws ServiceException { 367 return delegate.flushRegion(controller, request); 368 } 369 370 @Override 371 public CompactRegionResponse compactRegion(RpcController controller, 372 CompactRegionRequest request) throws ServiceException { 373 return delegate.compactRegion(controller, request); 374 } 375 376 @Override 377 public ReplicateWALEntryResponse replicateWALEntry(RpcController controller, 378 ReplicateWALEntryRequest request) throws ServiceException { 379 if (!failNext) { 380 failNext = true; 381 return delegate.replicateWALEntry(controller, request); 382 } else { 383 failNext = false; 384 throw new ServiceException("Injected failure"); 385 } 386 } 387 388 @Override 389 public ReplicateWALEntryResponse replay(RpcController controller, 390 ReplicateWALEntryRequest request) throws ServiceException { 391 return delegate.replay(controller, request); 392 } 393 394 @Override 395 public RollWALWriterResponse rollWALWriter(RpcController controller, 396 RollWALWriterRequest request) throws ServiceException { 397 return delegate.rollWALWriter(controller, request); 398 } 399 400 @Override 401 public GetServerInfoResponse getServerInfo(RpcController controller, 402 GetServerInfoRequest request) throws ServiceException { 403 return delegate.getServerInfo(controller, request); 404 } 405 406 @Override 407 public StopServerResponse stopServer(RpcController controller, StopServerRequest request) 408 throws ServiceException { 409 return delegate.stopServer(controller, request); 410 } 411 412 @Override 413 public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller, 414 UpdateFavoredNodesRequest request) throws ServiceException { 415 return delegate.updateFavoredNodes(controller, request); 416 } 417 418 @Override 419 public UpdateConfigurationResponse updateConfiguration(RpcController controller, 420 UpdateConfigurationRequest request) throws ServiceException { 421 return delegate.updateConfiguration(controller, request); 422 } 423 424 @Override 425 public GetRegionLoadResponse getRegionLoad(RpcController controller, 426 GetRegionLoadRequest request) throws ServiceException { 427 return delegate.getRegionLoad(controller, request); 428 } 429 430 @Override 431 public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller, 432 ClearCompactionQueuesRequest request) throws ServiceException { 433 return delegate.clearCompactionQueues(controller, request); 434 } 435 436 @Override 437 public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller, 438 GetSpaceQuotaSnapshotsRequest request) throws ServiceException { 439 return delegate.getSpaceQuotaSnapshots(controller, request); 440 } 441 442 @Override 443 public ExecuteProceduresResponse executeProcedures(RpcController controller, 444 ExecuteProceduresRequest request) 445 throws ServiceException { 446 return null; 447 } 448 449 @Override 450 public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, 451 ClearRegionBlockCacheRequest request) throws ServiceException { 452 return delegate.clearRegionBlockCache(controller, request); 453 } 454 } 455 456 public class FailureInjectingReplicatorForTest extends ReplicatorForTest { 457 458 public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) { 459 super(entries, ordinal); 460 } 461 462 @Override 463 protected void replicateEntries(BlockingInterface rrs, List<Entry> entries, 464 String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) 465 throws IOException { 466 super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries, 467 replicationClusterId, baseNamespaceDir, hfileArchiveDir); 468 } 469 } 470 471 @Override 472 protected Replicator createReplicator(List<Entry> entries, int ordinal) { 473 return new FailureInjectingReplicatorForTest(entries, ordinal); 474 } 475 } 476 477}