001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.rsgroup;
020
021import com.google.protobuf.ServiceException;
022
023import java.io.ByteArrayInputStream;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.Map;
032import java.util.NavigableSet;
033import java.util.Set;
034import java.util.SortedSet;
035import java.util.TreeSet;
036import java.util.concurrent.atomic.AtomicBoolean;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.Coprocessor;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.HColumnDescriptor;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HTableDescriptor;
046import org.apache.hadoop.hbase.MetaTableAccessor;
047import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ClusterConnection;
051import org.apache.hadoop.hbase.client.Delete;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Mutation;
054import org.apache.hadoop.hbase.client.Put;
055import org.apache.hadoop.hbase.client.RegionInfo;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableState;
060import org.apache.hadoop.hbase.constraint.ConstraintException;
061import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
062import org.apache.hadoop.hbase.exceptions.DeserializationException;
063import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
064import org.apache.hadoop.hbase.master.MasterServices;
065import org.apache.hadoop.hbase.master.ServerListener;
066import org.apache.hadoop.hbase.master.TableStateManager;
067import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
068import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
069import org.apache.hadoop.hbase.net.Address;
070import org.apache.hadoop.hbase.procedure2.Procedure;
071import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
072import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
073import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
074import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
075import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.zookeeper.ZKUtil;
078import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
079import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
080import org.apache.yetus.audience.InterfaceAudience;
081import org.apache.zookeeper.KeeperException;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
085import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
086import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
087import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
089
090/**
091 * This is an implementation of {@link RSGroupInfoManager} which makes
092 * use of an HBase table as the persistence store for the group information.
093 * It also makes use of zookeeper to store group information needed
094 * for bootstrapping during offline mode.
095 *
096 * <h2>Concurrency</h2>
097 * RSGroup state is kept locally in Maps. There is a rsgroup name to cached
098 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the
099 * rsgroup they belong too (in {@link #tableMap}). These Maps are persisted to the
100 * hbase:rsgroup table (and cached in zk) on each modification.
101 *
102 * <p>Mutations on state are synchronized but reads can continue without having
103 * to wait on an instance monitor, mutations do wholesale replace of the Maps on
104 * update -- Copy-On-Write; the local Maps of state are read-only, just-in-case
105 * (see flushConfig).
106 *
107 * <p>Reads must not block else there is a danger we'll deadlock.
108 *
109 * <p>Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and
110 * then act on the results of the query modifying cache in zookeeper without another thread
111 * making intermediate modifications. These clients synchronize on the 'this' instance so
112 * no other has access concurrently. Reads must be able to continue concurrently.
113 */
114@InterfaceAudience.Private
115final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
116  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
117
118  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
119  private final static HTableDescriptor RSGROUP_TABLE_DESC;
120  static {
121    RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME);
122    RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
123    RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
124    try {
125      RSGROUP_TABLE_DESC.addCoprocessor(
126        MultiRowMutationEndpoint.class.getName(),
127          null, Coprocessor.PRIORITY_SYSTEM, null);
128    } catch (IOException ex) {
129      throw new RuntimeException(ex);
130    }
131  }
132
133  // There two Maps are immutable and wholesale replaced on each modification
134  // so are safe to access concurrently. See class comment.
135  private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
136  private volatile Map<TableName, String> tableMap = Collections.emptyMap();
137
138  private final MasterServices masterServices;
139  private Table rsGroupTable;
140  private final ClusterConnection conn;
141  private final ZKWatcher watcher;
142  private final RSGroupStartupWorker rsGroupStartupWorker = new RSGroupStartupWorker();
143  // contains list of groups that were last flushed to persistent store
144  private Set<String> prevRSGroups = new HashSet<>();
145  private final ServerEventsListenerThread serverEventsListenerThread =
146      new ServerEventsListenerThread();
147  private FailedOpenUpdaterThread failedOpenUpdaterThread;
148
149  private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
150    this.masterServices = masterServices;
151    this.watcher = masterServices.getZooKeeper();
152    this.conn = masterServices.getClusterConnection();
153  }
154
155  private synchronized void init() throws IOException{
156    refresh();
157    serverEventsListenerThread.start();
158    masterServices.getServerManager().registerListener(serverEventsListenerThread);
159    failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
160    failedOpenUpdaterThread.start();
161    masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
162  }
163
164  static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
165    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master);
166    instance.init();
167    return instance;
168  }
169
170  public void start(){
171    // create system table of rsgroup
172    rsGroupStartupWorker.start();
173  }
174
175  @Override
176  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
177    checkGroupName(rsGroupInfo.getName());
178    if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
179        rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
180      throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName());
181    }
182    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
183    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
184    flushConfig(newGroupMap);
185  }
186
187  private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException {
188    RSGroupInfo rsGroupInfo = getRSGroup(groupName);
189    if (rsGroupInfo == null) {
190      throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist");
191    }
192    return rsGroupInfo;
193  }
194
195  @Override
196  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
197      String dstGroup) throws IOException {
198    RSGroupInfo src = getRSGroupInfo(srcGroup);
199    RSGroupInfo dst = getRSGroupInfo(dstGroup);
200    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
201    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
202    // rsgroup of dead servers that are to come back later).
203    Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)?
204        Utility.getOnlineServers(this.masterServices): null;
205    for (Address el: servers) {
206      src.removeServer(el);
207      if (onlineServers != null) {
208        if (!onlineServers.contains(el)) {
209          if (LOG.isDebugEnabled()) {
210            LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
211          }
212          continue;
213        }
214      }
215      dst.addServer(el);
216    }
217    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
218    newGroupMap.put(src.getName(), src);
219    newGroupMap.put(dst.getName(), dst);
220    flushConfig(newGroupMap);
221    return dst.getServers();
222  }
223
224  @Override
225  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
226    for (RSGroupInfo info: rsGroupMap.values()) {
227      if (info.containsServer(serverHostPort)) {
228        return info;
229      }
230    }
231    return null;
232  }
233
234  @Override
235  public RSGroupInfo getRSGroup(String groupName) {
236    return rsGroupMap.get(groupName);
237  }
238
239  @Override
240  public String getRSGroupOfTable(TableName tableName) {
241    return tableMap.get(tableName);
242  }
243
244  @Override
245  public synchronized void moveTables(Set<TableName> tableNames, String groupName)
246      throws IOException {
247    if (groupName != null && !rsGroupMap.containsKey(groupName)) {
248      throw new DoNotRetryIOException("Group "+groupName+" does not exist");
249    }
250
251    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
252    for(TableName tableName: tableNames) {
253      if (tableMap.containsKey(tableName)) {
254        RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
255        src.removeTable(tableName);
256        newGroupMap.put(src.getName(), src);
257      }
258      if(groupName != null) {
259        RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
260        dst.addTable(tableName);
261        newGroupMap.put(dst.getName(), dst);
262      }
263    }
264    flushConfig(newGroupMap);
265  }
266
267  @Override
268  public synchronized void removeRSGroup(String groupName) throws IOException {
269    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
270      throw new DoNotRetryIOException("Group " + groupName + " does not exist or is a reserved "
271          + "group");
272    }
273    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
274    newGroupMap.remove(groupName);
275    flushConfig(newGroupMap);
276  }
277
278  @Override
279  public List<RSGroupInfo> listRSGroups() {
280    return Lists.newLinkedList(rsGroupMap.values());
281  }
282
283  @Override
284  public boolean isOnline() {
285    return rsGroupStartupWorker.isOnline();
286  }
287
288  @Override
289  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
290                                   String srcGroup, String dstGroup) throws IOException {
291    //get server's group
292    RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
293    RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
294
295    //move servers
296    for (Address el: servers) {
297      srcGroupInfo.removeServer(el);
298      dstGroupInfo.addServer(el);
299    }
300    //move tables
301    for(TableName tableName: tables) {
302      srcGroupInfo.removeTable(tableName);
303      dstGroupInfo.addTable(tableName);
304    }
305
306    //flush changed groupinfo
307    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
308    newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
309    newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
310    flushConfig(newGroupMap);
311  }
312
313  @Override
314  public synchronized void removeServers(Set<Address> servers) throws IOException {
315    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
316    for (Address el: servers) {
317      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
318      if (rsGroupInfo != null) {
319        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
320        if (newRsGroupInfo == null) {
321          rsGroupInfo.removeServer(el);
322          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
323        } else {
324          newRsGroupInfo.removeServer(el);
325          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
326        }
327      }else {
328        LOG.warn("Server " + el + " does not belong to any rsgroup.");
329      }
330    }
331
332    if (rsGroupInfos.size() > 0) {
333      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
334      newGroupMap.putAll(rsGroupInfos);
335      flushConfig(newGroupMap);
336    }
337  }
338
339  List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
340    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
341    for (Result result : rsGroupTable.getScanner(new Scan())) {
342      RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(
343              result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
344      rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
345    }
346    return rsGroupInfoList;
347  }
348
349  List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
350    String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
351    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
352    //Overwrite any info stored by table, this takes precedence
353    try {
354      if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
355        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
356        if (children == null) {
357          return RSGroupInfoList;
358        }
359        for(String znode: children) {
360          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
361          if(data.length > 0) {
362            ProtobufUtil.expectPBMagicPrefix(data);
363            ByteArrayInputStream bis = new ByteArrayInputStream(
364                data, ProtobufUtil.lengthOfPBMagic(), data.length);
365            RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(
366                RSGroupProtos.RSGroupInfo.parseFrom(bis)));
367          }
368        }
369        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
370      }
371    } catch (KeeperException|DeserializationException|InterruptedException e) {
372      throw new IOException("Failed to read rsGroupZNode",e);
373    }
374    return RSGroupInfoList;
375  }
376
377  @Override
378  public void refresh() throws IOException {
379    refresh(false);
380  }
381
382  /**
383   * Read rsgroup info from the source of truth, the hbase:rsgroup table.
384   * Update zk cache. Called on startup of the manager.
385   */
386  private synchronized void refresh(boolean forceOnline) throws IOException {
387    List<RSGroupInfo> groupList = new LinkedList<>();
388
389    // Overwrite anything read from zk, group table is source of truth
390    // if online read from GROUP table
391    if (forceOnline || isOnline()) {
392      LOG.debug("Refreshing in Online mode.");
393      if (rsGroupTable == null) {
394        rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME);
395      }
396      groupList.addAll(retrieveGroupListFromGroupTable());
397    } else {
398      LOG.debug("Refreshing in Offline mode.");
399      groupList.addAll(retrieveGroupListFromZookeeper());
400    }
401
402    // refresh default group, prune
403    NavigableSet<TableName> orphanTables = new TreeSet<>();
404    for(String entry: masterServices.getTableDescriptors().getAll().keySet()) {
405      orphanTables.add(TableName.valueOf(entry));
406    }
407    for (RSGroupInfo group: groupList) {
408      if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
409        orphanTables.removeAll(group.getTables());
410      }
411    }
412
413    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
414    // from region group table or zk
415    groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(),
416        orphanTables));
417
418    // populate the data
419    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
420    HashMap<TableName, String> newTableMap = Maps.newHashMap();
421    for (RSGroupInfo group : groupList) {
422      newGroupMap.put(group.getName(), group);
423      for(TableName table: group.getTables()) {
424        newTableMap.put(table, group.getName());
425      }
426    }
427    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
428    updateCacheOfRSGroups(rsGroupMap.keySet());
429  }
430
431  private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> groupMap)
432      throws IOException {
433    Map<TableName,String> newTableMap = Maps.newHashMap();
434    List<Mutation> mutations = Lists.newArrayList();
435
436    // populate deletes
437    for(String groupName : prevRSGroups) {
438      if(!groupMap.containsKey(groupName)) {
439        Delete d = new Delete(Bytes.toBytes(groupName));
440        mutations.add(d);
441      }
442    }
443
444    // populate puts
445    for(RSGroupInfo RSGroupInfo : groupMap.values()) {
446      RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
447      Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
448      p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
449      mutations.add(p);
450      for(TableName entry: RSGroupInfo.getTables()) {
451        newTableMap.put(entry, RSGroupInfo.getName());
452      }
453    }
454
455    if(mutations.size() > 0) {
456      multiMutate(mutations);
457    }
458    return newTableMap;
459  }
460
461  private synchronized void flushConfig()
462  throws IOException {
463    flushConfig(this.rsGroupMap);
464  }
465
466  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap)
467  throws IOException {
468    Map<TableName, String> newTableMap;
469
470    // For offline mode persistence is still unavailable
471    // We're refreshing in-memory state but only for default servers
472    if (!isOnline()) {
473      Map<String, RSGroupInfo> m = Maps.newHashMap(rsGroupMap);
474      RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP);
475      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
476      if (!m.equals(newGroupMap) ||
477          !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
478        throw new IOException("Only default servers can be updated during offline mode");
479      }
480      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
481      rsGroupMap = newGroupMap;
482      return;
483    }
484
485    newTableMap = flushConfigTable(newGroupMap);
486
487    // Make changes visible after having been persisted to the source of truth
488    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
489
490    try {
491      String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
492      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
493
494      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
495      for(String groupName : prevRSGroups) {
496        if(!newGroupMap.containsKey(groupName)) {
497          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
498          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
499        }
500      }
501
502
503      for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
504        String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
505        RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
506        LOG.debug("Updating znode: "+znode);
507        ZKUtil.createAndFailSilent(watcher, znode);
508        zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
509        zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
510            ProtobufUtil.prependPBMagic(proto.toByteArray())));
511      }
512      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
513
514      ZKUtil.multiOrSequential(watcher, zkOps, false);
515    } catch (KeeperException e) {
516      LOG.error("Failed to write to rsGroupZNode", e);
517      masterServices.abort("Failed to write to rsGroupZNode", e);
518      throw new IOException("Failed to write to rsGroupZNode",e);
519    }
520    updateCacheOfRSGroups(newGroupMap.keySet());
521  }
522
523  /**
524   * Make changes visible.
525   * Caller must be synchronized on 'this'.
526   */
527  private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
528      Map<TableName, String> newTableMap) {
529    // Make maps Immutable.
530    this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
531    this.tableMap = Collections.unmodifiableMap(newTableMap);
532  }
533
534  /**
535   * Update cache of rsgroups.
536   * Caller must be synchronized on 'this'.
537   * @param currentGroups Current list of Groups.
538   */
539  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
540    this.prevRSGroups.clear();
541    this.prevRSGroups.addAll(currentGroups);
542  }
543
544  // Called by getDefaultServers. Presume it has lock in place.
545  private List<ServerName> getOnlineRS() throws IOException {
546    if (masterServices != null) {
547      return masterServices.getServerManager().getOnlineServersList();
548    }
549    LOG.debug("Reading online RS from zookeeper");
550    List<ServerName> servers = new LinkedList<>();
551    try {
552      for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
553        servers.add(ServerName.parseServerName(el));
554      }
555    } catch (KeeperException e) {
556      throw new IOException("Failed to retrieve server list from zookeeper", e);
557    }
558    return servers;
559  }
560
561  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
562  private SortedSet<Address> getDefaultServers() throws IOException {
563    SortedSet<Address> defaultServers = Sets.newTreeSet();
564    for (ServerName serverName : getOnlineRS()) {
565      Address server =
566          Address.fromParts(serverName.getHostname(), serverName.getPort());
567      boolean found = false;
568      for(RSGroupInfo rsgi: listRSGroups()) {
569        if(!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) &&
570            rsgi.containsServer(server)) {
571          found = true;
572          break;
573        }
574      }
575      if (!found) {
576        defaultServers.add(server);
577      }
578    }
579    return defaultServers;
580  }
581
582  // Called by ServerEventsListenerThread. Synchronize on this because redoing
583  // the rsGroupMap then writing it out.
584  private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException {
585    RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
586    RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables());
587    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
588    newGroupMap.put(newInfo.getName(), newInfo);
589    flushConfig(newGroupMap);
590  }
591
592  // Called by FailedOpenUpdaterThread
593  private void updateFailedAssignments() {
594    // Kick all regions in FAILED_OPEN state
595    List<RegionInfo> stuckAssignments = Lists.newArrayList();
596    for (RegionStateNode state:
597        masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
598      if (state.isStuck()) {
599        stuckAssignments.add(state.getRegionInfo());
600      }
601    }
602    for (RegionInfo region: stuckAssignments) {
603      LOG.info("Retrying assignment of " + region);
604      try {
605        masterServices.getAssignmentManager().unassign(region);
606      } catch (IOException e) {
607        LOG.warn("Unable to reassign " + region, e);
608      }
609    }
610  }
611
612  /**
613   * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
614   * servers. Notifications about server changes are received by registering {@link ServerListener}.
615   * As a listener, we need to return immediately, so the real work of updating the servers is
616   * done asynchronously in this thread.
617   */
618  private class ServerEventsListenerThread extends Thread implements ServerListener {
619    private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
620    private boolean changed = false;
621
622    ServerEventsListenerThread() {
623      setDaemon(true);
624    }
625
626    @Override
627    public void serverAdded(ServerName serverName) {
628      serverChanged();
629    }
630
631    @Override
632    public void serverRemoved(ServerName serverName) {
633      serverChanged();
634    }
635
636    private synchronized void serverChanged() {
637      changed = true;
638      this.notify();
639    }
640
641    @Override
642    public void run() {
643      setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
644      SortedSet<Address> prevDefaultServers = new TreeSet<>();
645      while(isMasterRunning(masterServices)) {
646        try {
647          LOG.info("Updating default servers.");
648          SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
649          if (!servers.equals(prevDefaultServers)) {
650            RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
651            prevDefaultServers = servers;
652            LOG.info("Updated with servers: "+servers.size());
653          }
654          try {
655            synchronized (this) {
656              while (!changed) {
657                wait();
658              }
659              changed = false;
660            }
661          } catch (InterruptedException e) {
662            LOG.warn("Interrupted", e);
663          }
664        } catch (IOException e) {
665          LOG.warn("Failed to update default servers", e);
666        }
667      }
668    }
669  }
670
671  private class FailedOpenUpdaterThread extends Thread implements ServerListener {
672    private final long waitInterval;
673    private volatile boolean hasChanged = false;
674
675    public FailedOpenUpdaterThread(Configuration conf) {
676      this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY,
677        DEFAULT_REASSIGN_WAIT_INTERVAL);
678      setDaemon(true);
679    }
680
681    @Override
682    public void serverAdded(ServerName serverName) {
683      serverChanged();
684    }
685
686    @Override
687    public void serverRemoved(ServerName serverName) {
688    }
689
690    @Override
691    public void run() {
692      while (isMasterRunning(masterServices)) {
693        boolean interrupted = false;
694        try {
695          synchronized (this) {
696            while (!hasChanged) {
697              wait();
698            }
699            hasChanged = false;
700          }
701        } catch (InterruptedException e) {
702          LOG.warn("Interrupted", e);
703          interrupted = true;
704        }
705        if (!isMasterRunning(masterServices) || interrupted) {
706          continue;
707        }
708
709        // First, wait a while in case more servers are about to rejoin the cluster
710        try {
711          Thread.sleep(waitInterval);
712        } catch (InterruptedException e) {
713          LOG.warn("Interrupted", e);
714        }
715        if (!isMasterRunning(masterServices)) {
716          continue;
717        }
718
719        // Kick all regions in FAILED_OPEN state
720        updateFailedAssignments();
721      }
722    }
723
724    public void serverChanged() {
725      synchronized (this) {
726        hasChanged = true;
727        this.notify();
728      }
729    }
730  }
731
732  private class RSGroupStartupWorker extends Thread {
733    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
734    private volatile boolean online = false;
735
736    RSGroupStartupWorker() {
737      setDaemon(true);
738    }
739
740    @Override
741    public void run() {
742      setName(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
743      if (waitForGroupTableOnline()) {
744        LOG.info("GroupBasedLoadBalancer is now online");
745      }
746    }
747
748    private boolean waitForGroupTableOnline() {
749      final List<RegionInfo> foundRegions = new LinkedList<>();
750      final List<RegionInfo> assignedRegions = new LinkedList<>();
751      final AtomicBoolean found = new AtomicBoolean(false);
752      final TableStateManager tsm = masterServices.getTableStateManager();
753      boolean createSent = false;
754      while (!found.get() && isMasterRunning(masterServices)) {
755        foundRegions.clear();
756        assignedRegions.clear();
757        found.set(true);
758        try {
759          conn.getTable(TableName.NAMESPACE_TABLE_NAME);
760          conn.getTable(RSGROUP_TABLE_NAME);
761          boolean rootMetaFound =
762              masterServices.getMetaTableLocator().verifyMetaRegionLocation(
763                  conn, masterServices.getZooKeeper(), 1);
764          final AtomicBoolean nsFound = new AtomicBoolean(false);
765          if (rootMetaFound) {
766            MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() {
767              @Override
768              public boolean visitInternal(Result row) throws IOException {
769                RegionInfo info = MetaTableAccessor.getRegionInfo(row);
770                if (info != null) {
771                  Cell serverCell =
772                      row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
773                          HConstants.SERVER_QUALIFIER);
774                  if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
775                    ServerName sn =
776                        ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
777                    if (sn == null) {
778                      found.set(false);
779                    } else if (tsm.isTableState(RSGROUP_TABLE_NAME, TableState.State.ENABLED)) {
780                      try {
781                        ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
782                        ClientProtos.GetRequest request =
783                            RequestConverter.buildGetRequest(info.getRegionName(),
784                                new Get(ROW_KEY));
785                        rs.get(null, request);
786                        assignedRegions.add(info);
787                      } catch(Exception ex) {
788                        LOG.debug("Caught exception while verifying group region", ex);
789                      }
790                    }
791                    foundRegions.add(info);
792                  }
793                  if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) {
794                    Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
795                        HConstants.SERVER_QUALIFIER);
796                    ServerName sn = null;
797                    if(cell != null) {
798                      sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell));
799                    }
800                    if (sn == null) {
801                      nsFound.set(false);
802                    } else if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME,
803                        TableState.State.ENABLED)) {
804                      try {
805                        ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
806                        ClientProtos.GetRequest request =
807                            RequestConverter.buildGetRequest(info.getRegionName(),
808                                new Get(ROW_KEY));
809                        rs.get(null, request);
810                        nsFound.set(true);
811                      } catch(Exception ex) {
812                        LOG.debug("Caught exception while verifying group region", ex);
813                      }
814                    }
815                  }
816                }
817                return true;
818              }
819            };
820            MetaTableAccessor.fullScanRegions(conn, visitor);
821            // if no regions in meta then we have to create the table
822            if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
823              createRSGroupTable();
824              createSent = true;
825            }
826            LOG.info("RSGroup table=" + RSGROUP_TABLE_NAME + " isOnline=" + found.get()
827                + ", regionCount=" + foundRegions.size() + ", assignCount="
828                + assignedRegions.size() + ", rootMetaFound=" + rootMetaFound);
829            found.set(found.get() && assignedRegions.size() == foundRegions.size()
830                && foundRegions.size() > 0);
831          } else {
832            LOG.info("Waiting for catalog tables to come online");
833            found.set(false);
834          }
835          if (found.get()) {
836            LOG.debug("With group table online, refreshing cached information.");
837            RSGroupInfoManagerImpl.this.refresh(true);
838            online = true;
839            //flush any inconsistencies between ZK and HTable
840            RSGroupInfoManagerImpl.this.flushConfig();
841          }
842        } catch (RuntimeException e) {
843          throw e;
844        } catch(Exception e) {
845          found.set(false);
846          LOG.warn("Failed to perform check", e);
847        }
848        try {
849          Thread.sleep(100);
850        } catch (InterruptedException e) {
851          LOG.info("Sleep interrupted", e);
852        }
853      }
854      return found.get();
855    }
856
857    private void createRSGroupTable() throws IOException {
858      Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
859      // wait for region to be online
860      int tries = 600;
861      while (!(masterServices.getMasterProcedureExecutor().isFinished(procId))
862          && masterServices.getMasterProcedureExecutor().isRunning()
863          && tries > 0) {
864        try {
865          Thread.sleep(100);
866        } catch (InterruptedException e) {
867          throw new IOException("Wait interrupted ", e);
868        }
869        tries--;
870      }
871      if(tries <= 0) {
872        throw new IOException("Failed to create group table in a given time.");
873      } else {
874        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
875        if (result != null && result.isFailed()) {
876          throw new IOException("Failed to create group table. " +
877              MasterProcedureUtil.unwrapRemoteIOException(result));
878        }
879      }
880    }
881
882    public boolean isOnline() {
883      return online;
884    }
885  }
886
887  private static boolean isMasterRunning(MasterServices masterServices) {
888    return !masterServices.isAborted() && !masterServices.isStopped();
889  }
890
891  private void multiMutate(List<Mutation> mutations) throws IOException {
892    CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
893    MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
894      = MultiRowMutationProtos.MutateRowsRequest.newBuilder();
895    for (Mutation mutation : mutations) {
896      if (mutation instanceof Put) {
897        mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
898            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
899            mutation));
900      } else if (mutation instanceof Delete) {
901        mmrBuilder.addMutationRequest(
902            org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
903                org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.
904                  MutationType.DELETE, mutation));
905      } else {
906        throw new DoNotRetryIOException("multiMutate doesn't support "
907          + mutation.getClass().getName());
908      }
909    }
910
911    MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
912      MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
913    try {
914      service.mutateRows(null, mmrBuilder.build());
915    } catch (ServiceException ex) {
916      ProtobufUtil.toIOException(ex);
917    }
918  }
919
920  private void checkGroupName(String groupName) throws ConstraintException {
921    if (!groupName.matches("[a-zA-Z0-9_]+")) {
922      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
923    }
924  }
925}