001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.mockito.Mockito.mock;
022
023import java.io.IOException;
024import java.util.List;
025import org.apache.hadoop.conf.Configuration;
026import org.apache.hadoop.hbase.HBaseClassTestRule;
027import org.apache.hadoop.hbase.HBaseTestingUtil;
028import org.apache.hadoop.hbase.ServerName;
029import org.apache.hadoop.hbase.client.AsyncClusterConnection;
030import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
031import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
032import org.apache.hadoop.hbase.testclassification.ReplicationTests;
033import org.apache.hadoop.hbase.testclassification.SmallTests;
034import org.junit.Before;
035import org.junit.ClassRule;
036import org.junit.Test;
037import org.junit.experimental.categories.Category;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
042
043@Category({ ReplicationTests.class, SmallTests.class })
044public class TestHBaseReplicationEndpoint {
045
046  @ClassRule
047  public static final HBaseClassTestRule CLASS_RULE =
048    HBaseClassTestRule.forClass(TestHBaseReplicationEndpoint.class);
049
050  private static final Logger LOG = LoggerFactory.getLogger(TestHBaseReplicationEndpoint.class);
051
052  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
053
054  private HBaseReplicationEndpoint endpoint;
055
056  @Before
057  public void setUp() throws Exception {
058    try {
059      ReplicationEndpoint.Context context = new ReplicationEndpoint.Context(null,
060        UTIL.getConfiguration(), UTIL.getConfiguration(), null, null, null, null, null, null, null);
061      endpoint = new DummyHBaseReplicationEndpoint();
062      endpoint.init(context);
063    } catch (Exception e) {
064      LOG.info("Failed", e);
065    }
066  }
067
068  @Test
069  public void testChooseSinks() {
070    List<ServerName> serverNames = Lists.newArrayList();
071    int totalServers = 20;
072    for (int i = 0; i < totalServers; i++) {
073      serverNames.add(mock(ServerName.class));
074    }
075    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
076    endpoint.chooseSinks();
077    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
078    assertEquals(expected, endpoint.getNumSinks());
079  }
080
081  @Test
082  public void testChooseSinksLessThanRatioAvailable() {
083    List<ServerName> serverNames =
084      Lists.newArrayList(mock(ServerName.class), mock(ServerName.class));
085    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
086    endpoint.chooseSinks();
087    assertEquals(1, endpoint.getNumSinks());
088  }
089
090  @Test
091  public void testReportBadSink() {
092    ServerName serverNameA = mock(ServerName.class);
093    ServerName serverNameB = mock(ServerName.class);
094    ((DummyHBaseReplicationEndpoint) endpoint)
095      .setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
096    endpoint.chooseSinks();
097    // Sanity check
098    assertEquals(1, endpoint.getNumSinks());
099
100    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
101    endpoint.reportBadSink(sinkPeer);
102    // Just reporting a bad sink once shouldn't have an effect
103    assertEquals(1, endpoint.getNumSinks());
104  }
105
106  /**
107   * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not be
108   * replicated to anymore.
109   */
110  @Test
111  public void testReportBadSinkPastThreshold() {
112    List<ServerName> serverNames = Lists.newArrayList();
113    int totalServers = 30;
114    for (int i = 0; i < totalServers; i++) {
115      serverNames.add(mock(ServerName.class));
116    }
117    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
118    endpoint.chooseSinks();
119    // Sanity check
120    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
121    assertEquals(expected, endpoint.getNumSinks());
122
123    ServerName badSinkServer0 = endpoint.getSinkServers().get(0);
124    SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class));
125    for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
126      endpoint.reportBadSink(sinkPeer);
127    }
128    // Reporting a bad sink more than the threshold count should remove it
129    // from the list of potential sinks
130    assertEquals(expected - 1, endpoint.getNumSinks());
131
132    // now try a sink that has some successes
133    ServerName badSinkServer1 = endpoint.getSinkServers().get(0);
134    sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class));
135    for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
136      endpoint.reportBadSink(sinkPeer);
137    }
138    endpoint.reportSinkSuccess(sinkPeer); // one success
139    endpoint.reportBadSink(sinkPeer);
140    // did not remove the sink, since we had one successful try
141    assertEquals(expected - 1, endpoint.getNumSinks());
142
143    for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) {
144      endpoint.reportBadSink(sinkPeer);
145    }
146    // still not remove, since the success reset the counter
147    assertEquals(expected - 1, endpoint.getNumSinks());
148    endpoint.reportBadSink(sinkPeer);
149    // but we exhausted the tries
150    assertEquals(expected - 2, endpoint.getNumSinks());
151  }
152
153  @Test
154  public void testReportBadSinkDownToZeroSinks() {
155    List<ServerName> serverNames = Lists.newArrayList();
156    int totalServers = 4;
157    for (int i = 0; i < totalServers; i++) {
158      serverNames.add(mock(ServerName.class));
159    }
160    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
161    endpoint.chooseSinks();
162    // Sanity check
163    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
164    assertEquals(expected, endpoint.getNumSinks());
165
166    ServerName serverNameA = endpoint.getSinkServers().get(0);
167    ServerName serverNameB = endpoint.getSinkServers().get(1);
168
169    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
170    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
171
172    for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
173      endpoint.reportBadSink(sinkPeerA);
174      endpoint.reportBadSink(sinkPeerB);
175    }
176
177    // We've gone down to 0 good sinks, so the replication sinks
178    // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
179    // reported as bad.
180    expected =
181      (int) ((totalServers - 2) * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
182    assertEquals(expected, endpoint.getNumSinks());
183  }
184
185  private static class DummyHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
186
187    List<ServerName> regionServers;
188
189    public void setRegionServers(List<ServerName> regionServers) {
190      this.regionServers = regionServers;
191    }
192
193    @Override
194    public List<ServerName> fetchSlavesAddresses() {
195      return regionServers;
196    }
197
198    @Override
199    public boolean replicate(ReplicateContext replicateContext) {
200      return false;
201    }
202
203    @Override
204    public AsyncClusterConnection createConnection(Configuration conf) throws IOException {
205      return null;
206    }
207  }
208}