001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.Assert.assertEquals;
021
022import java.io.IOException;
023import java.util.List;
024import java.util.concurrent.atomic.AtomicBoolean;
025import java.util.concurrent.atomic.AtomicInteger;
026import org.apache.hadoop.fs.Path;
027import org.apache.hadoop.hbase.HBaseClassTestRule;
028import org.apache.hadoop.hbase.HBaseTestingUtility;
029import org.apache.hadoop.hbase.TableName;
030import org.apache.hadoop.hbase.Waiter;
031import org.apache.hadoop.hbase.client.HBaseAdmin;
032import org.apache.hadoop.hbase.client.Put;
033import org.apache.hadoop.hbase.ipc.RpcServer;
034import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
035import org.apache.hadoop.hbase.replication.TestReplicationBase;
036import org.apache.hadoop.hbase.testclassification.MediumTests;
037import org.apache.hadoop.hbase.util.Bytes;
038import org.apache.hadoop.hbase.wal.WAL.Entry;
039import org.junit.AfterClass;
040import org.junit.BeforeClass;
041import org.junit.ClassRule;
042import org.junit.Ignore;
043import org.junit.Test;
044import org.junit.experimental.categories.Category;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
049import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
050
051import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
056import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
057import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
058import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
059import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
060import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
061import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
062import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
063import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
064import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
065import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
066import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
067import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
068import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
069import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
070import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
071import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
072import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
073import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
074import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
075import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
076import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
077import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
078import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
079import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
080import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
081import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
082import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
083import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
084import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
085import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
086import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
087import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
089import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
090
091@Category(MediumTests.class)
092@Ignore("Flaky, needs to be rewritten, see HBASE-19125")
093public class TestReplicator extends TestReplicationBase {
094
095  @ClassRule
096  public static final HBaseClassTestRule CLASS_RULE =
097      HBaseClassTestRule.forClass(TestReplicator.class);
098
099  static final Logger LOG = LoggerFactory.getLogger(TestReplicator.class);
100  static final int NUM_ROWS = 10;
101
102  @BeforeClass
103  public static void setUpBeforeClass() throws Exception {
104    // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
105    conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
106    TestReplicationBase.setUpBeforeClass();
107    admin.removePeer("2"); // Remove the peer set up for us by base class
108  }
109
110  @Test
111  public void testReplicatorBatching() throws Exception {
112    // Clear the tables
113    truncateTable(utility1, tableName);
114    truncateTable(utility2, tableName);
115
116    // Replace the peer set up for us by the base class with a wrapper for this test
117    admin.addPeer("testReplicatorBatching",
118      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
119        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
120
121    ReplicationEndpointForTest.setBatchCount(0);
122    ReplicationEndpointForTest.setEntriesCount(0);
123    try {
124      ReplicationEndpointForTest.pause();
125      try {
126        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
127        // have to be replicated separately.
128        final byte[] valueBytes = new byte[8 *1024];
129        for (int i = 0; i < NUM_ROWS; i++) {
130          htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i)))
131            .addColumn(famName, null, valueBytes)
132          );
133        }
134      } finally {
135        ReplicationEndpointForTest.resume();
136      }
137
138      // Wait for replication to complete.
139      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
140        @Override
141        public boolean evaluate() throws Exception {
142          LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount());
143          return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
144        }
145
146        @Override
147        public String explainFailure() throws Exception {
148          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
149        }
150      });
151
152      assertEquals("We sent an incorrect number of batches", NUM_ROWS,
153        ReplicationEndpointForTest.getBatchCount());
154      assertEquals("We did not replicate enough rows", NUM_ROWS,
155        utility2.countRows(htable2));
156    } finally {
157      admin.removePeer("testReplicatorBatching");
158    }
159  }
160
161  @Test
162  public void testReplicatorWithErrors() throws Exception {
163    // Clear the tables
164    truncateTable(utility1, tableName);
165    truncateTable(utility2, tableName);
166
167    // Replace the peer set up for us by the base class with a wrapper for this test
168    admin.addPeer("testReplicatorWithErrors",
169      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
170          .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
171        null);
172
173    FailureInjectingReplicationEndpointForTest.setBatchCount(0);
174    FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
175    try {
176      FailureInjectingReplicationEndpointForTest.pause();
177      try {
178        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
179        // have to be replicated separately.
180        final byte[] valueBytes = new byte[8 *1024];
181        for (int i = 0; i < NUM_ROWS; i++) {
182          htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i)))
183            .addColumn(famName, null, valueBytes)
184          );
185        }
186      } finally {
187        FailureInjectingReplicationEndpointForTest.resume();
188      }
189
190      // Wait for replication to complete.
191      // We can expect 10 batches
192      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
193        @Override
194        public boolean evaluate() throws Exception {
195          return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
196        }
197
198        @Override
199        public String explainFailure() throws Exception {
200          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
201        }
202      });
203
204      assertEquals("We did not replicate enough rows", NUM_ROWS,
205        utility2.countRows(htable2));
206    } finally {
207      admin.removePeer("testReplicatorWithErrors");
208    }
209  }
210
211  @AfterClass
212  public static void tearDownAfterClass() throws Exception {
213    TestReplicationBase.tearDownAfterClass();
214  }
215
216  private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
217    HBaseAdmin admin = util.getHBaseAdmin();
218    admin.disableTable(tableName);
219    admin.truncateTable(tablename, false);
220  }
221
222  public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
223
224    private static AtomicInteger batchCount = new AtomicInteger(0);
225    private static int entriesCount;
226    private static final Object latch = new Object();
227    private static AtomicBoolean useLatch = new AtomicBoolean(false);
228
229    public static void resume() {
230      useLatch.set(false);
231      synchronized (latch) {
232        latch.notifyAll();
233      }
234    }
235
236    public static void pause() {
237      useLatch.set(true);
238    }
239
240    public static void await() throws InterruptedException {
241      if (useLatch.get()) {
242        LOG.info("Waiting on latch");
243        synchronized(latch) {
244          latch.wait();
245        }
246        LOG.info("Waited on latch, now proceeding");
247      }
248    }
249
250    public static int getBatchCount() {
251      return batchCount.get();
252    }
253
254    public static void setBatchCount(int i) {
255      LOG.info("SetBatchCount=" + i + ", old=" + getBatchCount());
256      batchCount.set(i);
257    }
258
259    public static int getEntriesCount() {
260      return entriesCount;
261    }
262
263    public static void setEntriesCount(int i) {
264      LOG.info("SetEntriesCount=" + i);
265      entriesCount = i;
266    }
267
268    public class ReplicatorForTest extends Replicator {
269
270      public ReplicatorForTest(List<Entry> entries, int ordinal) {
271        super(entries, ordinal);
272      }
273
274      @Override
275      protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
276          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
277          throws IOException {
278        try {
279          long size = 0;
280          for (Entry e: entries) {
281            size += e.getKey().estimatedSerializedSizeOf();
282            size += e.getEdit().estimatedSerializedSizeOf();
283          }
284          LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
285              entries.size() + " entries with total size " + size + " bytes to " +
286              replicationClusterId);
287          super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
288            hfileArchiveDir);
289          entriesCount += entries.size();
290          int count = batchCount.incrementAndGet();
291          LOG.info("Completed replicating batch " + System.identityHashCode(entries) +
292              " count=" + count);
293        } catch (IOException e) {
294          LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
295          throw e;
296        }
297      }
298    }
299
300    @Override
301    public boolean replicate(ReplicateContext replicateContext) {
302      try {
303        await();
304      } catch (InterruptedException e) {
305        LOG.warn("Interrupted waiting for latch", e);
306      }
307      return super.replicate(replicateContext);
308    }
309
310    @Override
311    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
312      return new ReplicatorForTest(entries, ordinal);
313    }
314  }
315
316  public static class FailureInjectingReplicationEndpointForTest
317      extends ReplicationEndpointForTest {
318
319    static class FailureInjectingBlockingInterface implements BlockingInterface {
320
321      private final BlockingInterface delegate;
322      private volatile boolean failNext;
323
324      public FailureInjectingBlockingInterface(BlockingInterface delegate) {
325        this.delegate = delegate;
326      }
327
328      @Override
329      public GetRegionInfoResponse getRegionInfo(RpcController controller,
330          GetRegionInfoRequest request) throws ServiceException {
331        return delegate.getRegionInfo(controller, request);
332      }
333
334      @Override
335      public GetStoreFileResponse getStoreFile(RpcController controller,
336          GetStoreFileRequest request) throws ServiceException {
337        return delegate.getStoreFile(controller, request);
338      }
339
340      @Override
341      public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
342          GetOnlineRegionRequest request) throws ServiceException {
343        return delegate.getOnlineRegion(controller, request);
344      }
345
346      @Override
347      public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
348          throws ServiceException {
349        return delegate.openRegion(controller, request);
350      }
351
352      @Override
353      public WarmupRegionResponse warmupRegion(RpcController controller,
354          WarmupRegionRequest request) throws ServiceException {
355        return delegate.warmupRegion(controller, request);
356      }
357
358      @Override
359      public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
360          throws ServiceException {
361        return delegate.closeRegion(controller, request);
362      }
363
364      @Override
365      public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
366          throws ServiceException {
367        return delegate.flushRegion(controller, request);
368      }
369
370      @Override
371      public CompactRegionResponse compactRegion(RpcController controller,
372          CompactRegionRequest request) throws ServiceException {
373        return delegate.compactRegion(controller, request);
374      }
375
376      @Override
377      public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
378          ReplicateWALEntryRequest request) throws ServiceException {
379        if (!failNext) {
380          failNext = true;
381          return delegate.replicateWALEntry(controller, request);
382        } else {
383          failNext = false;
384          throw new ServiceException("Injected failure");
385        }
386      }
387
388      @Override
389      public ReplicateWALEntryResponse replay(RpcController controller,
390          ReplicateWALEntryRequest request) throws ServiceException {
391        return delegate.replay(controller, request);
392      }
393
394      @Override
395      public RollWALWriterResponse rollWALWriter(RpcController controller,
396          RollWALWriterRequest request) throws ServiceException {
397        return delegate.rollWALWriter(controller, request);
398      }
399
400      @Override
401      public GetServerInfoResponse getServerInfo(RpcController controller,
402          GetServerInfoRequest request) throws ServiceException {
403        return delegate.getServerInfo(controller, request);
404      }
405
406      @Override
407      public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
408          throws ServiceException {
409        return delegate.stopServer(controller, request);
410      }
411
412      @Override
413      public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
414          UpdateFavoredNodesRequest request) throws ServiceException {
415        return delegate.updateFavoredNodes(controller, request);
416      }
417
418      @Override
419      public UpdateConfigurationResponse updateConfiguration(RpcController controller,
420          UpdateConfigurationRequest request) throws ServiceException {
421        return delegate.updateConfiguration(controller, request);
422      }
423
424      @Override
425      public GetRegionLoadResponse getRegionLoad(RpcController controller,
426          GetRegionLoadRequest request) throws ServiceException {
427        return delegate.getRegionLoad(controller, request);
428      }
429
430      @Override
431      public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
432          ClearCompactionQueuesRequest request) throws ServiceException {
433        return delegate.clearCompactionQueues(controller, request);
434      }
435
436      @Override
437      public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
438          GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
439        return delegate.getSpaceQuotaSnapshots(controller, request);
440      }
441
442      @Override
443      public ExecuteProceduresResponse executeProcedures(RpcController controller,
444                                                         ExecuteProceduresRequest request)
445      throws ServiceException {
446        return null;
447      }
448
449      @Override
450      public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
451          ClearRegionBlockCacheRequest request) throws ServiceException {
452        return delegate.clearRegionBlockCache(controller, request);
453      }
454    }
455
456    public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
457
458      public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
459        super(entries, ordinal);
460      }
461
462      @Override
463      protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
464          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
465          throws IOException {
466        super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
467          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
468      }
469    }
470
471    @Override
472    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
473      return new FailureInjectingReplicatorForTest(entries, ordinal);
474    }
475  }
476
477}