001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import com.google.protobuf.ServiceException;
021import java.io.ByteArrayInputStream;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.NavigableSet;
031import java.util.OptionalLong;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import org.apache.hadoop.hbase.Coprocessor;
036import org.apache.hadoop.hbase.DoNotRetryIOException;
037import org.apache.hadoop.hbase.ServerName;
038import org.apache.hadoop.hbase.TableName;
039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
040import org.apache.hadoop.hbase.client.Connection;
041import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Delete;
043import org.apache.hadoop.hbase.client.Get;
044import org.apache.hadoop.hbase.client.Mutation;
045import org.apache.hadoop.hbase.client.Put;
046import org.apache.hadoop.hbase.client.Result;
047import org.apache.hadoop.hbase.client.ResultScanner;
048import org.apache.hadoop.hbase.client.Scan;
049import org.apache.hadoop.hbase.client.Table;
050import org.apache.hadoop.hbase.client.TableDescriptor;
051import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
052import org.apache.hadoop.hbase.constraint.ConstraintException;
053import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
054import org.apache.hadoop.hbase.exceptions.DeserializationException;
055import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
056import org.apache.hadoop.hbase.master.MasterServices;
057import org.apache.hadoop.hbase.master.ServerListener;
058import org.apache.hadoop.hbase.master.TableStateManager;
059import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
060import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
061import org.apache.hadoop.hbase.net.Address;
062import org.apache.hadoop.hbase.procedure2.Procedure;
063import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
064import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
065import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
066import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
067import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
068import org.apache.hadoop.hbase.util.Bytes;
069import org.apache.hadoop.hbase.util.Threads;
070import org.apache.hadoop.hbase.zookeeper.ZKUtil;
071import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
072import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
073import org.apache.yetus.audience.InterfaceAudience;
074import org.apache.zookeeper.KeeperException;
075import org.slf4j.Logger;
076import org.slf4j.LoggerFactory;
077
078import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
079import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
080import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
081
082/**
083 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
084 * persistence store for the group information. It also makes use of zookeeper to store group
085 * information needed for bootstrapping during offline mode.
086 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
087 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
088 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
089 * zk) on each modification.
090 * <p>
091 * Mutations on state are synchronized but reads can continue without having to wait on an instance
092 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
093 * state are read-only, just-in-case (see flushConfig).
094 * <p>
095 * Reads must not block else there is a danger we'll deadlock.
096 * <p>
097 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
098 * on the results of the query modifying cache in zookeeper without another thread making
099 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
100 * access concurrently. Reads must be able to continue concurrently.
101 */
102@InterfaceAudience.Private
103final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
104  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
105
106  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
107  private static final TableDescriptor RSGROUP_TABLE_DESC;
108  static {
109    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
110      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
111      .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
112    try {
113      builder.setCoprocessor(
114        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
115          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
116    } catch (IOException ex) {
117      throw new Error(ex);
118    }
119    RSGROUP_TABLE_DESC = builder.build();
120  }
121
122  // There two Maps are immutable and wholesale replaced on each modification
123  // so are safe to access concurrently. See class comment.
124  private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
125  private volatile Map<TableName, String> tableMap = Collections.emptyMap();
126
127  private final MasterServices masterServices;
128  private final Connection conn;
129  private final ZKWatcher watcher;
130  private final RSGroupStartupWorker rsGroupStartupWorker;
131  // contains list of groups that were last flushed to persistent store
132  private Set<String> prevRSGroups = new HashSet<>();
133  private final ServerEventsListenerThread serverEventsListenerThread =
134    new ServerEventsListenerThread();
135
136  private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
137    this.masterServices = masterServices;
138    this.watcher = masterServices.getZooKeeper();
139    this.conn = masterServices.getConnection();
140    this.rsGroupStartupWorker = new RSGroupStartupWorker();
141  }
142
143
144  private synchronized void init() throws IOException {
145    refresh();
146    serverEventsListenerThread.start();
147    masterServices.getServerManager().registerListener(serverEventsListenerThread);
148  }
149
150  static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
151    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master);
152    instance.init();
153    return instance;
154  }
155
156  public void start() {
157    // create system table of rsgroup
158    rsGroupStartupWorker.start();
159  }
160
161  @Override
162  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
163    checkGroupName(rsGroupInfo.getName());
164    if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
165      rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
166      throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
167    }
168    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
169    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
170    flushConfig(newGroupMap);
171  }
172
173  private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException {
174    RSGroupInfo rsGroupInfo = getRSGroup(groupName);
175    if (rsGroupInfo == null) {
176      throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist");
177    }
178    return rsGroupInfo;
179  }
180
181  /**
182   * @param master the master to get online servers for
183   * @return Set of online Servers named for their hostname and port (not ServerName).
184   */
185  private static Set<Address> getOnlineServers(final MasterServices master) {
186    Set<Address> onlineServers = new HashSet<Address>();
187    if (master == null) {
188      return onlineServers;
189    }
190
191    for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
192      onlineServers.add(server.getAddress());
193    }
194    return onlineServers;
195  }
196
197  @Override
198  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
199      String dstGroup) throws IOException {
200    RSGroupInfo src = getRSGroupInfo(srcGroup);
201    RSGroupInfo dst = getRSGroupInfo(dstGroup);
202    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
203    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
204    // rsgroup of dead servers that are to come back later).
205    Set<Address> onlineServers =
206      dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices)
207        : null;
208    for (Address el : servers) {
209      src.removeServer(el);
210      if (onlineServers != null) {
211        if (!onlineServers.contains(el)) {
212          if (LOG.isDebugEnabled()) {
213            LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
214          }
215          continue;
216        }
217      }
218      dst.addServer(el);
219    }
220    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
221    newGroupMap.put(src.getName(), src);
222    newGroupMap.put(dst.getName(), dst);
223    flushConfig(newGroupMap);
224    return dst.getServers();
225  }
226
227  @Override
228  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
229    for (RSGroupInfo info : rsGroupMap.values()) {
230      if (info.containsServer(serverHostPort)) {
231        return info;
232      }
233    }
234    return null;
235  }
236
237  @Override
238  public RSGroupInfo getRSGroup(String groupName) {
239    return rsGroupMap.get(groupName);
240  }
241
242  @Override
243  public String getRSGroupOfTable(TableName tableName) {
244    return tableMap.get(tableName);
245  }
246
247  @Override
248  public synchronized void moveTables(Set<TableName> tableNames, String groupName)
249      throws IOException {
250    if (groupName != null && !rsGroupMap.containsKey(groupName)) {
251      throw new DoNotRetryIOException("Group " + groupName + " does not exist");
252    }
253
254    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
255    for (TableName tableName : tableNames) {
256      if (tableMap.containsKey(tableName)) {
257        RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
258        src.removeTable(tableName);
259        newGroupMap.put(src.getName(), src);
260      }
261      if (groupName != null) {
262        RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName));
263        dst.addTable(tableName);
264        newGroupMap.put(dst.getName(), dst);
265      }
266    }
267    flushConfig(newGroupMap);
268  }
269
270  @Override
271  public synchronized void removeRSGroup(String groupName) throws IOException {
272    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
273      throw new DoNotRetryIOException(
274        "Group " + groupName + " does not exist or is a reserved " + "group");
275    }
276    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
277    newGroupMap.remove(groupName);
278    flushConfig(newGroupMap);
279  }
280
281  @Override
282  public List<RSGroupInfo> listRSGroups() {
283    return Lists.newLinkedList(rsGroupMap.values());
284  }
285
286  @Override
287  public boolean isOnline() {
288    return rsGroupStartupWorker.isOnline();
289  }
290
291  @Override
292  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup,
293      String dstGroup) throws IOException {
294    // get server's group
295    RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
296    RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
297
298    // move servers
299    for (Address el : servers) {
300      srcGroupInfo.removeServer(el);
301      dstGroupInfo.addServer(el);
302    }
303    // move tables
304    for (TableName tableName : tables) {
305      srcGroupInfo.removeTable(tableName);
306      dstGroupInfo.addTable(tableName);
307    }
308
309    // flush changed groupinfo
310    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
311    newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
312    newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
313    flushConfig(newGroupMap);
314  }
315
316  @Override
317  public synchronized void removeServers(Set<Address> servers) throws IOException {
318    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
319    for (Address el : servers) {
320      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
321      if (rsGroupInfo != null) {
322        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
323        if (newRsGroupInfo == null) {
324          rsGroupInfo.removeServer(el);
325          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
326        } else {
327          newRsGroupInfo.removeServer(el);
328          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
329        }
330      } else {
331        LOG.warn("Server " + el + " does not belong to any rsgroup.");
332      }
333    }
334
335    if (rsGroupInfos.size() > 0) {
336      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
337      newGroupMap.putAll(rsGroupInfos);
338      flushConfig(newGroupMap);
339    }
340  }
341
342  List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
343    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
344    try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
345        ResultScanner scanner = table.getScanner(new Scan())) {
346      for (Result result;;) {
347        result = scanner.next();
348        if (result == null) {
349          break;
350        }
351        RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
352          .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
353        rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
354      }
355    }
356    return rsGroupInfoList;
357  }
358
359  List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
360    String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
361    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
362    // Overwrite any info stored by table, this takes precedence
363    try {
364      if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
365        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
366        if (children == null) {
367          return RSGroupInfoList;
368        }
369        for (String znode : children) {
370          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
371          if (data.length > 0) {
372            ProtobufUtil.expectPBMagicPrefix(data);
373            ByteArrayInputStream bis =
374              new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
375            RSGroupInfoList
376              .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
377          }
378        }
379        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
380      }
381    } catch (KeeperException | DeserializationException | InterruptedException e) {
382      throw new IOException("Failed to read rsGroupZNode", e);
383    }
384    return RSGroupInfoList;
385  }
386
387  @Override
388  public void refresh() throws IOException {
389    refresh(false);
390  }
391
392  /**
393   * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
394   * startup of the manager.
395   */
396  private synchronized void refresh(boolean forceOnline) throws IOException {
397    List<RSGroupInfo> groupList = new LinkedList<>();
398
399    // Overwrite anything read from zk, group table is source of truth
400    // if online read from GROUP table
401    if (forceOnline || isOnline()) {
402      LOG.debug("Refreshing in Online mode.");
403      groupList.addAll(retrieveGroupListFromGroupTable());
404    } else {
405      LOG.debug("Refreshing in Offline mode.");
406      groupList.addAll(retrieveGroupListFromZookeeper());
407    }
408
409    // refresh default group, prune
410    NavigableSet<TableName> orphanTables = new TreeSet<>();
411    for (String entry : masterServices.getTableDescriptors().getAll().keySet()) {
412      orphanTables.add(TableName.valueOf(entry));
413    }
414    for (RSGroupInfo group : groupList) {
415      if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
416        orphanTables.removeAll(group.getTables());
417      }
418    }
419
420    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
421    // from region group table or zk
422    groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables));
423
424    // populate the data
425    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
426    HashMap<TableName, String> newTableMap = Maps.newHashMap();
427    for (RSGroupInfo group : groupList) {
428      newGroupMap.put(group.getName(), group);
429      for (TableName table : group.getTables()) {
430        newTableMap.put(table, group.getName());
431      }
432    }
433    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
434    updateCacheOfRSGroups(rsGroupMap.keySet());
435  }
436
437  private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap)
438      throws IOException {
439    Map<TableName, String> newTableMap = Maps.newHashMap();
440    List<Mutation> mutations = Lists.newArrayList();
441
442    // populate deletes
443    for (String groupName : prevRSGroups) {
444      if (!groupMap.containsKey(groupName)) {
445        Delete d = new Delete(Bytes.toBytes(groupName));
446        mutations.add(d);
447      }
448    }
449
450    // populate puts
451    for (RSGroupInfo RSGroupInfo : groupMap.values()) {
452      RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
453      Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
454      p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
455      mutations.add(p);
456      for (TableName entry : RSGroupInfo.getTables()) {
457        newTableMap.put(entry, RSGroupInfo.getName());
458      }
459    }
460
461    if (mutations.size() > 0) {
462      multiMutate(mutations);
463    }
464    return newTableMap;
465  }
466
467  private synchronized void flushConfig() throws IOException {
468    flushConfig(this.rsGroupMap);
469  }
470
471  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
472    Map<TableName, String> newTableMap;
473
474    // For offline mode persistence is still unavailable
475    // We're refreshing in-memory state but only for servers in default group
476    if (!isOnline()) {
477      if (newGroupMap == this.rsGroupMap) {
478        // When newGroupMap is this.rsGroupMap itself,
479        // do not need to check default group and other groups as followed
480        return;
481      }
482
483      Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap);
484      RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
485      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
486      if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ ||
487          !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())
488          /* compare tables in default group */) {
489        throw new IOException("Only servers in default group can be updated during offline mode");
490      }
491
492      // Restore newGroupMap by putting its default group back
493      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
494
495      // Refresh rsGroupMap
496      // according to the inputted newGroupMap (an updated copy of rsGroupMap)
497      rsGroupMap = newGroupMap;
498
499      // Do not need to update tableMap
500      // because only the update on servers in default group is allowed above,
501      // or IOException will be thrown
502      return;
503    }
504
505    /* For online mode, persist to Zookeeper */
506    newTableMap = flushConfigTable(newGroupMap);
507
508    // Make changes visible after having been persisted to the source of truth
509    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
510
511    try {
512      String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
513      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
514
515      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
516      for (String groupName : prevRSGroups) {
517        if (!newGroupMap.containsKey(groupName)) {
518          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
519          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
520        }
521      }
522
523      for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
524        String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
525        RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
526        LOG.debug("Updating znode: " + znode);
527        ZKUtil.createAndFailSilent(watcher, znode);
528        zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
529        zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
530          ProtobufUtil.prependPBMagic(proto.toByteArray())));
531      }
532      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
533
534      ZKUtil.multiOrSequential(watcher, zkOps, false);
535    } catch (KeeperException e) {
536      LOG.error("Failed to write to rsGroupZNode", e);
537      masterServices.abort("Failed to write to rsGroupZNode", e);
538      throw new IOException("Failed to write to rsGroupZNode", e);
539    }
540    updateCacheOfRSGroups(newGroupMap.keySet());
541  }
542
543  /**
544   * Make changes visible. Caller must be synchronized on 'this'.
545   */
546  private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
547      Map<TableName, String> newTableMap) {
548    // Make maps Immutable.
549    this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
550    this.tableMap = Collections.unmodifiableMap(newTableMap);
551  }
552
553  /**
554   * Update cache of rsgroups. Caller must be synchronized on 'this'.
555   * @param currentGroups Current list of Groups.
556   */
557  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
558    this.prevRSGroups.clear();
559    this.prevRSGroups.addAll(currentGroups);
560  }
561
562  // Called by getDefaultServers. Presume it has lock in place.
563  private List<ServerName> getOnlineRS() throws IOException {
564    if (masterServices != null) {
565      return masterServices.getServerManager().getOnlineServersList();
566    }
567    LOG.debug("Reading online RS from zookeeper");
568    List<ServerName> servers = new LinkedList<>();
569    try {
570      for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
571        servers.add(ServerName.parseServerName(el));
572      }
573    } catch (KeeperException e) {
574      throw new IOException("Failed to retrieve server list from zookeeper", e);
575    }
576    return servers;
577  }
578
579  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
580  private SortedSet<Address> getDefaultServers() throws IOException {
581    // Build a list of servers in other groups than default group, from rsGroupMap
582    Set<Address> serversInOtherGroup = new HashSet<>();
583    for (RSGroupInfo group : listRSGroups() /* get from rsGroupMap */) {
584      if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group
585        serversInOtherGroup.addAll(group.getServers());
586      }
587    }
588
589    // Get all online servers from Zookeeper and find out servers in default group
590    SortedSet<Address> defaultServers = Sets.newTreeSet();
591    for (ServerName serverName : getOnlineRS()) {
592      Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
593      if (!serversInOtherGroup.contains(server)) { // not in other groups
594        defaultServers.add(server);
595      }
596    }
597    return defaultServers;
598  }
599
600  // Called by ServerEventsListenerThread. Synchronize on this because redoing
601  // the rsGroupMap then writing it out.
602  private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException {
603    RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
604    RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables());
605    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
606    newGroupMap.put(newInfo.getName(), newInfo);
607    flushConfig(newGroupMap);
608  }
609
610  /**
611   * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
612   * servers. Notifications about server changes are received by registering {@link ServerListener}.
613   * As a listener, we need to return immediately, so the real work of updating the servers is done
614   * asynchronously in this thread.
615   */
616  private class ServerEventsListenerThread extends Thread implements ServerListener {
617    private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
618    private boolean changed = false;
619
620    ServerEventsListenerThread() {
621      setDaemon(true);
622    }
623
624    @Override
625    public void serverAdded(ServerName serverName) {
626      serverChanged();
627    }
628
629    @Override
630    public void serverRemoved(ServerName serverName) {
631      serverChanged();
632    }
633
634    private synchronized void serverChanged() {
635      changed = true;
636      this.notify();
637    }
638
639    @Override
640    public void run() {
641      setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
642      SortedSet<Address> prevDefaultServers = new TreeSet<>();
643      while (isMasterRunning(masterServices)) {
644        try {
645          LOG.info("Updating default servers.");
646          SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
647          if (!servers.equals(prevDefaultServers)) {
648            RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
649            prevDefaultServers = servers;
650            LOG.info("Updated with servers: " + servers.size());
651          }
652          try {
653            synchronized (this) {
654              while (!changed) {
655                wait();
656              }
657              changed = false;
658            }
659          } catch (InterruptedException e) {
660            LOG.warn("Interrupted", e);
661          }
662        } catch (IOException e) {
663          LOG.warn("Failed to update default servers", e);
664        }
665      }
666    }
667  }
668
669  private class RSGroupStartupWorker extends Thread {
670    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
671    private volatile boolean online = false;
672
673    RSGroupStartupWorker() {
674      super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
675      setDaemon(true);
676    }
677
678    @Override
679    public void run() {
680      if (waitForGroupTableOnline()) {
681        LOG.info("GroupBasedLoadBalancer is now online");
682      } else {
683        LOG.warn("Quit without making region group table online");
684      }
685    }
686
687    private boolean waitForGroupTableOnline() {
688      while (isMasterRunning(masterServices)) {
689        try {
690          TableStateManager tsm = masterServices.getTableStateManager();
691          if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
692            createRSGroupTable();
693          }
694          // try reading from the table
695          try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
696            table.get(new Get(ROW_KEY));
697          }
698          LOG.info(
699            "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
700          RSGroupInfoManagerImpl.this.refresh(true);
701          online = true;
702          // flush any inconsistencies between ZK and HTable
703          RSGroupInfoManagerImpl.this.flushConfig();
704          return true;
705        } catch (Exception e) {
706          LOG.warn("Failed to perform check", e);
707          // 100ms is short so let's just ignore the interrupt
708          Threads.sleepWithoutInterrupt(100);
709        }
710      }
711      return false;
712    }
713
714    private void createRSGroupTable() throws IOException {
715      OptionalLong optProcId = masterServices.getProcedures().stream()
716        .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
717        .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
718        .findFirst();
719      long procId;
720      if (optProcId.isPresent()) {
721        procId = optProcId.getAsLong();
722      } else {
723        procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
724      }
725      // wait for region to be online
726      int tries = 600;
727      while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) &&
728        masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) {
729        try {
730          Thread.sleep(100);
731        } catch (InterruptedException e) {
732          throw new IOException("Wait interrupted ", e);
733        }
734        tries--;
735      }
736      if (tries <= 0) {
737        throw new IOException("Failed to create group table in a given time.");
738      } else {
739        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
740        if (result != null && result.isFailed()) {
741          throw new IOException(
742            "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
743        }
744      }
745    }
746
747    public boolean isOnline() {
748      return online;
749    }
750  }
751
752  private static boolean isMasterRunning(MasterServices masterServices) {
753    return !masterServices.isAborted() && !masterServices.isStopped();
754  }
755
756  private void multiMutate(List<Mutation> mutations) throws IOException {
757    try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
758      CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
759      MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
760        MultiRowMutationProtos.MutateRowsRequest.newBuilder();
761      for (Mutation mutation : mutations) {
762        if (mutation instanceof Put) {
763          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
764            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
765            mutation));
766        } else if (mutation instanceof Delete) {
767          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
768            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
769            mutation));
770        } else {
771          throw new DoNotRetryIOException(
772            "multiMutate doesn't support " + mutation.getClass().getName());
773        }
774      }
775
776      MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
777        MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
778      try {
779        service.mutateRows(null, mmrBuilder.build());
780      } catch (ServiceException ex) {
781        ProtobufUtil.toIOException(ex);
782      }
783    }
784  }
785
786  private void checkGroupName(String groupName) throws ConstraintException {
787    if (!groupName.matches("[a-zA-Z0-9_]+")) {
788      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
789    }
790  }
791}