001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.CompletableFuture;
026import java.util.concurrent.LinkedBlockingQueue;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtil;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.TableName;
033import org.apache.hadoop.hbase.Waiter;
034import org.apache.hadoop.hbase.client.Admin;
035import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
036import org.apache.hadoop.hbase.client.Connection;
037import org.apache.hadoop.hbase.client.Put;
038import org.apache.hadoop.hbase.client.Table;
039import org.apache.hadoop.hbase.client.TableDescriptor;
040import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
041import org.apache.hadoop.hbase.ipc.RpcServer;
042import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
043import org.apache.hadoop.hbase.testclassification.MediumTests;
044import org.apache.hadoop.hbase.testclassification.ReplicationTests;
045import org.apache.hadoop.hbase.util.Bytes;
046import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
047import org.apache.hadoop.hbase.wal.WAL.Entry;
048import org.junit.AfterClass;
049import org.junit.Assert;
050import org.junit.BeforeClass;
051import org.junit.ClassRule;
052import org.junit.Test;
053import org.junit.experimental.categories.Category;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
056import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
057import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
058
059@Category({ ReplicationTests.class, MediumTests.class })
060public class TestSerialReplicationEndpoint {
061
062  @ClassRule
063  public static final HBaseClassTestRule CLASS_RULE =
064    HBaseClassTestRule.forClass(TestSerialReplicationEndpoint.class);
065
066  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
067  private static Configuration CONF;
068  private static Connection CONN;
069
070  @BeforeClass
071  public static void setUp() throws Exception {
072    UTIL.startMiniCluster();
073    CONF = UTIL.getConfiguration();
074    CONF.setLong(RpcServer.MAX_REQUEST_SIZE, 102400);
075    CONN = UTIL.getConnection();
076  }
077
078  @AfterClass
079  public static void tearDown() throws Exception {
080    Closeables.close(CONN, true);
081    UTIL.shutdownMiniCluster();
082  }
083
084  private String getZKClusterKey() {
085    return String.format("127.0.0.1:%d:%s", UTIL.getZkCluster().getClientPort(),
086      CONF.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
087  }
088
089  private void testHBaseReplicationEndpoint(String tableNameStr, String peerId, boolean isSerial)
090    throws IOException {
091    TestEndpoint.reset();
092    int cellNum = 10000;
093
094    TableName tableName = TableName.valueOf(tableNameStr);
095    byte[] family = Bytes.toBytes("f");
096    byte[] qualifier = Bytes.toBytes("q");
097    TableDescriptor td =
098      TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
099        .newBuilder(family).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
100    UTIL.createTable(td, null);
101
102    try (Admin admin = CONN.getAdmin()) {
103      ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
104        .setClusterKey(getZKClusterKey()).setReplicationEndpointImpl(TestEndpoint.class.getName())
105        .setReplicateAllUserTables(false).setSerial(isSerial)
106        .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())).build();
107      admin.addReplicationPeer(peerId, peerConfig);
108    }
109
110    try (Table table = CONN.getTable(tableName)) {
111      for (int i = 0; i < cellNum; i++) {
112        Put put = new Put(Bytes.toBytes(i)).addColumn(family, qualifier,
113          EnvironmentEdgeManager.currentTime(), Bytes.toBytes(i));
114        table.put(put);
115      }
116    }
117    Waiter.waitFor(CONF, 60000, () -> TestEndpoint.getEntries().size() >= cellNum);
118
119    int index = 0;
120    Assert.assertEquals(TestEndpoint.getEntries().size(), cellNum);
121    if (!isSerial) {
122      Collections.sort(TestEndpoint.getEntries(), (a, b) -> {
123        long seqA = a.getKey().getSequenceId();
124        long seqB = b.getKey().getSequenceId();
125        return seqA == seqB ? 0 : (seqA < seqB ? -1 : 1);
126      });
127    }
128    for (Entry entry : TestEndpoint.getEntries()) {
129      Assert.assertEquals(entry.getKey().getTableName(), tableName);
130      Assert.assertEquals(entry.getEdit().getCells().size(), 1);
131      Cell cell = entry.getEdit().getCells().get(0);
132      Assert.assertArrayEquals(
133        Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()),
134        Bytes.toBytes(index));
135      index++;
136    }
137    Assert.assertEquals(index, cellNum);
138  }
139
140  @Test
141  public void testSerialReplicate() throws Exception {
142    testHBaseReplicationEndpoint("testSerialReplicate", "100", true);
143  }
144
145  @Test
146  public void testParallelReplicate() throws Exception {
147    testHBaseReplicationEndpoint("testParallelReplicate", "101", false);
148  }
149
150  public static class TestEndpoint extends HBaseInterClusterReplicationEndpoint {
151
152    private final static BlockingQueue<Entry> entryQueue = new LinkedBlockingQueue<>();
153
154    public static void reset() {
155      entryQueue.clear();
156    }
157
158    public static List<Entry> getEntries() {
159      return new ArrayList<>(entryQueue);
160    }
161
162    @Override
163    public boolean canReplicateToSameCluster() {
164      return true;
165    }
166
167    @Override
168    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
169      int timeout) {
170      entryQueue.addAll(entries);
171      return CompletableFuture.completedFuture(ordinal);
172    }
173
174    @Override
175    public synchronized int getNumSinks() {
176      // Return multiple server names for endpoint parallel replication.
177      return 10;
178    }
179  }
180}