001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import com.google.protobuf.ServiceException; 021import java.io.ByteArrayInputStream; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.NavigableSet; 031import java.util.OptionalLong; 032import java.util.Set; 033import java.util.SortedSet; 034import java.util.TreeSet; 035import org.apache.hadoop.conf.Configuration; 036import org.apache.hadoop.hbase.Coprocessor; 037import org.apache.hadoop.hbase.DoNotRetryIOException; 038import org.apache.hadoop.hbase.NamespaceDescriptor; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Connection; 043import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 044import org.apache.hadoop.hbase.client.Delete; 045import org.apache.hadoop.hbase.client.Get; 046import org.apache.hadoop.hbase.client.Mutation; 047import org.apache.hadoop.hbase.client.Put; 048import org.apache.hadoop.hbase.client.Result; 049import org.apache.hadoop.hbase.client.ResultScanner; 050import org.apache.hadoop.hbase.client.Scan; 051import org.apache.hadoop.hbase.client.Table; 052import org.apache.hadoop.hbase.client.TableDescriptor; 053import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 054import org.apache.hadoop.hbase.constraint.ConstraintException; 055import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 056import org.apache.hadoop.hbase.exceptions.DeserializationException; 057import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 058import org.apache.hadoop.hbase.master.ClusterSchema; 059import org.apache.hadoop.hbase.master.MasterServices; 060import org.apache.hadoop.hbase.master.ServerListener; 061import org.apache.hadoop.hbase.master.TableStateManager; 062import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; 063import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 064import org.apache.hadoop.hbase.net.Address; 065import org.apache.hadoop.hbase.procedure2.Procedure; 066import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 067import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 068import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 069import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; 070import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 071import org.apache.hadoop.hbase.util.Bytes; 072import org.apache.hadoop.hbase.util.Threads; 073import org.apache.hadoop.hbase.zookeeper.ZKUtil; 074import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 075import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 076import org.apache.hadoop.util.Shell; 077import org.apache.yetus.audience.InterfaceAudience; 078import org.apache.zookeeper.KeeperException; 079import org.slf4j.Logger; 080import org.slf4j.LoggerFactory; 081 082import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 083import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 084import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 085import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 086 087/** 088 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the 089 * persistence store for the group information. It also makes use of zookeeper to store group 090 * information needed for bootstrapping during offline mode. 091 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached 092 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong 093 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in 094 * zk) on each modification. 095 * <p> 096 * Mutations on state are synchronized but reads can continue without having to wait on an instance 097 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of 098 * state are read-only, just-in-case (see flushConfig). 099 * <p> 100 * Reads must not block else there is a danger we'll deadlock. 101 * <p> 102 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act 103 * on the results of the query modifying cache in zookeeper without another thread making 104 * intermediate modifications. These clients synchronize on the 'this' instance so no other has 105 * access concurrently. Reads must be able to continue concurrently. 106 */ 107@InterfaceAudience.Private 108final class RSGroupInfoManagerImpl implements RSGroupInfoManager { 109 private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); 110 111 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ 112 private static final TableDescriptor RSGROUP_TABLE_DESC; 113 static { 114 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME) 115 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES)) 116 .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); 117 try { 118 builder.setCoprocessor( 119 CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName()) 120 .setPriority(Coprocessor.PRIORITY_SYSTEM).build()); 121 } catch (IOException ex) { 122 throw new Error(ex); 123 } 124 RSGROUP_TABLE_DESC = builder.build(); 125 } 126 127 // There two Maps are immutable and wholesale replaced on each modification 128 // so are safe to access concurrently. See class comment. 129 private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap(); 130 private volatile Map<TableName, String> tableMap = Collections.emptyMap(); 131 132 private final MasterServices masterServices; 133 private final Connection conn; 134 private final ZKWatcher watcher; 135 private final RSGroupStartupWorker rsGroupStartupWorker; 136 // contains list of groups that were last flushed to persistent store 137 private Set<String> prevRSGroups = new HashSet<>(); 138 private final ServerEventsListenerThread serverEventsListenerThread = 139 new ServerEventsListenerThread(); 140 141 /** Get rsgroup table mapping script */ 142 @VisibleForTesting 143 RSGroupMappingScript script; 144 145 // Package visibility for testing 146 static class RSGroupMappingScript { 147 148 static final String RS_GROUP_MAPPING_SCRIPT = "hbase.rsgroup.table.mapping.script"; 149 static final String RS_GROUP_MAPPING_SCRIPT_TIMEOUT = 150 "hbase.rsgroup.table.mapping.script.timeout"; 151 152 private Shell.ShellCommandExecutor rsgroupMappingScript; 153 154 RSGroupMappingScript(Configuration conf) { 155 String script = conf.get(RS_GROUP_MAPPING_SCRIPT); 156 if (script == null || script.isEmpty()) { 157 return; 158 } 159 160 rsgroupMappingScript = new Shell.ShellCommandExecutor( 161 new String[] { script, "", "" }, null, null, 162 conf.getLong(RS_GROUP_MAPPING_SCRIPT_TIMEOUT, 5000) // 5 seconds 163 ); 164 } 165 166 String getRSGroup(String namespace, String tablename) { 167 if (rsgroupMappingScript == null) { 168 return null; 169 } 170 String[] exec = rsgroupMappingScript.getExecString(); 171 exec[1] = namespace; 172 exec[2] = tablename; 173 try { 174 rsgroupMappingScript.execute(); 175 } catch (IOException e) { 176 LOG.error("Failed to get RSGroup from script for table {}:{}", namespace, tablename, e); 177 return null; 178 } 179 return rsgroupMappingScript.getOutput().trim(); 180 } 181 } 182 183 private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException { 184 this.masterServices = masterServices; 185 this.watcher = masterServices.getZooKeeper(); 186 this.conn = masterServices.getConnection(); 187 this.rsGroupStartupWorker = new RSGroupStartupWorker(); 188 script = new RSGroupMappingScript(masterServices.getConfiguration()); 189 } 190 191 private synchronized void init() throws IOException { 192 refresh(); 193 serverEventsListenerThread.start(); 194 masterServices.getServerManager().registerListener(serverEventsListenerThread); 195 } 196 197 static RSGroupInfoManager getInstance(MasterServices master) throws IOException { 198 RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master); 199 instance.init(); 200 return instance; 201 } 202 203 public void start() { 204 // create system table of rsgroup 205 rsGroupStartupWorker.start(); 206 } 207 208 @Override 209 public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { 210 checkGroupName(rsGroupInfo.getName()); 211 if (rsGroupMap.get(rsGroupInfo.getName()) != null || 212 rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 213 throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName()); 214 } 215 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 216 newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); 217 flushConfig(newGroupMap); 218 } 219 220 private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException { 221 RSGroupInfo rsGroupInfo = getRSGroup(groupName); 222 if (rsGroupInfo == null) { 223 throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist"); 224 } 225 return rsGroupInfo; 226 } 227 228 /** 229 * @param master the master to get online servers for 230 * @return Set of online Servers named for their hostname and port (not ServerName). 231 */ 232 private static Set<Address> getOnlineServers(final MasterServices master) { 233 Set<Address> onlineServers = new HashSet<Address>(); 234 if (master == null) { 235 return onlineServers; 236 } 237 238 for (ServerName server : master.getServerManager().getOnlineServers().keySet()) { 239 onlineServers.add(server.getAddress()); 240 } 241 return onlineServers; 242 } 243 244 @Override 245 public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup, 246 String dstGroup) throws IOException { 247 RSGroupInfo src = getRSGroupInfo(srcGroup); 248 RSGroupInfo dst = getRSGroupInfo(dstGroup); 249 // If destination is 'default' rsgroup, only add servers that are online. If not online, drop 250 // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a 251 // rsgroup of dead servers that are to come back later). 252 Set<Address> onlineServers = 253 dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices) 254 : null; 255 for (Address el : servers) { 256 src.removeServer(el); 257 if (onlineServers != null) { 258 if (!onlineServers.contains(el)) { 259 if (LOG.isDebugEnabled()) { 260 LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online"); 261 } 262 continue; 263 } 264 } 265 dst.addServer(el); 266 } 267 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 268 newGroupMap.put(src.getName(), src); 269 newGroupMap.put(dst.getName(), dst); 270 flushConfig(newGroupMap); 271 return dst.getServers(); 272 } 273 274 @Override 275 public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException { 276 for (RSGroupInfo info : rsGroupMap.values()) { 277 if (info.containsServer(serverHostPort)) { 278 return info; 279 } 280 } 281 return null; 282 } 283 284 @Override 285 public RSGroupInfo getRSGroup(String groupName) { 286 return rsGroupMap.get(groupName); 287 } 288 289 @Override 290 public String getRSGroupOfTable(TableName tableName) { 291 return tableMap.get(tableName); 292 } 293 294 @Override 295 public synchronized void moveTables(Set<TableName> tableNames, String groupName) 296 throws IOException { 297 // Check if rsGroup contains the destination rsgroup 298 if (groupName != null && !rsGroupMap.containsKey(groupName)) { 299 throw new DoNotRetryIOException("Group " + groupName + " does not exist"); 300 } 301 302 // Make a copy of rsGroupMap to update 303 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 304 305 // Remove tables from their original rsgroups 306 // and update the copy of rsGroupMap 307 for (TableName tableName : tableNames) { 308 if (tableMap.containsKey(tableName)) { 309 RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName))); 310 src.removeTable(tableName); 311 newGroupMap.put(src.getName(), src); 312 } 313 } 314 315 // Add tables to the destination rsgroup 316 // and update the copy of rsGroupMap 317 if (groupName != null) { 318 RSGroupInfo dstGroup = new RSGroupInfo(newGroupMap.get(groupName)); 319 dstGroup.addAllTables(tableNames); 320 newGroupMap.put(dstGroup.getName(), dstGroup); 321 } 322 323 // Flush according to the updated copy of rsGroupMap 324 flushConfig(newGroupMap); 325 } 326 327 @Override 328 public synchronized void removeRSGroup(String groupName) throws IOException { 329 if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 330 throw new DoNotRetryIOException( 331 "Group " + groupName + " does not exist or is a reserved " + "group"); 332 } 333 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 334 newGroupMap.remove(groupName); 335 flushConfig(newGroupMap); 336 } 337 338 @Override 339 public List<RSGroupInfo> listRSGroups() { 340 return Lists.newLinkedList(rsGroupMap.values()); 341 } 342 343 @Override 344 public boolean isOnline() { 345 return rsGroupStartupWorker.isOnline(); 346 } 347 348 @Override 349 public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup, 350 String dstGroup) throws IOException { 351 // get server's group 352 RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup); 353 RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup); 354 355 // move servers 356 for (Address el : servers) { 357 srcGroupInfo.removeServer(el); 358 dstGroupInfo.addServer(el); 359 } 360 // move tables 361 for (TableName tableName : tables) { 362 srcGroupInfo.removeTable(tableName); 363 dstGroupInfo.addTable(tableName); 364 } 365 366 // flush changed groupinfo 367 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 368 newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo); 369 newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo); 370 flushConfig(newGroupMap); 371 } 372 373 @Override 374 public synchronized void removeServers(Set<Address> servers) throws IOException { 375 Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>(); 376 for (Address el : servers) { 377 RSGroupInfo rsGroupInfo = getRSGroupOfServer(el); 378 if (rsGroupInfo != null) { 379 RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName()); 380 if (newRsGroupInfo == null) { 381 rsGroupInfo.removeServer(el); 382 rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo); 383 } else { 384 newRsGroupInfo.removeServer(el); 385 rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo); 386 } 387 } else { 388 LOG.warn("Server " + el + " does not belong to any rsgroup."); 389 } 390 } 391 392 if (rsGroupInfos.size() > 0) { 393 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 394 newGroupMap.putAll(rsGroupInfos); 395 flushConfig(newGroupMap); 396 } 397 } 398 399 @Override 400 public void renameRSGroup(String oldName, String newName) throws IOException { 401 checkGroupName(oldName); 402 checkGroupName(newName); 403 if (oldName.equals(RSGroupInfo.DEFAULT_GROUP)) { 404 throw new ConstraintException("Can't rename default rsgroup"); 405 } 406 407 RSGroupInfo oldGroup = getRSGroup(oldName); 408 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 409 newGroupMap.remove(oldName); 410 RSGroupInfo newGroup = new RSGroupInfo(newName, 411 (SortedSet<Address>) oldGroup.getServers(), oldGroup.getTables()); 412 newGroupMap.put(newName, newGroup); 413 flushConfig(newGroupMap); 414 } 415 416 /** 417 * Will try to get the rsgroup from {@code tableMap} first 418 * then try to get the rsgroup from {@code script} 419 * try to get the rsgroup from the {@link NamespaceDescriptor} lastly. 420 * If still not present, return default group. 421 */ 422 @Override 423 public RSGroupInfo determineRSGroupInfoForTable(TableName tableName) 424 throws IOException { 425 RSGroupInfo groupFromOldRSGroupInfo = getRSGroup(getRSGroupOfTable(tableName)); 426 if (groupFromOldRSGroupInfo != null) { 427 return groupFromOldRSGroupInfo; 428 } 429 // RSGroup information determined by administrator. 430 RSGroupInfo groupDeterminedByAdmin = getRSGroup( 431 script.getRSGroup(tableName.getNamespaceAsString(), tableName.getQualifierAsString())); 432 if (groupDeterminedByAdmin != null) { 433 return groupDeterminedByAdmin; 434 } 435 // Finally, we will try to fall back to namespace as rsgroup if exists 436 ClusterSchema clusterSchema = masterServices.getClusterSchema(); 437 if (clusterSchema == null) { 438 if (TableName.isMetaTableName(tableName)) { 439 LOG.info("Can not get the namespace rs group config for meta table, since the" + 440 " meta table is not online yet, will use default group to assign meta first"); 441 } else { 442 LOG.warn("ClusterSchema is null, can only use default rsgroup, should not happen?"); 443 } 444 } else { 445 NamespaceDescriptor nd = clusterSchema.getNamespace(tableName.getNamespaceAsString()); 446 RSGroupInfo groupNameOfNs = 447 getRSGroup(nd.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP)); 448 if (groupNameOfNs != null) { 449 return groupNameOfNs; 450 } 451 } 452 return getRSGroup(RSGroupInfo.DEFAULT_GROUP); 453 } 454 455 List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException { 456 List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList(); 457 try (Table table = conn.getTable(RSGROUP_TABLE_NAME); 458 ResultScanner scanner = table.getScanner(new Scan())) { 459 for (Result result;;) { 460 result = scanner.next(); 461 if (result == null) { 462 break; 463 } 464 RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo 465 .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); 466 rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto)); 467 } 468 } 469 return rsGroupInfoList; 470 } 471 472 List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException { 473 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 474 List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); 475 // Overwrite any info stored by table, this takes precedence 476 try { 477 if (ZKUtil.checkExists(watcher, groupBasePath) != -1) { 478 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath); 479 if (children == null) { 480 return RSGroupInfoList; 481 } 482 for (String znode : children) { 483 byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode)); 484 if (data.length > 0) { 485 ProtobufUtil.expectPBMagicPrefix(data); 486 ByteArrayInputStream bis = 487 new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); 488 RSGroupInfoList 489 .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); 490 } 491 } 492 LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); 493 } 494 } catch (KeeperException | DeserializationException | InterruptedException e) { 495 throw new IOException("Failed to read rsGroupZNode", e); 496 } 497 return RSGroupInfoList; 498 } 499 500 @Override 501 public void refresh() throws IOException { 502 refresh(false); 503 } 504 505 /** 506 * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on 507 * startup of the manager. 508 */ 509 private synchronized void refresh(boolean forceOnline) throws IOException { 510 List<RSGroupInfo> groupList = new LinkedList<>(); 511 512 // Overwrite anything read from zk, group table is source of truth 513 // if online read from GROUP table 514 if (forceOnline || isOnline()) { 515 LOG.debug("Refreshing in Online mode."); 516 groupList.addAll(retrieveGroupListFromGroupTable()); 517 } else { 518 LOG.debug("Refreshing in Offline mode."); 519 groupList.addAll(retrieveGroupListFromZookeeper()); 520 } 521 522 // refresh default group, prune 523 NavigableSet<TableName> orphanTables = new TreeSet<>(); 524 for (String entry : masterServices.getTableDescriptors().getAll().keySet()) { 525 orphanTables.add(TableName.valueOf(entry)); 526 } 527 for (RSGroupInfo group : groupList) { 528 if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 529 orphanTables.removeAll(group.getTables()); 530 } 531 } 532 533 // This is added to the last of the list so it overwrites the 'default' rsgroup loaded 534 // from region group table or zk 535 groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables)); 536 537 // populate the data 538 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); 539 HashMap<TableName, String> newTableMap = Maps.newHashMap(); 540 for (RSGroupInfo group : groupList) { 541 newGroupMap.put(group.getName(), group); 542 for (TableName table : group.getTables()) { 543 newTableMap.put(table, group.getName()); 544 } 545 } 546 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 547 updateCacheOfRSGroups(rsGroupMap.keySet()); 548 } 549 550 private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap) 551 throws IOException { 552 Map<TableName, String> newTableMap = Maps.newHashMap(); 553 List<Mutation> mutations = Lists.newArrayList(); 554 555 // populate deletes 556 for (String groupName : prevRSGroups) { 557 if (!groupMap.containsKey(groupName)) { 558 Delete d = new Delete(Bytes.toBytes(groupName)); 559 mutations.add(d); 560 } 561 } 562 563 // populate puts 564 for (RSGroupInfo RSGroupInfo : groupMap.values()) { 565 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 566 Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); 567 p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); 568 mutations.add(p); 569 for (TableName entry : RSGroupInfo.getTables()) { 570 newTableMap.put(entry, RSGroupInfo.getName()); 571 } 572 } 573 574 if (mutations.size() > 0) { 575 multiMutate(mutations); 576 } 577 return newTableMap; 578 } 579 580 private synchronized void flushConfig() throws IOException { 581 flushConfig(this.rsGroupMap); 582 } 583 584 private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException { 585 Map<TableName, String> newTableMap; 586 587 // For offline mode persistence is still unavailable 588 // We're refreshing in-memory state but only for servers in default group 589 if (!isOnline()) { 590 if (newGroupMap == this.rsGroupMap) { 591 // When newGroupMap is this.rsGroupMap itself, 592 // do not need to check default group and other groups as followed 593 return; 594 } 595 596 Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap); 597 RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 598 RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 599 if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ || 600 !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables()) 601 /* compare tables in default group */) { 602 throw new IOException("Only servers in default group can be updated during offline mode"); 603 } 604 605 // Restore newGroupMap by putting its default group back 606 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); 607 608 // Refresh rsGroupMap 609 // according to the inputted newGroupMap (an updated copy of rsGroupMap) 610 rsGroupMap = newGroupMap; 611 612 // Do not need to update tableMap 613 // because only the update on servers in default group is allowed above, 614 // or IOException will be thrown 615 return; 616 } 617 618 /* For online mode, persist to Zookeeper */ 619 newTableMap = flushConfigTable(newGroupMap); 620 621 // Make changes visible after having been persisted to the source of truth 622 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 623 624 try { 625 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 626 ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC); 627 628 List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size()); 629 for (String groupName : prevRSGroups) { 630 if (!newGroupMap.containsKey(groupName)) { 631 String znode = ZNodePaths.joinZNode(groupBasePath, groupName); 632 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 633 } 634 } 635 636 for (RSGroupInfo RSGroupInfo : newGroupMap.values()) { 637 String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName()); 638 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 639 LOG.debug("Updating znode: " + znode); 640 ZKUtil.createAndFailSilent(watcher, znode); 641 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 642 zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, 643 ProtobufUtil.prependPBMagic(proto.toByteArray()))); 644 } 645 LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); 646 647 ZKUtil.multiOrSequential(watcher, zkOps, false); 648 } catch (KeeperException e) { 649 LOG.error("Failed to write to rsGroupZNode", e); 650 masterServices.abort("Failed to write to rsGroupZNode", e); 651 throw new IOException("Failed to write to rsGroupZNode", e); 652 } 653 updateCacheOfRSGroups(newGroupMap.keySet()); 654 } 655 656 /** 657 * Make changes visible. Caller must be synchronized on 'this'. 658 */ 659 private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap, 660 Map<TableName, String> newTableMap) { 661 // Make maps Immutable. 662 this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); 663 this.tableMap = Collections.unmodifiableMap(newTableMap); 664 } 665 666 /** 667 * Update cache of rsgroups. Caller must be synchronized on 'this'. 668 * @param currentGroups Current list of Groups. 669 */ 670 private void updateCacheOfRSGroups(final Set<String> currentGroups) { 671 this.prevRSGroups.clear(); 672 this.prevRSGroups.addAll(currentGroups); 673 } 674 675 // Called by getDefaultServers. Presume it has lock in place. 676 private List<ServerName> getOnlineRS() throws IOException { 677 if (masterServices != null) { 678 return masterServices.getServerManager().getOnlineServersList(); 679 } 680 LOG.debug("Reading online RS from zookeeper"); 681 List<ServerName> servers = new LinkedList<>(); 682 try { 683 for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) { 684 servers.add(ServerName.parseServerName(el)); 685 } 686 } catch (KeeperException e) { 687 throw new IOException("Failed to retrieve server list from zookeeper", e); 688 } 689 return servers; 690 } 691 692 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 693 private SortedSet<Address> getDefaultServers() throws IOException { 694 // Build a list of servers in other groups than default group, from rsGroupMap 695 Set<Address> serversInOtherGroup = new HashSet<>(); 696 for (RSGroupInfo group : listRSGroups() /* get from rsGroupMap */) { 697 if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group 698 serversInOtherGroup.addAll(group.getServers()); 699 } 700 } 701 702 // Get all online servers from Zookeeper and find out servers in default group 703 SortedSet<Address> defaultServers = Sets.newTreeSet(); 704 for (ServerName serverName : getOnlineRS()) { 705 Address server = Address.fromParts(serverName.getHostname(), serverName.getPort()); 706 if (!serversInOtherGroup.contains(server)) { // not in other groups 707 defaultServers.add(server); 708 } 709 } 710 return defaultServers; 711 } 712 713 // Called by ServerEventsListenerThread. Synchronize on this because redoing 714 // the rsGroupMap then writing it out. 715 private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException { 716 RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); 717 RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables()); 718 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 719 newGroupMap.put(newInfo.getName(), newInfo); 720 flushConfig(newGroupMap); 721 } 722 723 /** 724 * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known 725 * servers. Notifications about server changes are received by registering {@link ServerListener}. 726 * As a listener, we need to return immediately, so the real work of updating the servers is done 727 * asynchronously in this thread. 728 */ 729 private class ServerEventsListenerThread extends Thread implements ServerListener { 730 private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class); 731 private boolean changed = false; 732 733 ServerEventsListenerThread() { 734 setDaemon(true); 735 } 736 737 @Override 738 public void serverAdded(ServerName serverName) { 739 serverChanged(); 740 } 741 742 @Override 743 public void serverRemoved(ServerName serverName) { 744 serverChanged(); 745 } 746 747 private synchronized void serverChanged() { 748 changed = true; 749 this.notify(); 750 } 751 752 @Override 753 public void run() { 754 setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName()); 755 SortedSet<Address> prevDefaultServers = new TreeSet<>(); 756 while (isMasterRunning(masterServices)) { 757 try { 758 LOG.info("Updating default servers."); 759 SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers(); 760 if (!servers.equals(prevDefaultServers)) { 761 RSGroupInfoManagerImpl.this.updateDefaultServers(servers); 762 prevDefaultServers = servers; 763 LOG.info("Updated with servers: " + servers.size()); 764 } 765 try { 766 synchronized (this) { 767 while (!changed) { 768 wait(); 769 } 770 changed = false; 771 } 772 } catch (InterruptedException e) { 773 LOG.warn("Interrupted", e); 774 } 775 } catch (IOException e) { 776 LOG.warn("Failed to update default servers", e); 777 } 778 } 779 } 780 } 781 782 private class RSGroupStartupWorker extends Thread { 783 private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class); 784 private volatile boolean online = false; 785 786 RSGroupStartupWorker() { 787 super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName()); 788 setDaemon(true); 789 } 790 791 @Override 792 public void run() { 793 if (waitForGroupTableOnline()) { 794 LOG.info("GroupBasedLoadBalancer is now online"); 795 } else { 796 LOG.warn("Quit without making region group table online"); 797 } 798 } 799 800 private boolean waitForGroupTableOnline() { 801 while (isMasterRunning(masterServices)) { 802 try { 803 TableStateManager tsm = masterServices.getTableStateManager(); 804 if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) { 805 createRSGroupTable(); 806 } 807 // try reading from the table 808 try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) { 809 table.get(new Get(ROW_KEY)); 810 } 811 LOG.info( 812 "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information"); 813 RSGroupInfoManagerImpl.this.refresh(true); 814 online = true; 815 // flush any inconsistencies between ZK and HTable 816 RSGroupInfoManagerImpl.this.flushConfig(); 817 return true; 818 } catch (Exception e) { 819 LOG.warn("Failed to perform check", e); 820 // 100ms is short so let's just ignore the interrupt 821 Threads.sleepWithoutInterrupt(100); 822 } 823 } 824 return false; 825 } 826 827 private void createRSGroupTable() throws IOException { 828 OptionalLong optProcId = masterServices.getProcedures().stream() 829 .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p) 830 .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId) 831 .findFirst(); 832 long procId; 833 if (optProcId.isPresent()) { 834 procId = optProcId.getAsLong(); 835 } else { 836 procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); 837 } 838 // wait for region to be online 839 int tries = 600; 840 while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) && 841 masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) { 842 try { 843 Thread.sleep(100); 844 } catch (InterruptedException e) { 845 throw new IOException("Wait interrupted ", e); 846 } 847 tries--; 848 } 849 if (tries <= 0) { 850 throw new IOException("Failed to create group table in a given time."); 851 } else { 852 Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId); 853 if (result != null && result.isFailed()) { 854 throw new IOException( 855 "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result)); 856 } 857 } 858 } 859 860 public boolean isOnline() { 861 return online; 862 } 863 } 864 865 private static boolean isMasterRunning(MasterServices masterServices) { 866 return !masterServices.isAborted() && !masterServices.isStopped(); 867 } 868 869 private void multiMutate(List<Mutation> mutations) throws IOException { 870 try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) { 871 CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY); 872 MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder = 873 MultiRowMutationProtos.MutateRowsRequest.newBuilder(); 874 for (Mutation mutation : mutations) { 875 if (mutation instanceof Put) { 876 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 877 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, 878 mutation)); 879 } else if (mutation instanceof Delete) { 880 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 881 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE, 882 mutation)); 883 } else { 884 throw new DoNotRetryIOException( 885 "multiMutate doesn't support " + mutation.getClass().getName()); 886 } 887 } 888 889 MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service = 890 MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel); 891 try { 892 service.mutateRows(null, mmrBuilder.build()); 893 } catch (ServiceException ex) { 894 ProtobufUtil.toIOException(ex); 895 } 896 } 897 } 898 899 private void checkGroupName(String groupName) throws ConstraintException { 900 if (!groupName.matches("[a-zA-Z0-9_]+")) { 901 throw new ConstraintException("RSGroup name should only contain alphanumeric characters"); 902 } 903 } 904}