001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication.regionserver;
019
020import static org.junit.jupiter.api.Assertions.assertArrayEquals;
021import static org.junit.jupiter.api.Assertions.assertEquals;
022
023import java.io.IOException;
024import java.util.ArrayList;
025import java.util.Collections;
026import java.util.List;
027import java.util.concurrent.BlockingQueue;
028import java.util.concurrent.CompletableFuture;
029import java.util.concurrent.LinkedBlockingQueue;
030import org.apache.hadoop.conf.Configuration;
031import org.apache.hadoop.hbase.Cell;
032import org.apache.hadoop.hbase.HBaseTestingUtil;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.TableName;
035import org.apache.hadoop.hbase.Waiter;
036import org.apache.hadoop.hbase.client.Admin;
037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
038import org.apache.hadoop.hbase.client.Connection;
039import org.apache.hadoop.hbase.client.Put;
040import org.apache.hadoop.hbase.client.Table;
041import org.apache.hadoop.hbase.client.TableDescriptor;
042import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
043import org.apache.hadoop.hbase.ipc.RpcServer;
044import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
045import org.apache.hadoop.hbase.testclassification.MediumTests;
046import org.apache.hadoop.hbase.testclassification.ReplicationTests;
047import org.apache.hadoop.hbase.util.Bytes;
048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
049import org.apache.hadoop.hbase.wal.WAL.Entry;
050import org.junit.jupiter.api.AfterAll;
051import org.junit.jupiter.api.BeforeAll;
052import org.junit.jupiter.api.Tag;
053import org.junit.jupiter.api.Test;
054
055import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
056import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
057import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
058
059@Tag(ReplicationTests.TAG)
060@Tag(MediumTests.TAG)
061public class TestSerialReplicationEndpoint {
062
063  private static HBaseTestingUtil UTIL = new HBaseTestingUtil();
064  private static Configuration CONF;
065  private static Connection CONN;
066
067  @BeforeAll
068  public static void setUp() throws Exception {
069    UTIL.startMiniCluster();
070    CONF = UTIL.getConfiguration();
071    CONF.setLong(RpcServer.MAX_REQUEST_SIZE, 102400);
072    CONN = UTIL.getConnection();
073  }
074
075  @AfterAll
076  public static void tearDown() throws Exception {
077    Closeables.close(CONN, true);
078    UTIL.shutdownMiniCluster();
079  }
080
081  private String getZKClusterKey() {
082    return String.format("127.0.0.1:%d:%s", UTIL.getZkCluster().getClientPort(),
083      CONF.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
084  }
085
086  private void testHBaseReplicationEndpoint(String tableNameStr, String peerId, boolean isSerial)
087    throws IOException {
088    TestEndpoint.reset();
089    int cellNum = 10000;
090
091    TableName tableName = TableName.valueOf(tableNameStr);
092    byte[] family = Bytes.toBytes("f");
093    byte[] qualifier = Bytes.toBytes("q");
094    TableDescriptor td =
095      TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
096        .newBuilder(family).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
097    UTIL.createTable(td, null);
098
099    try (Admin admin = CONN.getAdmin()) {
100      ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
101        .setClusterKey(getZKClusterKey()).setReplicationEndpointImpl(TestEndpoint.class.getName())
102        .setReplicateAllUserTables(false).setSerial(isSerial)
103        .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())).build();
104      admin.addReplicationPeer(peerId, peerConfig);
105    }
106
107    try (Table table = CONN.getTable(tableName)) {
108      for (int i = 0; i < cellNum; i++) {
109        Put put = new Put(Bytes.toBytes(i)).addColumn(family, qualifier,
110          EnvironmentEdgeManager.currentTime(), Bytes.toBytes(i));
111        table.put(put);
112      }
113    }
114    Waiter.waitFor(CONF, 60000, () -> TestEndpoint.getEntries().size() >= cellNum);
115
116    int index = 0;
117    assertEquals(cellNum, TestEndpoint.getEntries().size());
118    if (!isSerial) {
119      Collections.sort(TestEndpoint.getEntries(), (a, b) -> {
120        long seqA = a.getKey().getSequenceId();
121        long seqB = b.getKey().getSequenceId();
122        return seqA == seqB ? 0 : (seqA < seqB ? -1 : 1);
123      });
124    }
125    for (Entry entry : TestEndpoint.getEntries()) {
126      assertEquals(tableName, entry.getKey().getTableName());
127      assertEquals(1, entry.getEdit().getCells().size());
128      Cell cell = entry.getEdit().getCells().get(0);
129      assertArrayEquals(Bytes.toBytes(index),
130        Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
131      index++;
132    }
133    assertEquals(cellNum, index);
134  }
135
136  @Test
137  public void testSerialReplicate() throws Exception {
138    testHBaseReplicationEndpoint("testSerialReplicate", "100", true);
139  }
140
141  @Test
142  public void testParallelReplicate() throws Exception {
143    testHBaseReplicationEndpoint("testParallelReplicate", "101", false);
144  }
145
146  public static class TestEndpoint extends HBaseInterClusterReplicationEndpoint {
147
148    private final static BlockingQueue<Entry> entryQueue = new LinkedBlockingQueue<>();
149
150    public static void reset() {
151      entryQueue.clear();
152    }
153
154    public static List<Entry> getEntries() {
155      return new ArrayList<>(entryQueue);
156    }
157
158    @Override
159    public boolean canReplicateToSameCluster() {
160      return true;
161    }
162
163    @Override
164    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
165      int timeout) {
166      entryQueue.addAll(entries);
167      return CompletableFuture.completedFuture(ordinal);
168    }
169
170    @Override
171    public synchronized int getNumSinks() {
172      // Return multiple server names for endpoint parallel replication.
173      return 10;
174    }
175  }
176}