001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.replication.master; 019 020import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE; 021import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE_DEFAULT; 022import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE; 023import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT; 024 025import java.io.IOException; 026import org.apache.hadoop.conf.Configuration; 027import org.apache.hadoop.hbase.HBaseConfiguration; 028import org.apache.hadoop.hbase.client.Admin; 029import org.apache.hadoop.hbase.client.Connection; 030import org.apache.hadoop.hbase.client.ConnectionFactory; 031import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; 032import org.apache.hadoop.hbase.replication.ReplicationException; 033import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; 034import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; 035import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; 036import org.apache.hadoop.hbase.zookeeper.ZKUtil; 037import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 038import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 039import org.apache.yetus.audience.InterfaceAudience; 040import org.apache.yetus.audience.InterfaceStability; 041import org.apache.zookeeper.KeeperException; 042import org.slf4j.Logger; 043import org.slf4j.LoggerFactory; 044 045import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 046 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; 048 049/** 050 * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. It will 051 * be removed in HBase 3.x. See HBASE-11393 052 */ 053@InterfaceAudience.Private 054@InterfaceStability.Unstable 055public class ReplicationPeerConfigUpgrader{ 056 057 private static final String TABLE_CFS_ZNODE = "zookeeper.znode.replication.peers.tableCFs"; 058 private static final String TABLE_CFS_ZNODE_DEFAULT = "tableCFs"; 059 060 private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class); 061 private final Configuration conf; 062 private final ZKWatcher zookeeper; 063 private final ReplicationPeerStorage peerStorage; 064 065 public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration conf) { 066 this.zookeeper = zookeeper; 067 this.conf = conf; 068 this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); 069 } 070 071 public void upgrade() throws Exception { 072 try (Connection conn = ConnectionFactory.createConnection(conf)) { 073 Admin admin = conn.getAdmin(); 074 admin.listReplicationPeers().forEach((peerDesc) -> { 075 String peerId = peerDesc.getPeerId(); 076 ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); 077 if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) 078 || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) { 079 peerConfig.setReplicateAllUserTables(false); 080 try { 081 admin.updateReplicationPeerConfig(peerId, peerConfig); 082 } catch (Exception e) { 083 LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e); 084 } 085 } 086 }); 087 } 088 } 089 090 public void copyTableCFs() throws ReplicationException { 091 for (String peerId : peerStorage.listPeerIds()) { 092 if (!copyTableCFs(peerId)) { 093 LOG.error("upgrade tableCFs failed for peerId=" + peerId); 094 } 095 } 096 } 097 098 @VisibleForTesting 099 protected String getTableCFsNode(String peerId) { 100 String replicationZNode = ZNodePaths.joinZNode(zookeeper.getZNodePaths().baseZNode, 101 conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT)); 102 String peersZNode = 103 ZNodePaths.joinZNode(replicationZNode, conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT)); 104 return ZNodePaths.joinZNode(peersZNode, 105 ZNodePaths.joinZNode(peerId, conf.get(TABLE_CFS_ZNODE, TABLE_CFS_ZNODE_DEFAULT))); 106 } 107 108 public boolean copyTableCFs(String peerId) throws ReplicationException { 109 String tableCFsNode = getTableCFsNode(peerId); 110 try { 111 if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) { 112 ReplicationPeerConfig rpc = peerStorage.getPeerConfig(peerId); 113 // We only need to copy data from tableCFs node to rpc Node the first time hmaster start. 114 if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) { 115 // we copy TableCFs node into PeerNode 116 LOG.info("Copy table ColumnFamilies into peer=" + peerId); 117 ReplicationProtos.TableCF[] tableCFs = 118 ReplicationPeerConfigUtil.parseTableCFs(ZKUtil.getData(this.zookeeper, tableCFsNode)); 119 if (tableCFs != null && tableCFs.length > 0) { 120 rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs)); 121 peerStorage.updatePeerConfig(peerId, rpc); 122 } 123 } else { 124 LOG.info("No tableCFs in peerNode:" + peerId); 125 } 126 } 127 } catch (KeeperException e) { 128 LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e); 129 return false; 130 } catch (InterruptedException e) { 131 LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e); 132 return false; 133 } catch (IOException e) { 134 LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e); 135 return false; 136 } 137 return true; 138 } 139 140 private static void printUsageAndExit() { 141 System.err.printf( 142 "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader" 143 + " [options]"); 144 System.err.println(" where [options] are:"); 145 System.err.println(" -h|-help Show this help and exit."); 146 System.err.println(" copyTableCFs Copy table-cfs to replication peer config"); 147 System.err.println(" upgrade Upgrade replication peer config to new format"); 148 System.err.println(); 149 System.exit(1); 150 } 151 152 public static void main(String[] args) throws Exception { 153 if (args.length != 1) { 154 printUsageAndExit(); 155 } 156 if (args[0].equals("-help") || args[0].equals("-h")) { 157 printUsageAndExit(); 158 } else if (args[0].equals("copyTableCFs")) { 159 Configuration conf = HBaseConfiguration.create(); 160 try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) { 161 ReplicationPeerConfigUpgrader tableCFsUpdater = 162 new ReplicationPeerConfigUpgrader(zkw, conf); 163 tableCFsUpdater.copyTableCFs(); 164 } 165 } else if (args[0].equals("upgrade")) { 166 Configuration conf = HBaseConfiguration.create(); 167 try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) { 168 ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf); 169 upgrader.upgrade(); 170 } 171 } else { 172 printUsageAndExit(); 173 } 174 } 175}