001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.exceptions.DeserializationException;
033import org.apache.hadoop.hbase.net.Address;
034import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
035import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
036import org.apache.hadoop.hbase.zookeeper.ZKUtil;
037import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
038import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.zookeeper.KeeperException;
041import org.junit.Assert;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
044import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
045
046@InterfaceAudience.Private
047public class VerifyingRSGroupAdminClient implements RSGroupAdmin {
048  private Table table;
049  private ZKWatcher zkw;
050  private RSGroupAdmin wrapped;
051
052  public VerifyingRSGroupAdminClient(RSGroupAdmin RSGroupAdmin, Configuration conf)
053      throws IOException {
054    wrapped = RSGroupAdmin;
055    table = ConnectionFactory.createConnection(conf)
056        .getTable(RSGroupInfoManager.RSGROUP_TABLE_NAME);
057    zkw = new ZKWatcher(conf, this.getClass().getSimpleName(), null);
058  }
059
060  @Override
061  public void addRSGroup(String groupName) throws IOException {
062    wrapped.addRSGroup(groupName);
063    verify();
064  }
065
066  @Override
067  public RSGroupInfo getRSGroupInfo(String groupName) throws IOException {
068    return wrapped.getRSGroupInfo(groupName);
069  }
070
071  @Override
072  public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException {
073    return wrapped.getRSGroupInfoOfTable(tableName);
074  }
075
076  @Override
077  public void moveServers(Set<Address> servers, String targetGroup) throws IOException {
078    wrapped.moveServers(servers, targetGroup);
079    verify();
080  }
081
082  @Override
083  public void moveTables(Set<TableName> tables, String targetGroup) throws IOException {
084    wrapped.moveTables(tables, targetGroup);
085    verify();
086  }
087
088  @Override
089  public void removeRSGroup(String name) throws IOException {
090    wrapped.removeRSGroup(name);
091    verify();
092  }
093
094  @Override
095  public boolean balanceRSGroup(String groupName) throws IOException {
096    return wrapped.balanceRSGroup(groupName);
097  }
098
099  @Override
100  public List<RSGroupInfo> listRSGroups() throws IOException {
101    return wrapped.listRSGroups();
102  }
103
104  @Override
105  public RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException {
106    return wrapped.getRSGroupOfServer(hostPort);
107  }
108
109  @Override
110  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String targetGroup)
111      throws IOException {
112    wrapped.moveServersAndTables(servers, tables, targetGroup);
113    verify();
114  }
115
116  @Override
117  public void removeServers(Set<Address> servers) throws IOException {
118    wrapped.removeServers(servers);
119    verify();
120  }
121
122  @Override
123  public void renameRSGroup(String oldName, String newName) throws IOException {
124    wrapped.renameRSGroup(oldName, newName);
125    verify();
126  }
127
128  public void verify() throws IOException {
129    Map<String, RSGroupInfo> groupMap = Maps.newHashMap();
130    Set<RSGroupInfo> zList = Sets.newHashSet();
131
132    for (Result result : table.getScanner(new Scan())) {
133      RSGroupProtos.RSGroupInfo proto =
134          RSGroupProtos.RSGroupInfo.parseFrom(
135              result.getValue(
136                  RSGroupInfoManager.META_FAMILY_BYTES,
137                  RSGroupInfoManager.META_QUALIFIER_BYTES));
138      groupMap.put(proto.getName(), RSGroupProtobufUtil.toGroupInfo(proto));
139    }
140    Assert.assertEquals(Sets.newHashSet(groupMap.values()),
141        Sets.newHashSet(wrapped.listRSGroups()));
142    try {
143      String groupBasePath = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup");
144      for(String znode: ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) {
145        byte[] data = ZKUtil.getData(zkw, ZNodePaths.joinZNode(groupBasePath, znode));
146        if(data.length > 0) {
147          ProtobufUtil.expectPBMagicPrefix(data);
148          ByteArrayInputStream bis = new ByteArrayInputStream(
149              data, ProtobufUtil.lengthOfPBMagic(), data.length);
150          zList.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
151        }
152      }
153      Assert.assertEquals(zList.size(), groupMap.size());
154      for(RSGroupInfo RSGroupInfo : zList) {
155        Assert.assertTrue(groupMap.get(RSGroupInfo.getName()).equals(RSGroupInfo));
156      }
157    } catch (KeeperException e) {
158      throw new IOException("ZK verification failed", e);
159    } catch (DeserializationException e) {
160      throw new IOException("ZK verification failed", e);
161    } catch (InterruptedException e) {
162      throw new IOException("ZK verification failed", e);
163    }
164  }
165}