001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.rsgroup;
020
021import com.google.protobuf.ServiceException;
022
023import java.io.ByteArrayInputStream;
024import java.io.IOException;
025import java.util.ArrayList;
026import java.util.Collections;
027import java.util.HashMap;
028import java.util.HashSet;
029import java.util.LinkedList;
030import java.util.List;
031import java.util.Map;
032import java.util.NavigableSet;
033import java.util.Set;
034import java.util.SortedSet;
035import java.util.TreeSet;
036import java.util.concurrent.atomic.AtomicBoolean;
037
038import org.apache.hadoop.conf.Configuration;
039import org.apache.hadoop.hbase.Cell;
040import org.apache.hadoop.hbase.CellUtil;
041import org.apache.hadoop.hbase.Coprocessor;
042import org.apache.hadoop.hbase.DoNotRetryIOException;
043import org.apache.hadoop.hbase.HColumnDescriptor;
044import org.apache.hadoop.hbase.HConstants;
045import org.apache.hadoop.hbase.HTableDescriptor;
046import org.apache.hadoop.hbase.MetaTableAccessor;
047import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase;
048import org.apache.hadoop.hbase.ServerName;
049import org.apache.hadoop.hbase.TableName;
050import org.apache.hadoop.hbase.client.ClusterConnection;
051import org.apache.hadoop.hbase.client.Delete;
052import org.apache.hadoop.hbase.client.Get;
053import org.apache.hadoop.hbase.client.Mutation;
054import org.apache.hadoop.hbase.client.Put;
055import org.apache.hadoop.hbase.client.RegionInfo;
056import org.apache.hadoop.hbase.client.Result;
057import org.apache.hadoop.hbase.client.Scan;
058import org.apache.hadoop.hbase.client.Table;
059import org.apache.hadoop.hbase.client.TableState;
060import org.apache.hadoop.hbase.constraint.ConstraintException;
061import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
062import org.apache.hadoop.hbase.exceptions.DeserializationException;
063import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
064import org.apache.hadoop.hbase.master.MasterServices;
065import org.apache.hadoop.hbase.master.ServerListener;
066import org.apache.hadoop.hbase.master.TableStateManager;
067import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
068import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
069import org.apache.hadoop.hbase.net.Address;
070import org.apache.hadoop.hbase.procedure2.Procedure;
071import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
072import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
073import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
074import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
075import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
076import org.apache.hadoop.hbase.util.Bytes;
077import org.apache.hadoop.hbase.zookeeper.ZKUtil;
078import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
079import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
080import org.apache.yetus.audience.InterfaceAudience;
081import org.apache.zookeeper.KeeperException;
082import org.slf4j.Logger;
083import org.slf4j.LoggerFactory;
084import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
085import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
086import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
087import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
089
090/**
091 * This is an implementation of {@link RSGroupInfoManager} which makes
092 * use of an HBase table as the persistence store for the group information.
093 * It also makes use of zookeeper to store group information needed
094 * for bootstrapping during offline mode.
095 *
096 * <h2>Concurrency</h2>
097 * RSGroup state is kept locally in Maps. There is a rsgroup name to cached
098 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the
099 * rsgroup they belong too (in {@link #tableMap}). These Maps are persisted to the
100 * hbase:rsgroup table (and cached in zk) on each modification.
101 *
102 * <p>Mutations on state are synchronized but reads can continue without having
103 * to wait on an instance monitor, mutations do wholesale replace of the Maps on
104 * update -- Copy-On-Write; the local Maps of state are read-only, just-in-case
105 * (see flushConfig).
106 *
107 * <p>Reads must not block else there is a danger we'll deadlock.
108 *
109 * <p>Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and
110 * then act on the results of the query modifying cache in zookeeper without another thread
111 * making intermediate modifications. These clients synchronize on the 'this' instance so
112 * no other has access concurrently. Reads must be able to continue concurrently.
113 */
114@InterfaceAudience.Private
115final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
116  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
117
118  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
119  private final static HTableDescriptor RSGROUP_TABLE_DESC;
120  static {
121    RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME);
122    RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES));
123    RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
124    try {
125      RSGROUP_TABLE_DESC.addCoprocessor(
126        MultiRowMutationEndpoint.class.getName(),
127          null, Coprocessor.PRIORITY_SYSTEM, null);
128    } catch (IOException ex) {
129      throw new RuntimeException(ex);
130    }
131  }
132
133  // There two Maps are immutable and wholesale replaced on each modification
134  // so are safe to access concurrently. See class comment.
135  private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
136  private volatile Map<TableName, String> tableMap = Collections.emptyMap();
137
138  private final MasterServices masterServices;
139  private Table rsGroupTable;
140  private final ClusterConnection conn;
141  private final ZKWatcher watcher;
142  private final RSGroupStartupWorker rsGroupStartupWorker = new RSGroupStartupWorker();
143  // contains list of groups that were last flushed to persistent store
144  private Set<String> prevRSGroups = new HashSet<>();
145  private final ServerEventsListenerThread serverEventsListenerThread =
146      new ServerEventsListenerThread();
147  private FailedOpenUpdaterThread failedOpenUpdaterThread;
148
149  private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
150    this.masterServices = masterServices;
151    this.watcher = masterServices.getZooKeeper();
152    this.conn = masterServices.getClusterConnection();
153  }
154
155  private synchronized void init() throws IOException{
156    refresh();
157    serverEventsListenerThread.start();
158    masterServices.getServerManager().registerListener(serverEventsListenerThread);
159    failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
160    failedOpenUpdaterThread.start();
161    masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
162  }
163
164  static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
165    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master);
166    instance.init();
167    return instance;
168  }
169
170  public void start(){
171    // create system table of rsgroup
172    rsGroupStartupWorker.start();
173  }
174
175  @Override
176  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
177    checkGroupName(rsGroupInfo.getName());
178    if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
179        rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
180      throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName());
181    }
182    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
183    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
184    flushConfig(newGroupMap);
185  }
186
187  private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException {
188    RSGroupInfo rsGroupInfo = getRSGroup(groupName);
189    if (rsGroupInfo == null) {
190      throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist");
191    }
192    return rsGroupInfo;
193  }
194
195  @Override
196  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
197      String dstGroup) throws IOException {
198    RSGroupInfo src = getRSGroupInfo(srcGroup);
199    RSGroupInfo dst = getRSGroupInfo(dstGroup);
200    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
201    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
202    // rsgroup of dead servers that are to come back later).
203    Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)?
204        Utility.getOnlineServers(this.masterServices): null;
205    for (Address el: servers) {
206      src.removeServer(el);
207      if (onlineServers != null) {
208        if (!onlineServers.contains(el)) {
209          if (LOG.isDebugEnabled()) {
210            LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
211          }
212          continue;
213        }
214      }
215      dst.addServer(el);
216    }
217    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
218    newGroupMap.put(src.getName(), src);
219    newGroupMap.put(dst.getName(), dst);
220    flushConfig(newGroupMap);
221    return dst.getServers();
222  }
223
224  @Override
225  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
226    for (RSGroupInfo info: rsGroupMap.values()) {
227      if (info.containsServer(serverHostPort)) {
228        return info;
229      }
230    }
231    return null;
232  }
233
234  @Override
235  public RSGroupInfo getRSGroup(String groupName) {
236    return rsGroupMap.get(groupName);
237  }
238
239  @Override
240  public String getRSGroupOfTable(TableName tableName) {
241    return tableMap.get(tableName);
242  }
243
244  @Override
245  public synchronized void moveTables(Set<TableName> tableNames, String groupName)
246      throws IOException {
247    if (groupName != null && !rsGroupMap.containsKey(groupName)) {
248      throw new DoNotRetryIOException("Group "+groupName+" does not exist");
249    }
250
251    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
252    for(TableName tableName: tableNames) {
253      if (tableMap.containsKey(tableName)) {
254        RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
255        src.removeTable(tableName);
256        newGroupMap.put(src.getName(), src);
257      }
258      if(groupName != null) {
259        RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
260        dst.addTable(tableName);
261        newGroupMap.put(dst.getName(), dst);
262      }
263    }
264    flushConfig(newGroupMap);
265  }
266
267  @Override
268  public synchronized void removeRSGroup(String groupName) throws IOException {
269    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
270      throw new DoNotRetryIOException("Group " + groupName + " does not exist or is a reserved "
271          + "group");
272    }
273    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
274    newGroupMap.remove(groupName);
275    flushConfig(newGroupMap);
276  }
277
278  @Override
279  public List<RSGroupInfo> listRSGroups() {
280    return Lists.newLinkedList(rsGroupMap.values());
281  }
282
283  @Override
284  public boolean isOnline() {
285    return rsGroupStartupWorker.isOnline();
286  }
287
288  @Override
289  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables,
290                                   String srcGroup, String dstGroup) throws IOException {
291    //get server's group
292    RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
293    RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
294
295    //move servers
296    for (Address el: servers) {
297      srcGroupInfo.removeServer(el);
298      dstGroupInfo.addServer(el);
299    }
300    //move tables
301    for(TableName tableName: tables) {
302      srcGroupInfo.removeTable(tableName);
303      dstGroupInfo.addTable(tableName);
304    }
305
306    //flush changed groupinfo
307    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
308    newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
309    newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
310    flushConfig(newGroupMap);
311  }
312
313  @Override
314  public synchronized void removeServers(Set<Address> servers) throws IOException {
315    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
316    for (Address el: servers) {
317      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
318      if (rsGroupInfo != null) {
319        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
320        if (newRsGroupInfo == null) {
321          rsGroupInfo.removeServer(el);
322          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
323        } else {
324          newRsGroupInfo.removeServer(el);
325          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
326        }
327      }else {
328        LOG.warn("Server " + el + " does not belong to any rsgroup.");
329      }
330    }
331
332    if (rsGroupInfos.size() > 0) {
333      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
334      newGroupMap.putAll(rsGroupInfos);
335      flushConfig(newGroupMap);
336    }
337  }
338
339  List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
340    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
341    for (Result result : rsGroupTable.getScanner(new Scan())) {
342      RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom(
343              result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
344      rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
345    }
346    return rsGroupInfoList;
347  }
348
349  List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
350    String groupBasePath = ZNodePaths.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode);
351    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
352    //Overwrite any info stored by table, this takes precedence
353    try {
354      if(ZKUtil.checkExists(watcher, groupBasePath) != -1) {
355        for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) {
356          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
357          if(data.length > 0) {
358            ProtobufUtil.expectPBMagicPrefix(data);
359            ByteArrayInputStream bis = new ByteArrayInputStream(
360                data, ProtobufUtil.lengthOfPBMagic(), data.length);
361            RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(
362                RSGroupProtos.RSGroupInfo.parseFrom(bis)));
363          }
364        }
365        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
366      }
367    } catch (KeeperException|DeserializationException|InterruptedException e) {
368      throw new IOException("Failed to read rsGroupZNode",e);
369    }
370    return RSGroupInfoList;
371  }
372
373  @Override
374  public void refresh() throws IOException {
375    refresh(false);
376  }
377
378  /**
379   * Read rsgroup info from the source of truth, the hbase:rsgroup table.
380   * Update zk cache. Called on startup of the manager.
381   */
382  private synchronized void refresh(boolean forceOnline) throws IOException {
383    List<RSGroupInfo> groupList = new LinkedList<>();
384
385    // Overwrite anything read from zk, group table is source of truth
386    // if online read from GROUP table
387    if (forceOnline || isOnline()) {
388      LOG.debug("Refreshing in Online mode.");
389      if (rsGroupTable == null) {
390        rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME);
391      }
392      groupList.addAll(retrieveGroupListFromGroupTable());
393    } else {
394      LOG.debug("Refreshing in Offline mode.");
395      groupList.addAll(retrieveGroupListFromZookeeper());
396    }
397
398    // refresh default group, prune
399    NavigableSet<TableName> orphanTables = new TreeSet<>();
400    for(String entry: masterServices.getTableDescriptors().getAll().keySet()) {
401      orphanTables.add(TableName.valueOf(entry));
402    }
403    for (RSGroupInfo group: groupList) {
404      if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
405        orphanTables.removeAll(group.getTables());
406      }
407    }
408
409    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
410    // from region group table or zk
411    groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(),
412        orphanTables));
413
414    // populate the data
415    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
416    HashMap<TableName, String> newTableMap = Maps.newHashMap();
417    for (RSGroupInfo group : groupList) {
418      newGroupMap.put(group.getName(), group);
419      for(TableName table: group.getTables()) {
420        newTableMap.put(table, group.getName());
421      }
422    }
423    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
424    updateCacheOfRSGroups(rsGroupMap.keySet());
425  }
426
427  private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> groupMap)
428      throws IOException {
429    Map<TableName,String> newTableMap = Maps.newHashMap();
430    List<Mutation> mutations = Lists.newArrayList();
431
432    // populate deletes
433    for(String groupName : prevRSGroups) {
434      if(!groupMap.containsKey(groupName)) {
435        Delete d = new Delete(Bytes.toBytes(groupName));
436        mutations.add(d);
437      }
438    }
439
440    // populate puts
441    for(RSGroupInfo RSGroupInfo : groupMap.values()) {
442      RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
443      Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
444      p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
445      mutations.add(p);
446      for(TableName entry: RSGroupInfo.getTables()) {
447        newTableMap.put(entry, RSGroupInfo.getName());
448      }
449    }
450
451    if(mutations.size() > 0) {
452      multiMutate(mutations);
453    }
454    return newTableMap;
455  }
456
457  private synchronized void flushConfig()
458  throws IOException {
459    flushConfig(this.rsGroupMap);
460  }
461
462  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap)
463  throws IOException {
464    Map<TableName, String> newTableMap;
465
466    // For offline mode persistence is still unavailable
467    // We're refreshing in-memory state but only for default servers
468    if (!isOnline()) {
469      Map<String, RSGroupInfo> m = Maps.newHashMap(rsGroupMap);
470      RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP);
471      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
472      if (!m.equals(newGroupMap) ||
473          !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) {
474        throw new IOException("Only default servers can be updated during offline mode");
475      }
476      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
477      rsGroupMap = newGroupMap;
478      return;
479    }
480
481    newTableMap = flushConfigTable(newGroupMap);
482
483    // Make changes visible after having been persisted to the source of truth
484    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
485
486    try {
487      String groupBasePath = ZNodePaths.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode);
488      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
489
490      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
491      for(String groupName : prevRSGroups) {
492        if(!newGroupMap.containsKey(groupName)) {
493          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
494          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
495        }
496      }
497
498
499      for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
500        String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
501        RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
502        LOG.debug("Updating znode: "+znode);
503        ZKUtil.createAndFailSilent(watcher, znode);
504        zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
505        zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
506            ProtobufUtil.prependPBMagic(proto.toByteArray())));
507      }
508      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
509
510      ZKUtil.multiOrSequential(watcher, zkOps, false);
511    } catch (KeeperException e) {
512      LOG.error("Failed to write to rsGroupZNode", e);
513      masterServices.abort("Failed to write to rsGroupZNode", e);
514      throw new IOException("Failed to write to rsGroupZNode",e);
515    }
516    updateCacheOfRSGroups(newGroupMap.keySet());
517  }
518
519  /**
520   * Make changes visible.
521   * Caller must be synchronized on 'this'.
522   */
523  private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
524      Map<TableName, String> newTableMap) {
525    // Make maps Immutable.
526    this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
527    this.tableMap = Collections.unmodifiableMap(newTableMap);
528  }
529
530  /**
531   * Update cache of rsgroups.
532   * Caller must be synchronized on 'this'.
533   * @param currentGroups Current list of Groups.
534   */
535  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
536    this.prevRSGroups.clear();
537    this.prevRSGroups.addAll(currentGroups);
538  }
539
540  // Called by getDefaultServers. Presume it has lock in place.
541  private List<ServerName> getOnlineRS() throws IOException {
542    if (masterServices != null) {
543      return masterServices.getServerManager().getOnlineServersList();
544    }
545    LOG.debug("Reading online RS from zookeeper");
546    List<ServerName> servers = new LinkedList<>();
547    try {
548      for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.znodePaths.rsZNode)) {
549        servers.add(ServerName.parseServerName(el));
550      }
551    } catch (KeeperException e) {
552      throw new IOException("Failed to retrieve server list from zookeeper", e);
553    }
554    return servers;
555  }
556
557  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
558  private SortedSet<Address> getDefaultServers() throws IOException {
559    SortedSet<Address> defaultServers = Sets.newTreeSet();
560    for (ServerName serverName : getOnlineRS()) {
561      Address server =
562          Address.fromParts(serverName.getHostname(), serverName.getPort());
563      boolean found = false;
564      for(RSGroupInfo rsgi: listRSGroups()) {
565        if(!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) &&
566            rsgi.containsServer(server)) {
567          found = true;
568          break;
569        }
570      }
571      if (!found) {
572        defaultServers.add(server);
573      }
574    }
575    return defaultServers;
576  }
577
578  // Called by ServerEventsListenerThread. Synchronize on this because redoing
579  // the rsGroupMap then writing it out.
580  private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException {
581    RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
582    RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables());
583    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
584    newGroupMap.put(newInfo.getName(), newInfo);
585    flushConfig(newGroupMap);
586  }
587
588  // Called by FailedOpenUpdaterThread
589  private void updateFailedAssignments() {
590    // Kick all regions in FAILED_OPEN state
591    List<RegionInfo> stuckAssignments = Lists.newArrayList();
592    for (RegionStateNode state:
593        masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
594      if (state.isStuck()) {
595        stuckAssignments.add(state.getRegionInfo());
596      }
597    }
598    for (RegionInfo region: stuckAssignments) {
599      LOG.info("Retrying assignment of " + region);
600      try {
601        masterServices.getAssignmentManager().unassign(region);
602      } catch (IOException e) {
603        LOG.warn("Unable to reassign " + region, e);
604      }
605    }
606  }
607
608  /**
609   * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
610   * servers. Notifications about server changes are received by registering {@link ServerListener}.
611   * As a listener, we need to return immediately, so the real work of updating the servers is
612   * done asynchronously in this thread.
613   */
614  private class ServerEventsListenerThread extends Thread implements ServerListener {
615    private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
616    private boolean changed = false;
617
618    ServerEventsListenerThread() {
619      setDaemon(true);
620    }
621
622    @Override
623    public void serverAdded(ServerName serverName) {
624      serverChanged();
625    }
626
627    @Override
628    public void serverRemoved(ServerName serverName) {
629      serverChanged();
630    }
631
632    private synchronized void serverChanged() {
633      changed = true;
634      this.notify();
635    }
636
637    @Override
638    public void run() {
639      setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
640      SortedSet<Address> prevDefaultServers = new TreeSet<>();
641      while(isMasterRunning(masterServices)) {
642        try {
643          LOG.info("Updating default servers.");
644          SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
645          if (!servers.equals(prevDefaultServers)) {
646            RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
647            prevDefaultServers = servers;
648            LOG.info("Updated with servers: "+servers.size());
649          }
650          try {
651            synchronized (this) {
652              while (!changed) {
653                wait();
654              }
655              changed = false;
656            }
657          } catch (InterruptedException e) {
658            LOG.warn("Interrupted", e);
659          }
660        } catch (IOException e) {
661          LOG.warn("Failed to update default servers", e);
662        }
663      }
664    }
665  }
666
667  private class FailedOpenUpdaterThread extends Thread implements ServerListener {
668    private final long waitInterval;
669    private volatile boolean hasChanged = false;
670
671    public FailedOpenUpdaterThread(Configuration conf) {
672      this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY,
673        DEFAULT_REASSIGN_WAIT_INTERVAL);
674      setDaemon(true);
675    }
676
677    @Override
678    public void serverAdded(ServerName serverName) {
679      serverChanged();
680    }
681
682    @Override
683    public void serverRemoved(ServerName serverName) {
684    }
685
686    @Override
687    public void run() {
688      while (isMasterRunning(masterServices)) {
689        boolean interrupted = false;
690        try {
691          synchronized (this) {
692            while (!hasChanged) {
693              wait();
694            }
695            hasChanged = false;
696          }
697        } catch (InterruptedException e) {
698          LOG.warn("Interrupted", e);
699          interrupted = true;
700        }
701        if (!isMasterRunning(masterServices) || interrupted) {
702          continue;
703        }
704
705        // First, wait a while in case more servers are about to rejoin the cluster
706        try {
707          Thread.sleep(waitInterval);
708        } catch (InterruptedException e) {
709          LOG.warn("Interrupted", e);
710        }
711        if (!isMasterRunning(masterServices)) {
712          continue;
713        }
714
715        // Kick all regions in FAILED_OPEN state
716        updateFailedAssignments();
717      }
718    }
719
720    public void serverChanged() {
721      synchronized (this) {
722        hasChanged = true;
723        this.notify();
724      }
725    }
726  }
727
728  private class RSGroupStartupWorker extends Thread {
729    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
730    private volatile boolean online = false;
731
732    RSGroupStartupWorker() {
733      setDaemon(true);
734    }
735
736    @Override
737    public void run() {
738      setName(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
739      if (waitForGroupTableOnline()) {
740        LOG.info("GroupBasedLoadBalancer is now online");
741      }
742    }
743
744    private boolean waitForGroupTableOnline() {
745      final List<RegionInfo> foundRegions = new LinkedList<>();
746      final List<RegionInfo> assignedRegions = new LinkedList<>();
747      final AtomicBoolean found = new AtomicBoolean(false);
748      final TableStateManager tsm = masterServices.getTableStateManager();
749      boolean createSent = false;
750      while (!found.get() && isMasterRunning(masterServices)) {
751        foundRegions.clear();
752        assignedRegions.clear();
753        found.set(true);
754        try {
755          conn.getTable(TableName.NAMESPACE_TABLE_NAME);
756          conn.getTable(RSGROUP_TABLE_NAME);
757          boolean rootMetaFound =
758              masterServices.getMetaTableLocator().verifyMetaRegionLocation(
759                  conn, masterServices.getZooKeeper(), 1);
760          final AtomicBoolean nsFound = new AtomicBoolean(false);
761          if (rootMetaFound) {
762            MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() {
763              @Override
764              public boolean visitInternal(Result row) throws IOException {
765                RegionInfo info = MetaTableAccessor.getRegionInfo(row);
766                if (info != null) {
767                  Cell serverCell =
768                      row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
769                          HConstants.SERVER_QUALIFIER);
770                  if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) {
771                    ServerName sn =
772                        ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell));
773                    if (sn == null) {
774                      found.set(false);
775                    } else if (tsm.isTableState(RSGROUP_TABLE_NAME, TableState.State.ENABLED)) {
776                      try {
777                        ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
778                        ClientProtos.GetRequest request =
779                            RequestConverter.buildGetRequest(info.getRegionName(),
780                                new Get(ROW_KEY));
781                        rs.get(null, request);
782                        assignedRegions.add(info);
783                      } catch(Exception ex) {
784                        LOG.debug("Caught exception while verifying group region", ex);
785                      }
786                    }
787                    foundRegions.add(info);
788                  }
789                  if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) {
790                    Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY,
791                        HConstants.SERVER_QUALIFIER);
792                    ServerName sn = null;
793                    if(cell != null) {
794                      sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell));
795                    }
796                    if (sn == null) {
797                      nsFound.set(false);
798                    } else if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME,
799                        TableState.State.ENABLED)) {
800                      try {
801                        ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn);
802                        ClientProtos.GetRequest request =
803                            RequestConverter.buildGetRequest(info.getRegionName(),
804                                new Get(ROW_KEY));
805                        rs.get(null, request);
806                        nsFound.set(true);
807                      } catch(Exception ex) {
808                        LOG.debug("Caught exception while verifying group region", ex);
809                      }
810                    }
811                  }
812                }
813                return true;
814              }
815            };
816            MetaTableAccessor.fullScanRegions(conn, visitor);
817            // if no regions in meta then we have to create the table
818            if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) {
819              createRSGroupTable();
820              createSent = true;
821            }
822            LOG.info("RSGroup table=" + RSGROUP_TABLE_NAME + " isOnline=" + found.get()
823                + ", regionCount=" + foundRegions.size() + ", assignCount="
824                + assignedRegions.size() + ", rootMetaFound=" + rootMetaFound);
825            found.set(found.get() && assignedRegions.size() == foundRegions.size()
826                && foundRegions.size() > 0);
827          } else {
828            LOG.info("Waiting for catalog tables to come online");
829            found.set(false);
830          }
831          if (found.get()) {
832            LOG.debug("With group table online, refreshing cached information.");
833            RSGroupInfoManagerImpl.this.refresh(true);
834            online = true;
835            //flush any inconsistencies between ZK and HTable
836            RSGroupInfoManagerImpl.this.flushConfig();
837          }
838        } catch (RuntimeException e) {
839          throw e;
840        } catch(Exception e) {
841          found.set(false);
842          LOG.warn("Failed to perform check", e);
843        }
844        try {
845          Thread.sleep(100);
846        } catch (InterruptedException e) {
847          LOG.info("Sleep interrupted", e);
848        }
849      }
850      return found.get();
851    }
852
853    private void createRSGroupTable() throws IOException {
854      Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
855      // wait for region to be online
856      int tries = 600;
857      while (!(masterServices.getMasterProcedureExecutor().isFinished(procId))
858          && masterServices.getMasterProcedureExecutor().isRunning()
859          && tries > 0) {
860        try {
861          Thread.sleep(100);
862        } catch (InterruptedException e) {
863          throw new IOException("Wait interrupted ", e);
864        }
865        tries--;
866      }
867      if(tries <= 0) {
868        throw new IOException("Failed to create group table in a given time.");
869      } else {
870        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
871        if (result != null && result.isFailed()) {
872          throw new IOException("Failed to create group table. " +
873              MasterProcedureUtil.unwrapRemoteIOException(result));
874        }
875      }
876    }
877
878    public boolean isOnline() {
879      return online;
880    }
881  }
882
883  private static boolean isMasterRunning(MasterServices masterServices) {
884    return !masterServices.isAborted() && !masterServices.isStopped();
885  }
886
887  private void multiMutate(List<Mutation> mutations) throws IOException {
888    CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY);
889    MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder
890      = MultiRowMutationProtos.MutateRowsRequest.newBuilder();
891    for (Mutation mutation : mutations) {
892      if (mutation instanceof Put) {
893        mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
894            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
895            mutation));
896      } else if (mutation instanceof Delete) {
897        mmrBuilder.addMutationRequest(
898            org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
899                org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.
900                  MutationType.DELETE, mutation));
901      } else {
902        throw new DoNotRetryIOException("multiMutate doesn't support "
903          + mutation.getClass().getName());
904      }
905    }
906
907    MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
908      MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
909    try {
910      service.mutateRows(null, mmrBuilder.build());
911    } catch (ServiceException ex) {
912      ProtobufUtil.toIOException(ex);
913    }
914  }
915
916  private void checkGroupName(String groupName) throws ConstraintException {
917    if (!groupName.matches("[a-zA-Z0-9_]+")) {
918      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
919    }
920  }
921}