001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import com.google.protobuf.ServiceException;
021import java.io.ByteArrayInputStream;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.NavigableSet;
031import java.util.OptionalLong;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035
036import org.apache.hadoop.conf.Configuration;
037import org.apache.hadoop.hbase.Coprocessor;
038import org.apache.hadoop.hbase.DoNotRetryIOException;
039import org.apache.hadoop.hbase.NamespaceDescriptor;
040import org.apache.hadoop.hbase.ServerName;
041import org.apache.hadoop.hbase.TableName;
042import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
043import org.apache.hadoop.hbase.client.Connection;
044import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
045import org.apache.hadoop.hbase.client.Delete;
046import org.apache.hadoop.hbase.client.Get;
047import org.apache.hadoop.hbase.client.Mutation;
048import org.apache.hadoop.hbase.client.Put;
049import org.apache.hadoop.hbase.client.Result;
050import org.apache.hadoop.hbase.client.ResultScanner;
051import org.apache.hadoop.hbase.client.Scan;
052import org.apache.hadoop.hbase.client.Table;
053import org.apache.hadoop.hbase.client.TableDescriptor;
054import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
055import org.apache.hadoop.hbase.constraint.ConstraintException;
056import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
057import org.apache.hadoop.hbase.exceptions.DeserializationException;
058import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
059import org.apache.hadoop.hbase.master.ClusterSchema;
060import org.apache.hadoop.hbase.master.MasterServices;
061import org.apache.hadoop.hbase.master.ServerListener;
062import org.apache.hadoop.hbase.master.TableStateManager;
063import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
064import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
065import org.apache.hadoop.hbase.net.Address;
066import org.apache.hadoop.hbase.procedure2.Procedure;
067import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
068import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
069import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
070import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
071import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
072import org.apache.hadoop.hbase.util.Bytes;
073import org.apache.hadoop.hbase.util.Threads;
074import org.apache.hadoop.hbase.zookeeper.ZKUtil;
075import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
076import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
077import org.apache.hadoop.util.Shell;
078import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
079import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
080import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
081import org.apache.yetus.audience.InterfaceAudience;
082import org.apache.zookeeper.KeeperException;
083import org.slf4j.Logger;
084import org.slf4j.LoggerFactory;
085
086/**
087 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
088 * persistence store for the group information. It also makes use of zookeeper to store group
089 * information needed for bootstrapping during offline mode.
090 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
091 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
092 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
093 * zk) on each modification.
094 * <p>
095 * Mutations on state are synchronized but reads can continue without having to wait on an instance
096 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
097 * state are read-only, just-in-case (see flushConfig).
098 * <p>
099 * Reads must not block else there is a danger we'll deadlock.
100 * <p>
101 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
102 * on the results of the query modifying cache in zookeeper without another thread making
103 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
104 * access concurrently. Reads must be able to continue concurrently.
105 */
106@InterfaceAudience.Private
107final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
108  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
109
110  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
111  private static final TableDescriptor RSGROUP_TABLE_DESC;
112  static {
113    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
114      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
115      .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
116    try {
117      builder.setCoprocessor(
118        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
119          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
120    } catch (IOException ex) {
121      throw new Error(ex);
122    }
123    RSGROUP_TABLE_DESC = builder.build();
124  }
125
126  // There two Maps are immutable and wholesale replaced on each modification
127  // so are safe to access concurrently. See class comment.
128  private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
129  private volatile Map<TableName, String> tableMap = Collections.emptyMap();
130
131  private final MasterServices masterServices;
132  private final Connection conn;
133  private final ZKWatcher watcher;
134  private final RSGroupStartupWorker rsGroupStartupWorker;
135  // contains list of groups that were last flushed to persistent store
136  private Set<String> prevRSGroups = new HashSet<>();
137  private final ServerEventsListenerThread serverEventsListenerThread =
138    new ServerEventsListenerThread();
139
140  /** Get rsgroup table mapping script */
141  RSGroupMappingScript script;
142
143  // Package visibility for testing
144  static class RSGroupMappingScript {
145
146    static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script";
147    static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT =
148      "hbase.rsgroup.table.mapping.script.timeout";
149
150    private Shell.ShellCommandExecutor rsgroupMappingScript;
151
152    RSGroupMappingScript(Configuration conf) {
153      String script = conf.get(RS_GROUP_MAPPING_SCRIPT);
154      if (script == null || script.isEmpty()) {
155        return;
156      }
157
158      rsgroupMappingScript = new Shell.ShellCommandExecutor(
159        new String[] { script, "", "" }, null, null,
160        conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000) // 5 seconds
161      );
162    }
163
164    String getRSGroup(String namespace, String tablename) {
165      if (rsgroupMappingScript == null) {
166        return null;
167      }
168      String[] exec = rsgroupMappingScript.getExecString();
169      exec[1] = namespace;
170      exec[2] = tablename;
171      try {
172        rsgroupMappingScript.execute();
173      } catch (IOException e) {
174        LOG.error("Failed to get RSGroup from script for table {}:{}", namespace, tablename, e);
175        return null;
176      }
177      return rsgroupMappingScript.getOutput().trim();
178    }
179  }
180
181  private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
182    this.masterServices = masterServices;
183    this.watcher = masterServices.getZooKeeper();
184    this.conn = masterServices.getConnection();
185    this.rsGroupStartupWorker = new RSGroupStartupWorker();
186    script = new RSGroupMappingScript(masterServices.getConfiguration());
187  }
188
189  private synchronized void init() throws IOException {
190    refresh();
191    serverEventsListenerThread.start();
192    masterServices.getServerManager().registerListener(serverEventsListenerThread);
193  }
194
195  static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
196    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master);
197    instance.init();
198    return instance;
199  }
200
201  public void start() {
202    // create system table of rsgroup
203    rsGroupStartupWorker.start();
204  }
205
206  @Override
207  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
208    checkGroupName(rsGroupInfo.getName());
209    if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
210      rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
211      throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
212    }
213    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
214    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
215    flushConfig(newGroupMap);
216  }
217
218  private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException {
219    RSGroupInfo rsGroupInfo = getRSGroup(groupName);
220    if (rsGroupInfo == null) {
221      throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist");
222    }
223    return rsGroupInfo;
224  }
225
226  /**
227   * @param master the master to get online servers for
228   * @return Set of online Servers named for their hostname and port (not ServerName).
229   */
230  private static Set<Address> getOnlineServers(final MasterServices master) {
231    Set<Address> onlineServers = new HashSet<Address>();
232    if (master == null) {
233      return onlineServers;
234    }
235
236    for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
237      onlineServers.add(server.getAddress());
238    }
239    return onlineServers;
240  }
241
242  @Override
243  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
244      String dstGroup) throws IOException {
245    RSGroupInfo src = getRSGroupInfo(srcGroup);
246    RSGroupInfo dst = getRSGroupInfo(dstGroup);
247    Set<Address> movedServers = new HashSet<>();
248    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
249    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
250    // rsgroup of dead servers that are to come back later).
251    Set<Address> onlineServers =
252      dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices)
253        : null;
254    for (Address el : servers) {
255      src.removeServer(el);
256      if (onlineServers != null) {
257        if (!onlineServers.contains(el)) {
258          if (LOG.isDebugEnabled()) {
259            LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
260          }
261          continue;
262        }
263      }
264      dst.addServer(el);
265      movedServers.add(el);
266    }
267    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
268    newGroupMap.put(src.getName(), src);
269    newGroupMap.put(dst.getName(), dst);
270    flushConfig(newGroupMap);
271    return movedServers;
272  }
273
274  @Override
275  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
276    for (RSGroupInfo info : rsGroupMap.values()) {
277      if (info.containsServer(serverHostPort)) {
278        return info;
279      }
280    }
281    return null;
282  }
283
284  @Override
285  public RSGroupInfo getRSGroup(String groupName) {
286    return rsGroupMap.get(groupName);
287  }
288
289  @Override
290  public String getRSGroupOfTable(TableName tableName) {
291    return tableMap.get(tableName);
292  }
293
294  @Override
295  public synchronized void moveTables(Set<TableName> tableNames, String groupName)
296      throws IOException {
297    // Check if rsGroup contains the destination rsgroup
298    if (groupName != null && !rsGroupMap.containsKey(groupName)) {
299      throw new DoNotRetryIOException("Group " + groupName + " does not exist");
300    }
301
302    // Make a copy of rsGroupMap to update
303    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
304
305    // Remove tables from their original rsgroups
306    // and update the copy of rsGroupMap
307    for (TableName tableName : tableNames) {
308      if (tableMap.containsKey(tableName)) {
309        RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
310        src.removeTable(tableName);
311        newGroupMap.put(src.getName(), src);
312      }
313    }
314
315    // Add tables to the destination rsgroup
316    // and update the copy of rsGroupMap
317    if (groupName != null) {
318      RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName));
319      dstGroup.addAllTables(tableNames);
320      newGroupMap.put(dstGroup.getName(), dstGroup);
321    }
322
323    // Flush according to the updated copy of rsGroupMap
324    flushConfig(newGroupMap);
325  }
326
327  @Override
328  public synchronized void removeRSGroup(String groupName) throws IOException {
329    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
330      throw new DoNotRetryIOException(
331        "Group " + groupName + " does not exist or is a reserved " + "group");
332    }
333    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
334    newGroupMap.remove(groupName);
335    flushConfig(newGroupMap);
336  }
337
338  @Override
339  public List<RSGroupInfo> listRSGroups() {
340    return Lists.newLinkedList(rsGroupMap.values());
341  }
342
343  @Override
344  public boolean isOnline() {
345    return rsGroupStartupWorker.isOnline();
346  }
347
348  @Override
349  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup,
350      String dstGroup) throws IOException {
351    // get server's group
352    RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
353    RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
354
355    // move servers
356    for (Address el : servers) {
357      srcGroupInfo.removeServer(el);
358      dstGroupInfo.addServer(el);
359    }
360    // move tables
361    for (TableName tableName : tables) {
362      srcGroupInfo.removeTable(tableName);
363      dstGroupInfo.addTable(tableName);
364    }
365
366    // flush changed groupinfo
367    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
368    newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
369    newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
370    flushConfig(newGroupMap);
371  }
372
373  @Override
374  public synchronized void removeServers(Set<Address> servers) throws IOException {
375    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
376    for (Address el : servers) {
377      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
378      if (rsGroupInfo != null) {
379        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
380        if (newRsGroupInfo == null) {
381          rsGroupInfo.removeServer(el);
382          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
383        } else {
384          newRsGroupInfo.removeServer(el);
385          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
386        }
387      } else {
388        LOG.warn("Server " + el + " does not belong to any rsgroup.");
389      }
390    }
391
392    if (rsGroupInfos.size() > 0) {
393      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
394      newGroupMap.putAll(rsGroupInfos);
395      flushConfig(newGroupMap);
396    }
397  }
398
399  @Override
400  public void renameRSGroup(String oldName, String newName) throws IOException {
401    checkGroupName(oldName);
402    checkGroupName(newName);
403    if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) {
404      throw new ConstraintException("Can't rename default rsgroup");
405    }
406    RSGroupInfo oldGroup = getRSGroup(oldName);
407    if (oldGroup == null) {
408      throw new ConstraintException("RSGroup " + oldName + " does not exist");
409    }
410    if (rsGroupMap.containsKey(newName)) {
411      throw new ConstraintException("Group already exists: " + newName);
412    }
413
414    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
415    newGroupMap.remove(oldName);
416    RSGroupInfo newGroup = new RSGroupInfo(newName,
417      (SortedSet<Address>) oldGroup.getServers(), oldGroup.getTables());
418    newGroupMap.put(newName, newGroup);
419    flushConfig(newGroupMap);
420  }
421
422  /**
423   * Will try to get the rsgroup from {@code tableMap} first
424   * then try to get the rsgroup from {@code script}
425   * try to get the rsgroup from the {@link NamespaceDescriptor} lastly.
426   * If still not present, return default group.
427   */
428  @Override
429  public RSGroupInfo determineRSGroupInfoForTable(TableName tableName)
430    throws IOException {
431    RSGroupInfo groupFromOldRSGroupInfo = getRSGroup(getRSGroupOfTable(tableName));
432    if (groupFromOldRSGroupInfo != null) {
433      return groupFromOldRSGroupInfo;
434    }
435    // RSGroup information determined by administrator.
436    RSGroupInfo groupDeterminedByAdmin = getRSGroup(
437      script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString()));
438    if (groupDeterminedByAdmin != null) {
439      return groupDeterminedByAdmin;
440    }
441    // Finally, we will try to fall back to namespace as rsgroup if exists
442    ClusterSchema clusterSchema = masterServices.getClusterSchema();
443    if (clusterSchema == null) {
444      if (TableName.isMetaTableName(tableName)) {
445        LOG.info("Can not get the namespace rs group config for meta table, since the" +
446          " meta table is not online yet, will use default group to assign meta first");
447      } else {
448        LOG.warn("ClusterSchema is null, can only use default rsgroup, should not happen?");
449      }
450    } else {
451      NamespaceDescriptor nd = clusterSchema.getNamespace(tableName.getNamespaceAsString());
452      RSGroupInfo groupNameOfNs =
453        getRSGroup(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP));
454      if (groupNameOfNs != null) {
455        return groupNameOfNs;
456      }
457    }
458    return getRSGroup(RSGroupInfo.DEFAULT_GROUP);
459  }
460
461  @Override
462  public void updateRSGroupConfig(String groupName, Map<String, String> configuration)
463      throws IOException {
464    if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) {
465      // We do not persist anything of default group, therefore, it is not supported to update
466      // default group's configuration which lost once master down.
467      throw new ConstraintException("configuration of " + RSGroupInfo.DEFAULT_GROUP
468          + " can't be stored persistently");
469    }
470    RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName);
471    new HashSet<>(rsGroupInfo.getConfiguration().keySet())
472        .forEach(rsGroupInfo::removeConfiguration);
473    configuration.forEach(rsGroupInfo::setConfiguration);
474    flushConfig();
475  }
476
477  List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
478    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
479    try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
480        ResultScanner scanner = table.getScanner(new Scan())) {
481      for (Result result;;) {
482        result = scanner.next();
483        if (result == null) {
484          break;
485        }
486        RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
487          .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
488        rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
489      }
490    }
491    return rsGroupInfoList;
492  }
493
494  List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
495    String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
496    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
497    // Overwrite any info stored by table, this takes precedence
498    try {
499      if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
500        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
501        if (children == null) {
502          return RSGroupInfoList;
503        }
504        for (String znode : children) {
505          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
506          if (data != null && data.length > 0) {
507            ProtobufUtil.expectPBMagicPrefix(data);
508            ByteArrayInputStream bis =
509              new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
510            RSGroupInfoList
511              .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
512          }
513        }
514        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
515      }
516    } catch (KeeperException | DeserializationException | InterruptedException e) {
517      throw new IOException("Failed to read rsGroupZNode", e);
518    }
519    return RSGroupInfoList;
520  }
521
522  @Override
523  public void refresh() throws IOException {
524    refresh(false);
525  }
526
527  /**
528   * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
529   * startup of the manager.
530   */
531  private synchronized void refresh(boolean forceOnline) throws IOException {
532    List<RSGroupInfo> groupList = new LinkedList<>();
533
534    // Overwrite anything read from zk, group table is source of truth
535    // if online read from GROUP table
536    if (forceOnline || isOnline()) {
537      LOG.debug("Refreshing in Online mode.");
538      groupList.addAll(retrieveGroupListFromGroupTable());
539    } else {
540      LOG.debug("Refreshing in Offline mode.");
541      groupList.addAll(retrieveGroupListFromZookeeper());
542    }
543
544    // refresh default group, prune
545    NavigableSet<TableName> orphanTables = new TreeSet<>();
546    for (String entry : masterServices.getTableDescriptors().getAll().keySet()) {
547      orphanTables.add(TableName.valueOf(entry));
548    }
549    for (RSGroupInfo group : groupList) {
550      if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
551        orphanTables.removeAll(group.getTables());
552      }
553    }
554
555    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
556    // from region group table or zk
557    groupList.add(
558        new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList), orphanTables));
559
560    // populate the data
561    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
562    HashMap<TableName, String> newTableMap = Maps.newHashMap();
563    for (RSGroupInfo group : groupList) {
564      newGroupMap.put(group.getName(), group);
565      for (TableName table : group.getTables()) {
566        newTableMap.put(table, group.getName());
567      }
568    }
569    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
570    updateCacheOfRSGroups(rsGroupMap.keySet());
571  }
572
573  private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap)
574      throws IOException {
575    Map<TableName, String> newTableMap = Maps.newHashMap();
576    List<Mutation> mutations = Lists.newArrayList();
577
578    // populate deletes
579    for (String groupName : prevRSGroups) {
580      if (!groupMap.containsKey(groupName)) {
581        Delete d = new Delete(Bytes.toBytes(groupName));
582        mutations.add(d);
583      }
584    }
585
586    // populate puts
587    for (RSGroupInfo RSGroupInfo : groupMap.values()) {
588      RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
589      Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
590      p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
591      mutations.add(p);
592      for (TableName entry : RSGroupInfo.getTables()) {
593        newTableMap.put(entry, RSGroupInfo.getName());
594      }
595    }
596
597    if (mutations.size() > 0) {
598      multiMutate(mutations);
599    }
600    return newTableMap;
601  }
602
603  private synchronized void flushConfig() throws IOException {
604    flushConfig(this.rsGroupMap);
605  }
606
607  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
608    Map<TableName, String> newTableMap;
609
610    // For offline mode persistence is still unavailable
611    // We're refreshing in-memory state but only for servers in default group
612    if (!isOnline()) {
613      if (newGroupMap == this.rsGroupMap) {
614        // When newGroupMap is this.rsGroupMap itself,
615        // do not need to check default group and other groups as followed
616        return;
617      }
618
619      Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap);
620      RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
621      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
622      if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ ||
623          !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())
624          /* compare tables in default group */) {
625        throw new IOException("Only servers in default group can be updated during offline mode");
626      }
627
628      // Restore newGroupMap by putting its default group back
629      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
630
631      // Refresh rsGroupMap
632      // according to the inputted newGroupMap (an updated copy of rsGroupMap)
633      rsGroupMap = newGroupMap;
634
635      // Do not need to update tableMap
636      // because only the update on servers in default group is allowed above,
637      // or IOException will be thrown
638      return;
639    }
640
641    /* For online mode, persist to Zookeeper */
642    newTableMap = flushConfigTable(newGroupMap);
643
644    // Make changes visible after having been persisted to the source of truth
645    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
646
647    try {
648      String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
649      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
650
651      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
652      for (String groupName : prevRSGroups) {
653        if (!newGroupMap.containsKey(groupName)) {
654          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
655          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
656        }
657      }
658
659      for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
660        String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
661        RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
662        LOG.debug("Updating znode: " + znode);
663        ZKUtil.createAndFailSilent(watcher, znode);
664        zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
665        zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
666          ProtobufUtil.prependPBMagic(proto.toByteArray())));
667      }
668      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
669
670      ZKUtil.multiOrSequential(watcher, zkOps, false);
671    } catch (KeeperException e) {
672      LOG.error("Failed to write to rsGroupZNode", e);
673      masterServices.abort("Failed to write to rsGroupZNode", e);
674      throw new IOException("Failed to write to rsGroupZNode", e);
675    }
676    updateCacheOfRSGroups(newGroupMap.keySet());
677  }
678
679  /**
680   * Make changes visible. Caller must be synchronized on 'this'.
681   */
682  private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
683      Map<TableName, String> newTableMap) {
684    // Make maps Immutable.
685    this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
686    this.tableMap = Collections.unmodifiableMap(newTableMap);
687  }
688
689  /**
690   * Update cache of rsgroups. Caller must be synchronized on 'this'.
691   * @param currentGroups Current list of Groups.
692   */
693  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
694    this.prevRSGroups.clear();
695    this.prevRSGroups.addAll(currentGroups);
696  }
697
698  // Called by getDefaultServers. Presume it has lock in place.
699  private List<ServerName> getOnlineRS() throws IOException {
700    if (masterServices != null) {
701      return masterServices.getServerManager().getOnlineServersList();
702    }
703    LOG.debug("Reading online RS from zookeeper");
704    List<ServerName> servers = new LinkedList<>();
705    try {
706      for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
707        servers.add(ServerName.parseServerName(el));
708      }
709    } catch (KeeperException e) {
710      throw new IOException("Failed to retrieve server list from zookeeper", e);
711    }
712    return servers;
713  }
714
715  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
716  private SortedSet<Address> getDefaultServers() throws IOException {
717    return getDefaultServers(listRSGroups());
718  }
719
720  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
721  private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList)
722      throws IOException {
723    // Build a list of servers in other groups than default group, from rsGroupMap
724    Set<Address> serversInOtherGroup = new HashSet<>();
725    for (RSGroupInfo group : rsGroupInfoList) {
726      if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group
727        serversInOtherGroup.addAll(group.getServers());
728      }
729    }
730
731    // Get all online servers from Zookeeper and find out servers in default group
732    SortedSet<Address> defaultServers = Sets.newTreeSet();
733    for (ServerName serverName : getOnlineRS()) {
734      Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
735      if (!serversInOtherGroup.contains(server)) { // not in other groups
736        defaultServers.add(server);
737      }
738    }
739    return defaultServers;
740  }
741
742  // Called by ServerEventsListenerThread. Synchronize on this because redoing
743  // the rsGroupMap then writing it out.
744  private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException {
745    RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
746    RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables());
747    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
748    newGroupMap.put(newInfo.getName(), newInfo);
749    flushConfig(newGroupMap);
750  }
751
752  /**
753   * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
754   * servers. Notifications about server changes are received by registering {@link ServerListener}.
755   * As a listener, we need to return immediately, so the real work of updating the servers is done
756   * asynchronously in this thread.
757   */
758  private class ServerEventsListenerThread extends Thread implements ServerListener {
759    private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
760    private boolean changed = false;
761
762    ServerEventsListenerThread() {
763      setDaemon(true);
764    }
765
766    @Override
767    public void serverAdded(ServerName serverName) {
768      serverChanged();
769    }
770
771    @Override
772    public void serverRemoved(ServerName serverName) {
773      serverChanged();
774    }
775
776    private synchronized void serverChanged() {
777      changed = true;
778      this.notify();
779    }
780
781    @Override
782    public void run() {
783      setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
784      SortedSet<Address> prevDefaultServers = new TreeSet<>();
785      while (isMasterRunning(masterServices)) {
786        try {
787          LOG.info("Updating default servers.");
788          SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
789          if (!servers.equals(prevDefaultServers)) {
790            RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
791            prevDefaultServers = servers;
792            LOG.info("Updated with servers: " + servers.size());
793          }
794          try {
795            synchronized (this) {
796              while (!changed) {
797                wait();
798              }
799              changed = false;
800            }
801          } catch (InterruptedException e) {
802            LOG.warn("Interrupted", e);
803          }
804        } catch (IOException e) {
805          LOG.warn("Failed to update default servers", e);
806        }
807      }
808    }
809  }
810
811  private class RSGroupStartupWorker extends Thread {
812    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
813    private volatile boolean online = false;
814
815    RSGroupStartupWorker() {
816      super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
817      setDaemon(true);
818    }
819
820    @Override
821    public void run() {
822      if (waitForGroupTableOnline()) {
823        LOG.info("GroupBasedLoadBalancer is now online");
824      } else {
825        LOG.warn("Quit without making region group table online");
826      }
827    }
828
829    private boolean waitForGroupTableOnline() {
830      while (isMasterRunning(masterServices)) {
831        try {
832          TableStateManager tsm = masterServices.getTableStateManager();
833          if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
834            createRSGroupTable();
835          }
836          // try reading from the table
837          try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
838            table.get(new Get(ROW_KEY));
839          }
840          LOG.info(
841            "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
842          RSGroupInfoManagerImpl.this.refresh(true);
843          online = true;
844          // flush any inconsistencies between ZK and HTable
845          RSGroupInfoManagerImpl.this.flushConfig();
846          return true;
847        } catch (Exception e) {
848          LOG.warn("Failed to perform check", e);
849          // 100ms is short so let's just ignore the interrupt
850          Threads.sleepWithoutInterrupt(100);
851        }
852      }
853      return false;
854    }
855
856    private void createRSGroupTable() throws IOException {
857      OptionalLong optProcId = masterServices.getProcedures().stream()
858        .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
859        .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
860        .findFirst();
861      long procId;
862      if (optProcId.isPresent()) {
863        procId = optProcId.getAsLong();
864      } else {
865        procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
866      }
867      // wait for region to be online
868      int tries = 600;
869      while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) &&
870        masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) {
871        try {
872          Thread.sleep(100);
873        } catch (InterruptedException e) {
874          throw new IOException("Wait interrupted ", e);
875        }
876        tries--;
877      }
878      if (tries <= 0) {
879        throw new IOException("Failed to create group table in a given time.");
880      } else {
881        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
882        if (result != null && result.isFailed()) {
883          throw new IOException(
884            "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
885        }
886      }
887    }
888
889    public boolean isOnline() {
890      return online;
891    }
892  }
893
894  private static boolean isMasterRunning(MasterServices masterServices) {
895    return !masterServices.isAborted() && !masterServices.isStopped();
896  }
897
898  private void multiMutate(List<Mutation> mutations) throws IOException {
899    try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
900      CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
901      MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
902        MultiRowMutationProtos.MutateRowsRequest.newBuilder();
903      for (Mutation mutation : mutations) {
904        if (mutation instanceof Put) {
905          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
906            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
907            mutation));
908        } else if (mutation instanceof Delete) {
909          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
910            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
911            mutation));
912        } else {
913          throw new DoNotRetryIOException(
914            "multiMutate doesn't support " + mutation.getClass().getName());
915        }
916      }
917
918      MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
919        MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
920      try {
921        service.mutateRows(null, mmrBuilder.build());
922      } catch (ServiceException ex) {
923        ProtobufUtil.toIOException(ex);
924      }
925    }
926  }
927
928  private void checkGroupName(String groupName) throws ConstraintException {
929    if (!groupName.matches("[a-zA-Z0-9_]+")) {
930      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
931    }
932  }
933}