001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertNull; 022import static org.junit.Assert.assertTrue; 023 024import java.util.List; 025import java.util.Map; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.Abortable; 028import org.apache.hadoop.hbase.HBaseClassTestRule; 029import org.apache.hadoop.hbase.HBaseTestingUtility; 030import org.apache.hadoop.hbase.TableName; 031import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 032import org.apache.hadoop.hbase.exceptions.DeserializationException; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 034import org.apache.hadoop.hbase.testclassification.ReplicationTests; 035import org.apache.hadoop.hbase.testclassification.SmallTests; 036import org.apache.hadoop.hbase.util.Bytes; 037import org.apache.hadoop.hbase.zookeeper.ZKUtil; 038import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 039import org.apache.zookeeper.KeeperException; 040import org.junit.AfterClass; 041import org.junit.BeforeClass; 042import org.junit.ClassRule; 043import org.junit.Rule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.junit.rules.TestName; 047import org.slf4j.Logger; 048import org.slf4j.LoggerFactory; 049 050@Category({ReplicationTests.class, SmallTests.class}) 051public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader { 052 053 @ClassRule 054 public static final HBaseClassTestRule CLASS_RULE = 055 HBaseClassTestRule.forClass(TestTableCFsUpdater.class); 056 057 private static final Logger LOG = LoggerFactory.getLogger(TestTableCFsUpdater.class); 058 private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); 059 060 private static ZKWatcher zkw = null; 061 private static Abortable abortable = null; 062 063 @Rule 064 public TestName name = new TestName(); 065 066 public TestTableCFsUpdater() { 067 super(zkw, TEST_UTIL.getConfiguration(), abortable); 068 } 069 070 @BeforeClass 071 public static void setUpBeforeClass() throws Exception { 072 TEST_UTIL.startMiniZKCluster(); 073 Configuration conf = TEST_UTIL.getConfiguration(); 074 abortable = new Abortable() { 075 @Override 076 public void abort(String why, Throwable e) { 077 LOG.info(why, e); 078 } 079 080 @Override 081 public boolean isAborted() { 082 return false; 083 } 084 }; 085 zkw = new ZKWatcher(conf, "TableCFs", abortable, true); 086 } 087 088 @AfterClass 089 public static void tearDownAfterClass() throws Exception { 090 TEST_UTIL.shutdownMiniZKCluster(); 091 } 092 093 @Test 094 public void testUpgrade() throws KeeperException, InterruptedException, 095 DeserializationException { 096 String peerId = "1"; 097 final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); 098 final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); 099 final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); 100 101 ReplicationPeerConfig rpc = new ReplicationPeerConfig(); 102 rpc.setClusterKey(zkw.getQuorum()); 103 String peerNode = getPeerNode(peerId); 104 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 105 106 String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3; 107 String tableCFsNode = getTableCFsNode(peerId); 108 LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); 109 ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); 110 111 ReplicationPeerConfig actualRpc = 112 ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 113 String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 114 115 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 116 assertNull(actualRpc.getTableCFsMap()); 117 assertEquals(tableCFs, actualTableCfs); 118 119 peerId = "2"; 120 rpc = new ReplicationPeerConfig(); 121 rpc.setClusterKey(zkw.getQuorum()); 122 peerNode = getPeerNode(peerId); 123 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 124 125 tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2"; 126 tableCFsNode = getTableCFsNode(peerId); 127 LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); 128 ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); 129 130 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 131 actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 132 133 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 134 assertNull(actualRpc.getTableCFsMap()); 135 assertEquals(tableCFs, actualTableCfs); 136 137 peerId = "3"; 138 rpc = new ReplicationPeerConfig(); 139 rpc.setClusterKey(zkw.getQuorum()); 140 peerNode = getPeerNode(peerId); 141 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 142 143 tableCFs = ""; 144 tableCFsNode = getTableCFsNode(peerId); 145 LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); 146 ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); 147 148 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 149 actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 150 151 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 152 assertNull(actualRpc.getTableCFsMap()); 153 assertEquals(tableCFs, actualTableCfs); 154 155 peerId = "4"; 156 rpc = new ReplicationPeerConfig(); 157 rpc.setClusterKey(zkw.getQuorum()); 158 peerNode = getPeerNode(peerId); 159 ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); 160 161 tableCFsNode = getTableCFsNode(peerId); 162 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 163 actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); 164 165 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 166 assertNull(actualRpc.getTableCFsMap()); 167 assertNull(actualTableCfs); 168 169 copyTableCFs(); 170 171 peerId = "1"; 172 peerNode = getPeerNode(peerId); 173 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 174 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 175 Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap(); 176 assertEquals(3, tableNameListMap.size()); 177 assertTrue(tableNameListMap.containsKey(tableName1)); 178 assertTrue(tableNameListMap.containsKey(tableName2)); 179 assertTrue(tableNameListMap.containsKey(tableName3)); 180 assertEquals(2, tableNameListMap.get(tableName1).size()); 181 assertEquals("cf1", tableNameListMap.get(tableName1).get(0)); 182 assertEquals("cf2", tableNameListMap.get(tableName1).get(1)); 183 assertEquals(1, tableNameListMap.get(tableName2).size()); 184 assertEquals("cf3", tableNameListMap.get(tableName2).get(0)); 185 assertNull(tableNameListMap.get(tableName3)); 186 187 188 peerId = "2"; 189 peerNode = getPeerNode(peerId); 190 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 191 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 192 tableNameListMap = actualRpc.getTableCFsMap(); 193 assertEquals(2, tableNameListMap.size()); 194 assertTrue(tableNameListMap.containsKey(tableName1)); 195 assertTrue(tableNameListMap.containsKey(tableName2)); 196 assertEquals(2, tableNameListMap.get(tableName1).size()); 197 assertEquals("cf1", tableNameListMap.get(tableName1).get(0)); 198 assertEquals("cf3", tableNameListMap.get(tableName1).get(1)); 199 assertEquals(1, tableNameListMap.get(tableName2).size()); 200 assertEquals("cf2", tableNameListMap.get(tableName2).get(0)); 201 202 peerId = "3"; 203 peerNode = getPeerNode(peerId); 204 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 205 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 206 tableNameListMap = actualRpc.getTableCFsMap(); 207 assertNull(tableNameListMap); 208 209 peerId = "4"; 210 peerNode = getPeerNode(peerId); 211 actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); 212 assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); 213 tableNameListMap = actualRpc.getTableCFsMap(); 214 assertNull(tableNameListMap); 215 } 216 217 218}