001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import java.io.IOException;
021import java.util.ArrayList;
022import java.util.Collections;
023import java.util.List;
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.Callable;
026import java.util.concurrent.LinkedBlockingQueue;
027import org.apache.hadoop.conf.Configuration;
028import org.apache.hadoop.hbase.Cell;
029import org.apache.hadoop.hbase.HBaseClassTestRule;
030import org.apache.hadoop.hbase.HBaseTestingUtility;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ServerName;
033import org.apache.hadoop.hbase.TableName;
034import org.apache.hadoop.hbase.Waiter;
035import org.apache.hadoop.hbase.client.Admin;
036import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
037import org.apache.hadoop.hbase.client.Connection;
038import org.apache.hadoop.hbase.client.Put;
039import org.apache.hadoop.hbase.client.Table;
040import org.apache.hadoop.hbase.client.TableDescriptor;
041import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
042import org.apache.hadoop.hbase.ipc.RpcServer;
043import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
044import org.apache.hadoop.hbase.testclassification.MediumTests;
045import org.apache.hadoop.hbase.testclassification.ReplicationTests;
046import org.apache.hadoop.hbase.util.Bytes;
047import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
048import org.apache.hadoop.hbase.wal.WAL.Entry;
049import org.junit.AfterClass;
050import org.junit.Assert;
051import org.junit.BeforeClass;
052import org.junit.ClassRule;
053import org.junit.Test;
054import org.junit.experimental.categories.Category;
055
056import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
057import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
058import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
059
060@Category({ ReplicationTests.class, MediumTests.class })
061public class TestSerialReplicationEndpoint {
062
063  @ClassRule
064  public static final HBaseClassTestRule CLASS_RULE =
065    HBaseClassTestRule.forClass(TestSerialReplicationEndpoint.class);
066
067  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
068  private static Configuration CONF;
069  private static Connection CONN;
070
071  @BeforeClass
072  public static void setUp() throws Exception {
073    UTIL.startMiniCluster();
074    CONF = UTIL.getConfiguration();
075    CONF.setLong(RpcServer.MAX_REQUEST_SIZE, 102400);
076    CONN = UTIL.getConnection();
077  }
078
079  @AfterClass
080  public static void tearDown() throws Exception {
081    Closeables.close(CONN, true);
082    UTIL.shutdownMiniCluster();
083  }
084
085  private String getZKClusterKey() {
086    return String.format("127.0.0.1:%d:%s", UTIL.getZkCluster().getClientPort(),
087      CONF.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
088  }
089
090  private void testHBaseReplicationEndpoint(String tableNameStr, String peerId, boolean isSerial)
091    throws IOException {
092    TestEndpoint.reset();
093    int cellNum = 10000;
094
095    TableName tableName = TableName.valueOf(tableNameStr);
096    byte[] family = Bytes.toBytes("f");
097    byte[] qualifier = Bytes.toBytes("q");
098    TableDescriptor td =
099      TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
100        .newBuilder(family).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
101    UTIL.createTable(td, null);
102
103    try (Admin admin = CONN.getAdmin()) {
104      ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
105        .setClusterKey(getZKClusterKey()).setReplicationEndpointImpl(TestEndpoint.class.getName())
106        .setReplicateAllUserTables(false).setSerial(isSerial)
107        .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())).build();
108      admin.addReplicationPeer(peerId, peerConfig);
109    }
110
111    try (Table table = CONN.getTable(tableName)) {
112      for (int i = 0; i < cellNum; i++) {
113        Put put = new Put(Bytes.toBytes(i)).addColumn(family, qualifier,
114          EnvironmentEdgeManager.currentTime(), Bytes.toBytes(i));
115        table.put(put);
116      }
117    }
118    Waiter.waitFor(CONF, 60000, () -> TestEndpoint.getEntries().size() >= cellNum);
119
120    int index = 0;
121    Assert.assertEquals(TestEndpoint.getEntries().size(), cellNum);
122    if (!isSerial) {
123      Collections.sort(TestEndpoint.getEntries(), (a, b) -> {
124        long seqA = a.getKey().getSequenceId();
125        long seqB = b.getKey().getSequenceId();
126        return seqA == seqB ? 0 : (seqA < seqB ? -1 : 1);
127      });
128    }
129    for (Entry entry : TestEndpoint.getEntries()) {
130      Assert.assertEquals(entry.getKey().getTableName(), tableName);
131      Assert.assertEquals(entry.getEdit().getCells().size(), 1);
132      Cell cell = entry.getEdit().getCells().get(0);
133      Assert.assertArrayEquals(
134        Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()),
135        Bytes.toBytes(index));
136      index++;
137    }
138    Assert.assertEquals(index, cellNum);
139  }
140
141  @Test
142  public void testSerialReplicate() throws Exception {
143    testHBaseReplicationEndpoint("testSerialReplicate", "100", true);
144  }
145
146  @Test
147  public void testParallelReplicate() throws Exception {
148    testHBaseReplicationEndpoint("testParallelReplicate", "101", false);
149  }
150
151  public static class TestEndpoint extends HBaseInterClusterReplicationEndpoint {
152
153    private final static BlockingQueue<Entry> entryQueue = new LinkedBlockingQueue<>();
154
155    public static void reset() {
156      entryQueue.clear();
157    }
158
159    public static List<Entry> getEntries() {
160      return new ArrayList<>(entryQueue);
161    }
162
163    @Override
164    public boolean canReplicateToSameCluster() {
165      return true;
166    }
167
168    @Override
169    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
170      return () -> {
171        entryQueue.addAll(entries);
172        return ordinal;
173      };
174    }
175
176    @Override
177    public synchronized List<ServerName> getRegionServers() {
178      // Return multiple server names for endpoint parallel replication.
179      return new ArrayList<>(
180        ImmutableList.of(ServerName.valueOf("www.example.com", 12016, 1525245876026L),
181          ServerName.valueOf("www.example2.com", 12016, 1525245876026L),
182          ServerName.valueOf("www.example3.com", 12016, 1525245876026L),
183          ServerName.valueOf("www.example4.com", 12016, 1525245876026L),
184          ServerName.valueOf("www.example4.com", 12016, 1525245876026L)));
185    }
186  }
187}