001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.ClusterConnection;
029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
030import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
031import org.apache.hadoop.hbase.testclassification.ReplicationTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.junit.Before;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
041import org.mockito.Mockito;
042
043@Category({ReplicationTests.class, SmallTests.class})
044public class TestReplicationSinkManager {
045
046  @ClassRule
047  public static final HBaseClassTestRule CLASS_RULE =
048      HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
049
050  private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
051
052  private ReplicationSinkManager sinkManager;
053  private HBaseReplicationEndpoint replicationEndpoint;
054
055  /**
056   * Manage the 'getRegionServers' for the tests below. Override the base class handling
057   * of Regionservers. We used to use a mock for this but updated guava/errorprone disallows
058   * mocking of classes that implement Service.
059   */
060  private static class SetServersHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
061    List<ServerName> regionServers;
062
063    @Override
064    public boolean replicate(ReplicateContext replicateContext) {
065      return false;
066    }
067
068    @Override
069    public synchronized void setRegionServers(List<ServerName> regionServers) {
070      this.regionServers = regionServers;
071    }
072
073    @Override
074    public List<ServerName> getRegionServers() {
075      return this.regionServers;
076    }
077  }
078
079  @Before
080  public void setUp() {
081    this.replicationEndpoint = new SetServersHBaseReplicationEndpoint();
082    sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class), PEER_CLUSTER_ID,
083      replicationEndpoint, new Configuration());
084  }
085
086  @Test
087  public void testChooseSinks() {
088    List<ServerName> serverNames = Lists.newArrayList();
089    int totalServers = 20;
090    for (int i = 0; i < totalServers; i++) {
091      serverNames.add(mock(ServerName.class));
092    }
093    replicationEndpoint.setRegionServers(serverNames);
094    sinkManager.chooseSinks();
095    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
096    assertEquals(expected, sinkManager.getNumSinks());
097
098  }
099
100  @Test
101  public void testChooseSinks_LessThanRatioAvailable() {
102    List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
103      mock(ServerName.class));
104    replicationEndpoint.setRegionServers(serverNames);
105    sinkManager.chooseSinks();
106    assertEquals(1, sinkManager.getNumSinks());
107  }
108
109  @Test
110  public void testReportBadSink() {
111    ServerName serverNameA = mock(ServerName.class);
112    ServerName serverNameB = mock(ServerName.class);
113    replicationEndpoint.setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
114    sinkManager.chooseSinks();
115    // Sanity check
116    assertEquals(1, sinkManager.getNumSinks());
117
118    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
119
120    sinkManager.reportBadSink(sinkPeer);
121
122    // Just reporting a bad sink once shouldn't have an effect
123    assertEquals(1, sinkManager.getNumSinks());
124
125  }
126
127  /**
128   * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
129   * be replicated to anymore.
130   */
131  @Test
132  public void testReportBadSink_PastThreshold() {
133    List<ServerName> serverNames = Lists.newArrayList();
134    int totalServers = 30;
135    for (int i = 0; i < totalServers; i++) {
136      serverNames.add(mock(ServerName.class));
137    }
138    replicationEndpoint.setRegionServers(serverNames);
139    sinkManager.chooseSinks();
140    // Sanity check
141    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
142    assertEquals(expected, sinkManager.getNumSinks());
143
144    ServerName serverName = sinkManager.getSinksForTesting().get(0);
145
146    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
147
148    sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
149    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
150      sinkManager.reportBadSink(sinkPeer);
151    }
152
153    // Reporting a bad sink more than the threshold count should remove it
154    // from the list of potential sinks
155    assertEquals(expected - 1, sinkManager.getNumSinks());
156
157    //
158    // now try a sink that has some successes
159    //
160    serverName = sinkManager.getSinksForTesting().get(0);
161
162    sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
163    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
164      sinkManager.reportBadSink(sinkPeer);
165    }
166    sinkManager.reportSinkSuccess(sinkPeer); // one success
167    sinkManager.reportBadSink(sinkPeer);
168
169    // did not remove the sink, since we had one successful try
170    assertEquals(expected - 1, sinkManager.getNumSinks());
171
172    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
173      sinkManager.reportBadSink(sinkPeer);
174    }
175    // still not remove, since the success reset the counter
176    assertEquals(expected - 1, sinkManager.getNumSinks());
177
178    sinkManager.reportBadSink(sinkPeer);
179    // but we exhausted the tries
180    assertEquals(expected - 2, sinkManager.getNumSinks());
181  }
182
183  @Test
184  public void testReportBadSink_DownToZeroSinks() {
185    List<ServerName> serverNames = Lists.newArrayList();
186    int totalServers = 4;
187    for (int i = 0; i < totalServers; i++) {
188      serverNames.add(mock(ServerName.class));
189    }
190    replicationEndpoint.setRegionServers(serverNames);
191    sinkManager.chooseSinks();
192    // Sanity check
193    List<ServerName> sinkList = sinkManager.getSinksForTesting();
194    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
195    assertEquals(expected, sinkList.size());
196
197    ServerName serverNameA = sinkList.get(0);
198    ServerName serverNameB = sinkList.get(1);
199
200    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
201    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
202
203    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
204      sinkManager.reportBadSink(sinkPeerA);
205      sinkManager.reportBadSink(sinkPeerB);
206    }
207
208    // We've gone down to 0 good sinks, so the replication sinks
209    // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
210    // reported as bad.
211    expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
212    assertEquals(expected, sinkManager.getNumSinks());
213  }
214
215}