001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.mockito.Mockito.mock; 022import static org.mockito.Mockito.when; 023 024import java.io.IOException; 025import java.net.URI; 026import java.util.Collection; 027import java.util.List; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.HBaseTestingUtil; 030import org.apache.hadoop.hbase.ServerName; 031import org.apache.hadoop.hbase.client.AsyncClusterConnection; 032import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; 033import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer; 034import org.apache.hadoop.hbase.testclassification.ReplicationTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.junit.jupiter.api.BeforeEach; 037import org.junit.jupiter.api.Tag; 038import org.junit.jupiter.api.Test; 039 040import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 041 042@Tag(ReplicationTests.TAG) 043@Tag(SmallTests.TAG) 044public class TestHBaseReplicationEndpoint { 045 046 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 047 048 private HBaseReplicationEndpoint endpoint; 049 050 @BeforeEach 051 public void setUp() throws Exception { 052 ReplicationPeer replicationPeer = mock(ReplicationPeer.class); 053 ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class); 054 when(replicationPeer.getPeerConfig()).thenReturn(peerConfig); 055 when(peerConfig.getClusterKey()).thenReturn("hbase+zk://server1:2181/hbase"); 056 ReplicationEndpoint.Context context = 057 new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null, 058 null, null, replicationPeer, null, null, null); 059 endpoint = new DummyHBaseReplicationEndpoint(); 060 endpoint.init(context); 061 } 062 063 @Test 064 public void testChooseSinks() { 065 List<ServerName> serverNames = Lists.newArrayList(); 066 int totalServers = 20; 067 for (int i = 0; i < totalServers; i++) { 068 serverNames.add(mock(ServerName.class)); 069 } 070 ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); 071 endpoint.chooseSinks(); 072 int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); 073 assertEquals(expected, endpoint.getNumSinks()); 074 } 075 076 @Test 077 public void testChooseSinksLessThanRatioAvailable() { 078 List<ServerName> serverNames = 079 Lists.newArrayList(mock(ServerName.class), mock(ServerName.class)); 080 ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); 081 endpoint.chooseSinks(); 082 assertEquals(1, endpoint.getNumSinks()); 083 } 084 085 @Test 086 public void testReportBadSink() { 087 ServerName serverNameA = mock(ServerName.class); 088 ServerName serverNameB = mock(ServerName.class); 089 ((DummyHBaseReplicationEndpoint) endpoint) 090 .setRegionServers(Lists.newArrayList(serverNameA, serverNameB)); 091 endpoint.chooseSinks(); 092 // Sanity check 093 assertEquals(1, endpoint.getNumSinks()); 094 095 SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); 096 endpoint.reportBadSink(sinkPeer); 097 // Just reporting a bad sink once shouldn't have an effect 098 assertEquals(1, endpoint.getNumSinks()); 099 } 100 101 /** 102 * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not be 103 * replicated to anymore. 104 */ 105 @Test 106 public void testReportBadSinkPastThreshold() { 107 List<ServerName> serverNames = Lists.newArrayList(); 108 int totalServers = 30; 109 for (int i = 0; i < totalServers; i++) { 110 serverNames.add(mock(ServerName.class)); 111 } 112 ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); 113 endpoint.chooseSinks(); 114 // Sanity check 115 int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); 116 assertEquals(expected, endpoint.getNumSinks()); 117 118 ServerName badSinkServer0 = endpoint.getSinkServers().get(0); 119 SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class)); 120 for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { 121 endpoint.reportBadSink(sinkPeer); 122 } 123 // Reporting a bad sink more than the threshold count should remove it 124 // from the list of potential sinks 125 assertEquals(expected - 1, endpoint.getNumSinks()); 126 127 // now try a sink that has some successes 128 ServerName badSinkServer1 = endpoint.getSinkServers().get(0); 129 sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class)); 130 for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { 131 endpoint.reportBadSink(sinkPeer); 132 } 133 endpoint.reportSinkSuccess(sinkPeer); // one success 134 endpoint.reportBadSink(sinkPeer); 135 // did not remove the sink, since we had one successful try 136 assertEquals(expected - 1, endpoint.getNumSinks()); 137 138 for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) { 139 endpoint.reportBadSink(sinkPeer); 140 } 141 // still not remove, since the success reset the counter 142 assertEquals(expected - 1, endpoint.getNumSinks()); 143 endpoint.reportBadSink(sinkPeer); 144 // but we exhausted the tries 145 assertEquals(expected - 2, endpoint.getNumSinks()); 146 } 147 148 @Test 149 public void testReportBadSinkDownToZeroSinks() { 150 List<ServerName> serverNames = Lists.newArrayList(); 151 int totalServers = 4; 152 for (int i = 0; i < totalServers; i++) { 153 serverNames.add(mock(ServerName.class)); 154 } 155 ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames); 156 endpoint.chooseSinks(); 157 // Sanity check 158 int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); 159 assertEquals(expected, endpoint.getNumSinks()); 160 161 ServerName serverNameA = endpoint.getSinkServers().get(0); 162 ServerName serverNameB = endpoint.getSinkServers().get(1); 163 164 serverNames.remove(serverNameA); 165 serverNames.remove(serverNameB); 166 167 SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class)); 168 SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class)); 169 170 for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) { 171 endpoint.reportBadSink(sinkPeerA); 172 endpoint.reportBadSink(sinkPeerB); 173 } 174 175 // We've gone down to 0 good sinks, so the replication sinks 176 // should have been refreshed now, so out of 4 servers, 2 are not considered as they are 177 // reported as bad. 178 expected = 179 (int) ((totalServers - 2) * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO); 180 assertEquals(expected, endpoint.getNumSinks()); 181 } 182 183 private static class DummyHBaseReplicationEndpoint extends HBaseReplicationEndpoint { 184 185 List<ServerName> regionServers; 186 187 public void setRegionServers(List<ServerName> regionServers) { 188 this.regionServers = regionServers; 189 } 190 191 @Override 192 protected Collection<ServerName> fetchPeerAddresses() { 193 return regionServers; 194 } 195 196 @Override 197 public boolean replicate(ReplicateContext replicateContext) { 198 return false; 199 } 200 201 @Override 202 public AsyncClusterConnection createConnection(URI clusterURI, Configuration conf) 203 throws IOException { 204 return null; 205 } 206 } 207}