001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import com.google.protobuf.ServiceException; 021import java.io.ByteArrayInputStream; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.NavigableSet; 031import java.util.OptionalLong; 032import java.util.Set; 033import java.util.SortedSet; 034import java.util.TreeSet; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Coprocessor; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.NamespaceDescriptor; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Mutation; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.ResultScanner; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.constraint.ConstraintException; 055import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 056import org.apache.hadoop.hbase.exceptions.DeserializationException; 057import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 058import org.apache.hadoop.hbase.master.ClusterSchema; 059import org.apache.hadoop.hbase.master.MasterServices; 060import org.apache.hadoop.hbase.master.ServerListener; 061import org.apache.hadoop.hbase.master.TableStateManager; 062import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; 063import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 064import org.apache.hadoop.hbase.net.Address; 065import org.apache.hadoop.hbase.procedure2.Procedure; 066import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 067import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 069import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; 070import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.zookeeper.ZKUtil; 074import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 075import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 076import org.apache.hadoop.util.Shell; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.apache.zookeeper.KeeperException; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 083import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 084import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 085 086/** 087 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the 088 * persistence store for the group information. It also makes use of zookeeper to store group 089 * information needed for bootstrapping during offline mode. 090 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached 091 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong 092 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in 093 * zk) on each modification. 094 * <p> 095 * Mutations on state are synchronized but reads can continue without having to wait on an instance 096 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of 097 * state are read-only, just-in-case (see flushConfig). 098 * <p> 099 * Reads must not block else there is a danger we'll deadlock. 100 * <p> 101 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act 102 * on the results of the query modifying cache in zookeeper without another thread making 103 * intermediate modifications. These clients synchronize on the 'this' instance so no other has 104 * access concurrently. Reads must be able to continue concurrently. 105 */ 106@InterfaceAudience.Private 107final class RSGroupInfoManagerImpl implements RSGroupInfoManager { 108 private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); 109 110 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ 111 private static final TableDescriptor RSGROUP_TABLE_DESC; 112 static { 113 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME) 114 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES)) 115 .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); 116 try { 117 builder.setCoprocessor( 118 CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName()) 119 .setPriority(Coprocessor.PRIORITY_SYSTEM).build()); 120 } catch (IOException ex) { 121 throw new Error(ex); 122 } 123 RSGROUP_TABLE_DESC = builder.build(); 124 } 125 126 // There two Maps are immutable and wholesale replaced on each modification 127 // so are safe to access concurrently. See class comment. 128 private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap(); 129 private volatile Map<TableName, String> tableMap = Collections.emptyMap(); 130 131 private final MasterServices masterServices; 132 private final Connection conn; 133 private final ZKWatcher watcher; 134 private final RSGroupStartupWorker rsGroupStartupWorker; 135 // contains list of groups that were last flushed to persistent store 136 private Set<String> prevRSGroups = new HashSet<>(); 137 private final ServerEventsListenerThread serverEventsListenerThread = 138 new ServerEventsListenerThread(); 139 140 /** Get rsgroup table mapping script */ 141 RSGroupMappingScript script; 142 143 // Package visibility for testing 144 static class RSGroupMappingScript { 145 146 static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script"; 147 static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT = 148 "hbase.rsgroup.table.mapping.script.timeout"; 149 150 private Shell.ShellCommandExecutor rsgroupMappingScript; 151 152 RSGroupMappingScript(Configuration conf) { 153 String script = conf.get(RS_GROUP_MAPPING_SCRIPT); 154 if (script == null || script.isEmpty()) { 155 return; 156 } 157 158 rsgroupMappingScript = new Shell.ShellCommandExecutor(new String[] { script, "", "" }, null, 159 null, conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000) // 5 seconds 160 ); 161 } 162 163 String getRSGroup(String namespace, String tablename) { 164 if (rsgroupMappingScript == null) { 165 return null; 166 } 167 String[] exec = rsgroupMappingScript.getExecString(); 168 exec[1] = namespace; 169 exec[2] = tablename; 170 try { 171 rsgroupMappingScript.execute(); 172 } catch (IOException e) { 173 LOG.error("Failed to get RSGroup from script for table {}:{}", namespace, tablename, e); 174 return null; 175 } 176 return rsgroupMappingScript.getOutput().trim(); 177 } 178 } 179 180 private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException { 181 this.masterServices = masterServices; 182 this.watcher = masterServices.getZooKeeper(); 183 this.conn = masterServices.getConnection(); 184 this.rsGroupStartupWorker = new RSGroupStartupWorker(); 185 script = new RSGroupMappingScript(masterServices.getConfiguration()); 186 } 187 188 private synchronized void init() throws IOException { 189 refresh(); 190 serverEventsListenerThread.start(); 191 masterServices.getServerManager().registerListener(serverEventsListenerThread); 192 } 193 194 static RSGroupInfoManager getInstance(MasterServices master) throws IOException { 195 RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master); 196 instance.init(); 197 return instance; 198 } 199 200 public void start() { 201 // create system table of rsgroup 202 rsGroupStartupWorker.start(); 203 } 204 205 @Override 206 public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { 207 checkGroupName(rsGroupInfo.getName()); 208 if ( 209 rsGroupMap.get(rsGroupInfo.getName()) != null 210 || rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP) 211 ) { 212 throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName()); 213 } 214 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 215 newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); 216 flushConfig(newGroupMap); 217 } 218 219 private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException { 220 RSGroupInfo rsGroupInfo = getRSGroup(groupName); 221 if (rsGroupInfo == null) { 222 throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist"); 223 } 224 return rsGroupInfo; 225 } 226 227 /** 228 * @param master the master to get online servers for 229 * @return Set of online Servers named for their hostname and port (not ServerName). 230 */ 231 private static Set<Address> getOnlineServers(final MasterServices master) { 232 Set<Address> onlineServers = new HashSet<Address>(); 233 if (master == null) { 234 return onlineServers; 235 } 236 237 for (ServerName server : master.getServerManager().getOnlineServers().keySet()) { 238 onlineServers.add(server.getAddress()); 239 } 240 return onlineServers; 241 } 242 243 @Override 244 public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup, 245 String dstGroup) throws IOException { 246 RSGroupInfo src = getRSGroupInfo(srcGroup); 247 RSGroupInfo dst = getRSGroupInfo(dstGroup); 248 Set<Address> movedServers = new HashSet<>(); 249 // If destination is 'default' rsgroup, only add servers that are online. If not online, drop 250 // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a 251 // rsgroup of dead servers that are to come back later). 252 Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) 253 ? getOnlineServers(this.masterServices) 254 : null; 255 for (Address el : servers) { 256 src.removeServer(el); 257 if (onlineServers != null) { 258 if (!onlineServers.contains(el)) { 259 if (LOG.isDebugEnabled()) { 260 LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online"); 261 } 262 continue; 263 } 264 } 265 dst.addServer(el); 266 movedServers.add(el); 267 } 268 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 269 newGroupMap.put(src.getName(), src); 270 newGroupMap.put(dst.getName(), dst); 271 flushConfig(newGroupMap); 272 return movedServers; 273 } 274 275 @Override 276 public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException { 277 for (RSGroupInfo info : rsGroupMap.values()) { 278 if (info.containsServer(serverHostPort)) { 279 return info; 280 } 281 } 282 return null; 283 } 284 285 @Override 286 public RSGroupInfo getRSGroup(String groupName) { 287 return rsGroupMap.get(groupName); 288 } 289 290 @Override 291 public String getRSGroupOfTable(TableName tableName) { 292 return tableMap.get(tableName); 293 } 294 295 @Override 296 public synchronized void moveTables(Set<TableName> tableNames, String groupName) 297 throws IOException { 298 // Check if rsGroup contains the destination rsgroup 299 if (groupName != null && !rsGroupMap.containsKey(groupName)) { 300 throw new DoNotRetryIOException("Group " + groupName + " does not exist"); 301 } 302 303 // Make a copy of rsGroupMap to update 304 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 305 306 // Remove tables from their original rsgroups 307 // and update the copy of rsGroupMap 308 for (TableName tableName : tableNames) { 309 if (tableMap.containsKey(tableName)) { 310 RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName))); 311 src.removeTable(tableName); 312 newGroupMap.put(src.getName(), src); 313 } 314 } 315 316 // Add tables to the destination rsgroup 317 // and update the copy of rsGroupMap 318 if (groupName != null) { 319 RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName)); 320 dstGroup.addAllTables(tableNames); 321 newGroupMap.put(dstGroup.getName(), dstGroup); 322 } 323 324 // Flush according to the updated copy of rsGroupMap 325 flushConfig(newGroupMap); 326 } 327 328 @Override 329 public synchronized void removeRSGroup(String groupName) throws IOException { 330 if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 331 throw new DoNotRetryIOException( 332 "Group " + groupName + " does not exist or is a reserved " + "group"); 333 } 334 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 335 newGroupMap.remove(groupName); 336 flushConfig(newGroupMap); 337 } 338 339 @Override 340 public List<RSGroupInfo> listRSGroups() { 341 return Lists.newLinkedList(rsGroupMap.values()); 342 } 343 344 @Override 345 public boolean isOnline() { 346 return rsGroupStartupWorker.isOnline(); 347 } 348 349 @Override 350 public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup, 351 String dstGroup) throws IOException { 352 // get server's group 353 RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup); 354 RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup); 355 356 // move servers 357 for (Address el : servers) { 358 srcGroupInfo.removeServer(el); 359 dstGroupInfo.addServer(el); 360 } 361 // move tables 362 for (TableName tableName : tables) { 363 srcGroupInfo.removeTable(tableName); 364 dstGroupInfo.addTable(tableName); 365 } 366 367 // flush changed groupinfo 368 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 369 newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo); 370 newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo); 371 flushConfig(newGroupMap); 372 } 373 374 @Override 375 public synchronized void removeServers(Set<Address> servers) throws IOException { 376 Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>(); 377 for (Address el : servers) { 378 RSGroupInfo rsGroupInfo = getRSGroupOfServer(el); 379 if (rsGroupInfo != null) { 380 RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName()); 381 if (newRsGroupInfo == null) { 382 rsGroupInfo.removeServer(el); 383 rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo); 384 } else { 385 newRsGroupInfo.removeServer(el); 386 rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo); 387 } 388 } else { 389 LOG.warn("Server " + el + " does not belong to any rsgroup."); 390 } 391 } 392 393 if (rsGroupInfos.size() > 0) { 394 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 395 newGroupMap.putAll(rsGroupInfos); 396 flushConfig(newGroupMap); 397 } 398 } 399 400 @Override 401 public void renameRSGroup(String oldName, String newName) throws IOException { 402 checkGroupName(oldName); 403 checkGroupName(newName); 404 if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) { 405 throw new ConstraintException("Can't rename default rsgroup"); 406 } 407 RSGroupInfo oldGroup = getRSGroup(oldName); 408 if (oldGroup == null) { 409 throw new ConstraintException("RSGroup " + oldName + " does not exist"); 410 } 411 if (rsGroupMap.containsKey(newName)) { 412 throw new ConstraintException("Group already exists: " + newName); 413 } 414 415 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 416 newGroupMap.remove(oldName); 417 RSGroupInfo newGroup = 418 new RSGroupInfo(newName, (SortedSet<Address>) oldGroup.getServers(), oldGroup.getTables()); 419 newGroupMap.put(newName, newGroup); 420 flushConfig(newGroupMap); 421 } 422 423 /** 424 * Will try to get the rsgroup from {@code tableMap} first then try to get the rsgroup from 425 * {@code script} try to get the rsgroup from the {@link NamespaceDescriptor} lastly. If still not 426 * present, return default group. 427 */ 428 @Override 429 public RSGroupInfo determineRSGroupInfoForTable(TableName tableName) throws IOException { 430 RSGroupInfo groupFromOldRSGroupInfo = getRSGroup(getRSGroupOfTable(tableName)); 431 if (groupFromOldRSGroupInfo != null) { 432 return groupFromOldRSGroupInfo; 433 } 434 // RSGroup information determined by administrator. 435 RSGroupInfo groupDeterminedByAdmin = getRSGroup( 436 script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString())); 437 if (groupDeterminedByAdmin != null) { 438 return groupDeterminedByAdmin; 439 } 440 // Finally, we will try to fall back to namespace as rsgroup if exists 441 ClusterSchema clusterSchema = masterServices.getClusterSchema(); 442 if (clusterSchema == null) { 443 if (TableName.isMetaTableName(tableName)) { 444 LOG.info("Can not get the namespace rs group config for meta table, since the" 445 + " meta table is not online yet, will use default group to assign meta first"); 446 } else { 447 LOG.warn("ClusterSchema is null, can only use default rsgroup, should not happen?"); 448 } 449 } else { 450 NamespaceDescriptor nd = clusterSchema.getNamespace(tableName.getNamespaceAsString()); 451 RSGroupInfo groupNameOfNs = 452 getRSGroup(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP)); 453 if (groupNameOfNs != null) { 454 return groupNameOfNs; 455 } 456 } 457 return getRSGroup(RSGroupInfo.DEFAULT_GROUP); 458 } 459 460 @Override 461 public void updateRSGroupConfig(String groupName, Map<String, String> configuration) 462 throws IOException { 463 if (RSGroupInfo.DEFAULT_GROUP.equals(groupName)) { 464 // We do not persist anything of default group, therefore, it is not supported to update 465 // default group's configuration which lost once master down. 466 throw new ConstraintException( 467 "configuration of " + RSGroupInfo.DEFAULT_GROUP + " can't be stored persistently"); 468 } 469 RSGroupInfo rsGroupInfo = getRSGroupInfo(groupName); 470 new HashSet<>(rsGroupInfo.getConfiguration().keySet()) 471 .forEach(rsGroupInfo::removeConfiguration); 472 configuration.forEach(rsGroupInfo::setConfiguration); 473 flushConfig(); 474 } 475 476 List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException { 477 List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList(); 478 try (Table table = conn.getTable(RSGROUP_TABLE_NAME); 479 ResultScanner scanner = table.getScanner(new Scan())) { 480 for (Result result;;) { 481 result = scanner.next(); 482 if (result == null) { 483 break; 484 } 485 RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo 486 .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); 487 rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto)); 488 } 489 } 490 return rsGroupInfoList; 491 } 492 493 List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException { 494 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 495 List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); 496 // Overwrite any info stored by table, this takes precedence 497 try { 498 if (ZKUtil.checkExists(watcher, groupBasePath) != -1) { 499 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath); 500 if (children == null) { 501 return RSGroupInfoList; 502 } 503 for (String znode : children) { 504 byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode)); 505 if (data != null && data.length > 0) { 506 ProtobufUtil.expectPBMagicPrefix(data); 507 ByteArrayInputStream bis = 508 new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); 509 RSGroupInfoList 510 .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); 511 } 512 } 513 LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); 514 } 515 } catch (KeeperException | DeserializationException | InterruptedException e) { 516 throw new IOException("Failed to read rsGroupZNode", e); 517 } 518 return RSGroupInfoList; 519 } 520 521 @Override 522 public void refresh() throws IOException { 523 refresh(false); 524 } 525 526 /** 527 * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on 528 * startup of the manager. 529 */ 530 private synchronized void refresh(boolean forceOnline) throws IOException { 531 List<RSGroupInfo> groupList = new LinkedList<>(); 532 533 // Overwrite anything read from zk, group table is source of truth 534 // if online read from GROUP table 535 if (forceOnline || isOnline()) { 536 LOG.debug("Refreshing in Online mode."); 537 groupList.addAll(retrieveGroupListFromGroupTable()); 538 } else { 539 LOG.debug("Refreshing in Offline mode."); 540 groupList.addAll(retrieveGroupListFromZookeeper()); 541 } 542 543 // refresh default group, prune 544 NavigableSet<TableName> orphanTables = new TreeSet<>(); 545 for (String entry : masterServices.getTableDescriptors().getAll().keySet()) { 546 orphanTables.add(TableName.valueOf(entry)); 547 } 548 for (RSGroupInfo group : groupList) { 549 if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 550 orphanTables.removeAll(group.getTables()); 551 } 552 } 553 554 // This is added to the last of the list so it overwrites the 'default' rsgroup loaded 555 // from region group table or zk 556 groupList 557 .add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(groupList), orphanTables)); 558 559 // populate the data 560 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); 561 HashMap<TableName, String> newTableMap = Maps.newHashMap(); 562 for (RSGroupInfo group : groupList) { 563 newGroupMap.put(group.getName(), group); 564 for (TableName table : group.getTables()) { 565 newTableMap.put(table, group.getName()); 566 } 567 } 568 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 569 updateCacheOfRSGroups(rsGroupMap.keySet()); 570 } 571 572 private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap) 573 throws IOException { 574 Map<TableName, String> newTableMap = Maps.newHashMap(); 575 List<Mutation> mutations = Lists.newArrayList(); 576 577 // populate deletes 578 for (String groupName : prevRSGroups) { 579 if (!groupMap.containsKey(groupName)) { 580 Delete d = new Delete(Bytes.toBytes(groupName)); 581 mutations.add(d); 582 } 583 } 584 585 // populate puts 586 for (RSGroupInfo RSGroupInfo : groupMap.values()) { 587 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 588 Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); 589 p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); 590 mutations.add(p); 591 for (TableName entry : RSGroupInfo.getTables()) { 592 newTableMap.put(entry, RSGroupInfo.getName()); 593 } 594 } 595 596 if (mutations.size() > 0) { 597 multiMutate(mutations); 598 } 599 return newTableMap; 600 } 601 602 private synchronized void flushConfig() throws IOException { 603 flushConfig(this.rsGroupMap); 604 } 605 606 private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException { 607 Map<TableName, String> newTableMap; 608 609 // For offline mode persistence is still unavailable 610 // We're refreshing in-memory state but only for servers in default group 611 if (!isOnline()) { 612 if (newGroupMap == this.rsGroupMap) { 613 // When newGroupMap is this.rsGroupMap itself, 614 // do not need to check default group and other groups as followed 615 return; 616 } 617 618 Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap); 619 RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 620 RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 621 if ( 622 !oldGroupMap.equals(newGroupMap) 623 /* compare both tables and servers in other groups */ || !oldDefaultGroup.getTables() 624 .equals(newDefaultGroup.getTables()) 625 /* compare tables in default group */ 626 ) { 627 throw new IOException("Only servers in default group can be updated during offline mode"); 628 } 629 630 // Restore newGroupMap by putting its default group back 631 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); 632 633 // Refresh rsGroupMap 634 // according to the inputted newGroupMap (an updated copy of rsGroupMap) 635 rsGroupMap = newGroupMap; 636 637 // Do not need to update tableMap 638 // because only the update on servers in default group is allowed above, 639 // or IOException will be thrown 640 return; 641 } 642 643 /* For online mode, persist to Zookeeper */ 644 newTableMap = flushConfigTable(newGroupMap); 645 646 // Make changes visible after having been persisted to the source of truth 647 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 648 649 try { 650 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 651 ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC); 652 653 List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size()); 654 for (String groupName : prevRSGroups) { 655 if (!newGroupMap.containsKey(groupName)) { 656 String znode = ZNodePaths.joinZNode(groupBasePath, groupName); 657 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 658 } 659 } 660 661 for (RSGroupInfo RSGroupInfo : newGroupMap.values()) { 662 String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName()); 663 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 664 LOG.debug("Updating znode: " + znode); 665 ZKUtil.createAndFailSilent(watcher, znode); 666 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 667 zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, 668 ProtobufUtil.prependPBMagic(proto.toByteArray()))); 669 } 670 LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); 671 672 ZKUtil.multiOrSequential(watcher, zkOps, false); 673 } catch (KeeperException e) { 674 LOG.error("Failed to write to rsGroupZNode", e); 675 masterServices.abort("Failed to write to rsGroupZNode", e); 676 throw new IOException("Failed to write to rsGroupZNode", e); 677 } 678 updateCacheOfRSGroups(newGroupMap.keySet()); 679 } 680 681 /** 682 * Make changes visible. Caller must be synchronized on 'this'. 683 */ 684 private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap, 685 Map<TableName, String> newTableMap) { 686 // Make maps Immutable. 687 this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); 688 this.tableMap = Collections.unmodifiableMap(newTableMap); 689 } 690 691 /** 692 * Update cache of rsgroups. Caller must be synchronized on 'this'. 693 * @param currentGroups Current list of Groups. 694 */ 695 private void updateCacheOfRSGroups(final Set<String> currentGroups) { 696 this.prevRSGroups.clear(); 697 this.prevRSGroups.addAll(currentGroups); 698 } 699 700 // Called by getDefaultServers. Presume it has lock in place. 701 private List<ServerName> getOnlineRS() throws IOException { 702 if (masterServices != null) { 703 return masterServices.getServerManager().getOnlineServersList(); 704 } 705 LOG.debug("Reading online RS from zookeeper"); 706 List<ServerName> servers = new LinkedList<>(); 707 try { 708 for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) { 709 servers.add(ServerName.parseServerName(el)); 710 } 711 } catch (KeeperException e) { 712 throw new IOException("Failed to retrieve server list from zookeeper", e); 713 } 714 return servers; 715 } 716 717 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 718 private SortedSet<Address> getDefaultServers() throws IOException { 719 return getDefaultServers(listRSGroups()); 720 } 721 722 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 723 private SortedSet<Address> getDefaultServers(List<RSGroupInfo> rsGroupInfoList) 724 throws IOException { 725 // Build a list of servers in other groups than default group, from rsGroupMap 726 Set<Address> serversInOtherGroup = new HashSet<>(); 727 for (RSGroupInfo group : rsGroupInfoList) { 728 if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group 729 serversInOtherGroup.addAll(group.getServers()); 730 } 731 } 732 733 // Get all online servers from Zookeeper and find out servers in default group 734 SortedSet<Address> defaultServers = Sets.newTreeSet(); 735 for (ServerName serverName : getOnlineRS()) { 736 Address server = Address.fromParts(serverName.getHostname(), serverName.getPort()); 737 if (!serversInOtherGroup.contains(server)) { // not in other groups 738 defaultServers.add(server); 739 } 740 } 741 return defaultServers; 742 } 743 744 // Called by ServerEventsListenerThread. Synchronize on this because redoing 745 // the rsGroupMap then writing it out. 746 private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException { 747 RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); 748 RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables()); 749 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 750 newGroupMap.put(newInfo.getName(), newInfo); 751 flushConfig(newGroupMap); 752 } 753 754 /** 755 * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known 756 * servers. Notifications about server changes are received by registering {@link ServerListener}. 757 * As a listener, we need to return immediately, so the real work of updating the servers is done 758 * asynchronously in this thread. 759 */ 760 private class ServerEventsListenerThread extends Thread implements ServerListener { 761 private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class); 762 private boolean changed = false; 763 764 ServerEventsListenerThread() { 765 setDaemon(true); 766 } 767 768 @Override 769 public void serverAdded(ServerName serverName) { 770 serverChanged(); 771 } 772 773 @Override 774 public void serverRemoved(ServerName serverName) { 775 serverChanged(); 776 } 777 778 private synchronized void serverChanged() { 779 changed = true; 780 this.notify(); 781 } 782 783 @Override 784 public void run() { 785 setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName()); 786 SortedSet<Address> prevDefaultServers = new TreeSet<>(); 787 while (isMasterRunning(masterServices)) { 788 try { 789 LOG.info("Updating default servers."); 790 SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers(); 791 if (!servers.equals(prevDefaultServers)) { 792 RSGroupInfoManagerImpl.this.updateDefaultServers(servers); 793 prevDefaultServers = servers; 794 LOG.info("Updated with servers: " + servers.size()); 795 } 796 try { 797 synchronized (this) { 798 while (!changed) { 799 wait(); 800 } 801 changed = false; 802 } 803 } catch (InterruptedException e) { 804 LOG.warn("Interrupted", e); 805 } 806 } catch (IOException e) { 807 LOG.warn("Failed to update default servers", e); 808 } 809 } 810 } 811 } 812 813 private class RSGroupStartupWorker extends Thread { 814 private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class); 815 private volatile boolean online = false; 816 817 RSGroupStartupWorker() { 818 super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName()); 819 setDaemon(true); 820 } 821 822 @Override 823 public void run() { 824 if (waitForGroupTableOnline()) { 825 LOG.info("GroupBasedLoadBalancer is now online"); 826 } else { 827 LOG.warn("Quit without making region group table online"); 828 } 829 } 830 831 private boolean waitForGroupTableOnline() { 832 while (isMasterRunning(masterServices)) { 833 try { 834 TableStateManager tsm = masterServices.getTableStateManager(); 835 if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) { 836 createRSGroupTable(); 837 } 838 // try reading from the table 839 try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) { 840 table.get(new Get(ROW_KEY)); 841 } 842 LOG.info( 843 "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information"); 844 RSGroupInfoManagerImpl.this.refresh(true); 845 online = true; 846 // flush any inconsistencies between ZK and HTable 847 RSGroupInfoManagerImpl.this.flushConfig(); 848 return true; 849 } catch (Exception e) { 850 LOG.warn("Failed to perform check", e); 851 // 100ms is short so let's just ignore the interrupt 852 Threads.sleepWithoutInterrupt(100); 853 } 854 } 855 return false; 856 } 857 858 private void createRSGroupTable() throws IOException { 859 OptionalLong optProcId = masterServices.getProcedures().stream() 860 .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p) 861 .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId) 862 .findFirst(); 863 long procId; 864 if (optProcId.isPresent()) { 865 procId = optProcId.getAsLong(); 866 } else { 867 procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); 868 } 869 // wait for region to be online 870 int tries = 600; 871 while ( 872 !(masterServices.getMasterProcedureExecutor().isFinished(procId)) 873 && masterServices.getMasterProcedureExecutor().isRunning() && tries > 0 874 ) { 875 try { 876 Thread.sleep(100); 877 } catch (InterruptedException e) { 878 throw new IOException("Wait interrupted ", e); 879 } 880 tries--; 881 } 882 if (tries <= 0) { 883 throw new IOException("Failed to create group table in a given time."); 884 } else { 885 Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId); 886 if (result != null && result.isFailed()) { 887 throw new IOException( 888 "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result)); 889 } 890 } 891 } 892 893 public boolean isOnline() { 894 return online; 895 } 896 } 897 898 private static boolean isMasterRunning(MasterServices masterServices) { 899 return !masterServices.isAborted() && !masterServices.isStopped(); 900 } 901 902 private void multiMutate(List<Mutation> mutations) throws IOException { 903 try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) { 904 CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY); 905 MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder = 906 MultiRowMutationProtos.MutateRowsRequest.newBuilder(); 907 for (Mutation mutation : mutations) { 908 if (mutation instanceof Put) { 909 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 910 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, 911 mutation)); 912 } else if (mutation instanceof Delete) { 913 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 914 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE, 915 mutation)); 916 } else { 917 throw new DoNotRetryIOException( 918 "multiMutate doesn't support " + mutation.getClass().getName()); 919 } 920 } 921 922 MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service = 923 MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel); 924 try { 925 service.mutateRows(null, mmrBuilder.build()); 926 } catch (ServiceException ex) { 927 ProtobufUtil.toIOException(ex); 928 } 929 } 930 } 931 932 private void checkGroupName(String groupName) throws ConstraintException { 933 if (!groupName.matches("[a-zA-Z0-9_]+")) { 934 throw new ConstraintException("RSGroup name should only contain alphanumeric characters"); 935 } 936 } 937}