001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.Assert.assertEquals; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.util.List; 025import org.apache.hadoop.conf.Configuration; 026import org.apache.hadoop.hbase.HBaseClassTestRule; 027import org.apache.hadoop.hbase.ServerName; 028import org.apache.hadoop.hbase.client.ClusterConnection; 029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; 030import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; 031import org.apache.hadoop.hbase.testclassification.ReplicationTests; 032import org.apache.hadoop.hbase.testclassification.SmallTests; 033import org.junit.Before; 034import org.junit.ClassRule; 035import org.junit.Test; 036import org.junit.experimental.categories.Category; 037 038import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 039 040import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 041 042@Category({ReplicationTests.class, SmallTests.class}) 043public class TestReplicationSinkManager { 044 045 @ClassRule 046 public static final HBaseClassTestRule CLASS_RULE = 047 HBaseClassTestRule.forClass(TestReplicationSinkManager.class); 048 049 private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID"; 050 051 private HBaseReplicationEndpoint replicationEndpoint; 052 private ReplicationSinkManager sinkManager; 053 054 @Before 055 public void setUp() { 056 replicationEndpoint = mock(HBaseReplicationEndpoint.class); 057 sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class), 058 PEER_CLUSTER_ID, replicationEndpoint, new Configuration()); 059 } 060 061 @Test 062 public void testChooseSinks() { 063 List<ServerName> serverNames = Lists.newArrayList(); 064 int totalServers = 20; 065 for (int i = 0; i < totalServers; i++) { 066 serverNames.add(mock(ServerName.class)); 067 } 068 069 when(replicationEndpoint.getRegionServers()) 070 .thenReturn(serverNames); 071 072 sinkManager.chooseSinks(); 073 074 int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 075 assertEquals(expected, sinkManager.getNumSinks()); 076 077 } 078 079 @Test 080 public void testChooseSinks_LessThanRatioAvailable() { 081 List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class), 082 mock(ServerName.class)); 083 084 when(replicationEndpoint.getRegionServers()) 085 .thenReturn(serverNames); 086 087 sinkManager.chooseSinks(); 088 089 assertEquals(1, sinkManager.getNumSinks()); 090 } 091 092 @Test 093 public void testReportBadSink() { 094 ServerName serverNameA = mock(ServerName.class); 095 ServerName serverNameB = mock(ServerName.class); 096 when(replicationEndpoint.getRegionServers()) 097 .thenReturn(Lists.newArrayList(serverNameA, serverNameB)); 098 099 sinkManager.chooseSinks(); 100 // Sanity check 101 assertEquals(1, sinkManager.getNumSinks()); 102 103 SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); 104 105 sinkManager.reportBadSink(sinkPeer); 106 107 // Just reporting a bad sink once shouldn't have an effect 108 assertEquals(1, sinkManager.getNumSinks()); 109 110 } 111 112 /** 113 * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not 114 * be replicated to anymore. 115 */ 116 @Test 117 public void testReportBadSink_PastThreshold() { 118 List<ServerName> serverNames = Lists.newArrayList(); 119 int totalServers = 30; 120 for (int i = 0; i < totalServers; i++) { 121 serverNames.add(mock(ServerName.class)); 122 } 123 when(replicationEndpoint.getRegionServers()) 124 .thenReturn(serverNames); 125 126 127 sinkManager.chooseSinks(); 128 // Sanity check 129 int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 130 assertEquals(expected, sinkManager.getNumSinks()); 131 132 ServerName serverName = sinkManager.getSinksForTesting().get(0); 133 134 SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); 135 136 sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative 137 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { 138 sinkManager.reportBadSink(sinkPeer); 139 } 140 141 // Reporting a bad sink more than the threshold count should remove it 142 // from the list of potential sinks 143 assertEquals(expected - 1, sinkManager.getNumSinks()); 144 145 // 146 // now try a sink that has some successes 147 // 148 serverName = sinkManager.getSinksForTesting().get(0); 149 150 sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class)); 151 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) { 152 sinkManager.reportBadSink(sinkPeer); 153 } 154 sinkManager.reportSinkSuccess(sinkPeer); // one success 155 sinkManager.reportBadSink(sinkPeer); 156 157 // did not remove the sink, since we had one successful try 158 assertEquals(expected - 1, sinkManager.getNumSinks()); 159 160 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) { 161 sinkManager.reportBadSink(sinkPeer); 162 } 163 // still not remove, since the success reset the counter 164 assertEquals(expected - 1, sinkManager.getNumSinks()); 165 166 sinkManager.reportBadSink(sinkPeer); 167 // but we exhausted the tries 168 assertEquals(expected - 2, sinkManager.getNumSinks()); 169 } 170 171 @Test 172 public void testReportBadSink_DownToZeroSinks() { 173 List<ServerName> serverNames = Lists.newArrayList(); 174 int totalServers = 4; 175 for (int i = 0; i < totalServers; i++) { 176 serverNames.add(mock(ServerName.class)); 177 } 178 when(replicationEndpoint.getRegionServers()) 179 .thenReturn(serverNames); 180 181 182 sinkManager.chooseSinks(); 183 // Sanity check 184 List<ServerName> sinkList = sinkManager.getSinksForTesting(); 185 int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 186 assertEquals(expected, sinkList.size()); 187 188 ServerName serverNameA = sinkList.get(0); 189 ServerName serverNameB = sinkList.get(1); 190 191 SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class)); 192 SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class)); 193 194 for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) { 195 sinkManager.reportBadSink(sinkPeerA); 196 sinkManager.reportBadSink(sinkPeerB); 197 } 198 199 // We've gone down to 0 good sinks, so the replication sinks 200 // should have been refreshed now, so out of 4 servers, 2 are not considered as they are 201 // reported as bad. 202 expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO); 203 assertEquals(expected, sinkManager.getNumSinks()); 204 } 205 206}