001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.UUID; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicBoolean; 025import org.apache.hadoop.hbase.HBaseTestingUtil; 026import org.apache.hadoop.hbase.HConstants; 027import org.apache.hadoop.hbase.TableName; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 030import org.apache.hadoop.hbase.client.Put; 031import org.apache.hadoop.hbase.client.Table; 032import org.apache.hadoop.hbase.client.TableDescriptor; 033import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 034import org.apache.hadoop.hbase.testclassification.MediumTests; 035import org.apache.hadoop.hbase.testclassification.ReplicationTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.util.Threads; 038import org.junit.jupiter.api.AfterAll; 039import org.junit.jupiter.api.BeforeAll; 040import org.junit.jupiter.api.BeforeEach; 041import org.junit.jupiter.api.Tag; 042import org.junit.jupiter.api.Test; 043 044import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; 045 046@Tag(ReplicationTests.TAG) 047@Tag(MediumTests.TAG) 048public class TestNonHBaseReplicationEndpoint { 049 050 private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); 051 052 private static Admin ADMIN; 053 054 private static final TableName tableName = TableName.valueOf("test"); 055 private static final byte[] famName = Bytes.toBytes("f"); 056 057 private static final AtomicBoolean REPLICATED = new AtomicBoolean(); 058 059 @BeforeAll 060 public static void setupBeforeClass() throws Exception { 061 UTIL.startMiniCluster(); 062 ADMIN = UTIL.getAdmin(); 063 } 064 065 @AfterAll 066 public static void teardownAfterClass() throws Exception { 067 UTIL.shutdownMiniCluster(); 068 } 069 070 @BeforeEach 071 public void setup() { 072 REPLICATED.set(false); 073 } 074 075 @Test 076 public void test() throws IOException { 077 TableDescriptor td = 078 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 079 .newBuilder(famName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 080 Table table = UTIL.createTable(td, HBaseTestingUtil.KEYS_FOR_HBA_CREATE_TABLE); 081 082 ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 083 .setReplicationEndpointImpl(NonHBaseReplicationEndpoint.class.getName()) 084 .setReplicateAllUserTables(false) 085 .setTableCFsMap(ImmutableMap.of(tableName, new ArrayList<>())).build(); 086 087 ADMIN.addReplicationPeer("1", peerConfig); 088 loadData(table); 089 090 UTIL.waitFor(10000L, () -> REPLICATED.get()); 091 } 092 093 protected static void loadData(Table table) throws IOException { 094 for (int i = 0; i < 100; i++) { 095 Put put = new Put(Bytes.toBytes(Integer.toString(i))); 096 put.addColumn(famName, famName, Bytes.toBytes(i)); 097 table.put(put); 098 } 099 } 100 101 public static class NonHBaseReplicationEndpoint implements ReplicationEndpoint { 102 103 private boolean running = false; 104 105 @Override 106 public void init(Context context) throws IOException { 107 } 108 109 @Override 110 public boolean canReplicateToSameCluster() { 111 return false; 112 } 113 114 @Override 115 public UUID getPeerUUID() { 116 return UUID.randomUUID(); 117 } 118 119 @Override 120 public WALEntryFilter getWALEntryfilter() { 121 return null; 122 } 123 124 @Override 125 public boolean replicate(ReplicateContext replicateContext) { 126 REPLICATED.set(true); 127 return true; 128 } 129 130 @Override 131 public boolean isRunning() { 132 return running; 133 } 134 135 @Override 136 public boolean isStarting() { 137 return false; 138 } 139 140 @Override 141 public void start() { 142 running = true; 143 } 144 145 @Override 146 public void awaitRunning() { 147 long interval = 100L; 148 while (!running) { 149 Threads.sleep(interval); 150 } 151 } 152 153 @Override 154 public void awaitRunning(long timeout, TimeUnit unit) { 155 long start = System.currentTimeMillis(); 156 long end = start + unit.toMillis(timeout); 157 long interval = 100L; 158 while (!running && System.currentTimeMillis() < end) { 159 Threads.sleep(interval); 160 } 161 } 162 163 @Override 164 public void stop() { 165 running = false; 166 } 167 168 @Override 169 public void awaitTerminated() { 170 long interval = 100L; 171 while (running) { 172 Threads.sleep(interval); 173 } 174 } 175 176 @Override 177 public void awaitTerminated(long timeout, TimeUnit unit) { 178 long start = System.currentTimeMillis(); 179 long end = start + unit.toMillis(timeout); 180 long interval = 100L; 181 while (running && System.currentTimeMillis() < end) { 182 Threads.sleep(interval); 183 } 184 } 185 186 @Override 187 public Throwable failureCause() { 188 return null; 189 } 190 191 @Override 192 public void peerConfigUpdated(ReplicationPeerConfig rpc) { 193 } 194 } 195}