001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.client.ClusterConnection; 029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 030import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; 031import org.apache.hadoop.hbase.testclassification.ReplicationTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.junit.Before; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037 038import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 041import org.mockito.Mockito; 042 043@Category({ReplicationTests.class, SmallTests.class}) 044public class TestReplicationSinkManager { 045 046 @ClassRule 047 public static final HBaseClassTestRule CLASS_RULE = 048 HBaseClassTestRule.forClass(TestReplicationSinkManager.class); 049 050 private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID"; 051 052 private ReplicationSinkManager sinkManager; 053 private HBaseReplicationEndpoint replicationEndpoint; 054 055 /** 056 * Manage the 'getRegionServers' for the tests below. Override the base class handling 057 * of Regionservers. We used to use a mock for this but updated guava/errorprone disallows 058 * mocking of classes that implement Service. 059 */ 060 private static class SetServersHBaseReplicationEndpoint extends HBaseReplicationEndpoint { 061 List<ServerName> regionServers; 062 063 @Override 064 public boolean replicate(ReplicateContext replicateContext) { 065 return false; 066 } 067 068 @Override 069 public synchronized void setRegionServers(List<ServerName> regionServers) { 070 this.regionServers = regionServers; 071 } 072 073 @Override 074 public List<ServerName> getRegionServers() { 075 return this.regionServers; 076 } 077 } 078 079 @Before 080 public void setUp() { 081 this.replicationEndpoint = new SetServersHBaseReplicationEndpoint(); 082 sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class), PEER_CLUSTER_ID, 083 replicationEndpoint, new Configuration()); 084 } 085 086 @Test 087 public void testChooseSinks() { 088 List<ServerName> serverNames = Lists.newArrayList(); 089 int totalServers = 20; 090 for (int i = 0; i < totalServers; i++) { 091 serverNames.add(mock(ServerName.class)); 092 } 093 replicationEndpoint.setRegionServers(serverNames); 094 sinkManager.chooseSinks(); 095 int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 096 assertEquals(expected, sinkManager.getNumSinks()); 097 098 } 099 100 @Test 101 public void testChooseSinks_LessThanRatioAvailable() { 102 List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class), 103 mock(ServerName.class)); 104 replicationEndpoint.setRegionServers(serverNames); 105 sinkManager.chooseSinks(); 106 assertEquals(1, sinkManager.getNumSinks()); 107 } 108 109 @Test 110 public void testReportBadSink() { 111 ServerName serverNameA = mock(ServerName.class); 112 ServerName serverNameB = mock(ServerName.class); 113 replicationEndpoint.setRegionServers(Lists.newArrayList(serverNameA, serverNameB)); 114 sinkManager.chooseSinks(); 115 // Sanity check 116 assertEquals(1, sinkManager.getNumSinks()); 117 118 SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); 119 120 sinkManager.reportBadSink(sinkPeer); 121 122 // Just reporting a bad sink once shouldn't have an effect 123 assertEquals(1, sinkManager.getNumSinks()); 124 125 } 126 127 /** 128 * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not 129 * be replicated to anymore. 130 */ 131 @Test 132 public void testReportBadSink_PastThreshold() { 133 List<ServerName> serverNames = Lists.newArrayList(); 134 int totalServers = 30; 135 for (int i = 0; i < totalServers; i++) { 136 serverNames.add(mock(ServerName.class)); 137 } 138 replicationEndpoint.setRegionServers(serverNames); 139 sinkManager.chooseSinks(); 140 // Sanity check 141 int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 142 assertEquals(expected, sinkManager.getNumSinks()); 143 144 ServerName serverName = sinkManager.getSinksForTesting().get(0); 145 146 SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); 147 148 sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative 149 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { 150 sinkManager.reportBadSink(sinkPeer); 151 } 152 153 // Reporting a bad sink more than the threshold count should remove it 154 // from the list of potential sinks 155 assertEquals(expected - 1, sinkManager.getNumSinks()); 156 157 // 158 // now try a sink that has some successes 159 // 160 serverName = sinkManager.getSinksForTesting().get(0); 161 162 sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); 163 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) { 164 sinkManager.reportBadSink(sinkPeer); 165 } 166 sinkManager.reportSinkSuccess(sinkPeer); // one success 167 sinkManager.reportBadSink(sinkPeer); 168 169 // did not remove the sink, since we had one successful try 170 assertEquals(expected - 1, sinkManager.getNumSinks()); 171 172 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) { 173 sinkManager.reportBadSink(sinkPeer); 174 } 175 // still not remove, since the success reset the counter 176 assertEquals(expected - 1, sinkManager.getNumSinks()); 177 178 sinkManager.reportBadSink(sinkPeer); 179 // but we exhausted the tries 180 assertEquals(expected - 2, sinkManager.getNumSinks()); 181 } 182 183 @Test 184 public void testReportBadSink_DownToZeroSinks() { 185 List<ServerName> serverNames = Lists.newArrayList(); 186 int totalServers = 4; 187 for (int i = 0; i < totalServers; i++) { 188 serverNames.add(mock(ServerName.class)); 189 } 190 replicationEndpoint.setRegionServers(serverNames); 191 sinkManager.chooseSinks(); 192 // Sanity check 193 List<ServerName> sinkList = sinkManager.getSinksForTesting(); 194 int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 195 assertEquals(expected, sinkList.size()); 196 197 ServerName serverNameA = sinkList.get(0); 198 ServerName serverNameB = sinkList.get(1); 199 200 SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); 201 SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); 202 203 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { 204 sinkManager.reportBadSink(sinkPeerA); 205 sinkManager.reportBadSink(sinkPeerB); 206 } 207 208 // We've gone down to 0 good sinks, so the replication sinks 209 // should have been refreshed now, so out of 4 servers, 2 are not considered as they are 210 // reported as bad. 211 expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 212 assertEquals(expected, sinkManager.getNumSinks()); 213 } 214 215}