001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021
022import org.apache.commons.io.IOUtils;
023import org.apache.hadoop.conf.Configuration;
024import org.apache.hadoop.hbase.HBaseTestingUtility;
025import org.apache.hadoop.hbase.TableName;
026import org.apache.hadoop.hbase.client.Admin;
027import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.Table;
031import org.apache.hadoop.hbase.client.TableDescriptor;
032import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
033import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
034import org.apache.hadoop.hbase.util.Bytes;
035import org.apache.hadoop.util.ToolRunner;
036import org.junit.After;
037import org.junit.Before;
038
039import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
040
041public abstract class TestReplicationSyncUpToolBase {
042
043  protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
044  protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
045
046  protected static final TableName TN1 = TableName.valueOf("t1_syncup");
047  protected static final TableName TN2 = TableName.valueOf("t2_syncup");
048
049  protected static final byte[] FAMILY = Bytes.toBytes("cf1");
050  protected static final byte[] QUALIFIER = Bytes.toBytes("q1");
051
052  protected static final byte[] NO_REP_FAMILY = Bytes.toBytes("norep");
053
054  protected TableDescriptor t1SyncupSource;
055  protected TableDescriptor t1SyncupTarget;
056  protected TableDescriptor t2SyncupSource;
057  protected TableDescriptor t2SyncupTarget;
058
059  protected Connection conn1;
060  protected Connection conn2;
061
062  protected Table ht1Source;
063  protected Table ht2Source;
064  protected Table ht1TargetAtPeer1;
065  protected Table ht2TargetAtPeer1;
066
067  protected void customizeClusterConf(Configuration conf) {
068  }
069
070  @Before
071  public void setUp() throws Exception {
072    customizeClusterConf(UTIL1.getConfiguration());
073    customizeClusterConf(UTIL2.getConfiguration());
074    TestReplicationBase.configureClusters(UTIL1, UTIL2);
075    UTIL1.startMiniZKCluster();
076    UTIL2.setZkCluster(UTIL1.getZkCluster());
077
078    UTIL1.startMiniCluster(2);
079    UTIL2.startMiniCluster(4);
080
081    t1SyncupSource = TableDescriptorBuilder.newBuilder(TN1)
082      .setColumnFamily(
083        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
084      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
085
086    t1SyncupTarget = TableDescriptorBuilder.newBuilder(TN1)
087      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
088      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
089
090    t2SyncupSource = TableDescriptorBuilder.newBuilder(TN2)
091      .setColumnFamily(
092        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
093      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
094
095    t2SyncupTarget = TableDescriptorBuilder.newBuilder(TN2)
096      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
097      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
098  }
099
100  @After
101  public void tearDown() throws Exception {
102    Closeables.close(ht1Source, true);
103    Closeables.close(ht2Source, true);
104    Closeables.close(ht1TargetAtPeer1, true);
105    Closeables.close(ht2TargetAtPeer1, true);
106    Closeables.close(conn1, true);
107    Closeables.close(conn2, true);
108    UTIL2.shutdownMiniCluster();
109    UTIL1.shutdownMiniCluster();
110  }
111
112  final void setupReplication() throws Exception {
113    Admin admin1 = UTIL1.getAdmin();
114    admin1.createTable(t1SyncupSource);
115    admin1.createTable(t2SyncupSource);
116
117    Admin admin2 = UTIL2.getAdmin();
118    admin2.createTable(t1SyncupTarget);
119    admin2.createTable(t2SyncupTarget);
120
121    // Get HTable from Master
122    Connection conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
123    ht1Source = conn1.getTable(TN1);
124    ht2Source = conn1.getTable(TN2);
125
126    // Get HTable from Peer1
127    Connection conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
128    ht1TargetAtPeer1 = conn2.getTable(TN1);
129    ht2TargetAtPeer1 = conn2.getTable(TN2);
130
131    /**
132     * set M-S : Master: utility1 Slave1: utility2
133     */
134    ReplicationPeerConfig rpc =
135      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build();
136    admin1.addReplicationPeer("1", rpc);
137  }
138
139  final void syncUp(HBaseTestingUtility util) throws Exception {
140    ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]);
141  }
142
143  // Utilities that manager shutdown / restart of source / sink clusters. They take care of
144  // invalidating stale connections after shutdown / restarts.
145  final void shutDownSourceHBaseCluster() throws Exception {
146    IOUtils.closeQuietly(ht1Source, ht2Source);
147    UTIL1.shutdownMiniHBaseCluster();
148  }
149
150  final void shutDownTargetHBaseCluster() throws Exception {
151    IOUtils.closeQuietly(ht1TargetAtPeer1, ht2TargetAtPeer1);
152    UTIL2.shutdownMiniHBaseCluster();
153  }
154
155  final void restartSourceHBaseCluster(int numServers) throws Exception {
156    IOUtils.closeQuietly(ht1Source, ht2Source);
157    UTIL1.restartHBaseCluster(numServers);
158    ht1Source = UTIL1.getConnection().getTable(TN1);
159    ht2Source = UTIL1.getConnection().getTable(TN2);
160  }
161
162  final void restartTargetHBaseCluster(int numServers) throws Exception {
163    IOUtils.closeQuietly(ht1TargetAtPeer1, ht2TargetAtPeer1);
164    UTIL2.restartHBaseCluster(numServers);
165    ht1TargetAtPeer1 = UTIL2.getConnection().getTable(TN1);
166    ht2TargetAtPeer1 = UTIL2.getConnection().getTable(TN2);
167  }
168}