001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.jupiter.api.Assertions.assertEquals;
021import static org.mockito.Mockito.mock;
022import static org.mockito.Mockito.when;
023
024import java.io.IOException;
025import java.net.URI;
026import java.util.Collection;
027import java.util.List;
028import org.apache.hadoop.conf.Configuration;
029import org.apache.hadoop.hbase.HBaseTestingUtil;
030import org.apache.hadoop.hbase.ServerName;
031import org.apache.hadoop.hbase.client.AsyncClusterConnection;
032import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
033import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint.SinkPeer;
034import org.apache.hadoop.hbase.testclassification.ReplicationTests;
035import org.apache.hadoop.hbase.testclassification.SmallTests;
036import org.junit.jupiter.api.BeforeEach;
037import org.junit.jupiter.api.Tag;
038import org.junit.jupiter.api.Test;
039
040import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
041
042@Tag(ReplicationTests.TAG)
043@Tag(SmallTests.TAG)
044public class TestHBaseReplicationEndpoint {
045
046  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
047
048  private HBaseReplicationEndpoint endpoint;
049
050  @BeforeEach
051  public void setUp() throws Exception {
052    ReplicationPeer replicationPeer = mock(ReplicationPeer.class);
053    ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
054    when(replicationPeer.getPeerConfig()).thenReturn(peerConfig);
055    when(peerConfig.getClusterKey()).thenReturn("hbase+zk://server1:2181/hbase");
056    ReplicationEndpoint.Context context =
057      new ReplicationEndpoint.Context(null, UTIL.getConfiguration(), UTIL.getConfiguration(), null,
058        null, null, replicationPeer, null, null, null);
059    endpoint = new DummyHBaseReplicationEndpoint();
060    endpoint.init(context);
061  }
062
063  @Test
064  public void testChooseSinks() {
065    List<ServerName> serverNames = Lists.newArrayList();
066    int totalServers = 20;
067    for (int i = 0; i < totalServers; i++) {
068      serverNames.add(mock(ServerName.class));
069    }
070    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
071    endpoint.chooseSinks();
072    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
073    assertEquals(expected, endpoint.getNumSinks());
074  }
075
076  @Test
077  public void testChooseSinksLessThanRatioAvailable() {
078    List<ServerName> serverNames =
079      Lists.newArrayList(mock(ServerName.class), mock(ServerName.class));
080    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
081    endpoint.chooseSinks();
082    assertEquals(1, endpoint.getNumSinks());
083  }
084
085  @Test
086  public void testReportBadSink() {
087    ServerName serverNameA = mock(ServerName.class);
088    ServerName serverNameB = mock(ServerName.class);
089    ((DummyHBaseReplicationEndpoint) endpoint)
090      .setRegionServers(Lists.newArrayList(serverNameA, serverNameB));
091    endpoint.chooseSinks();
092    // Sanity check
093    assertEquals(1, endpoint.getNumSinks());
094
095    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
096    endpoint.reportBadSink(sinkPeer);
097    // Just reporting a bad sink once shouldn't have an effect
098    assertEquals(1, endpoint.getNumSinks());
099  }
100
101  /**
102   * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not be
103   * replicated to anymore.
104   */
105  @Test
106  public void testReportBadSinkPastThreshold() {
107    List<ServerName> serverNames = Lists.newArrayList();
108    int totalServers = 30;
109    for (int i = 0; i < totalServers; i++) {
110      serverNames.add(mock(ServerName.class));
111    }
112    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
113    endpoint.chooseSinks();
114    // Sanity check
115    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
116    assertEquals(expected, endpoint.getNumSinks());
117
118    ServerName badSinkServer0 = endpoint.getSinkServers().get(0);
119    SinkPeer sinkPeer = new SinkPeer(badSinkServer0, mock(AsyncRegionServerAdmin.class));
120    for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
121      endpoint.reportBadSink(sinkPeer);
122    }
123    // Reporting a bad sink more than the threshold count should remove it
124    // from the list of potential sinks
125    assertEquals(expected - 1, endpoint.getNumSinks());
126
127    // now try a sink that has some successes
128    ServerName badSinkServer1 = endpoint.getSinkServers().get(0);
129    sinkPeer = new SinkPeer(badSinkServer1, mock(AsyncRegionServerAdmin.class));
130    for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
131      endpoint.reportBadSink(sinkPeer);
132    }
133    endpoint.reportSinkSuccess(sinkPeer); // one success
134    endpoint.reportBadSink(sinkPeer);
135    // did not remove the sink, since we had one successful try
136    assertEquals(expected - 1, endpoint.getNumSinks());
137
138    for (int i = 0; i < HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD - 1; i++) {
139      endpoint.reportBadSink(sinkPeer);
140    }
141    // still not remove, since the success reset the counter
142    assertEquals(expected - 1, endpoint.getNumSinks());
143    endpoint.reportBadSink(sinkPeer);
144    // but we exhausted the tries
145    assertEquals(expected - 2, endpoint.getNumSinks());
146  }
147
148  @Test
149  public void testReportBadSinkDownToZeroSinks() {
150    List<ServerName> serverNames = Lists.newArrayList();
151    int totalServers = 4;
152    for (int i = 0; i < totalServers; i++) {
153      serverNames.add(mock(ServerName.class));
154    }
155    ((DummyHBaseReplicationEndpoint) endpoint).setRegionServers(serverNames);
156    endpoint.chooseSinks();
157    // Sanity check
158    int expected = (int) (totalServers * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
159    assertEquals(expected, endpoint.getNumSinks());
160
161    ServerName serverNameA = endpoint.getSinkServers().get(0);
162    ServerName serverNameB = endpoint.getSinkServers().get(1);
163
164    serverNames.remove(serverNameA);
165    serverNames.remove(serverNameB);
166
167    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
168    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
169
170    for (int i = 0; i <= HBaseReplicationEndpoint.DEFAULT_BAD_SINK_THRESHOLD; i++) {
171      endpoint.reportBadSink(sinkPeerA);
172      endpoint.reportBadSink(sinkPeerB);
173    }
174
175    // We've gone down to 0 good sinks, so the replication sinks
176    // should have been refreshed now, so out of 4 servers, 2 are not considered as they are
177    // reported as bad.
178    expected =
179      (int) ((totalServers - 2) * HBaseReplicationEndpoint.DEFAULT_REPLICATION_SOURCE_RATIO);
180    assertEquals(expected, endpoint.getNumSinks());
181  }
182
183  private static class DummyHBaseReplicationEndpoint extends HBaseReplicationEndpoint {
184
185    List<ServerName> regionServers;
186
187    public void setRegionServers(List<ServerName> regionServers) {
188      this.regionServers = regionServers;
189    }
190
191    @Override
192    protected Collection<ServerName> fetchPeerAddresses() {
193      return regionServers;
194    }
195
196    @Override
197    public boolean replicate(ReplicateContext replicateContext) {
198      return false;
199    }
200
201    @Override
202    public AsyncClusterConnection createConnection(URI clusterURI, Configuration conf)
203      throws IOException {
204      return null;
205    }
206  }
207}