001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021import static org.junit.Assert.assertNotNull;
022import static org.junit.Assert.assertNull;
023
024import java.io.IOException;
025import java.util.OptionalLong;
026import java.util.concurrent.ExecutorService;
027import java.util.concurrent.Executors;
028import java.util.concurrent.Future;
029import java.util.concurrent.PriorityBlockingQueue;
030import java.util.concurrent.atomic.AtomicLong;
031import org.apache.hadoop.conf.Configuration;
032import org.apache.hadoop.fs.FileSystem;
033import org.apache.hadoop.fs.Path;
034import org.apache.hadoop.hbase.HBaseClassTestRule;
035import org.apache.hadoop.hbase.HBaseConfiguration;
036import org.apache.hadoop.hbase.HBaseTestingUtility;
037import org.apache.hadoop.hbase.HConstants;
038import org.apache.hadoop.hbase.KeyValue;
039import org.apache.hadoop.hbase.MiniHBaseCluster;
040import org.apache.hadoop.hbase.Server;
041import org.apache.hadoop.hbase.ServerName;
042import org.apache.hadoop.hbase.TableName;
043import org.apache.hadoop.hbase.Waiter;
044import org.apache.hadoop.hbase.Waiter.Predicate;
045import org.apache.hadoop.hbase.client.Admin;
046import org.apache.hadoop.hbase.regionserver.HRegionServer;
047import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
048import org.apache.hadoop.hbase.replication.ReplicationPeer;
049import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
050import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
051import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
052import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSource;
053import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSourceShipper;
054import org.apache.hadoop.hbase.replication.regionserver.Replication;
055import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
056import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
057import org.apache.hadoop.hbase.testclassification.MediumTests;
058import org.apache.hadoop.hbase.testclassification.ReplicationTests;
059import org.apache.hadoop.hbase.util.Bytes;
060import org.apache.hadoop.hbase.wal.WAL;
061import org.apache.hadoop.hbase.wal.WALEdit;
062import org.apache.hadoop.hbase.wal.WALFactory;
063import org.apache.hadoop.hbase.wal.WALKeyImpl;
064import org.apache.hadoop.hbase.wal.WALProvider;
065import org.junit.AfterClass;
066import org.junit.Assert;
067import org.junit.BeforeClass;
068import org.junit.ClassRule;
069import org.junit.Test;
070import org.junit.experimental.categories.Category;
071import org.mockito.Mockito;
072import org.slf4j.Logger;
073import org.slf4j.LoggerFactory;
074
075@Category({ReplicationTests.class, MediumTests.class})
076public class TestReplicationSource {
077
078  @ClassRule
079  public static final HBaseClassTestRule CLASS_RULE =
080      HBaseClassTestRule.forClass(TestReplicationSource.class);
081
082  private static final Logger LOG =
083      LoggerFactory.getLogger(TestReplicationSource.class);
084  private final static HBaseTestingUtility TEST_UTIL =
085      new HBaseTestingUtility();
086  private final static HBaseTestingUtility TEST_UTIL_PEER =
087      new HBaseTestingUtility();
088  private static FileSystem FS;
089  private static Path oldLogDir;
090  private static Path logDir;
091  private static Configuration conf = TEST_UTIL.getConfiguration();
092
093  @BeforeClass
094  public static void setUpBeforeClass() throws Exception {
095    TEST_UTIL.startMiniDFSCluster(1);
096    FS = TEST_UTIL.getDFSCluster().getFileSystem();
097    Path rootDir = TEST_UTIL.createRootDir();
098    oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
099    if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
100    logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
101    if (FS.exists(logDir)) FS.delete(logDir, true);
102  }
103
104  @AfterClass
105  public static void tearDownAfterClass() throws Exception {
106    TEST_UTIL_PEER.shutdownMiniHBaseCluster();
107    TEST_UTIL.shutdownMiniHBaseCluster();
108    TEST_UTIL.shutdownMiniDFSCluster();
109  }
110
111  /**
112   * Sanity check that we can move logs around while we are reading
113   * from them. Should this test fail, ReplicationSource would have a hard
114   * time reading logs that are being archived.
115   */
116  @Test
117  public void testLogMoving() throws Exception{
118    Path logPath = new Path(logDir, "log");
119    if (!FS.exists(logDir)) FS.mkdirs(logDir);
120    if (!FS.exists(oldLogDir)) FS.mkdirs(oldLogDir);
121    WALProvider.Writer writer = WALFactory.createWALWriter(FS, logPath,
122        TEST_UTIL.getConfiguration());
123    for(int i = 0; i < 3; i++) {
124      byte[] b = Bytes.toBytes(Integer.toString(i));
125      KeyValue kv = new KeyValue(b,b,b);
126      WALEdit edit = new WALEdit();
127      edit.add(kv);
128      WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
129          HConstants.DEFAULT_CLUSTER_ID);
130      writer.append(new WAL.Entry(key, edit));
131      writer.sync(false);
132    }
133    writer.close();
134
135    WAL.Reader reader = WALFactory.createReader(FS, logPath, TEST_UTIL.getConfiguration());
136    WAL.Entry entry = reader.next();
137    assertNotNull(entry);
138
139    Path oldLogPath = new Path(oldLogDir, "log");
140    FS.rename(logPath, oldLogPath);
141
142    entry = reader.next();
143    assertNotNull(entry);
144
145    entry = reader.next();
146    entry = reader.next();
147
148    assertNull(entry);
149    reader.close();
150  }
151
152  /**
153   * Tests that {@link ReplicationSource#terminate(String)} will timeout properly
154   */
155  @Test
156  public void testTerminateTimeout() throws Exception {
157    ReplicationSource source = new ReplicationSource();
158    ReplicationEndpoint replicationEndpoint = new HBaseInterClusterReplicationEndpoint() {
159      @Override
160      protected void doStart() {
161        notifyStarted();
162      }
163
164      @Override
165      protected void doStop() {
166        // not calling notifyStopped() here causes the caller of stop() to get a Future that never
167        // completes
168      }
169    };
170    replicationEndpoint.start();
171    ReplicationPeer mockPeer = Mockito.mock(ReplicationPeer.class);
172    Mockito.when(mockPeer.getPeerBandwidth()).thenReturn(0L);
173    Configuration testConf = HBaseConfiguration.create();
174    testConf.setInt("replication.source.maxretriesmultiplier", 1);
175    ReplicationSourceManager manager = Mockito.mock(ReplicationSourceManager.class);
176    Mockito.when(manager.getTotalBufferUsed()).thenReturn(new AtomicLong());
177    source.init(testConf, null, manager, null, mockPeer, null, "testPeer", null,
178      p -> OptionalLong.empty(), null);
179    ExecutorService executor = Executors.newSingleThreadExecutor();
180    Future<?> future = executor.submit(new Runnable() {
181
182      @Override
183      public void run() {
184        source.terminate("testing source termination");
185      }
186    });
187    long sleepForRetries = testConf.getLong("replication.source.sleepforretries", 1000);
188    Waiter.waitFor(testConf, sleepForRetries * 2, new Predicate<Exception>() {
189
190      @Override
191      public boolean evaluate() throws Exception {
192        return future.isDone();
193      }
194    });
195  }
196
197  /**
198   * Tests that recovered queues are preserved on a regionserver shutdown.
199   * See HBASE-18192
200   */
201  @Test
202  public void testServerShutdownRecoveredQueue() throws Exception {
203    try {
204      // Ensure single-threaded WAL
205      conf.set("hbase.wal.provider", "defaultProvider");
206      conf.setInt("replication.sleep.before.failover", 2000);
207      // Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
208      conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
209      MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
210      TEST_UTIL_PEER.startMiniCluster(1);
211
212      HRegionServer serverA = cluster.getRegionServer(0);
213      final ReplicationSourceManager managerA =
214          ((Replication) serverA.getReplicationSourceService()).getReplicationManager();
215      HRegionServer serverB = cluster.getRegionServer(1);
216      final ReplicationSourceManager managerB =
217          ((Replication) serverB.getReplicationSourceService()).getReplicationManager();
218      final Admin admin = TEST_UTIL.getAdmin();
219
220      final String peerId = "TestPeer";
221      admin.addReplicationPeer(peerId,
222        ReplicationPeerConfig.newBuilder().setClusterKey(TEST_UTIL_PEER.getClusterKey()).build());
223      // Wait for replication sources to come up
224      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
225        @Override public boolean evaluate() throws Exception {
226          return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
227        }
228      });
229      // Disabling peer makes sure there is at least one log to claim when the server dies
230      // The recovered queue will also stay there until the peer is disabled even if the
231      // WALs it contains have no data.
232      admin.disableReplicationPeer(peerId);
233
234      // Stopping serverA
235      // It's queues should be claimed by the only other alive server i.e. serverB
236      cluster.stopRegionServer(serverA.getServerName());
237      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
238        @Override public boolean evaluate() throws Exception {
239          return managerB.getOldSources().size() == 1;
240        }
241      });
242
243      final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
244      serverC.waitForServerOnline();
245      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
246        @Override public boolean evaluate() throws Exception {
247          return serverC.getReplicationSourceService() != null;
248        }
249      });
250      final ReplicationSourceManager managerC =
251          ((Replication) serverC.getReplicationSourceService()).getReplicationManager();
252      // Sanity check
253      assertEquals(0, managerC.getOldSources().size());
254
255      // Stopping serverB
256      // Now serverC should have two recovered queues:
257      // 1. The serverB's normal queue
258      // 2. serverA's recovered queue on serverB
259      cluster.stopRegionServer(serverB.getServerName());
260      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
261        @Override public boolean evaluate() throws Exception {
262          return managerC.getOldSources().size() == 2;
263        }
264      });
265      admin.enableReplicationPeer(peerId);
266      Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
267        @Override public boolean evaluate() throws Exception {
268          return managerC.getOldSources().size() == 0;
269        }
270      });
271    } finally {
272      conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
273    }
274  }
275
276  /**
277   * Regionserver implementation that adds a delay on the graceful shutdown.
278   */
279  public static class ShutdownDelayRegionServer extends HRegionServer {
280    public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
281      super(conf);
282    }
283
284    @Override
285    protected void stopServiceThreads() {
286      // Add a delay before service threads are shutdown.
287      // This will keep the zookeeper connection alive for the duration of the delay.
288      LOG.info("Adding a delay to the regionserver shutdown");
289      try {
290        Thread.sleep(2000);
291      } catch (InterruptedException ex) {
292        LOG.error("Interrupted while sleeping");
293      }
294      super.stopServiceThreads();
295    }
296  }
297
298  // Test HBASE-20497
299  @Test
300  public void testRecoveredReplicationSourceShipperGetPosition() throws Exception {
301    String walGroupId = "fake-wal-group-id";
302    ServerName serverName = ServerName.valueOf("www.example.com", 12006, 1524679704418L);
303    ServerName deadServer = ServerName.valueOf("www.deadServer.com", 12006, 1524679704419L);
304    PriorityBlockingQueue<Path> queue = new PriorityBlockingQueue<>();
305    queue.put(new Path("/www/html/test"));
306    RecoveredReplicationSource source = Mockito.mock(RecoveredReplicationSource.class);
307    Server server = Mockito.mock(Server.class);
308    Mockito.when(server.getServerName()).thenReturn(serverName);
309    Mockito.when(source.getServer()).thenReturn(server);
310    Mockito.when(source.getServerWALsBelongTo()).thenReturn(deadServer);
311    ReplicationQueueStorage storage = Mockito.mock(ReplicationQueueStorage.class);
312    Mockito.when(storage.getWALPosition(Mockito.eq(serverName), Mockito.any(), Mockito.any()))
313        .thenReturn(1001L);
314    Mockito.when(storage.getWALPosition(Mockito.eq(deadServer), Mockito.any(), Mockito.any()))
315        .thenReturn(-1L);
316    conf.setInt("replication.source.maxretriesmultiplier", -1);
317    RecoveredReplicationSourceShipper shipper =
318        new RecoveredReplicationSourceShipper(conf, walGroupId, queue, source, storage);
319    Assert.assertEquals(1001L, shipper.getStartPosition());
320    conf.unset("replication.source.maxretriesmultiplier");
321  }
322}
323