001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023
024import java.io.IOException;
025import java.util.OptionalLong;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.atomic.AtomicLong;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.fs.FileSystem;
032import org.apache.hadoop.fs.Path;
033import org.apache.hadoop.hbase.HBaseClassTestRule;
034import org.apache.hadoop.hbase.HBaseConfiguration;
035import org.apache.hadoop.hbase.HBaseTestingUtility;
036import org.apache.hadoop.hbase.HConstants;
037import org.apache.hadoop.hbase.KeyValue;
038import org.apache.hadoop.hbase.MiniHBaseCluster;
039import org.apache.hadoop.hbase.TableName;
040import org.apache.hadoop.hbase.Waiter;
041import org.apache.hadoop.hbase.Waiter.Predicate;
042import org.apache.hadoop.hbase.client.Admin;
043import org.apache.hadoop.hbase.regionserver.HRegionServer;
044import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
045import org.apache.hadoop.hbase.replication.regionserver.Replication;
046import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
047import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
048import org.apache.hadoop.hbase.testclassification.MediumTests;
049import org.apache.hadoop.hbase.testclassification.ReplicationTests;
050import org.apache.hadoop.hbase.util.Bytes;
051import org.apache.hadoop.hbase.wal.WAL;
052import org.apache.hadoop.hbase.wal.WALEdit;
053import org.apache.hadoop.hbase.wal.WALFactory;
054import org.apache.hadoop.hbase.wal.WALKeyImpl;
055import org.apache.hadoop.hbase.wal.WALProvider;
056import org.junit.AfterClass;
057import org.junit.BeforeClass;
058import org.junit.ClassRule;
059import org.junit.Test;
060import org.junit.experimental.categories.Category;
061import org.mockito.Mockito;
062import org.slf4j.Logger;
063import org.slf4j.LoggerFactory;
064
065@Category({ReplicationTests.class, MediumTests.class})
066public class TestReplicationSource {
067
068  @ClassRule
069  public static final HBaseClassTestRule CLASS_RULE =
070      HBaseClassTestRule.forClass(TestReplicationSource.class);
071
072  private static final Logger LOG =
073      LoggerFactory.getLogger(TestReplicationSource.class);
074  private final static HBaseTestingUtility TEST_UTIL =
075      new HBaseTestingUtility();
076  private final static HBaseTestingUtility TEST_UTIL_PEER =
077      new HBaseTestingUtility();
078  private static FileSystem FS;
079  private static Path oldLogDir;
080  private static Path logDir;
081  private static Configuration conf = TEST_UTIL.getConfiguration();
082
083  /**
084   * @throws java.lang.Exception
085   */
086  @BeforeClass
087  public static void setUpBeforeClass() throws Exception {
088    TEST_UTIL.startMiniDFSCluster(1);
089    FS = TEST_UTIL.getDFSCluster().getFileSystem();
090    Path rootDir = TEST_UTIL.createRootDir();
091    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
092    if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
093    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
094    if (FS.exists(logDir)) FS.delete(logDir, true);
095  }
096
097  @AfterClass
098  public static void tearDownAfterClass() throws Exception {
099    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
100    TEST_UTIL.shutdownMiniHBaseCluster();
101    TEST_UTIL.shutdownMiniDFSCluster();
102  }
103
104  /**
105   * Sanity check that we can move logs around while we are reading
106   * from them. Should this test fail, ReplicationSource would have a hard
107   * time reading logs that are being archived.
108   * @throws Exception
109   */
110  @Test
111  public void testLogMoving() throws Exception{
112    Path logPath = new Path(logDir, "log");
113    if (!FS.exists(logDir)) FS.mkdirs(logDir);
114    if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
115    WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
116        TEST_UTIL.getConfiguration());
117    for(int i = 0; i < 3; i++) {
118      byte[] b = Bytes.toBytes(Integer.toString(i));
119      KeyValue kv = new KeyValue(b,b,b);
120      WALEdit edit = new WALEdit();
121      edit.add(kv);
122      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
123          HConstants.DEFAULT_CLUSTER_ID);
124      writer.append(new WAL.Entry(key, edit));
125      writer.sync();
126    }
127    writer.close();
128
129    WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
130    WAL.Entry entry = reader.next();
131    assertNotNull(entry);
132
133    Path oldLogPath = new Path(oldLogDir, "log");
134    FS.rename(logPath, oldLogPath);
135
136    entry = reader.next();
137    assertNotNull(entry);
138
139    entry = reader.next();
140    entry = reader.next();
141
142    assertNull(entry);
143    reader.close();
144  }
145
146  /**
147   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
148   */
149  @Test
150  public void testTerminateTimeout() throws Exception {
151    ReplicationSource source = new ReplicationSource();
152    ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
153      @Override
154      protected void doStart() {
155        notifyStarted();
156      }
157
158      @Override
159      protected void doStop() {
160        // not calling notifyStopped() here causes the caller of stop() to get a Future that never
161        // completes
162      }
163    };
164    replicationEndpoint.start();
165    ReplicationPeers mockPeers = Mockito.mock(ReplicationPeers.class);
166    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
167    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
168    Configuration testConf = HBaseConfiguration.create();
169    testConf.setInt("replication.source.maxretriesmultiplier", 1);
170    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
171    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
172    source.init(testConf, null, manager, null, mockPeers, null, "testPeer", null,
173      replicationEndpoint, p -> OptionalLong.empty(), null);
174    ExecutorService executor = Executors.newSingleThreadExecutor();
175    Future<?> future = executor.submit(new Runnable() {
176
177      @Override
178      public void run() {
179        source.terminate("testing source termination");
180      }
181    });
182    long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
183    Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
184
185      @Override
186      public boolean evaluate() throws Exception {
187        return future.isDone();
188      }
189
190    });
191
192  }
193
194  /**
195   * Tests that recovered queues are preserved on a regionserver shutdown.
196   * See HBASE-18192
197   * @throws Exception
198   */
199  @Test
200  public void testServerShutdownRecoveredQueue() throws Exception {
201    try {
202      // Ensure single-threaded WAL
203      conf.set("hbase.wal.provider", "defaultProvider");
204      conf.setInt("replication.sleep.before.failover", 2000);
205      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
206      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
207      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
208      TEST_UTIL_PEER.startMiniCluster(1);
209
210      HRegionServer serverA = cluster.getRegionServer(0);
211      final ReplicationSourceManager managerA =
212          ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
213      HRegionServer serverB = cluster.getRegionServer(1);
214      final ReplicationSourceManager managerB =
215          ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
216      final Admin admin = TEST_UTIL.getAdmin();
217
218      final String peerId = "TestPeer";
219      admin.addReplicationPeer(peerId,
220          new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()));
221      // Wait for replication sources to come up
222      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
223        @Override public boolean evaluate() throws Exception {
224          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
225        }
226      });
227      // Disabling peer makes sure there is at least one log to claim when the server dies
228      // The recovered queue will also stay there until the peer is disabled even if the
229      // WALs it contains have no data.
230      admin.disableReplicationPeer(peerId);
231
232      // Stopping serverA
233      // It's queues should be claimed by the only other alive server i.e. serverB
234      cluster.stopRegionServer(serverA.getServerName());
235      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
236        @Override public boolean evaluate() throws Exception {
237          return managerB.getOldSources().size() == 1;
238        }
239      });
240
241      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
242      serverC.waitForServerOnline();
243      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
244        @Override public boolean evaluate() throws Exception {
245          return serverC.getReplicationSourceService() != null;
246        }
247      });
248      final ReplicationSourceManager managerC =
249          ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
250      // Sanity check
251      assertEquals(0, managerC.getOldSources().size());
252
253      // Stopping serverB
254      // Now serverC should have two recovered queues:
255      // 1. The serverB's normal queue
256      // 2. serverA's recovered queue on serverB
257      cluster.stopRegionServer(serverB.getServerName());
258      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
259        @Override public boolean evaluate() throws Exception {
260          return managerC.getOldSources().size() == 2;
261        }
262      });
263      admin.enableReplicationPeer(peerId);
264      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
265        @Override public boolean evaluate() throws Exception {
266          return managerC.getOldSources().size() == 0;
267        }
268      });
269    } finally {
270      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
271    }
272  }
273
274  /**
275   * Regionserver implementation that adds a delay on the graceful shutdown.
276   */
277  public static class ShutdownDelayRegionServer extends HRegionServer {
278    public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
279      super(conf);
280    }
281
282    @Override
283    protected void stopServiceThreads() {
284      // Add a delay before service threads are shutdown.
285      // This will keep the zookeeper connection alive for the duration of the delay.
286      LOG.info("Adding a delay to the regionserver shutdown");
287      try {
288        Thread.sleep(2000);
289      } catch (InterruptedException ex) {
290        LOG.error("Interrupted while sleeping");
291      }
292      super.stopServiceThreads();
293    }
294  }
295
296}
297