001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication; 019 020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; 021 022import org.apache.commons.io.IOUtils; 023import org.apache.hadoop.conf.Configuration; 024import org.apache.hadoop.hbase.HBaseTestingUtility; 025import org.apache.hadoop.hbase.TableName; 026import org.apache.hadoop.hbase.client.Admin; 027import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 028import org.apache.hadoop.hbase.client.Connection; 029import org.apache.hadoop.hbase.client.ConnectionFactory; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.client.TableDescriptor; 032import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 033import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.util.ToolRunner; 036import org.junit.After; 037import org.junit.Before; 038 039import org.apache.hbase.thirdparty.com.google.common.io.Closeables; 040 041public abstract class TestReplicationSyncUpToolBase { 042 043 protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); 044 protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); 045 046 protected static final TableName TN1 = TableName.valueOf("t1_syncup"); 047 protected static final TableName TN2 = TableName.valueOf("t2_syncup"); 048 049 protected static final byte[] FAMILY = Bytes.toBytes("cf1"); 050 protected static final byte[] QUALIFIER = Bytes.toBytes("q1"); 051 052 protected static final byte[] NO_REP_FAMILY = Bytes.toBytes("norep"); 053 054 protected TableDescriptor t1SyncupSource; 055 protected TableDescriptor t1SyncupTarget; 056 protected TableDescriptor t2SyncupSource; 057 protected TableDescriptor t2SyncupTarget; 058 059 protected Connection conn1; 060 protected Connection conn2; 061 062 protected Table ht1Source; 063 protected Table ht2Source; 064 protected Table ht1TargetAtPeer1; 065 protected Table ht2TargetAtPeer1; 066 067 protected void customizeClusterConf(Configuration conf) { 068 } 069 070 @Before 071 public void setUp() throws Exception { 072 customizeClusterConf(UTIL1.getConfiguration()); 073 customizeClusterConf(UTIL2.getConfiguration()); 074 TestReplicationBase.configureClusters(UTIL1, UTIL2); 075 UTIL1.startMiniZKCluster(); 076 UTIL2.setZkCluster(UTIL1.getZkCluster()); 077 078 UTIL1.startMiniCluster(2); 079 UTIL2.startMiniCluster(4); 080 081 t1SyncupSource = TableDescriptorBuilder.newBuilder(TN1) 082 .setColumnFamily( 083 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build()) 084 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); 085 086 t1SyncupTarget = TableDescriptorBuilder.newBuilder(TN1) 087 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 088 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); 089 090 t2SyncupSource = TableDescriptorBuilder.newBuilder(TN2) 091 .setColumnFamily( 092 ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build()) 093 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); 094 095 t2SyncupTarget = TableDescriptorBuilder.newBuilder(TN2) 096 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)) 097 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build(); 098 } 099 100 @After 101 public void tearDown() throws Exception { 102 Closeables.close(ht1Source, true); 103 Closeables.close(ht2Source, true); 104 Closeables.close(ht1TargetAtPeer1, true); 105 Closeables.close(ht2TargetAtPeer1, true); 106 Closeables.close(conn1, true); 107 Closeables.close(conn2, true); 108 UTIL2.shutdownMiniCluster(); 109 UTIL1.shutdownMiniCluster(); 110 } 111 112 final void setupReplication() throws Exception { 113 Admin admin1 = UTIL1.getAdmin(); 114 admin1.createTable(t1SyncupSource); 115 admin1.createTable(t2SyncupSource); 116 117 Admin admin2 = UTIL2.getAdmin(); 118 admin2.createTable(t1SyncupTarget); 119 admin2.createTable(t2SyncupTarget); 120 121 // Get HTable from Master 122 Connection conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration()); 123 ht1Source = conn1.getTable(TN1); 124 ht2Source = conn1.getTable(TN2); 125 126 // Get HTable from Peer1 127 Connection conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration()); 128 ht1TargetAtPeer1 = conn2.getTable(TN1); 129 ht2TargetAtPeer1 = conn2.getTable(TN2); 130 131 /** 132 * set M-S : Master: utility1 Slave1: utility2 133 */ 134 ReplicationPeerConfig rpc = 135 ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build(); 136 admin1.addReplicationPeer("1", rpc); 137 } 138 139 final void syncUp(HBaseTestingUtility util) throws Exception { 140 ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]); 141 } 142 143 // Utilities that manager shutdown / restart of source / sink clusters. They take care of 144 // invalidating stale connections after shutdown / restarts. 145 final void shutDownSourceHBaseCluster() throws Exception { 146 IOUtils.closeQuietly(ht1Source, ht2Source); 147 UTIL1.shutdownMiniHBaseCluster(); 148 } 149 150 final void shutDownTargetHBaseCluster() throws Exception { 151 IOUtils.closeQuietly(ht1TargetAtPeer1, ht2TargetAtPeer1); 152 UTIL2.shutdownMiniHBaseCluster(); 153 } 154 155 final void restartSourceHBaseCluster(int numServers) throws Exception { 156 IOUtils.closeQuietly(ht1Source, ht2Source); 157 UTIL1.restartHBaseCluster(numServers); 158 ht1Source = UTIL1.getConnection().getTable(TN1); 159 ht2Source = UTIL1.getConnection().getTable(TN2); 160 } 161 162 final void restartTargetHBaseCluster(int numServers) throws Exception { 163 IOUtils.closeQuietly(ht1TargetAtPeer1, ht2TargetAtPeer1); 164 UTIL2.restartHBaseCluster(numServers); 165 ht1TargetAtPeer1 = UTIL2.getConnection().getTable(TN1); 166 ht2TargetAtPeer1 = UTIL2.getConnection().getTable(TN2); 167 } 168}