001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.replication;
019
020import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
021
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.HBaseTestingUtility;
024import org.apache.hadoop.hbase.TableName;
025import org.apache.hadoop.hbase.client.Admin;
026import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
027import org.apache.hadoop.hbase.client.Connection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.Table;
030import org.apache.hadoop.hbase.client.TableDescriptor;
031import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
032import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
033import org.apache.hadoop.hbase.util.Bytes;
034import org.apache.hadoop.util.ToolRunner;
035import org.junit.After;
036import org.junit.Before;
037
038import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
039
040public abstract class TestReplicationSyncUpToolBase {
041
042  protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
043  protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
044
045  protected static final TableName TN1 = TableName.valueOf("t1_syncup");
046  protected static final TableName TN2 = TableName.valueOf("t2_syncup");
047
048  protected static final byte[] FAMILY = Bytes.toBytes("cf1");
049  protected static final byte[] QUALIFIER = Bytes.toBytes("q1");
050
051  protected static final byte[] NO_REP_FAMILY = Bytes.toBytes("norep");
052
053  protected TableDescriptor t1SyncupSource;
054  protected TableDescriptor t1SyncupTarget;
055  protected TableDescriptor t2SyncupSource;
056  protected TableDescriptor t2SyncupTarget;
057
058  protected Connection conn1;
059  protected Connection conn2;
060
061  protected Table ht1Source;
062  protected Table ht2Source;
063  protected Table ht1TargetAtPeer1;
064  protected Table ht2TargetAtPeer1;
065
066  protected void customizeClusterConf(Configuration conf) {
067  }
068
069  @Before
070  public void setUp() throws Exception {
071    customizeClusterConf(UTIL1.getConfiguration());
072    customizeClusterConf(UTIL2.getConfiguration());
073    TestReplicationBase.configureClusters(UTIL1, UTIL2);
074    UTIL1.startMiniZKCluster();
075    UTIL2.setZkCluster(UTIL1.getZkCluster());
076
077    UTIL1.startMiniCluster(2);
078    UTIL2.startMiniCluster(4);
079
080    t1SyncupSource = TableDescriptorBuilder.newBuilder(TN1)
081      .setColumnFamily(
082        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
083      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
084
085    t1SyncupTarget = TableDescriptorBuilder.newBuilder(TN1)
086      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
087      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
088
089    t2SyncupSource = TableDescriptorBuilder.newBuilder(TN2)
090      .setColumnFamily(
091        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
092      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
093
094    t2SyncupTarget = TableDescriptorBuilder.newBuilder(TN2)
095      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
096      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
097  }
098
099  @After
100  public void tearDown() throws Exception {
101    Closeables.close(ht1Source, true);
102    Closeables.close(ht2Source, true);
103    Closeables.close(ht1TargetAtPeer1, true);
104    Closeables.close(ht2TargetAtPeer1, true);
105    Closeables.close(conn1, true);
106    Closeables.close(conn2, true);
107    UTIL2.shutdownMiniCluster();
108    UTIL1.shutdownMiniCluster();
109  }
110
111  final void setupReplication() throws Exception {
112    Admin admin1 = UTIL1.getAdmin();
113    admin1.createTable(t1SyncupSource);
114    admin1.createTable(t2SyncupSource);
115
116    Admin admin2 = UTIL2.getAdmin();
117    admin2.createTable(t1SyncupTarget);
118    admin2.createTable(t2SyncupTarget);
119
120    // Get HTable from Master
121    Connection conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
122    ht1Source = conn1.getTable(TN1);
123    ht2Source = conn1.getTable(TN2);
124
125    // Get HTable from Peer1
126    Connection conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
127    ht1TargetAtPeer1 = conn2.getTable(TN1);
128    ht2TargetAtPeer1 = conn2.getTable(TN2);
129
130    /**
131     * set M-S : Master: utility1 Slave1: utility2
132     */
133    ReplicationPeerConfig rpc =
134      ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build();
135    admin1.addReplicationPeer("1", rpc);
136  }
137
138  final void syncUp(HBaseTestingUtility util) throws Exception {
139    ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]);
140  }
141
142  // Utilities that manager shutdown / restart of source / sink clusters. They take care of
143  // invalidating stale connections after shutdown / restarts.
144  final void shutDownSourceHBaseCluster() throws Exception {
145    Closeables.close(ht1Source, true);
146    Closeables.close(ht2Source, true);
147    UTIL1.shutdownMiniHBaseCluster();
148  }
149
150  final void shutDownTargetHBaseCluster() throws Exception {
151    Closeables.close(ht1TargetAtPeer1, true);
152    Closeables.close(ht2TargetAtPeer1, true);
153    UTIL2.shutdownMiniHBaseCluster();
154  }
155
156  final void restartSourceHBaseCluster(int numServers) throws Exception {
157    Closeables.close(ht1Source, true);
158    Closeables.close(ht2Source, true);
159    UTIL1.restartHBaseCluster(numServers);
160    ht1Source = UTIL1.getConnection().getTable(TN1);
161    ht2Source = UTIL1.getConnection().getTable(TN2);
162  }
163
164  final void restartTargetHBaseCluster(int numServers) throws Exception {
165    Closeables.close(ht1TargetAtPeer1, true);
166    Closeables.close(ht2TargetAtPeer1, true);
167    UTIL2.restartHBaseCluster(numServers);
168    ht1TargetAtPeer1 = UTIL2.getConnection().getTable(TN1);
169    ht2TargetAtPeer1 = UTIL2.getConnection().getTable(TN2);
170  }
171}