001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.Mockito.mock;
022
023import java.util.List;
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.HBaseClassTestRule;
026import org.apache.hadoop.hbase.ServerName;
027import org.apache.hadoop.hbase.client.ClusterConnection;
028import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
029import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
030import org.apache.hadoop.hbase.testclassification.ReplicationTests;
031import org.apache.hadoop.hbase.testclassification.SmallTests;
032import org.junit.Before;
033import org.junit.ClassRule;
034import org.junit.Test;
035import org.junit.experimental.categories.Category;
036
037import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
038
039import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
040
041@Category({ ReplicationTests.class, SmallTests.class })
042public class TestReplicationSinkManager {
043
044  @ClassRule
045  public static final HBaseClassTestRule CLASS_RULE =
046    HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
047
048  private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
049
050  private ReplicationSinkManager sinkManager;
051  private HBaseReplicationEndpoint replicationEndpoint;
052
053  /**
054   * Manage the 'getRegionServers' for the tests below. Override the base class handling of
055   * Regionservers. We used to use a mock for this but updated guava/errorprone disallows mocking of
056   * classes that implement Service.
057   */
058  private static class SetServersHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
059    List<ServerName> regionServers;
060
061    @Override
062    public boolean replicate(ReplicateContext replicateContext) {
063      return false;
064    }
065
066    @Override
067    public synchronized void setRegionServers(List<ServerName> regionServers) {
068      this.regionServers = regionServers;
069    }
070
071    @Override
072    public List<ServerName> getRegionServers() {
073      return this.regionServers;
074    }
075  }
076
077  @Before
078  public void setUp() {
079    this.replicationEndpoint = new SetServersHBaseReplicationEndpoint();
080    sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class), PEER_CLUSTER_ID,
081      replicationEndpoint, new Configuration());
082  }
083
084  @Test
085  public void testChooseSinks() {
086    List<ServerName> serverNames = Lists.newArrayList();
087    int totalServers = 20;
088    for (int i = 0; i < totalServers; i++) {
089      serverNames.add(mock(ServerName.class));
090    }
091    replicationEndpoint.setRegionServers(serverNames);
092    sinkManager.chooseSinks();
093    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
094    assertEquals(expected, sinkManager.getNumSinks());
095
096  }
097
098  @Test
099  public void testChooseSinks_LessThanRatioAvailable() {
100    List<ServerName> serverNames =
101      Lists.newArrayList(mock(ServerName.class), mock(ServerName.class));
102    replicationEndpoint.setRegionServers(serverNames);
103    sinkManager.chooseSinks();
104    assertEquals(1, sinkManager.getNumSinks());
105  }
106
107  @Test
108  public void testReportBadSink() {
109    ServerName serverNameA = mock(ServerName.class);
110    ServerName serverNameB = mock(ServerName.class);
111    replicationEndpoint.setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
112    sinkManager.chooseSinks();
113    // Sanity check
114    assertEquals(1, sinkManager.getNumSinks());
115
116    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
117
118    sinkManager.reportBadSink(sinkPeer);
119
120    // Just reporting a bad sink once shouldn't have an effect
121    assertEquals(1, sinkManager.getNumSinks());
122
123  }
124
125  /**
126   * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not be
127   * replicated to anymore.
128   */
129  @Test
130  public void testReportBadSink_PastThreshold() {
131    List<ServerName> serverNames = Lists.newArrayList();
132    int totalServers = 30;
133    for (int i = 0; i < totalServers; i++) {
134      serverNames.add(mock(ServerName.class));
135    }
136    replicationEndpoint.setRegionServers(serverNames);
137    sinkManager.chooseSinks();
138    // Sanity check
139    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
140    assertEquals(expected, sinkManager.getNumSinks());
141
142    ServerName serverName = sinkManager.getSinksForTesting().get(0);
143
144    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
145
146    sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
147    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
148      sinkManager.reportBadSink(sinkPeer);
149    }
150
151    // Reporting a bad sink more than the threshold count should remove it
152    // from the list of potential sinks
153    assertEquals(expected - 1, sinkManager.getNumSinks());
154
155    //
156    // now try a sink that has some successes
157    //
158    serverName = sinkManager.getSinksForTesting().get(0);
159
160    sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
161    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) {
162      sinkManager.reportBadSink(sinkPeer);
163    }
164    sinkManager.reportSinkSuccess(sinkPeer); // one success
165    sinkManager.reportBadSink(sinkPeer);
166
167    // did not remove the sink, since we had one successful try
168    assertEquals(expected - 1, sinkManager.getNumSinks());
169
170    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD - 2; i++) {
171      sinkManager.reportBadSink(sinkPeer);
172    }
173    // still not remove, since the success reset the counter
174    assertEquals(expected - 1, sinkManager.getNumSinks());
175
176    sinkManager.reportBadSink(sinkPeer);
177    // but we exhausted the tries
178    assertEquals(expected - 2, sinkManager.getNumSinks());
179  }
180
181  @Test
182  public void testReportBadSink_DownToZeroSinks() {
183    List<ServerName> serverNames = Lists.newArrayList();
184    int totalServers = 4;
185    for (int i = 0; i < totalServers; i++) {
186      serverNames.add(mock(ServerName.class));
187    }
188    replicationEndpoint.setRegionServers(serverNames);
189    sinkManager.chooseSinks();
190    // Sanity check
191    List<ServerName> sinkList = sinkManager.getSinksForTesting();
192    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
193    assertEquals(expected, sinkList.size());
194
195    ServerName serverNameA = sinkList.get(0);
196    ServerName serverNameB = sinkList.get(1);
197
198    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
199    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
200
201    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
202      sinkManager.reportBadSink(sinkPeerA);
203      sinkManager.reportBadSink(sinkPeerB);
204    }
205
206    // We've gone down to 0 good sinks, so the replication sinks
207    // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
208    // reported as bad.
209    expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
210    assertEquals(expected, sinkManager.getNumSinks());
211  }
212
213}