001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.rsgroup; 019 020import com.google.protobuf.ServiceException; 021import java.io.ByteArrayInputStream; 022import java.io.IOException; 023import java.util.ArrayList; 024import java.util.Collections; 025import java.util.HashMap; 026import java.util.HashSet; 027import java.util.LinkedList; 028import java.util.List; 029import java.util.Map; 030import java.util.NavigableSet; 031import java.util.OptionalLong; 032import java.util.Set; 033import java.util.SortedSet; 034import java.util.TreeSet; 035import org.apache.hadoop.hbase.Coprocessor; 036import org.apache.hadoop.hbase.DoNotRetryIOException; 037import org.apache.hadoop.hbase.ServerName; 038import org.apache.hadoop.hbase.TableName; 039import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; 040import org.apache.hadoop.hbase.client.Connection; 041import org.apache.hadoop.hbase.client.CoprocessorDescriptorBuilder; 042import org.apache.hadoop.hbase.client.Delete; 043import org.apache.hadoop.hbase.client.Get; 044import org.apache.hadoop.hbase.client.Mutation; 045import org.apache.hadoop.hbase.client.Put; 046import org.apache.hadoop.hbase.client.Result; 047import org.apache.hadoop.hbase.client.ResultScanner; 048import org.apache.hadoop.hbase.client.Scan; 049import org.apache.hadoop.hbase.client.Table; 050import org.apache.hadoop.hbase.client.TableDescriptor; 051import org.apache.hadoop.hbase.client.TableDescriptorBuilder; 052import org.apache.hadoop.hbase.constraint.ConstraintException; 053import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 054import org.apache.hadoop.hbase.exceptions.DeserializationException; 055import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 056import org.apache.hadoop.hbase.master.MasterServices; 057import org.apache.hadoop.hbase.master.ServerListener; 058import org.apache.hadoop.hbase.master.TableStateManager; 059import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure; 060import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 061import org.apache.hadoop.hbase.net.Address; 062import org.apache.hadoop.hbase.procedure2.Procedure; 063import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 064import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 065import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 066import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; 067import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 068import org.apache.hadoop.hbase.util.Bytes; 069import org.apache.hadoop.hbase.util.Threads; 070import org.apache.hadoop.hbase.zookeeper.ZKUtil; 071import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 072import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 073import org.apache.yetus.audience.InterfaceAudience; 074import org.apache.zookeeper.KeeperException; 075import org.slf4j.Logger; 076import org.slf4j.LoggerFactory; 077 078import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 079import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 080import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 081 082/** 083 * This is an implementation of {@link RSGroupInfoManager} which makes use of an HBase table as the 084 * persistence store for the group information. It also makes use of zookeeper to store group 085 * information needed for bootstrapping during offline mode. 086 * <h2>Concurrency</h2> RSGroup state is kept locally in Maps. There is a rsgroup name to cached 087 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the rsgroup they belong 088 * too (in {@link #tableMap}). These Maps are persisted to the hbase:rsgroup table (and cached in 089 * zk) on each modification. 090 * <p> 091 * Mutations on state are synchronized but reads can continue without having to wait on an instance 092 * monitor, mutations do wholesale replace of the Maps on update -- Copy-On-Write; the local Maps of 093 * state are read-only, just-in-case (see flushConfig). 094 * <p> 095 * Reads must not block else there is a danger we'll deadlock. 096 * <p> 097 * Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and then act 098 * on the results of the query modifying cache in zookeeper without another thread making 099 * intermediate modifications. These clients synchronize on the 'this' instance so no other has 100 * access concurrently. Reads must be able to continue concurrently. 101 */ 102@InterfaceAudience.Private 103final class RSGroupInfoManagerImpl implements RSGroupInfoManager { 104 private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); 105 106 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ 107 private static final TableDescriptor RSGROUP_TABLE_DESC; 108 static { 109 TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(RSGROUP_TABLE_NAME) 110 .setColumnFamily(ColumnFamilyDescriptorBuilder.of(META_FAMILY_BYTES)) 111 .setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); 112 try { 113 builder.setCoprocessor( 114 CoprocessorDescriptorBuilder.newBuilder(MultiRowMutationEndpoint.class.getName()) 115 .setPriority(Coprocessor.PRIORITY_SYSTEM).build()); 116 } catch (IOException ex) { 117 throw new Error(ex); 118 } 119 RSGROUP_TABLE_DESC = builder.build(); 120 } 121 122 // There two Maps are immutable and wholesale replaced on each modification 123 // so are safe to access concurrently. See class comment. 124 private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap(); 125 private volatile Map<TableName, String> tableMap = Collections.emptyMap(); 126 127 private final MasterServices masterServices; 128 private final Connection conn; 129 private final ZKWatcher watcher; 130 private final RSGroupStartupWorker rsGroupStartupWorker; 131 // contains list of groups that were last flushed to persistent store 132 private Set<String> prevRSGroups = new HashSet<>(); 133 private final ServerEventsListenerThread serverEventsListenerThread = 134 new ServerEventsListenerThread(); 135 136 private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException { 137 this.masterServices = masterServices; 138 this.watcher = masterServices.getZooKeeper(); 139 this.conn = masterServices.getConnection(); 140 this.rsGroupStartupWorker = new RSGroupStartupWorker(); 141 } 142 143 144 private synchronized void init() throws IOException { 145 refresh(); 146 serverEventsListenerThread.start(); 147 masterServices.getServerManager().registerListener(serverEventsListenerThread); 148 } 149 150 static RSGroupInfoManager getInstance(MasterServices master) throws IOException { 151 RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master); 152 instance.init(); 153 return instance; 154 } 155 156 public void start() { 157 // create system table of rsgroup 158 rsGroupStartupWorker.start(); 159 } 160 161 @Override 162 public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { 163 checkGroupName(rsGroupInfo.getName()); 164 if (rsGroupMap.get(rsGroupInfo.getName()) != null || 165 rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 166 throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName()); 167 } 168 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 169 newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); 170 flushConfig(newGroupMap); 171 } 172 173 private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException { 174 RSGroupInfo rsGroupInfo = getRSGroup(groupName); 175 if (rsGroupInfo == null) { 176 throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist"); 177 } 178 return rsGroupInfo; 179 } 180 181 /** 182 * @param master the master to get online servers for 183 * @return Set of online Servers named for their hostname and port (not ServerName). 184 */ 185 private static Set<Address> getOnlineServers(final MasterServices master) { 186 Set<Address> onlineServers = new HashSet<Address>(); 187 if (master == null) { 188 return onlineServers; 189 } 190 191 for (ServerName server : master.getServerManager().getOnlineServers().keySet()) { 192 onlineServers.add(server.getAddress()); 193 } 194 return onlineServers; 195 } 196 197 @Override 198 public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup, 199 String dstGroup) throws IOException { 200 RSGroupInfo src = getRSGroupInfo(srcGroup); 201 RSGroupInfo dst = getRSGroupInfo(dstGroup); 202 // If destination is 'default' rsgroup, only add servers that are online. If not online, drop 203 // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a 204 // rsgroup of dead servers that are to come back later). 205 Set<Address> onlineServers = 206 dst.getName().equals(RSGroupInfo.DEFAULT_GROUP) ? getOnlineServers(this.masterServices) 207 : null; 208 for (Address el : servers) { 209 src.removeServer(el); 210 if (onlineServers != null) { 211 if (!onlineServers.contains(el)) { 212 if (LOG.isDebugEnabled()) { 213 LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online"); 214 } 215 continue; 216 } 217 } 218 dst.addServer(el); 219 } 220 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 221 newGroupMap.put(src.getName(), src); 222 newGroupMap.put(dst.getName(), dst); 223 flushConfig(newGroupMap); 224 return dst.getServers(); 225 } 226 227 @Override 228 public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException { 229 for (RSGroupInfo info : rsGroupMap.values()) { 230 if (info.containsServer(serverHostPort)) { 231 return info; 232 } 233 } 234 return null; 235 } 236 237 @Override 238 public RSGroupInfo getRSGroup(String groupName) { 239 return rsGroupMap.get(groupName); 240 } 241 242 @Override 243 public String getRSGroupOfTable(TableName tableName) { 244 return tableMap.get(tableName); 245 } 246 247 @Override 248 public synchronized void moveTables(Set<TableName> tableNames, String groupName) 249 throws IOException { 250 if (groupName != null && !rsGroupMap.containsKey(groupName)) { 251 throw new DoNotRetryIOException("Group " + groupName + " does not exist"); 252 } 253 254 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 255 for (TableName tableName : tableNames) { 256 if (tableMap.containsKey(tableName)) { 257 RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName))); 258 src.removeTable(tableName); 259 newGroupMap.put(src.getName(), src); 260 } 261 if (groupName != null) { 262 RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName)); 263 dst.addTable(tableName); 264 newGroupMap.put(dst.getName(), dst); 265 } 266 } 267 flushConfig(newGroupMap); 268 } 269 270 @Override 271 public synchronized void removeRSGroup(String groupName) throws IOException { 272 if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 273 throw new DoNotRetryIOException( 274 "Group " + groupName + " does not exist or is a reserved " + "group"); 275 } 276 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 277 newGroupMap.remove(groupName); 278 flushConfig(newGroupMap); 279 } 280 281 @Override 282 public List<RSGroupInfo> listRSGroups() { 283 return Lists.newLinkedList(rsGroupMap.values()); 284 } 285 286 @Override 287 public boolean isOnline() { 288 return rsGroupStartupWorker.isOnline(); 289 } 290 291 @Override 292 public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, String srcGroup, 293 String dstGroup) throws IOException { 294 // get server's group 295 RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup); 296 RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup); 297 298 // move servers 299 for (Address el : servers) { 300 srcGroupInfo.removeServer(el); 301 dstGroupInfo.addServer(el); 302 } 303 // move tables 304 for (TableName tableName : tables) { 305 srcGroupInfo.removeTable(tableName); 306 dstGroupInfo.addTable(tableName); 307 } 308 309 // flush changed groupinfo 310 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 311 newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo); 312 newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo); 313 flushConfig(newGroupMap); 314 } 315 316 @Override 317 public synchronized void removeServers(Set<Address> servers) throws IOException { 318 Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>(); 319 for (Address el : servers) { 320 RSGroupInfo rsGroupInfo = getRSGroupOfServer(el); 321 if (rsGroupInfo != null) { 322 RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName()); 323 if (newRsGroupInfo == null) { 324 rsGroupInfo.removeServer(el); 325 rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo); 326 } else { 327 newRsGroupInfo.removeServer(el); 328 rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo); 329 } 330 } else { 331 LOG.warn("Server " + el + " does not belong to any rsgroup."); 332 } 333 } 334 335 if (rsGroupInfos.size() > 0) { 336 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 337 newGroupMap.putAll(rsGroupInfos); 338 flushConfig(newGroupMap); 339 } 340 } 341 342 List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException { 343 List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList(); 344 try (Table table = conn.getTable(RSGROUP_TABLE_NAME); 345 ResultScanner scanner = table.getScanner(new Scan())) { 346 for (Result result;;) { 347 result = scanner.next(); 348 if (result == null) { 349 break; 350 } 351 RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo 352 .parseFrom(result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); 353 rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto)); 354 } 355 } 356 return rsGroupInfoList; 357 } 358 359 List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException { 360 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 361 List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); 362 // Overwrite any info stored by table, this takes precedence 363 try { 364 if (ZKUtil.checkExists(watcher, groupBasePath) != -1) { 365 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath); 366 if (children == null) { 367 return RSGroupInfoList; 368 } 369 for (String znode : children) { 370 byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode)); 371 if (data.length > 0) { 372 ProtobufUtil.expectPBMagicPrefix(data); 373 ByteArrayInputStream bis = 374 new ByteArrayInputStream(data, ProtobufUtil.lengthOfPBMagic(), data.length); 375 RSGroupInfoList 376 .add(RSGroupProtobufUtil.toGroupInfo(RSGroupProtos.RSGroupInfo.parseFrom(bis))); 377 } 378 } 379 LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); 380 } 381 } catch (KeeperException | DeserializationException | InterruptedException e) { 382 throw new IOException("Failed to read rsGroupZNode", e); 383 } 384 return RSGroupInfoList; 385 } 386 387 @Override 388 public void refresh() throws IOException { 389 refresh(false); 390 } 391 392 /** 393 * Read rsgroup info from the source of truth, the hbase:rsgroup table. Update zk cache. Called on 394 * startup of the manager. 395 */ 396 private synchronized void refresh(boolean forceOnline) throws IOException { 397 List<RSGroupInfo> groupList = new LinkedList<>(); 398 399 // Overwrite anything read from zk, group table is source of truth 400 // if online read from GROUP table 401 if (forceOnline || isOnline()) { 402 LOG.debug("Refreshing in Online mode."); 403 groupList.addAll(retrieveGroupListFromGroupTable()); 404 } else { 405 LOG.debug("Refreshing in Offline mode."); 406 groupList.addAll(retrieveGroupListFromZookeeper()); 407 } 408 409 // refresh default group, prune 410 NavigableSet<TableName> orphanTables = new TreeSet<>(); 411 for (String entry : masterServices.getTableDescriptors().getAll().keySet()) { 412 orphanTables.add(TableName.valueOf(entry)); 413 } 414 for (RSGroupInfo group : groupList) { 415 if (!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 416 orphanTables.removeAll(group.getTables()); 417 } 418 } 419 420 // This is added to the last of the list so it overwrites the 'default' rsgroup loaded 421 // from region group table or zk 422 groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), orphanTables)); 423 424 // populate the data 425 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); 426 HashMap<TableName, String> newTableMap = Maps.newHashMap(); 427 for (RSGroupInfo group : groupList) { 428 newGroupMap.put(group.getName(), group); 429 for (TableName table : group.getTables()) { 430 newTableMap.put(table, group.getName()); 431 } 432 } 433 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 434 updateCacheOfRSGroups(rsGroupMap.keySet()); 435 } 436 437 private synchronized Map<TableName, String> flushConfigTable(Map<String, RSGroupInfo> groupMap) 438 throws IOException { 439 Map<TableName, String> newTableMap = Maps.newHashMap(); 440 List<Mutation> mutations = Lists.newArrayList(); 441 442 // populate deletes 443 for (String groupName : prevRSGroups) { 444 if (!groupMap.containsKey(groupName)) { 445 Delete d = new Delete(Bytes.toBytes(groupName)); 446 mutations.add(d); 447 } 448 } 449 450 // populate puts 451 for (RSGroupInfo RSGroupInfo : groupMap.values()) { 452 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 453 Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); 454 p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); 455 mutations.add(p); 456 for (TableName entry : RSGroupInfo.getTables()) { 457 newTableMap.put(entry, RSGroupInfo.getName()); 458 } 459 } 460 461 if (mutations.size() > 0) { 462 multiMutate(mutations); 463 } 464 return newTableMap; 465 } 466 467 private synchronized void flushConfig() throws IOException { 468 flushConfig(this.rsGroupMap); 469 } 470 471 private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException { 472 Map<TableName, String> newTableMap; 473 474 // For offline mode persistence is still unavailable 475 // We're refreshing in-memory state but only for servers in default group 476 if (!isOnline()) { 477 if (newGroupMap == this.rsGroupMap) { 478 // When newGroupMap is this.rsGroupMap itself, 479 // do not need to check default group and other groups as followed 480 return; 481 } 482 483 Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap); 484 RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 485 RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 486 if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ || 487 !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables()) 488 /* compare tables in default group */) { 489 throw new IOException("Only servers in default group can be updated during offline mode"); 490 } 491 492 // Restore newGroupMap by putting its default group back 493 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); 494 495 // Refresh rsGroupMap 496 // according to the inputted newGroupMap (an updated copy of rsGroupMap) 497 rsGroupMap = newGroupMap; 498 499 // Do not need to update tableMap 500 // because only the update on servers in default group is allowed above, 501 // or IOException will be thrown 502 return; 503 } 504 505 /* For online mode, persist to Zookeeper */ 506 newTableMap = flushConfigTable(newGroupMap); 507 508 // Make changes visible after having been persisted to the source of truth 509 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 510 511 try { 512 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 513 ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC); 514 515 List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size()); 516 for (String groupName : prevRSGroups) { 517 if (!newGroupMap.containsKey(groupName)) { 518 String znode = ZNodePaths.joinZNode(groupBasePath, groupName); 519 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 520 } 521 } 522 523 for (RSGroupInfo RSGroupInfo : newGroupMap.values()) { 524 String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName()); 525 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 526 LOG.debug("Updating znode: " + znode); 527 ZKUtil.createAndFailSilent(watcher, znode); 528 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 529 zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, 530 ProtobufUtil.prependPBMagic(proto.toByteArray()))); 531 } 532 LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); 533 534 ZKUtil.multiOrSequential(watcher, zkOps, false); 535 } catch (KeeperException e) { 536 LOG.error("Failed to write to rsGroupZNode", e); 537 masterServices.abort("Failed to write to rsGroupZNode", e); 538 throw new IOException("Failed to write to rsGroupZNode", e); 539 } 540 updateCacheOfRSGroups(newGroupMap.keySet()); 541 } 542 543 /** 544 * Make changes visible. Caller must be synchronized on 'this'. 545 */ 546 private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap, 547 Map<TableName, String> newTableMap) { 548 // Make maps Immutable. 549 this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); 550 this.tableMap = Collections.unmodifiableMap(newTableMap); 551 } 552 553 /** 554 * Update cache of rsgroups. Caller must be synchronized on 'this'. 555 * @param currentGroups Current list of Groups. 556 */ 557 private void updateCacheOfRSGroups(final Set<String> currentGroups) { 558 this.prevRSGroups.clear(); 559 this.prevRSGroups.addAll(currentGroups); 560 } 561 562 // Called by getDefaultServers. Presume it has lock in place. 563 private List<ServerName> getOnlineRS() throws IOException { 564 if (masterServices != null) { 565 return masterServices.getServerManager().getOnlineServersList(); 566 } 567 LOG.debug("Reading online RS from zookeeper"); 568 List<ServerName> servers = new LinkedList<>(); 569 try { 570 for (String el : ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) { 571 servers.add(ServerName.parseServerName(el)); 572 } 573 } catch (KeeperException e) { 574 throw new IOException("Failed to retrieve server list from zookeeper", e); 575 } 576 return servers; 577 } 578 579 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 580 private SortedSet<Address> getDefaultServers() throws IOException { 581 // Build a list of servers in other groups than default group, from rsGroupMap 582 Set<Address> serversInOtherGroup = new HashSet<>(); 583 for (RSGroupInfo group : listRSGroups() /* get from rsGroupMap */) { 584 if (!RSGroupInfo.DEFAULT_GROUP.equals(group.getName())) { // not default group 585 serversInOtherGroup.addAll(group.getServers()); 586 } 587 } 588 589 // Get all online servers from Zookeeper and find out servers in default group 590 SortedSet<Address> defaultServers = Sets.newTreeSet(); 591 for (ServerName serverName : getOnlineRS()) { 592 Address server = Address.fromParts(serverName.getHostname(), serverName.getPort()); 593 if (!serversInOtherGroup.contains(server)) { // not in other groups 594 defaultServers.add(server); 595 } 596 } 597 return defaultServers; 598 } 599 600 // Called by ServerEventsListenerThread. Synchronize on this because redoing 601 // the rsGroupMap then writing it out. 602 private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException { 603 RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); 604 RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables()); 605 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 606 newGroupMap.put(newInfo.getName(), newInfo); 607 flushConfig(newGroupMap); 608 } 609 610 /** 611 * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known 612 * servers. Notifications about server changes are received by registering {@link ServerListener}. 613 * As a listener, we need to return immediately, so the real work of updating the servers is done 614 * asynchronously in this thread. 615 */ 616 private class ServerEventsListenerThread extends Thread implements ServerListener { 617 private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class); 618 private boolean changed = false; 619 620 ServerEventsListenerThread() { 621 setDaemon(true); 622 } 623 624 @Override 625 public void serverAdded(ServerName serverName) { 626 serverChanged(); 627 } 628 629 @Override 630 public void serverRemoved(ServerName serverName) { 631 serverChanged(); 632 } 633 634 private synchronized void serverChanged() { 635 changed = true; 636 this.notify(); 637 } 638 639 @Override 640 public void run() { 641 setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName()); 642 SortedSet<Address> prevDefaultServers = new TreeSet<>(); 643 while (isMasterRunning(masterServices)) { 644 try { 645 LOG.info("Updating default servers."); 646 SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers(); 647 if (!servers.equals(prevDefaultServers)) { 648 RSGroupInfoManagerImpl.this.updateDefaultServers(servers); 649 prevDefaultServers = servers; 650 LOG.info("Updated with servers: " + servers.size()); 651 } 652 try { 653 synchronized (this) { 654 while (!changed) { 655 wait(); 656 } 657 changed = false; 658 } 659 } catch (InterruptedException e) { 660 LOG.warn("Interrupted", e); 661 } 662 } catch (IOException e) { 663 LOG.warn("Failed to update default servers", e); 664 } 665 } 666 } 667 } 668 669 private class RSGroupStartupWorker extends Thread { 670 private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class); 671 private volatile boolean online = false; 672 673 RSGroupStartupWorker() { 674 super(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName()); 675 setDaemon(true); 676 } 677 678 @Override 679 public void run() { 680 if (waitForGroupTableOnline()) { 681 LOG.info("GroupBasedLoadBalancer is now online"); 682 } else { 683 LOG.warn("Quit without making region group table online"); 684 } 685 } 686 687 private boolean waitForGroupTableOnline() { 688 while (isMasterRunning(masterServices)) { 689 try { 690 TableStateManager tsm = masterServices.getTableStateManager(); 691 if (!tsm.isTablePresent(RSGROUP_TABLE_NAME)) { 692 createRSGroupTable(); 693 } 694 // try reading from the table 695 try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) { 696 table.get(new Get(ROW_KEY)); 697 } 698 LOG.info( 699 "RSGroup table=" + RSGROUP_TABLE_NAME + " is online, refreshing cached information"); 700 RSGroupInfoManagerImpl.this.refresh(true); 701 online = true; 702 // flush any inconsistencies between ZK and HTable 703 RSGroupInfoManagerImpl.this.flushConfig(); 704 return true; 705 } catch (Exception e) { 706 LOG.warn("Failed to perform check", e); 707 // 100ms is short so let's just ignore the interrupt 708 Threads.sleepWithoutInterrupt(100); 709 } 710 } 711 return false; 712 } 713 714 private void createRSGroupTable() throws IOException { 715 OptionalLong optProcId = masterServices.getProcedures().stream() 716 .filter(p -> p instanceof CreateTableProcedure).map(p -> (CreateTableProcedure) p) 717 .filter(p -> p.getTableName().equals(RSGROUP_TABLE_NAME)).mapToLong(Procedure::getProcId) 718 .findFirst(); 719 long procId; 720 if (optProcId.isPresent()) { 721 procId = optProcId.getAsLong(); 722 } else { 723 procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); 724 } 725 // wait for region to be online 726 int tries = 600; 727 while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) && 728 masterServices.getMasterProcedureExecutor().isRunning() && tries > 0) { 729 try { 730 Thread.sleep(100); 731 } catch (InterruptedException e) { 732 throw new IOException("Wait interrupted ", e); 733 } 734 tries--; 735 } 736 if (tries <= 0) { 737 throw new IOException("Failed to create group table in a given time."); 738 } else { 739 Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId); 740 if (result != null && result.isFailed()) { 741 throw new IOException( 742 "Failed to create group table. " + MasterProcedureUtil.unwrapRemoteIOException(result)); 743 } 744 } 745 } 746 747 public boolean isOnline() { 748 return online; 749 } 750 } 751 752 private static boolean isMasterRunning(MasterServices masterServices) { 753 return !masterServices.isAborted() && !masterServices.isStopped(); 754 } 755 756 private void multiMutate(List<Mutation> mutations) throws IOException { 757 try (Table table = conn.getTable(RSGROUP_TABLE_NAME)) { 758 CoprocessorRpcChannel channel = table.coprocessorService(ROW_KEY); 759 MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder = 760 MultiRowMutationProtos.MutateRowsRequest.newBuilder(); 761 for (Mutation mutation : mutations) { 762 if (mutation instanceof Put) { 763 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 764 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, 765 mutation)); 766 } else if (mutation instanceof Delete) { 767 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 768 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.DELETE, 769 mutation)); 770 } else { 771 throw new DoNotRetryIOException( 772 "multiMutate doesn't support " + mutation.getClass().getName()); 773 } 774 } 775 776 MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service = 777 MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel); 778 try { 779 service.mutateRows(null, mmrBuilder.build()); 780 } catch (ServiceException ex) { 781 ProtobufUtil.toIOException(ex); 782 } 783 } 784 } 785 786 private void checkGroupName(String groupName) throws ConstraintException { 787 if (!groupName.matches("[a-zA-Z0-9_]+")) { 788 throw new ConstraintException("RSGroup name should only contain alphanumeric characters"); 789 } 790 } 791}