001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.Mockito.mock; 022 023import java.util.List; 024import org.apache.hadoop.conf.Configuration; 025import org.apache.hadoop.hbase.HBaseClassTestRule; 026import org.apache.hadoop.hbase.ServerName; 027import org.apache.hadoop.hbase.client.ClusterConnection; 028import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 029import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; 030import org.apache.hadoop.hbase.testclassification.ReplicationTests; 031import org.apache.hadoop.hbase.testclassification.SmallTests; 032import org.junit.Before; 033import org.junit.ClassRule; 034import org.junit.Test; 035import org.junit.experimental.categories.Category; 036 037import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 038 039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 040 041@Category({ ReplicationTests.class, SmallTests.class }) 042public class TestReplicationSinkManager { 043 044 @ClassRule 045 public static final HBaseClassTestRule CLASS_RULE = 046 HBaseClassTestRule.forClass(TestReplicationSinkManager.class); 047 048 private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID"; 049 050 private ReplicationSinkManager sinkManager; 051 private HBaseReplicationEndpoint replicationEndpoint; 052 053 /** 054 * Manage the 'getRegionServers' for the tests below. Override the base class handling of 055 * Regionservers. We used to use a mock for this but updated guava/errorprone disallows mocking of 056 * classes that implement Service. 057 */ 058 private static class SetServersHBaseReplicationEndpoint extends HBaseReplicationEndpoint { 059 List<ServerName> regionServers; 060 061 @Override 062 public boolean replicate(ReplicateContext replicateContext) { 063 return false; 064 } 065 066 @Override 067 public synchronized void setRegionServers(List<ServerName> regionServers) { 068 this.regionServers = regionServers; 069 } 070 071 @Override 072 public List<ServerName> getRegionServers() { 073 return this.regionServers; 074 } 075 } 076 077 @Before 078 public void setUp() { 079 this.replicationEndpoint = new SetServersHBaseReplicationEndpoint(); 080 sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class), PEER_CLUSTER_ID, 081 replicationEndpoint, new Configuration()); 082 } 083 084 @Test 085 public void testChooseSinks() { 086 List<ServerName> serverNames = Lists.newArrayList(); 087 int totalServers = 20; 088 for (int i = 0; i < totalServers; i++) { 089 serverNames.add(mock(ServerName.class)); 090 } 091 replicationEndpoint.setRegionServers(serverNames); 092 sinkManager.chooseSinks(); 093 int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 094 assertEquals(expected, sinkManager.getNumSinks()); 095 096 } 097 098 @Test 099 public void testChooseSinks_LessThanRatioAvailable() { 100 List<ServerName> serverNames = 101 Lists.newArrayList(mock(ServerName.class), mock(ServerName.class)); 102 replicationEndpoint.setRegionServers(serverNames); 103 sinkManager.chooseSinks(); 104 assertEquals(1, sinkManager.getNumSinks()); 105 } 106 107 @Test 108 public void testReportBadSink() { 109 ServerName serverNameA = mock(ServerName.class); 110 ServerName serverNameB = mock(ServerName.class); 111 replicationEndpoint.setRegionServers(Lists.newArrayList(serverNameA, serverNameB)); 112 sinkManager.chooseSinks(); 113 // Sanity check 114 assertEquals(1, sinkManager.getNumSinks()); 115 116 SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); 117 118 sinkManager.reportBadSink(sinkPeer); 119 120 // Just reporting a bad sink once shouldn't have an effect 121 assertEquals(1, sinkManager.getNumSinks()); 122 123 } 124 125 /** 126 * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not be 127 * replicated to anymore. 128 */ 129 @Test 130 public void testReportBadSink_PastThreshold() { 131 List<ServerName> serverNames = Lists.newArrayList(); 132 int totalServers = 30; 133 for (int i = 0; i < totalServers; i++) { 134 serverNames.add(mock(ServerName.class)); 135 } 136 replicationEndpoint.setRegionServers(serverNames); 137 sinkManager.chooseSinks(); 138 // Sanity check 139 int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 140 assertEquals(expected, sinkManager.getNumSinks()); 141 142 ServerName serverName = sinkManager.getSinksForTesting().get(0); 143 144 SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); 145 146 sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative 147 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { 148 sinkManager.reportBadSink(sinkPeer); 149 } 150 151 // Reporting a bad sink more than the threshold count should remove it 152 // from the list of potential sinks 153 assertEquals(expected - 1, sinkManager.getNumSinks()); 154 155 // 156 // now try a sink that has some successes 157 // 158 serverName = sinkManager.getSinksForTesting().get(0); 159 160 sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); 161 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) { 162 sinkManager.reportBadSink(sinkPeer); 163 } 164 sinkManager.reportSinkSuccess(sinkPeer); // one success 165 sinkManager.reportBadSink(sinkPeer); 166 167 // did not remove the sink, since we had one successful try 168 assertEquals(expected - 1, sinkManager.getNumSinks()); 169 170 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD - 2; i++) { 171 sinkManager.reportBadSink(sinkPeer); 172 } 173 // still not remove, since the success reset the counter 174 assertEquals(expected - 1, sinkManager.getNumSinks()); 175 176 sinkManager.reportBadSink(sinkPeer); 177 // but we exhausted the tries 178 assertEquals(expected - 2, sinkManager.getNumSinks()); 179 } 180 181 @Test 182 public void testReportBadSink_DownToZeroSinks() { 183 List<ServerName> serverNames = Lists.newArrayList(); 184 int totalServers = 4; 185 for (int i = 0; i < totalServers; i++) { 186 serverNames.add(mock(ServerName.class)); 187 } 188 replicationEndpoint.setRegionServers(serverNames); 189 sinkManager.chooseSinks(); 190 // Sanity check 191 List<ServerName> sinkList = sinkManager.getSinksForTesting(); 192 int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 193 assertEquals(expected, sinkList.size()); 194 195 ServerName serverNameA = sinkList.get(0); 196 ServerName serverNameB = sinkList.get(1); 197 198 SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); 199 SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); 200 201 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { 202 sinkManager.reportBadSink(sinkPeerA); 203 sinkManager.reportBadSink(sinkPeerB); 204 } 205 206 // We've gone down to 0 good sinks, so the replication sinks 207 // should have been refreshed now, so out of 4 servers, 2 are not considered as they are 208 // reported as bad. 209 expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 210 assertEquals(expected, sinkManager.getNumSinks()); 211 } 212 213}