001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.regionserver; 019 020import static org.junit.jupiter.api.Assertions.assertArrayEquals; 021import static org.junit.jupiter.api.Assertions.assertEquals; 022 023import java.io.IOException; 024import java.util.ArrayList; 025import java.util.Collections; 026import java.util.List; 027import java.util.concurrent.BlockingQueue; 028import java.util.concurrent.CompletableFuture; 029import java.util.concurrent.LinkedBlockingQueue; 030import org.apache.hadoop.conf.Configuration; 031import org.apache.hadoop.hbase.Cell; 032import org.apache.hadoop.hbase.HBaseTestingUtil; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.TableName; 035import org.apache.hadoop.hbase.Waiter; 036import org.apache.hadoop.hbase.client.Admin; 037import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 038import org.apache.hadoop.hbase.client.Connection; 039import org.apache.hadoop.hbase.client.Put; 040import org.apache.hadoop.hbase.client.Table; 041import org.apache.hadoop.hbase.client.TableDescriptor; 042import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 043import org.apache.hadoop.hbase.ipc.RpcServer; 044import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 045import org.apache.hadoop.hbase.testclassification.MediumTests; 046import org.apache.hadoop.hbase.testclassification.ReplicationTests; 047import org.apache.hadoop.hbase.util.Bytes; 048import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 049import org.apache.hadoop.hbase.wal.WAL.Entry; 050import org.junit.jupiter.api.AfterAll; 051import org.junit.jupiter.api.BeforeAll; 052import org.junit.jupiter.api.Tag; 053import org.junit.jupiter.api.Test; 054 055import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; 056import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 057import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 058 059@Tag(ReplicationTests.TAG) 060@Tag(MediumTests.TAG) 061public class TestSerialReplicationEndpoint { 062 063 private static HBaseTestingUtil UTIL = new HBaseTestingUtil(); 064 private static Configuration CONF; 065 private static Connection CONN; 066 067 @BeforeAll 068 public static void setUp() throws Exception { 069 UTIL.startMiniCluster(); 070 CONF = UTIL.getConfiguration(); 071 CONF.setLong(RpcServer.MAX_REQUEST_SIZE, 102400); 072 CONN = UTIL.getConnection(); 073 } 074 075 @AfterAll 076 public static void tearDown() throws Exception { 077 Closeables.close(CONN, true); 078 UTIL.shutdownMiniCluster(); 079 } 080 081 private String getZKClusterKey() { 082 return String.format("127.0.0.1:%d:%s", UTIL.getZkCluster().getClientPort(), 083 CONF.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); 084 } 085 086 private void testHBaseReplicationEndpoint(String tableNameStr, String peerId, boolean isSerial) 087 throws IOException { 088 TestEndpoint.reset(); 089 int cellNum = 10000; 090 091 TableName tableName = TableName.valueOf(tableNameStr); 092 byte[] family = Bytes.toBytes("f"); 093 byte[] qualifier = Bytes.toBytes("q"); 094 TableDescriptor td = 095 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 096 .newBuilder(family).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 097 UTIL.createTable(td, null); 098 099 try (Admin admin = CONN.getAdmin()) { 100 ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 101 .setClusterKey(getZKClusterKey()).setReplicationEndpointImpl(TestEndpoint.class.getName()) 102 .setReplicateAllUserTables(false).setSerial(isSerial) 103 .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())).build(); 104 admin.addReplicationPeer(peerId, peerConfig); 105 } 106 107 try (Table table = CONN.getTable(tableName)) { 108 for (int i = 0; i < cellNum; i++) { 109 Put put = new Put(Bytes.toBytes(i)).addColumn(family, qualifier, 110 EnvironmentEdgeManager.currentTime(), Bytes.toBytes(i)); 111 table.put(put); 112 } 113 } 114 Waiter.waitFor(CONF, 60000, () -> TestEndpoint.getEntries().size() >= cellNum); 115 116 int index = 0; 117 assertEquals(cellNum, TestEndpoint.getEntries().size()); 118 if (!isSerial) { 119 Collections.sort(TestEndpoint.getEntries(), (a, b) -> { 120 long seqA = a.getKey().getSequenceId(); 121 long seqB = b.getKey().getSequenceId(); 122 return seqA == seqB ? 0 : (seqA < seqB ? -1 : 1); 123 }); 124 } 125 for (Entry entry : TestEndpoint.getEntries()) { 126 assertEquals(tableName, entry.getKey().getTableName()); 127 assertEquals(1, entry.getEdit().getCells().size()); 128 Cell cell = entry.getEdit().getCells().get(0); 129 assertArrayEquals(Bytes.toBytes(index), 130 Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())); 131 index++; 132 } 133 assertEquals(cellNum, index); 134 } 135 136 @Test 137 public void testSerialReplicate() throws Exception { 138 testHBaseReplicationEndpoint("testSerialReplicate", "100", true); 139 } 140 141 @Test 142 public void testParallelReplicate() throws Exception { 143 testHBaseReplicationEndpoint("testParallelReplicate", "101", false); 144 } 145 146 public static class TestEndpoint extends HBaseInterClusterReplicationEndpoint { 147 148 private final static BlockingQueue<Entry> entryQueue = new LinkedBlockingQueue<>(); 149 150 public static void reset() { 151 entryQueue.clear(); 152 } 153 154 public static List<Entry> getEntries() { 155 return new ArrayList<>(entryQueue); 156 } 157 158 @Override 159 public boolean canReplicateToSameCluster() { 160 return true; 161 } 162 163 @Override 164 protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal, 165 int timeout) { 166 entryQueue.addAll(entries); 167 return CompletableFuture.completedFuture(ordinal); 168 } 169 170 @Override 171 public synchronized int getNumSinks() { 172 // Return multiple server names for endpoint parallel replication. 173 return 10; 174 } 175 } 176}