001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019package org.apache.hadoop.hbase.replication.master;
020
021import java.io.IOException;
022import java.util.List;
023
024import org.apache.hadoop.conf.Configuration;
025import org.apache.hadoop.hbase.Abortable;
026import org.apache.hadoop.hbase.HBaseConfiguration;
027import org.apache.hadoop.hbase.client.Admin;
028import org.apache.hadoop.hbase.client.Connection;
029import org.apache.hadoop.hbase.client.ConnectionFactory;
030import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
031import org.apache.hadoop.hbase.exceptions.DeserializationException;
032import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
033import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
034import org.apache.hadoop.hbase.zookeeper.ZKUtil;
035import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.apache.yetus.audience.InterfaceStability;
038import org.apache.zookeeper.KeeperException;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
042
043/**
044 * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x.
045 * It will be removed in HBase 3.x. See HBASE-11393
046 */
047@InterfaceAudience.Private
048@InterfaceStability.Unstable
049public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
050
051  private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class);
052
053  public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper,
054                         Configuration conf, Abortable abortable) {
055    super(zookeeper, conf, abortable);
056  }
057
058  public void upgrade() throws Exception {
059    try (Connection conn = ConnectionFactory.createConnection(conf)) {
060      Admin admin = conn.getAdmin();
061      admin.listReplicationPeers().forEach(
062        (peerDesc) -> {
063          String peerId = peerDesc.getPeerId();
064          ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
065          if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
066              || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
067            peerConfig.setReplicateAllUserTables(false);
068            try {
069              admin.updateReplicationPeerConfig(peerId, peerConfig);
070            } catch (Exception e) {
071              LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
072            }
073          }
074        });
075    }
076  }
077
078  public void copyTableCFs() {
079    List<String> znodes = null;
080    try {
081      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
082    } catch (KeeperException e) {
083      LOG.error("Failed to get peers znode", e);
084    }
085    if (znodes != null) {
086      for (String peerId : znodes) {
087        if (!copyTableCFs(peerId)) {
088          LOG.error("upgrade tableCFs failed for peerId=" + peerId);
089        }
090      }
091    }
092  }
093
094  public boolean copyTableCFs(String peerId) {
095    String tableCFsNode = getTableCFsNode(peerId);
096    try {
097      if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
098        String peerNode = getPeerNode(peerId);
099        ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode);
100        // We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
101        if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
102          // we copy TableCFs node into PeerNode
103          LOG.info("Copy table ColumnFamilies into peer=" + peerId);
104          ReplicationProtos.TableCF[] tableCFs =
105                  ReplicationPeerConfigUtil.parseTableCFs(
106                          ZKUtil.getData(this.zookeeper, tableCFsNode));
107          if (tableCFs != null && tableCFs.length > 0) {
108            rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
109            ZKUtil.setData(this.zookeeper, peerNode,
110              ReplicationPeerConfigUtil.toByteArray(rpc));
111          }
112        } else {
113          LOG.info("No tableCFs in peerNode:" + peerId);
114        }
115      }
116    } catch (KeeperException e) {
117      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
118      return false;
119    } catch (InterruptedException e) {
120      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
121      return false;
122    } catch (IOException e) {
123      LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
124      return false;
125    }
126    return true;
127  }
128
129  private ReplicationPeerConfig getReplicationPeerConig(String peerNode)
130          throws KeeperException, InterruptedException {
131    byte[] data = null;
132    data = ZKUtil.getData(this.zookeeper, peerNode);
133    if (data == null) {
134      LOG.error("Could not get configuration for " +
135              "peer because it doesn't exist. peer=" + peerNode);
136      return null;
137    }
138    try {
139      return ReplicationPeerConfigUtil.parsePeerFrom(data);
140    } catch (DeserializationException e) {
141      LOG.warn("Failed to parse cluster key from peer=" + peerNode);
142      return null;
143    }
144  }
145
146  private static void printUsageAndExit() {
147    System.err.printf(
148      "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader"
149          + " [options]");
150    System.err.println(" where [options] are:");
151    System.err.println("  -h|-help      Show this help and exit.");
152    System.err.println("  copyTableCFs  Copy table-cfs to replication peer config");
153    System.err.println("  upgrade           Upgrade replication peer config to new format");
154    System.err.println();
155    System.exit(1);
156  }
157
158  public static void main(String[] args) throws Exception {
159    if (args.length != 1) {
160      printUsageAndExit();
161    }
162    if (args[0].equals("-help") || args[0].equals("-h")) {
163      printUsageAndExit();
164    } else if (args[0].equals("copyTableCFs")) {
165      Configuration conf = HBaseConfiguration.create();
166      ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
167      try {
168        ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw,
169            conf, null);
170        tableCFsUpdater.copyTableCFs();
171      } finally {
172        zkw.close();
173      }
174    } else if (args[0].equals("upgrade")) {
175      Configuration conf = HBaseConfiguration.create();
176      ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
177      ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null);
178      upgrader.upgrade();
179    } else {
180      printUsageAndExit();
181    }
182  }
183}