001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.replication.regionserver;
020
021import java.io.IOException;
022import java.util.ArrayList;
023import java.util.Collections;
024import java.util.List;
025import java.util.concurrent.BlockingQueue;
026import java.util.concurrent.Callable;
027import java.util.concurrent.LinkedBlockingQueue;
028
029import org.apache.commons.io.IOUtils;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseClassTestRule;
033import org.apache.hadoop.hbase.HBaseTestingUtility;
034import org.apache.hadoop.hbase.HConstants;
035import org.apache.hadoop.hbase.ServerName;
036import org.apache.hadoop.hbase.TableName;
037import org.apache.hadoop.hbase.Waiter;
038import org.apache.hadoop.hbase.client.Admin;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.Put;
042import org.apache.hadoop.hbase.client.Table;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
045import org.apache.hadoop.hbase.ipc.RpcServer;
046import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
047import org.apache.hadoop.hbase.testclassification.MediumTests;
048import org.apache.hadoop.hbase.testclassification.ReplicationTests;
049import org.apache.hadoop.hbase.util.Bytes;
050import org.apache.hadoop.hbase.wal.WAL.Entry;
051import org.junit.AfterClass;
052import org.junit.Assert;
053import org.junit.BeforeClass;
054import org.junit.ClassRule;
055import org.junit.Test;
056import org.junit.experimental.categories.Category;
057
058import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
059import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
060
061@Category({ ReplicationTests.class, MediumTests.class })
062public class TestSerialReplicationEndpoint {
063
064  @ClassRule
065  public static final HBaseClassTestRule CLASS_RULE =
066      HBaseClassTestRule.forClass(TestSerialReplicationEndpoint.class);
067
068  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
069  private static Configuration CONF;
070  private static Connection CONN;
071
072  @BeforeClass
073  public static void setUp() throws Exception {
074    UTIL.startMiniCluster();
075    CONF = UTIL.getConfiguration();
076    CONF.setLong(RpcServer.MAX_REQUEST_SIZE, 102400);
077    CONN = UTIL.getConnection();
078  }
079
080  @AfterClass
081  public static void tearDown() throws Exception {
082    IOUtils.closeQuietly(CONN);
083    UTIL.shutdownMiniCluster();
084  }
085
086  private String getZKClusterKey() {
087    return String.format("127.0.0.1:%d:%s", UTIL.getZkCluster().getClientPort(),
088      CONF.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
089  }
090
091  private void testHBaseReplicationEndpoint(String tableNameStr, String peerId, boolean isSerial)
092      throws IOException {
093    TestEndpoint.reset();
094    int cellNum = 10000;
095
096    TableName tableName = TableName.valueOf(tableNameStr);
097    byte[] family = Bytes.toBytes("f");
098    byte[] qualifier = Bytes.toBytes("q");
099    TableDescriptor td =
100        TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
101            .newBuilder(family).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
102    UTIL.createTable(td, null);
103
104    try (Admin admin = CONN.getAdmin()) {
105      ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
106          .setClusterKey(getZKClusterKey()).setReplicationEndpointImpl(TestEndpoint.class.getName())
107          .setReplicateAllUserTables(false).setSerial(isSerial)
108          .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())).build();
109      admin.addReplicationPeer(peerId, peerConfig);
110    }
111
112    try (Table table = CONN.getTable(tableName)) {
113      for (int i = 0; i < cellNum; i++) {
114        Put put = new Put(Bytes.toBytes(i)).addColumn(family, qualifier, System.currentTimeMillis(),
115          Bytes.toBytes(i));
116        table.put(put);
117      }
118    }
119    Waiter.waitFor(CONF, 60000, () -> TestEndpoint.getEntries().size() >= cellNum);
120
121    int index = 0;
122    Assert.assertEquals(TestEndpoint.getEntries().size(), cellNum);
123    if (!isSerial) {
124      Collections.sort(TestEndpoint.getEntries(), (a, b) -> {
125        long seqA = a.getKey().getSequenceId();
126        long seqB = b.getKey().getSequenceId();
127        return seqA == seqB ? 0 : (seqA < seqB ? -1 : 1);
128      });
129    }
130    for (Entry entry : TestEndpoint.getEntries()) {
131      Assert.assertEquals(entry.getKey().getTableName(), tableName);
132      Assert.assertEquals(entry.getEdit().getCells().size(), 1);
133      Cell cell = entry.getEdit().getCells().get(0);
134      Assert.assertArrayEquals(
135        Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()),
136        Bytes.toBytes(index));
137      index++;
138    }
139    Assert.assertEquals(index, cellNum);
140  }
141
142  @Test
143  public void testSerialReplicate() throws Exception {
144    testHBaseReplicationEndpoint("testSerialReplicate", "100", true);
145  }
146
147  @Test
148  public void testParallelReplicate() throws Exception {
149    testHBaseReplicationEndpoint("testParallelReplicate", "101", false);
150  }
151
152  public static class TestEndpoint extends HBaseInterClusterReplicationEndpoint {
153
154    private final static BlockingQueue<Entry> entryQueue = new LinkedBlockingQueue<>();
155
156    public static void reset() {
157      entryQueue.clear();
158    }
159
160    public static List<Entry> getEntries() {
161      return new ArrayList<>(entryQueue);
162    }
163
164    @Override
165    public boolean canReplicateToSameCluster() {
166      return true;
167    }
168
169    @Override
170    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
171      return () -> {
172        entryQueue.addAll(entries);
173        return ordinal;
174      };
175    }
176
177    @Override
178    public synchronized List<ServerName> getRegionServers() {
179      // Return multiple server names for endpoint parallel replication.
180      return new ArrayList<>(
181          ImmutableList.of(ServerName.valueOf("www.example.com", 12016, 1525245876026L),
182            ServerName.valueOf("www.example2.com", 12016, 1525245876026L),
183            ServerName.valueOf("www.example3.com", 12016, 1525245876026L),
184            ServerName.valueOf("www.example4.com", 12016, 1525245876026L),
185            ServerName.valueOf("www.example4.com", 12016, 1525245876026L)));
186    }
187  }
188}