001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.rsgroup;
019
020import com.google.protobuf.ServiceException;
021import java.io.ByteArrayInputStream;
022import java.io.IOException;
023import java.util.ArrayList;
024import java.util.Collections;
025import java.util.HashMap;
026import java.util.HashSet;
027import java.util.LinkedList;
028import java.util.List;
029import java.util.Map;
030import java.util.NavigableSet;
031import java.util.OptionalLong;
032import java.util.Set;
033import java.util.SortedSet;
034import java.util.TreeSet;
035import org.apache.hadoop.conf.Configuration;
036import org.apache.hadoop.hbase.Coprocessor;
037import org.apache.hadoop.hbase.DoNotRetryIOException;
038import org.apache.hadoop.hbase.NamespaceDescriptor;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
042import org.apache.hadoop.hbase.client.Connection;
043import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder;
044import org.apache.hadoop.hbase.client.Delete;
045import org.apache.hadoop.hbase.client.Get;
046import org.apache.hadoop.hbase.client.Mutation;
047import org.apache.hadoop.hbase.client.Put;
048import org.apache.hadoop.hbase.client.Result;
049import org.apache.hadoop.hbase.client.ResultScanner;
050import org.apache.hadoop.hbase.client.Scan;
051import org.apache.hadoop.hbase.client.Table;
052import org.apache.hadoop.hbase.client.TableDescriptor;
053import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
054import org.apache.hadoop.hbase.constraint.ConstraintException;
055import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
056import org.apache.hadoop.hbase.exceptions.DeserializationException;
057import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
058import org.apache.hadoop.hbase.master.ClusterSchema;
059import org.apache.hadoop.hbase.master.MasterServices;
060import org.apache.hadoop.hbase.master.ServerListener;
061import org.apache.hadoop.hbase.master.TableStateManager;
062import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
063import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
064import org.apache.hadoop.hbase.net.Address;
065import org.apache.hadoop.hbase.procedure2.Procedure;
066import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
067import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
068import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos;
069import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos;
070import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
071import org.apache.hadoop.hbase.util.Bytes;
072import org.apache.hadoop.hbase.util.Threads;
073import org.apache.hadoop.hbase.zookeeper.ZKUtil;
074import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
075import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
076import org.apache.hadoop.util.Shell;
077import org.apache.yetus.audience.InterfaceAudience;
078import org.apache.zookeeper.KeeperException;
079import org.slf4j.Logger;
080import org.slf4j.LoggerFactory;
081
082import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
083import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
084import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
085import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
086
087/**
088 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the
089 * persistence store for the group information. It also makes use of zookeeper to store group
090 * information needed for bootstrapping during offline mode.
091 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached
092 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong
093 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in
094 * zk) on each modification.
095 * <p>
096 * Mutations on state are synchronized but reads can continue without having to wait on an instance
097 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of
098 * state are read-only, just-in-case (see flushConfig).
099 * <p>
100 * Reads must not block else there is a danger we'll deadlock.
101 * <p>
102 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act
103 * on the results of the query modifying cache in zookeeper without another thread making
104 * intermediate modifications. These clients synchronize on the 'this' instance so no other has
105 * access concurrently. Reads must be able to continue concurrently.
106 */
107@InterfaceAudience.Private
108final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
109  private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);
110
111  /** Table descriptor for <code>hbase:rsgroup</code> catalog table */
112  private static final TableDescriptor RSGROUP_TABLE_DESC;
113  static {
114    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME)
115      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES))
116      .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName());
117    try {
118      builder.setCoprocessor(
119        CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName())
120          .setPriority(Coprocessor.PRIORITY_SYSTEM).build());
121    } catch (IOException ex) {
122      throw new Error(ex);
123    }
124    RSGROUP_TABLE_DESC = builder.build();
125  }
126
127  // There two Maps are immutable and wholesale replaced on each modification
128  // so are safe to access concurrently. See class comment.
129  private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
130  private volatile Map<TableName, String> tableMap = Collections.emptyMap();
131
132  private final MasterServices masterServices;
133  private final Connection conn;
134  private final ZKWatcher watcher;
135  private final RSGroupStartupWorker rsGroupStartupWorker;
136  // contains list of groups that were last flushed to persistent store
137  private Set<String> prevRSGroups = new HashSet<>();
138  private final ServerEventsListenerThread serverEventsListenerThread =
139    new ServerEventsListenerThread();
140
141  /** Get rsgroup table mapping script */
142  @VisibleForTesting
143  RSGroupMappingScript script;
144
145  // Package visibility for testing
146  static class RSGroupMappingScript {
147
148    static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script";
149    static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT =
150      "hbase.rsgroup.table.mapping.script.timeout";
151
152    private Shell.ShellCommandExecutor rsgroupMappingScript;
153
154    RSGroupMappingScript(Configuration conf) {
155      String script = conf.get(RS_GROUP_MAPPING_SCRIPT);
156      if (script == null || script.isEmpty()) {
157        return;
158      }
159
160      rsgroupMappingScript = new Shell.ShellCommandExecutor(
161        new String[] { script, "", "" }, null, null,
162        conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000) // 5 seconds
163      );
164    }
165
166    String getRSGroup(String namespace, String tablename) {
167      if (rsgroupMappingScript == null) {
168        return null;
169      }
170      String[] exec = rsgroupMappingScript.getExecString();
171      exec[1] = namespace;
172      exec[2] = tablename;
173      try {
174        rsgroupMappingScript.execute();
175      } catch (IOException e) {
176        LOG.error("Failed to get RSGroup from script for table {}:{}", namespace, tablename, e);
177        return null;
178      }
179      return rsgroupMappingScript.getOutput().trim();
180    }
181  }
182
183  private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
184    this.masterServices = masterServices;
185    this.watcher = masterServices.getZooKeeper();
186    this.conn = masterServices.getConnection();
187    this.rsGroupStartupWorker = new RSGroupStartupWorker();
188    script = new RSGroupMappingScript(masterServices.getConfiguration());
189  }
190
191  private synchronized void init() throws IOException {
192    refresh();
193    serverEventsListenerThread.start();
194    masterServices.getServerManager().registerListener(serverEventsListenerThread);
195  }
196
197  static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
198    RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master);
199    instance.init();
200    return instance;
201  }
202
203  public void start() {
204    // create system table of rsgroup
205    rsGroupStartupWorker.start();
206  }
207
208  @Override
209  public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
210    checkGroupName(rsGroupInfo.getName());
211    if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
212      rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
213      throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
214    }
215    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
216    newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo);
217    flushConfig(newGroupMap);
218  }
219
220  private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException {
221    RSGroupInfo rsGroupInfo = getRSGroup(groupName);
222    if (rsGroupInfo == null) {
223      throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist");
224    }
225    return rsGroupInfo;
226  }
227
228  /**
229   * @param master the master to get online servers for
230   * @return Set of online Servers named for their hostname and port (not ServerName).
231   */
232  private static Set<Address> getOnlineServers(final MasterServices master) {
233    Set<Address> onlineServers = new HashSet<Address>();
234    if (master == null) {
235      return onlineServers;
236    }
237
238    for (ServerName server : master.getServerManager().getOnlineServers().keySet()) {
239      onlineServers.add(server.getAddress());
240    }
241    return onlineServers;
242  }
243
244  @Override
245  public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup,
246      String dstGroup) throws IOException {
247    RSGroupInfo src = getRSGroupInfo(srcGroup);
248    RSGroupInfo dst = getRSGroupInfo(dstGroup);
249    // If destination is 'default' rsgroup, only add servers that are online. If not online, drop
250    // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a
251    // rsgroup of dead servers that are to come back later).
252    Set<Address> onlineServers =
253      dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices)
254        : null;
255    for (Address el : servers) {
256      src.removeServer(el);
257      if (onlineServers != null) {
258        if (!onlineServers.contains(el)) {
259          if (LOG.isDebugEnabled()) {
260            LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online");
261          }
262          continue;
263        }
264      }
265      dst.addServer(el);
266    }
267    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
268    newGroupMap.put(src.getName(), src);
269    newGroupMap.put(dst.getName(), dst);
270    flushConfig(newGroupMap);
271    return dst.getServers();
272  }
273
274  @Override
275  public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
276    for (RSGroupInfo info : rsGroupMap.values()) {
277      if (info.containsServer(serverHostPort)) {
278        return info;
279      }
280    }
281    return null;
282  }
283
284  @Override
285  public RSGroupInfo getRSGroup(String groupName) {
286    return rsGroupMap.get(groupName);
287  }
288
289  @Override
290  public String getRSGroupOfTable(TableName tableName) {
291    return tableMap.get(tableName);
292  }
293
294  @Override
295  public synchronized void moveTables(Set<TableName> tableNames, String groupName)
296      throws IOException {
297    // Check if rsGroup contains the destination rsgroup
298    if (groupName != null && !rsGroupMap.containsKey(groupName)) {
299      throw new DoNotRetryIOException("Group " + groupName + " does not exist");
300    }
301
302    // Make a copy of rsGroupMap to update
303    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
304
305    // Remove tables from their original rsgroups
306    // and update the copy of rsGroupMap
307    for (TableName tableName : tableNames) {
308      if (tableMap.containsKey(tableName)) {
309        RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName)));
310        src.removeTable(tableName);
311        newGroupMap.put(src.getName(), src);
312      }
313    }
314
315    // Add tables to the destination rsgroup
316    // and update the copy of rsGroupMap
317    if (groupName != null) {
318      RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName));
319      dstGroup.addAllTables(tableNames);
320      newGroupMap.put(dstGroup.getName(), dstGroup);
321    }
322
323    // Flush according to the updated copy of rsGroupMap
324    flushConfig(newGroupMap);
325  }
326
327  @Override
328  public synchronized void removeRSGroup(String groupName) throws IOException {
329    if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
330      throw new DoNotRetryIOException(
331        "Group " + groupName + " does not exist or is a reserved " + "group");
332    }
333    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
334    newGroupMap.remove(groupName);
335    flushConfig(newGroupMap);
336  }
337
338  @Override
339  public List<RSGroupInfo> listRSGroups() {
340    return Lists.newLinkedList(rsGroupMap.values());
341  }
342
343  @Override
344  public boolean isOnline() {
345    return rsGroupStartupWorker.isOnline();
346  }
347
348  @Override
349  public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup,
350      String dstGroup) throws IOException {
351    // get server's group
352    RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup);
353    RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup);
354
355    // move servers
356    for (Address el : servers) {
357      srcGroupInfo.removeServer(el);
358      dstGroupInfo.addServer(el);
359    }
360    // move tables
361    for (TableName tableName : tables) {
362      srcGroupInfo.removeTable(tableName);
363      dstGroupInfo.addTable(tableName);
364    }
365
366    // flush changed groupinfo
367    Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
368    newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo);
369    newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo);
370    flushConfig(newGroupMap);
371  }
372
373  @Override
374  public synchronized void removeServers(Set<Address> servers) throws IOException {
375    Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>();
376    for (Address el : servers) {
377      RSGroupInfo rsGroupInfo = getRSGroupOfServer(el);
378      if (rsGroupInfo != null) {
379        RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName());
380        if (newRsGroupInfo == null) {
381          rsGroupInfo.removeServer(el);
382          rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo);
383        } else {
384          newRsGroupInfo.removeServer(el);
385          rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo);
386        }
387      } else {
388        LOG.warn("Server " + el + " does not belong to any rsgroup.");
389      }
390    }
391
392    if (rsGroupInfos.size() > 0) {
393      Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
394      newGroupMap.putAll(rsGroupInfos);
395      flushConfig(newGroupMap);
396    }
397  }
398
399  @Override
400  public void renameRSGroup(String oldName, String newName) throws IOException {
401    checkGroupName(oldName);
402    checkGroupName(newName);
403    if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) {
404      throw new ConstraintException("Can't rename default rsgroup");
405    }
406
407    RSGroupInfo oldGroup = getRSGroup(oldName);
408    Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
409    newGroupMap.remove(oldName);
410    RSGroupInfo newGroup = new RSGroupInfo(newName,
411      (SortedSet<Address>) oldGroup.getServers(), oldGroup.getTables());
412    newGroupMap.put(newName, newGroup);
413    flushConfig(newGroupMap);
414  }
415
416  /**
417   * Will try to get the rsgroup from {@code tableMap} first
418   * then try to get the rsgroup from {@code script}
419   * try to get the rsgroup from the {@link NamespaceDescriptor} lastly.
420   * If still not present, return default group.
421   */
422  @Override
423  public RSGroupInfo determineRSGroupInfoForTable(TableName tableName)
424    throws IOException {
425    RSGroupInfo groupFromOldRSGroupInfo = getRSGroup(getRSGroupOfTable(tableName));
426    if (groupFromOldRSGroupInfo != null) {
427      return groupFromOldRSGroupInfo;
428    }
429    // RSGroup information determined by administrator.
430    RSGroupInfo groupDeterminedByAdmin = getRSGroup(
431      script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString()));
432    if (groupDeterminedByAdmin != null) {
433      return groupDeterminedByAdmin;
434    }
435    // Finally, we will try to fall back to namespace as rsgroup if exists
436    ClusterSchema clusterSchema = masterServices.getClusterSchema();
437    if (clusterSchema == null) {
438      if (TableName.isMetaTableName(tableName)) {
439        LOG.info("Can not get the namespace rs group config for meta table, since the" +
440          " meta table is not online yet, will use default group to assign meta first");
441      } else {
442        LOG.warn("ClusterSchema is null, can only use default rsgroup, should not happen?");
443      }
444    } else {
445      NamespaceDescriptor nd = clusterSchema.getNamespace(tableName.getNamespaceAsString());
446      RSGroupInfo groupNameOfNs =
447        getRSGroup(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP));
448      if (groupNameOfNs != null) {
449        return groupNameOfNs;
450      }
451    }
452    return getRSGroup(RSGroupInfo.DEFAULT_GROUP);
453  }
454
455  List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException {
456    List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList();
457    try (Table table = conn.getTable(RSGROUP_TABLE_NAME);
458        ResultScanner scanner = table.getScanner(new Scan())) {
459      for (Result result;;) {
460        result = scanner.next();
461        if (result == null) {
462          break;
463        }
464        RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo
465          .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES));
466        rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto));
467      }
468    }
469    return rsGroupInfoList;
470  }
471
472  List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
473    String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
474    List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList();
475    // Overwrite any info stored by table, this takes precedence
476    try {
477      if (ZKUtil.checkExists(watcher, groupBasePath) != -1) {
478        List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath);
479        if (children == null) {
480          return RSGroupInfoList;
481        }
482        for (String znode : children) {
483          byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode));
484          if (data.length > 0) {
485            ProtobufUtil.expectPBMagicPrefix(data);
486            ByteArrayInputStream bis =
487              new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length);
488            RSGroupInfoList
489              .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis)));
490          }
491        }
492        LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size());
493      }
494    } catch (KeeperException | DeserializationException | InterruptedException e) {
495      throw new IOException("Failed to read rsGroupZNode", e);
496    }
497    return RSGroupInfoList;
498  }
499
500  @Override
501  public void refresh() throws IOException {
502    refresh(false);
503  }
504
505  /**
506   * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on
507   * startup of the manager.
508   */
509  private synchronized void refresh(boolean forceOnline) throws IOException {
510    List<RSGroupInfo> groupList = new LinkedList<>();
511
512    // Overwrite anything read from zk, group table is source of truth
513    // if online read from GROUP table
514    if (forceOnline || isOnline()) {
515      LOG.debug("Refreshing in Online mode.");
516      groupList.addAll(retrieveGroupListFromGroupTable());
517    } else {
518      LOG.debug("Refreshing in Offline mode.");
519      groupList.addAll(retrieveGroupListFromZookeeper());
520    }
521
522    // refresh default group, prune
523    NavigableSet<TableName> orphanTables = new TreeSet<>();
524    for (String entry : masterServices.getTableDescriptors().getAll().keySet()) {
525      orphanTables.add(TableName.valueOf(entry));
526    }
527    for (RSGroupInfo group : groupList) {
528      if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
529        orphanTables.removeAll(group.getTables());
530      }
531    }
532
533    // This is added to the last of the list so it overwrites the 'default' rsgroup loaded
534    // from region group table or zk
535    groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables));
536
537    // populate the data
538    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap();
539    HashMap<TableName, String> newTableMap = Maps.newHashMap();
540    for (RSGroupInfo group : groupList) {
541      newGroupMap.put(group.getName(), group);
542      for (TableName table : group.getTables()) {
543        newTableMap.put(table, group.getName());
544      }
545    }
546    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
547    updateCacheOfRSGroups(rsGroupMap.keySet());
548  }
549
550  private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap)
551      throws IOException {
552    Map<TableName, String> newTableMap = Maps.newHashMap();
553    List<Mutation> mutations = Lists.newArrayList();
554
555    // populate deletes
556    for (String groupName : prevRSGroups) {
557      if (!groupMap.containsKey(groupName)) {
558        Delete d = new Delete(Bytes.toBytes(groupName));
559        mutations.add(d);
560      }
561    }
562
563    // populate puts
564    for (RSGroupInfo RSGroupInfo : groupMap.values()) {
565      RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
566      Put p = new Put(Bytes.toBytes(RSGroupInfo.getName()));
567      p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray());
568      mutations.add(p);
569      for (TableName entry : RSGroupInfo.getTables()) {
570        newTableMap.put(entry, RSGroupInfo.getName());
571      }
572    }
573
574    if (mutations.size() > 0) {
575      multiMutate(mutations);
576    }
577    return newTableMap;
578  }
579
580  private synchronized void flushConfig() throws IOException {
581    flushConfig(this.rsGroupMap);
582  }
583
584  private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
585    Map<TableName, String> newTableMap;
586
587    // For offline mode persistence is still unavailable
588    // We're refreshing in-memory state but only for servers in default group
589    if (!isOnline()) {
590      if (newGroupMap == this.rsGroupMap) {
591        // When newGroupMap is this.rsGroupMap itself,
592        // do not need to check default group and other groups as followed
593        return;
594      }
595
596      Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap);
597      RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
598      RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
599      if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ ||
600          !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())
601          /* compare tables in default group */) {
602        throw new IOException("Only servers in default group can be updated during offline mode");
603      }
604
605      // Restore newGroupMap by putting its default group back
606      newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup);
607
608      // Refresh rsGroupMap
609      // according to the inputted newGroupMap (an updated copy of rsGroupMap)
610      rsGroupMap = newGroupMap;
611
612      // Do not need to update tableMap
613      // because only the update on servers in default group is allowed above,
614      // or IOException will be thrown
615      return;
616    }
617
618    /* For online mode, persist to Zookeeper */
619    newTableMap = flushConfigTable(newGroupMap);
620
621    // Make changes visible after having been persisted to the source of truth
622    resetRSGroupAndTableMaps(newGroupMap, newTableMap);
623
624    try {
625      String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode);
626      ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC);
627
628      List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size());
629      for (String groupName : prevRSGroups) {
630        if (!newGroupMap.containsKey(groupName)) {
631          String znode = ZNodePaths.joinZNode(groupBasePath, groupName);
632          zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
633        }
634      }
635
636      for (RSGroupInfo RSGroupInfo : newGroupMap.values()) {
637        String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName());
638        RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo);
639        LOG.debug("Updating znode: " + znode);
640        ZKUtil.createAndFailSilent(watcher, znode);
641        zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode));
642        zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode,
643          ProtobufUtil.prependPBMagic(proto.toByteArray())));
644      }
645      LOG.debug("Writing ZK GroupInfo count: " + zkOps.size());
646
647      ZKUtil.multiOrSequential(watcher, zkOps, false);
648    } catch (KeeperException e) {
649      LOG.error("Failed to write to rsGroupZNode", e);
650      masterServices.abort("Failed to write to rsGroupZNode", e);
651      throw new IOException("Failed to write to rsGroupZNode", e);
652    }
653    updateCacheOfRSGroups(newGroupMap.keySet());
654  }
655
656  /**
657   * Make changes visible. Caller must be synchronized on 'this'.
658   */
659  private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap,
660      Map<TableName, String> newTableMap) {
661    // Make maps Immutable.
662    this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
663    this.tableMap = Collections.unmodifiableMap(newTableMap);
664  }
665
666  /**
667   * Update cache of rsgroups. Caller must be synchronized on 'this'.
668   * @param currentGroups Current list of Groups.
669   */
670  private void updateCacheOfRSGroups(final Set<String> currentGroups) {
671    this.prevRSGroups.clear();
672    this.prevRSGroups.addAll(currentGroups);
673  }
674
675  // Called by getDefaultServers. Presume it has lock in place.
676  private List<ServerName> getOnlineRS() throws IOException {
677    if (masterServices != null) {
678      return masterServices.getServerManager().getOnlineServersList();
679    }
680    LOG.debug("Reading online RS from zookeeper");
681    List<ServerName> servers = new LinkedList<>();
682    try {
683      for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) {
684        servers.add(ServerName.parseServerName(el));
685      }
686    } catch (KeeperException e) {
687      throw new IOException("Failed to retrieve server list from zookeeper", e);
688    }
689    return servers;
690  }
691
692  // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs.
693  private SortedSet<Address> getDefaultServers() throws IOException {
694    // Build a list of servers in other groups than default group, from rsGroupMap
695    Set<Address> serversInOtherGroup = new HashSet<>();
696    for (RSGroupInfo group : listRSGroups() /* get from rsGroupMap */) {
697      if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group
698        serversInOtherGroup.addAll(group.getServers());
699      }
700    }
701
702    // Get all online servers from Zookeeper and find out servers in default group
703    SortedSet<Address> defaultServers = Sets.newTreeSet();
704    for (ServerName serverName : getOnlineRS()) {
705      Address server = Address.fromParts(serverName.getHostname(), serverName.getPort());
706      if (!serversInOtherGroup.contains(server)) { // not in other groups
707        defaultServers.add(server);
708      }
709    }
710    return defaultServers;
711  }
712
713  // Called by ServerEventsListenerThread. Synchronize on this because redoing
714  // the rsGroupMap then writing it out.
715  private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException {
716    RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
717    RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables());
718    HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
719    newGroupMap.put(newInfo.getName(), newInfo);
720    flushConfig(newGroupMap);
721  }
722
723  /**
724   * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
725   * servers. Notifications about server changes are received by registering {@link ServerListener}.
726   * As a listener, we need to return immediately, so the real work of updating the servers is done
727   * asynchronously in this thread.
728   */
729  private class ServerEventsListenerThread extends Thread implements ServerListener {
730    private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class);
731    private boolean changed = false;
732
733    ServerEventsListenerThread() {
734      setDaemon(true);
735    }
736
737    @Override
738    public void serverAdded(ServerName serverName) {
739      serverChanged();
740    }
741
742    @Override
743    public void serverRemoved(ServerName serverName) {
744      serverChanged();
745    }
746
747    private synchronized void serverChanged() {
748      changed = true;
749      this.notify();
750    }
751
752    @Override
753    public void run() {
754      setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName());
755      SortedSet<Address> prevDefaultServers = new TreeSet<>();
756      while (isMasterRunning(masterServices)) {
757        try {
758          LOG.info("Updating default servers.");
759          SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers();
760          if (!servers.equals(prevDefaultServers)) {
761            RSGroupInfoManagerImpl.this.updateDefaultServers(servers);
762            prevDefaultServers = servers;
763            LOG.info("Updated with servers: " + servers.size());
764          }
765          try {
766            synchronized (this) {
767              while (!changed) {
768                wait();
769              }
770              changed = false;
771            }
772          } catch (InterruptedException e) {
773            LOG.warn("Interrupted", e);
774          }
775        } catch (IOException e) {
776          LOG.warn("Failed to update default servers", e);
777        }
778      }
779    }
780  }
781
782  private class RSGroupStartupWorker extends Thread {
783    private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class);
784    private volatile boolean online = false;
785
786    RSGroupStartupWorker() {
787      super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName());
788      setDaemon(true);
789    }
790
791    @Override
792    public void run() {
793      if (waitForGroupTableOnline()) {
794        LOG.info("GroupBasedLoadBalancer is now online");
795      } else {
796        LOG.warn("Quit without making region group table online");
797      }
798    }
799
800    private boolean waitForGroupTableOnline() {
801      while (isMasterRunning(masterServices)) {
802        try {
803          TableStateManager tsm = masterServices.getTableStateManager();
804          if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) {
805            createRSGroupTable();
806          }
807          // try reading from the table
808          try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
809            table.get(new Get(ROW_KEY));
810          }
811          LOG.info(
812            "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information");
813          RSGroupInfoManagerImpl.this.refresh(true);
814          online = true;
815          // flush any inconsistencies between ZK and HTable
816          RSGroupInfoManagerImpl.this.flushConfig();
817          return true;
818        } catch (Exception e) {
819          LOG.warn("Failed to perform check", e);
820          // 100ms is short so let's just ignore the interrupt
821          Threads.sleepWithoutInterrupt(100);
822        }
823      }
824      return false;
825    }
826
827    private void createRSGroupTable() throws IOException {
828      OptionalLong optProcId = masterServices.getProcedures().stream()
829        .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p)
830        .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId)
831        .findFirst();
832      long procId;
833      if (optProcId.isPresent()) {
834        procId = optProcId.getAsLong();
835      } else {
836        procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC);
837      }
838      // wait for region to be online
839      int tries = 600;
840      while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) &&
841        masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) {
842        try {
843          Thread.sleep(100);
844        } catch (InterruptedException e) {
845          throw new IOException("Wait interrupted ", e);
846        }
847        tries--;
848      }
849      if (tries <= 0) {
850        throw new IOException("Failed to create group table in a given time.");
851      } else {
852        Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId);
853        if (result != null && result.isFailed()) {
854          throw new IOException(
855            "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result));
856        }
857      }
858    }
859
860    public boolean isOnline() {
861      return online;
862    }
863  }
864
865  private static boolean isMasterRunning(MasterServices masterServices) {
866    return !masterServices.isAborted() && !masterServices.isStopped();
867  }
868
869  private void multiMutate(List<Mutation> mutations) throws IOException {
870    try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) {
871      CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY);
872      MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder =
873        MultiRowMutationProtos.MutateRowsRequest.newBuilder();
874      for (Mutation mutation : mutations) {
875        if (mutation instanceof Put) {
876          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
877            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT,
878            mutation));
879        } else if (mutation instanceof Delete) {
880          mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation(
881            org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE,
882            mutation));
883        } else {
884          throw new DoNotRetryIOException(
885            "multiMutate doesn't support " + mutation.getClass().getName());
886        }
887      }
888
889      MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service =
890        MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel);
891      try {
892        service.mutateRows(null, mmrBuilder.build());
893      } catch (ServiceException ex) {
894        ProtobufUtil.toIOException(ex);
895      }
896    }
897  }
898
899  private void checkGroupName(String groupName) throws ConstraintException {
900    if (!groupName.matches("[a-zA-Z0-9_]+")) {
901      throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
902    }
903  }
904}