001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.ServerName;
028import org.apache.hadoop.hbase.client.ClusterConnection;
029import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
030import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
031import org.apache.hadoop.hbase.testclassification.ReplicationTests;
032import org.apache.hadoop.hbase.testclassification.SmallTests;
033import org.junit.Before;
034import org.junit.ClassRule;
035import org.junit.Test;
036import org.junit.experimental.categories.Category;
037
038import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
039
040import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
041
042@Category({ReplicationTests.class, SmallTests.class})
043public class TestReplicationSinkManager {
044
045  @ClassRule
046  public static final HBaseClassTestRule CLASS_RULE =
047      HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
048
049  private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
050
051  private HBaseReplicationEndpoint replicationEndpoint;
052  private ReplicationSinkManager sinkManager;
053
054  @Before
055  public void setUp() {
056    replicationEndpoint = mock(HBaseReplicationEndpoint.class);
057    sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class),
058                      PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
059  }
060
061  @Test
062  public void testChooseSinks() {
063    List<ServerName> serverNames = Lists.newArrayList();
064    int totalServers = 20;
065    for (int i = 0; i < totalServers; i++) {
066      serverNames.add(mock(ServerName.class));
067    }
068
069    when(replicationEndpoint.getRegionServers())
070          .thenReturn(serverNames);
071
072    sinkManager.chooseSinks();
073
074    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
075    assertEquals(expected, sinkManager.getNumSinks());
076
077  }
078
079  @Test
080  public void testChooseSinks_LessThanRatioAvailable() {
081    List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
082      mock(ServerName.class));
083
084    when(replicationEndpoint.getRegionServers())
085          .thenReturn(serverNames);
086
087    sinkManager.chooseSinks();
088
089    assertEquals(1, sinkManager.getNumSinks());
090  }
091
092  @Test
093  public void testReportBadSink() {
094    ServerName serverNameA = mock(ServerName.class);
095    ServerName serverNameB = mock(ServerName.class);
096    when(replicationEndpoint.getRegionServers())
097      .thenReturn(Lists.newArrayList(serverNameA, serverNameB));
098
099    sinkManager.chooseSinks();
100    // Sanity check
101    assertEquals(1, sinkManager.getNumSinks());
102
103    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
104
105    sinkManager.reportBadSink(sinkPeer);
106
107    // Just reporting a bad sink once shouldn't have an effect
108    assertEquals(1, sinkManager.getNumSinks());
109
110  }
111
112  /**
113   * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
114   * be replicated to anymore.
115   */
116  @Test
117  public void testReportBadSink_PastThreshold() {
118    List<ServerName> serverNames = Lists.newArrayList();
119    int totalServers = 30;
120    for (int i = 0; i < totalServers; i++) {
121      serverNames.add(mock(ServerName.class));
122    }
123    when(replicationEndpoint.getRegionServers())
124          .thenReturn(serverNames);
125
126
127    sinkManager.chooseSinks();
128    // Sanity check
129    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
130    assertEquals(expected, sinkManager.getNumSinks());
131
132    ServerName serverName = sinkManager.getSinksForTesting().get(0);
133
134    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
135
136    sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
137    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
138      sinkManager.reportBadSink(sinkPeer);
139    }
140
141    // Reporting a bad sink more than the threshold count should remove it
142    // from the list of potential sinks
143    assertEquals(expected - 1, sinkManager.getNumSinks());
144
145    //
146    // now try a sink that has some successes
147    //
148    serverName = sinkManager.getSinksForTesting().get(0);
149
150    sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
151    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
152      sinkManager.reportBadSink(sinkPeer);
153    }
154    sinkManager.reportSinkSuccess(sinkPeer); // one success
155    sinkManager.reportBadSink(sinkPeer);
156
157    // did not remove the sink, since we had one successful try
158    assertEquals(expected - 1, sinkManager.getNumSinks());
159
160    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-2; i++) {
161      sinkManager.reportBadSink(sinkPeer);
162    }
163    // still not remove, since the success reset the counter
164    assertEquals(expected - 1, sinkManager.getNumSinks());
165
166    sinkManager.reportBadSink(sinkPeer);
167    // but we exhausted the tries
168    assertEquals(expected - 2, sinkManager.getNumSinks());
169  }
170
171  @Test
172  public void testReportBadSink_DownToZeroSinks() {
173    List<ServerName> serverNames = Lists.newArrayList();
174    int totalServers = 4;
175    for (int i = 0; i < totalServers; i++) {
176      serverNames.add(mock(ServerName.class));
177    }
178    when(replicationEndpoint.getRegionServers())
179          .thenReturn(serverNames);
180
181
182    sinkManager.chooseSinks();
183    // Sanity check
184    List<ServerName> sinkList = sinkManager.getSinksForTesting();
185    int expected = (int) (totalServers * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
186    assertEquals(expected, sinkList.size());
187
188    ServerName serverNameA = sinkList.get(0);
189    ServerName serverNameB = sinkList.get(1);
190
191    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
192    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
193
194    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
195      sinkManager.reportBadSink(sinkPeerA);
196      sinkManager.reportBadSink(sinkPeerB);
197    }
198
199    // We've gone down to 0 good sinks, so the replication sinks
200    // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
201    // reported as bad.
202    expected = (int) ((totalServers - 2) * ReplicationSinkManager.DEFAULT_REPLICATION_SOURCE_RATIO);
203    assertEquals(expected, sinkManager.getNumSinks());
204  }
205
206}