001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import com.google.protobuf.ServiceException;
021import java.io.ByteArrayInputStream;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.NavigableSet;
031import java.util.OptionalLong;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Coprocessor;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Mutation;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.ResultScanner;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.constraint.ConstraintException;
055import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
056import org.apache.hadoop.hbase.exceptions.DeserializationException;
057import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
058import org.apache.hadoop.hbase.master.ClusterSchema;
059import org.apache.hadoop.hbase.master.MasterServices;
060import org.apache.hadoop.hbase.master.ServerListener;
061import org.apache.hadoop.hbase.master.TableStateManager;
062import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
063import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
064import org.apache.hadoop.hbase.net.Address;
065import org.apache.hadoop.hbase.procedure2.Procedure;
066import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
067import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
068import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
069import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
070import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.zookeeper.ZKUtil;
074import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
075import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
076import org.apache.hadoop.util.Shell;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.apache.zookeeper.KeeperException;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
083import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
084import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
085
086/**
087 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
088 * persistence store for the group information. It also makes use of zookeeper to store group
089 * information needed for bootstrapping during offline mode.
090 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
091 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
092 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
093 * zk) on each modification.
094 * <p>
095 * Mutations on state are synchronized but reads can continue without having to wait on an instance
096 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
097 * state are read-only, just-in-case (see flushConfig).
098 * <p>
099 * Reads must not block else there is a danger we'll deadlock.
100 * <p>
101 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
102 * on the results of the query modifying cache in zookeeper without another thread making
103 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
104 * access concurrently. Reads must be able to continue concurrently.
105 */
106@InterfaceAudience.Private
107final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
108  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
109
110  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
111  private static final TableDescriptor RSGROUP_TABLE_DESC;
112  static {
113    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
114      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
115      .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
116    try {
117      builder.setCoprocessor(
118        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
119          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
120    } catch (IOException ex) {
121      throw new Error(ex);
122    }
123    RSGROUP_TABLE_DESC = builder.build();
124  }
125
126  // There two Maps are immutable and wholesale replaced on each modification
127  // so are safe to access concurrently. See class comment.
128  private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
129  private volatile Map<TableName, String> tableMap = Collections.emptyMap();
130
131  private final MasterServices masterServices;
132  private final Connection conn;
133  private final ZKWatcher watcher;
134  private final RSGroupStartupWorker rsGroupStartupWorker;
135  // contains list of groups that were last flushed to persistent store
136  private Set<String> prevRSGroups = new HashSet<>();
137  private final ServerEventsListenerThread serverEventsListenerThread =
138    new ServerEventsListenerThread();
139
140  /** Get rsgroup table mapping script */
141  RSGroupMappingScript script;
142
143  // Package visibility for testing
144  static class RSGroupMappingScript {
145
146    static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script";
147    static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT =
148      "hbase.rsgroup.table.mapping.script.timeout";
149
150    private Shell.ShellCommandExecutor rsgroupMappingScript;
151
152    RSGroupMappingScript(Configuration conf) {
153      String script = conf.get(RS_GROUP_MAPPING_SCRIPT);
154      if (script == null || script.isEmpty()) {
155        return;
156      }
157
158      rsgroupMappingScript = new Shell.ShellCommandExecutor(new String[] { script, "", "" }, null,
159        null, conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000) // 5 seconds
160      );
161    }
162
163    String getRSGroup(String namespace, String tablename) {
164      if (rsgroupMappingScript == null) {
165        return null;
166      }
167      String[] exec = rsgroupMappingScript.getExecString();
168      exec[1] = namespace;
169      exec[2] = tablename;
170      try {
171        rsgroupMappingScript.execute();
172      } catch (IOException e) {
173        LOG.error("Failed to get RSGroup from script for table {}:{}", namespace, tablename, e);
174        return null;
175      }
176      return rsgroupMappingScript.getOutput().trim();
177    }
178  }
179
180  private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
181    this.masterServices = masterServices;
182    this.watcher = masterServices.getZooKeeper();
183    this.conn = masterServices.getConnection();
184    this.rsGroupStartupWorker = new RSGroupStartupWorker();
185    script = new RSGroupMappingScript(masterServices.getConfiguration());
186  }
187
188  private synchronized void init() throws IOException {
189    refresh();
190    serverEventsListenerThread.start();
191    masterServices.getServerManager().registerListener(serverEventsListenerThread);
192  }
193
194  static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
195    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master);
196    instance.init();
197    return instance;
198  }
199
200  public void start() {
201    // create system table of rsgroup
202    rsGroupStartupWorker.start();
203  }
204
205  @Override
206  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
207    checkGroupName(rsGroupInfo.getName());
208    if (
209      rsGroupMap.get(rsGroupInfo.getName()) != null
210        || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)
211    ) {
212      throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
213    }
214    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
215    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
216    flushConfig(newGroupMap);
217  }
218
219  private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException {
220    RSGroupInfo rsGroupInfo = getRSGroup(groupName);
221    if (rsGroupInfo == null) {
222      throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist");
223    }
224    return rsGroupInfo;
225  }
226
227  /**
228   * @param master the master to get online servers for
229   * @return Set of online Servers named for their hostname and port (not ServerName).
230   */
231  private static Set<Address> getOnlineServers(final MasterServices master) {
232    Set<Address> onlineServers = new HashSet<Address>();
233    if (master == null) {
234      return onlineServers;
235    }
236
237    for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
238      onlineServers.add(server.getAddress());
239    }
240    return onlineServers;
241  }
242
243  @Override
244  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
245    String dstGroup) throws IOException {
246    RSGroupInfo src = getRSGroupInfo(srcGroup);
247    RSGroupInfo dst = getRSGroupInfo(dstGroup);
248    Set<Address> movedServers = new HashSet<>();
249    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
250    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
251    // rsgroup of dead servers that are to come back later).
252    Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)
253      ? getOnlineServers(this.masterServices)
254      : null;
255    for (Address el : servers) {
256      src.removeServer(el);
257      if (onlineServers != null) {
258        if (!onlineServers.contains(el)) {
259          if (LOG.isDebugEnabled()) {
260            LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
261          }
262          continue;
263        }
264      }
265      dst.addServer(el);
266      movedServers.add(el);
267    }
268    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
269    newGroupMap.put(src.getName(), src);
270    newGroupMap.put(dst.getName(), dst);
271    flushConfig(newGroupMap);
272    return movedServers;
273  }
274
275  @Override
276  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
277    for (RSGroupInfo info : rsGroupMap.values()) {
278      if (info.containsServer(serverHostPort)) {
279        return info;
280      }
281    }
282    return null;
283  }
284
285  @Override
286  public RSGroupInfo getRSGroup(String groupName) {
287    return rsGroupMap.get(groupName);
288  }
289
290  @Override
291  public String getRSGroupOfTable(TableName tableName) {
292    return tableMap.get(tableName);
293  }
294
295  @Override
296  public synchronized void moveTables(Set<TableName> tableNames, String groupName)
297    throws IOException {
298    // Check if rsGroup contains the destination rsgroup
299    if (groupName != null && !rsGroupMap.containsKey(groupName)) {
300      throw new DoNotRetryIOException("Group " + groupName + " does not exist");
301    }
302
303    // Make a copy of rsGroupMap to update
304    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
305
306    // Remove tables from their original rsgroups
307    // and update the copy of rsGroupMap
308    for (TableName tableName : tableNames) {
309      if (tableMap.containsKey(tableName)) {
310        RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
311        src.removeTable(tableName);
312        newGroupMap.put(src.getName(), src);
313      }
314    }
315
316    // Add tables to the destination rsgroup
317    // and update the copy of rsGroupMap
318    if (groupName != null) {
319      RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName));
320      dstGroup.addAllTables(tableNames);
321      newGroupMap.put(dstGroup.getName(), dstGroup);
322    }
323
324    // Flush according to the updated copy of rsGroupMap
325    flushConfig(newGroupMap);
326  }
327
328  @Override
329  public synchronized void removeRSGroup(String groupName) throws IOException {
330    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
331      throw new DoNotRetryIOException(
332        "Group " + groupName + " does not exist or is a reserved " + "group");
333    }
334    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
335    newGroupMap.remove(groupName);
336    flushConfig(newGroupMap);
337  }
338
339  @Override
340  public List<RSGroupInfo> listRSGroups() {
341    return Lists.newLinkedList(rsGroupMap.values());
342  }
343
344  @Override
345  public boolean isOnline() {
346    return rsGroupStartupWorker.isOnline();
347  }
348
349  @Override
350  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup,
351    String dstGroup) throws IOException {
352    // get server's group
353    RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
354    RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
355
356    // move servers
357    for (Address el : servers) {
358      srcGroupInfo.removeServer(el);
359      dstGroupInfo.addServer(el);
360    }
361    // move tables
362    for (TableName tableName : tables) {
363      srcGroupInfo.removeTable(tableName);
364      dstGroupInfo.addTable(tableName);
365    }
366
367    // flush changed groupinfo
368    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
369    newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
370    newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
371    flushConfig(newGroupMap);
372  }
373
374  @Override
375  public synchronized void removeServers(Set<Address> servers) throws IOException {
376    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
377    for (Address el : servers) {
378      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
379      if (rsGroupInfo != null) {
380        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
381        if (newRsGroupInfo == null) {
382          rsGroupInfo.removeServer(el);
383          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
384        } else {
385          newRsGroupInfo.removeServer(el);
386          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
387        }
388      } else {
389        LOG.warn("Server " + el + " does not belong to any rsgroup.");
390      }
391    }
392
393    if (rsGroupInfos.size() > 0) {
394      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
395      newGroupMap.putAll(rsGroupInfos);
396      flushConfig(newGroupMap);
397    }
398  }
399
400  @Override
401  public void renameRSGroup(String oldName, String newName) throws IOException {
402    checkGroupName(oldName);
403    checkGroupName(newName);
404    if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) {
405      throw new ConstraintException("Can't rename default rsgroup");
406    }
407    RSGroupInfo oldGroup = getRSGroup(oldName);
408    if (oldGroup == null) {
409      throw new ConstraintException("RSGroup " + oldName + " does not exist");
410    }
411    if (rsGroupMap.containsKey(newName)) {
412      throw new ConstraintException("Group already exists: " + newName);
413    }
414
415    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
416    newGroupMap.remove(oldName);
417    RSGroupInfo newGroup =
418      new RSGroupInfo(newName, (SortedSet<Address>) oldGroup.getServers(), oldGroup.getTables());
419    newGroupMap.put(newName, newGroup);
420    flushConfig(newGroupMap);
421  }
422
423  /**
424   * Will try to get the rsgroup from {@code tableMap} first then try to get the rsgroup from
425   * {@code script} try to get the rsgroup from the {@link NamespaceDescriptor} lastly. If still not
426   * present, return default group.
427   */
428  @Override
429  public RSGroupInfo determineRSGroupInfoForTable(TableName tableName) throws IOException {
430    RSGroupInfo groupFromOldRSGroupInfo = getRSGroup(getRSGroupOfTable(tableName));
431    if (groupFromOldRSGroupInfo != null) {
432      return groupFromOldRSGroupInfo;
433    }
434    // RSGroup information determined by administrator.
435    RSGroupInfo groupDeterminedByAdmin = getRSGroup(
436      script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString()));
437    if (groupDeterminedByAdmin != null) {
438      return groupDeterminedByAdmin;
439    }
440    // Finally, we will try to fall back to namespace as rsgroup if exists
441    ClusterSchema clusterSchema = masterServices.getClusterSchema();
442    if (clusterSchema == null) {
443      if (TableName.isMetaTableName(tableName)) {
444        LOG.info("Can not get the namespace rs group config for meta table, since the"
445          + " meta table is not online yet, will use default group to assign meta first");
446      } else {
447        LOG.warn("ClusterSchema is null, can only use default rsgroup, should not happen?");
448      }
449    } else {
450      NamespaceDescriptor nd = clusterSchema.getNamespace(tableName.getNamespaceAsString());
451      RSGroupInfo groupNameOfNs =
452        getRSGroup(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP));
453      if (groupNameOfNs != null) {
454        return groupNameOfNs;
455      }
456    }
457    return getRSGroup(RSGroupInfo.DEFAULT_GROUP);
458  }
459
460  @Override
461  public void updateRSGroupConfig(String groupName, Map<String, String> configuration)
462    throws IOException {
463    if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) {
464      // We do not persist anything of default group, therefore, it is not supported to update
465      // default group's configuration which lost once master down.
466      throw new ConstraintException(
467        "configuration of " + RSGroupInfo.DEFAULT_GROUP + " can't be stored persistently");
468    }
469    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
470    new HashSet<>(rsGroupInfo.getConfiguration().keySet())
471      .forEach(rsGroupInfo::removeConfiguration);
472    configuration.forEach(rsGroupInfo::setConfiguration);
473    flushConfig();
474  }
475
476  List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
477    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
478    try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
479      ResultScanner scanner = table.getScanner(new Scan())) {
480      for (Result result;;) {
481        result = scanner.next();
482        if (result == null) {
483          break;
484        }
485        RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
486          .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
487        rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
488      }
489    }
490    return rsGroupInfoList;
491  }
492
493  List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
494    String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
495    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
496    // Overwrite any info stored by table, this takes precedence
497    try {
498      if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
499        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
500        if (children == null) {
501          return RSGroupInfoList;
502        }
503        for (String znode : children) {
504          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
505          if (data != null && data.length > 0) {
506            ProtobufUtil.expectPBMagicPrefix(data);
507            ByteArrayInputStream bis =
508              new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
509            RSGroupInfoList
510              .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
511          }
512        }
513        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
514      }
515    } catch (KeeperException | DeserializationException | InterruptedException e) {
516      throw new IOException("Failed to read rsGroupZNode", e);
517    }
518    return RSGroupInfoList;
519  }
520
521  @Override
522  public void refresh() throws IOException {
523    refresh(false);
524  }
525
526  /**
527   * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
528   * startup of the manager.
529   */
530  private synchronized void refresh(boolean forceOnline) throws IOException {
531    List<RSGroupInfo> groupList = new LinkedList<>();
532
533    // Overwrite anything read from zk, group table is source of truth
534    // if online read from GROUP table
535    if (forceOnline || isOnline()) {
536      LOG.debug("Refreshing in Online mode.");
537      groupList.addAll(retrieveGroupListFromGroupTable());
538    } else {
539      LOG.debug("Refreshing in Offline mode.");
540      groupList.addAll(retrieveGroupListFromZookeeper());
541    }
542
543    // refresh default group, prune
544    NavigableSet<TableName> orphanTables = new TreeSet<>();
545    for (String entry : masterServices.getTableDescriptors().getAll().keySet()) {
546      orphanTables.add(TableName.valueOf(entry));
547    }
548    for (RSGroupInfo group : groupList) {
549      if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
550        orphanTables.removeAll(group.getTables());
551      }
552    }
553
554    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
555    // from region group table or zk
556    groupList
557      .add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList), orphanTables));
558
559    // populate the data
560    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
561    HashMap<TableName, String> newTableMap = Maps.newHashMap();
562    for (RSGroupInfo group : groupList) {
563      newGroupMap.put(group.getName(), group);
564      for (TableName table : group.getTables()) {
565        newTableMap.put(table, group.getName());
566      }
567    }
568    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
569    updateCacheOfRSGroups(rsGroupMap.keySet());
570  }
571
572  private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap)
573    throws IOException {
574    Map<TableName, String> newTableMap = Maps.newHashMap();
575    List<Mutation> mutations = Lists.newArrayList();
576
577    // populate deletes
578    for (String groupName : prevRSGroups) {
579      if (!groupMap.containsKey(groupName)) {
580        Delete d = new Delete(Bytes.toBytes(groupName));
581        mutations.add(d);
582      }
583    }
584
585    // populate puts
586    for (RSGroupInfo RSGroupInfo : groupMap.values()) {
587      RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
588      Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
589      p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
590      mutations.add(p);
591      for (TableName entry : RSGroupInfo.getTables()) {
592        newTableMap.put(entry, RSGroupInfo.getName());
593      }
594    }
595
596    if (mutations.size() > 0) {
597      multiMutate(mutations);
598    }
599    return newTableMap;
600  }
601
602  private synchronized void flushConfig() throws IOException {
603    flushConfig(this.rsGroupMap);
604  }
605
606  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
607    Map<TableName, String> newTableMap;
608
609    // For offline mode persistence is still unavailable
610    // We're refreshing in-memory state but only for servers in default group
611    if (!isOnline()) {
612      if (newGroupMap == this.rsGroupMap) {
613        // When newGroupMap is this.rsGroupMap itself,
614        // do not need to check default group and other groups as followed
615        return;
616      }
617
618      Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap);
619      RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
620      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
621      if (
622        !oldGroupMap.equals(newGroupMap)
623          /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables()
624            .equals(newDefaultGroup.getTables())
625        /* compare tables in default group */
626      ) {
627        throw new IOException("Only servers in default group can be updated during offline mode");
628      }
629
630      // Restore newGroupMap by putting its default group back
631      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
632
633      // Refresh rsGroupMap
634      // according to the inputted newGroupMap (an updated copy of rsGroupMap)
635      rsGroupMap = newGroupMap;
636
637      // Do not need to update tableMap
638      // because only the update on servers in default group is allowed above,
639      // or IOException will be thrown
640      return;
641    }
642
643    /* For online mode, persist to Zookeeper */
644    newTableMap = flushConfigTable(newGroupMap);
645
646    // Make changes visible after having been persisted to the source of truth
647    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
648
649    try {
650      String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
651      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
652
653      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
654      for (String groupName : prevRSGroups) {
655        if (!newGroupMap.containsKey(groupName)) {
656          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
657          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
658        }
659      }
660
661      for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
662        String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
663        RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
664        LOG.debug("Updating znode: " + znode);
665        ZKUtil.createAndFailSilent(watcher, znode);
666        zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
667        zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
668          ProtobufUtil.prependPBMagic(proto.toByteArray())));
669      }
670      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
671
672      ZKUtil.multiOrSequential(watcher, zkOps, false);
673    } catch (KeeperException e) {
674      LOG.error("Failed to write to rsGroupZNode", e);
675      masterServices.abort("Failed to write to rsGroupZNode", e);
676      throw new IOException("Failed to write to rsGroupZNode", e);
677    }
678    updateCacheOfRSGroups(newGroupMap.keySet());
679  }
680
681  /**
682   * Make changes visible. Caller must be synchronized on 'this'.
683   */
684  private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
685    Map<TableName, String> newTableMap) {
686    // Make maps Immutable.
687    this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
688    this.tableMap = Collections.unmodifiableMap(newTableMap);
689  }
690
691  /**
692   * Update cache of rsgroups. Caller must be synchronized on 'this'.
693   * @param currentGroups Current list of Groups.
694   */
695  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
696    this.prevRSGroups.clear();
697    this.prevRSGroups.addAll(currentGroups);
698  }
699
700  // Called by getDefaultServers. Presume it has lock in place.
701  private List<ServerName> getOnlineRS() throws IOException {
702    if (masterServices != null) {
703      return masterServices.getServerManager().getOnlineServersList();
704    }
705    LOG.debug("Reading online RS from zookeeper");
706    List<ServerName> servers = new LinkedList<>();
707    try {
708      for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
709        servers.add(ServerName.parseServerName(el));
710      }
711    } catch (KeeperException e) {
712      throw new IOException("Failed to retrieve server list from zookeeper", e);
713    }
714    return servers;
715  }
716
717  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
718  private SortedSet<Address> getDefaultServers() throws IOException {
719    return getDefaultServers(listRSGroups());
720  }
721
722  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
723  private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList)
724    throws IOException {
725    // Build a list of servers in other groups than default group, from rsGroupMap
726    Set<Address> serversInOtherGroup = new HashSet<>();
727    for (RSGroupInfo group : rsGroupInfoList) {
728      if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group
729        serversInOtherGroup.addAll(group.getServers());
730      }
731    }
732
733    // Get all online servers from Zookeeper and find out servers in default group
734    SortedSet<Address> defaultServers = Sets.newTreeSet();
735    for (ServerName serverName : getOnlineRS()) {
736      Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
737      if (!serversInOtherGroup.contains(server)) { // not in other groups
738        defaultServers.add(server);
739      }
740    }
741    return defaultServers;
742  }
743
744  // Called by ServerEventsListenerThread. Synchronize on this because redoing
745  // the rsGroupMap then writing it out.
746  private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException {
747    RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
748    RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables());
749    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
750    newGroupMap.put(newInfo.getName(), newInfo);
751    flushConfig(newGroupMap);
752  }
753
754  /**
755   * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
756   * servers. Notifications about server changes are received by registering {@link ServerListener}.
757   * As a listener, we need to return immediately, so the real work of updating the servers is done
758   * asynchronously in this thread.
759   */
760  private class ServerEventsListenerThread extends Thread implements ServerListener {
761    private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
762    private boolean changed = false;
763
764    ServerEventsListenerThread() {
765      setDaemon(true);
766    }
767
768    @Override
769    public void serverAdded(ServerName serverName) {
770      serverChanged();
771    }
772
773    @Override
774    public void serverRemoved(ServerName serverName) {
775      serverChanged();
776    }
777
778    private synchronized void serverChanged() {
779      changed = true;
780      this.notify();
781    }
782
783    @Override
784    public void run() {
785      setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
786      SortedSet<Address> prevDefaultServers = new TreeSet<>();
787      while (isMasterRunning(masterServices)) {
788        try {
789          LOG.info("Updating default servers.");
790          SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
791          if (!servers.equals(prevDefaultServers)) {
792            RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
793            prevDefaultServers = servers;
794            LOG.info("Updated with servers: " + servers.size());
795          }
796          try {
797            synchronized (this) {
798              while (!changed) {
799                wait();
800              }
801              changed = false;
802            }
803          } catch (InterruptedException e) {
804            LOG.warn("Interrupted", e);
805          }
806        } catch (IOException e) {
807          LOG.warn("Failed to update default servers", e);
808        }
809      }
810    }
811  }
812
813  private class RSGroupStartupWorker extends Thread {
814    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
815    private volatile boolean online = false;
816
817    RSGroupStartupWorker() {
818      super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
819      setDaemon(true);
820    }
821
822    @Override
823    public void run() {
824      if (waitForGroupTableOnline()) {
825        LOG.info("GroupBasedLoadBalancer is now online");
826      } else {
827        LOG.warn("Quit without making region group table online");
828      }
829    }
830
831    private boolean waitForGroupTableOnline() {
832      while (isMasterRunning(masterServices)) {
833        try {
834          TableStateManager tsm = masterServices.getTableStateManager();
835          if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
836            createRSGroupTable();
837          }
838          // try reading from the table
839          try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
840            table.get(new Get(ROW_KEY));
841          }
842          LOG.info(
843            "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
844          RSGroupInfoManagerImpl.this.refresh(true);
845          online = true;
846          // flush any inconsistencies between ZK and HTable
847          RSGroupInfoManagerImpl.this.flushConfig();
848          return true;
849        } catch (Exception e) {
850          LOG.warn("Failed to perform check", e);
851          // 100ms is short so let's just ignore the interrupt
852          Threads.sleepWithoutInterrupt(100);
853        }
854      }
855      return false;
856    }
857
858    private void createRSGroupTable() throws IOException {
859      OptionalLong optProcId = masterServices.getProcedures().stream()
860        .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
861        .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
862        .findFirst();
863      long procId;
864      if (optProcId.isPresent()) {
865        procId = optProcId.getAsLong();
866      } else {
867        procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
868      }
869      // wait for region to be online
870      int tries = 600;
871      while (
872        !(masterServices.getMasterProcedureExecutor().isFinished(procId))
873          && masterServices.getMasterProcedureExecutor().isRunning() && tries > 0
874      ) {
875        try {
876          Thread.sleep(100);
877        } catch (InterruptedException e) {
878          throw new IOException("Wait interrupted ", e);
879        }
880        tries--;
881      }
882      if (tries <= 0) {
883        throw new IOException("Failed to create group table in a given time.");
884      } else {
885        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
886        if (result != null && result.isFailed()) {
887          throw new IOException(
888            "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
889        }
890      }
891    }
892
893    public boolean isOnline() {
894      return online;
895    }
896  }
897
898  private static boolean isMasterRunning(MasterServices masterServices) {
899    return !masterServices.isAborted() && !masterServices.isStopped();
900  }
901
902  private void multiMutate(List<Mutation> mutations) throws IOException {
903    try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
904      CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
905      MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
906        MultiRowMutationProtos.MutateRowsRequest.newBuilder();
907      for (Mutation mutation : mutations) {
908        if (mutation instanceof Put) {
909          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
910            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
911            mutation));
912        } else if (mutation instanceof Delete) {
913          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
914            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
915            mutation));
916        } else {
917          throw new DoNotRetryIOException(
918            "multiMutate doesn't support " + mutation.getClass().getName());
919        }
920      }
921
922      MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
923        MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
924      try {
925        service.mutateRows(null, mmrBuilder.build());
926      } catch (ServiceException ex) {
927        ProtobufUtil.toIOException(ex);
928      }
929    }
930  }
931
932  private void checkGroupName(String groupName) throws ConstraintException {
933    if (!groupName.matches("[a-zA-Z0-9_]+")) {
934      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
935    }
936  }
937}