001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import java.io.ByteArrayInputStream;
021import java.io.IOException;
022import java.util.List;
023import java.util.Map;
024import java.util.Set;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.hadoop.hbase.TableName;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.client.Result;
030import org.apache.hadoop.hbase.client.Scan;
031import org.apache.hadoop.hbase.client.Table;
032import org.apache.hadoop.hbase.exceptions.DeserializationException;
033import org.apache.hadoop.hbase.net.Address;
034import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
035import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
036import org.apache.hadoop.hbase.zookeeper.ZKUtil;
037import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
038import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
039import org.apache.yetus.audience.InterfaceAudience;
040import org.apache.zookeeper.KeeperException;
041import org.junit.Assert;
042
043import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
044import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
045
046@InterfaceAudience.Private
047public class VerifyingRSGroupAdminClient implements RSGroupAdmin {
048  private Table table;
049  private ZKWatcher zkw;
050  private RSGroupAdmin wrapped;
051
052  public VerifyingRSGroupAdminClient(RSGroupAdmin RSGroupAdmin, Configuration conf)
053      throws IOException {
054    wrapped = RSGroupAdmin;
055    table = ConnectionFactory.createConnection(conf)
056        .getTable(RSGroupInfoManager.RSGROUP_TABLE_NAME);
057    zkw = new ZKWatcher(conf, this.getClass().getSimpleName(), null);
058  }
059
060  @Override
061  public void addRSGroup(String groupName) throws IOException {
062    wrapped.addRSGroup(groupName);
063    verify();
064  }
065
066  @Override
067  public RSGroupInfo getRSGroupInfo(String groupName) throws IOException {
068    return wrapped.getRSGroupInfo(groupName);
069  }
070
071  @Override
072  public RSGroupInfo getRSGroupInfoOfTable(TableName tableName) throws IOException {
073    return wrapped.getRSGroupInfoOfTable(tableName);
074  }
075
076  @Override
077  public void moveServers(Set<Address> servers, String targetGroup) throws IOException {
078    wrapped.moveServers(servers, targetGroup);
079    verify();
080  }
081
082  @Override
083  public void moveTables(Set<TableName> tables, String targetGroup) throws IOException {
084    wrapped.moveTables(tables, targetGroup);
085    verify();
086  }
087
088  @Override
089  public void removeRSGroup(String name) throws IOException {
090    wrapped.removeRSGroup(name);
091    verify();
092  }
093
094  @Override
095  public boolean balanceRSGroup(String groupName) throws IOException {
096    return wrapped.balanceRSGroup(groupName);
097  }
098
099  @Override
100  public List<RSGroupInfo> listRSGroups() throws IOException {
101    return wrapped.listRSGroups();
102  }
103
104  @Override
105  public RSGroupInfo getRSGroupOfServer(Address hostPort) throws IOException {
106    return wrapped.getRSGroupOfServer(hostPort);
107  }
108
109  @Override
110  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String targetGroup)
111      throws IOException {
112    wrapped.moveServersAndTables(servers, tables, targetGroup);
113    verify();
114  }
115
116  @Override
117  public void removeServers(Set<Address> servers) throws IOException {
118    wrapped.removeServers(servers);
119    verify();
120  }
121
122  @Override
123  public void renameRSGroup(String oldName, String newName) throws IOException {
124    wrapped.renameRSGroup(oldName, newName);
125    verify();
126  }
127
128  @Override
129  public void updateRSGroupConfig(String groupName, Map<String, String> configuration)
130      throws IOException {
131    wrapped.updateRSGroupConfig(groupName, configuration);
132    verify();
133  }
134
135  public void verify() throws IOException {
136    Map<String, RSGroupInfo> groupMap = Maps.newHashMap();
137    Set<RSGroupInfo> zList = Sets.newHashSet();
138
139    for (Result result : table.getScanner(new Scan())) {
140      RSGroupProtos.RSGroupInfo proto =
141          RSGroupProtos.RSGroupInfo.parseFrom(
142              result.getValue(
143                  RSGroupInfoManager.META_FAMILY_BYTES,
144                  RSGroupInfoManager.META_QUALIFIER_BYTES));
145      groupMap.put(proto.getName(), RSGroupProtobufUtil.toGroupInfo(proto));
146    }
147    Assert.assertEquals(Sets.newHashSet(groupMap.values()),
148        Sets.newHashSet(wrapped.listRSGroups()));
149    try {
150      String groupBasePath = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode, "rsgroup");
151      for(String znode: ZKUtil.listChildrenNoWatch(zkw, groupBasePath)) {
152        byte[] data = ZKUtil.getData(zkw, ZNodePaths.joinZNode(groupBasePath, znode));
153        if(data.length > 0) {
154          ProtobufUtil.expectPBMagicPrefix(data);
155          ByteArrayInputStream bis = new ByteArrayInputStream(
156              data, ProtobufUtil.lengthOfPBMagic(), data.length);
157          zList.add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
158        }
159      }
160      Assert.assertEquals(zList.size(), groupMap.size());
161      for(RSGroupInfo RSGroupInfo : zList) {
162        Assert.assertTrue(groupMap.get(RSGroupInfo.getName()).equals(RSGroupInfo));
163      }
164    } catch (KeeperException e) {
165      throw new IOException("ZK verification failed", e);
166    } catch (DeserializationException e) {
167      throw new IOException("ZK verification failed", e);
168    } catch (InterruptedException e) {
169      throw new IOException("ZK verification failed", e);
170    }
171  }
172}