001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.HashMap; 023import java.util.List; 024import java.util.UUID; 025import java.util.concurrent.TimeUnit; 026import java.util.concurrent.atomic.AtomicBoolean; 027import org.apache.hadoop.hbase.HBaseClassTestRule; 028import org.apache.hadoop.hbase.HBaseTestingUtility; 029import org.apache.hadoop.hbase.HConstants; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.Admin; 032import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 033import org.apache.hadoop.hbase.client.Put; 034import org.apache.hadoop.hbase.client.Table; 035import org.apache.hadoop.hbase.client.TableDescriptor; 036import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 037import org.apache.hadoop.hbase.testclassification.MediumTests; 038import org.apache.hadoop.hbase.testclassification.ReplicationTests; 039import org.apache.hadoop.hbase.util.Bytes; 040import org.apache.hadoop.hbase.util.Threads; 041import org.junit.AfterClass; 042import org.junit.Before; 043import org.junit.BeforeClass; 044import org.junit.ClassRule; 045import org.junit.Test; 046import org.junit.experimental.categories.Category; 047 048@Category({ MediumTests.class, ReplicationTests.class }) 049public class TestNonHBaseReplicationEndpoint { 050 051 @ClassRule 052 public static final HBaseClassTestRule CLASS_RULE = 053 HBaseClassTestRule.forClass(TestNonHBaseReplicationEndpoint.class); 054 055 protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); 056 057 private static Admin ADMIN; 058 059 private static final TableName tableName = TableName.valueOf("test"); 060 private static final byte[] famName = Bytes.toBytes("f"); 061 062 private static final AtomicBoolean REPLICATED = new AtomicBoolean(); 063 064 @BeforeClass 065 public static void setupBeforeClass() throws Exception { 066 UTIL.startMiniCluster(); 067 ADMIN = UTIL.getAdmin(); 068 } 069 070 @AfterClass 071 public static void teardownAfterClass() throws Exception { 072 UTIL.shutdownMiniCluster(); 073 } 074 075 @Before 076 public void setup() { 077 REPLICATED.set(false); 078 } 079 080 @Test 081 public void test() throws IOException { 082 TableDescriptor td = 083 TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder 084 .newBuilder(famName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build(); 085 Table table = UTIL.createTable(td, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); 086 087 ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() 088 .setReplicationEndpointImpl(NonHBaseReplicationEndpoint.class.getName()) 089 .setReplicateAllUserTables(false).setTableCFsMap(new HashMap<TableName, List<String>>() { 090 { 091 put(tableName, new ArrayList<>()); 092 } 093 }).build(); 094 095 ADMIN.addReplicationPeer("1", peerConfig); 096 loadData(table); 097 098 UTIL.waitFor(10000L, () -> REPLICATED.get()); 099 } 100 101 protected static void loadData(Table table) throws IOException { 102 for (int i = 0; i < 100; i++) { 103 Put put = new Put(Bytes.toBytes(Integer.toString(i))); 104 put.addColumn(famName, famName, Bytes.toBytes(i)); 105 table.put(put); 106 } 107 } 108 109 public static class NonHBaseReplicationEndpoint implements ReplicationEndpoint { 110 111 private boolean running = false; 112 113 @Override 114 public void init(Context context) throws IOException { 115 } 116 117 @Override 118 public boolean canReplicateToSameCluster() { 119 return false; 120 } 121 122 @Override 123 public UUID getPeerUUID() { 124 return UUID.randomUUID(); 125 } 126 127 @Override 128 public WALEntryFilter getWALEntryfilter() { 129 return null; 130 } 131 132 @Override 133 public boolean replicate(ReplicateContext replicateContext) { 134 REPLICATED.set(true); 135 return true; 136 } 137 138 @Override 139 public boolean isRunning() { 140 return running; 141 } 142 143 @Override 144 public boolean isStarting() { 145 return false; 146 } 147 148 @Override 149 public void start() { 150 running = true; 151 } 152 153 @Override 154 public void awaitRunning() { 155 long interval = 100L; 156 while (!running) { 157 Threads.sleep(interval); 158 } 159 } 160 161 @Override 162 public void awaitRunning(long timeout, TimeUnit unit) { 163 long start = System.currentTimeMillis(); 164 long end = start + unit.toMillis(timeout); 165 long interval = 100L; 166 while (!running && System.currentTimeMillis() < end) { 167 Threads.sleep(interval); 168 } 169 } 170 171 @Override 172 public void stop() { 173 running = false; 174 } 175 176 @Override 177 public void awaitTerminated() { 178 long interval = 100L; 179 while (running) { 180 Threads.sleep(interval); 181 } 182 } 183 184 @Override 185 public void awaitTerminated(long timeout, TimeUnit unit) { 186 long start = System.currentTimeMillis(); 187 long end = start + unit.toMillis(timeout); 188 long interval = 100L; 189 while (running && System.currentTimeMillis() < end) { 190 Threads.sleep(interval); 191 } 192 } 193 194 @Override 195 public Throwable failureCause() { 196 return null; 197 } 198 199 @Override 200 public void peerConfigUpdated(ReplicationPeerConfig rpc) { 201 } 202 } 203}