001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.rsgroup; 020 021import com.google.protobuf.ServiceException; 022 023import java.io.ByteArrayInputStream; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.LinkedList; 030import java.util.List; 031import java.util.Map; 032import java.util.NavigableSet; 033import java.util.Set; 034import java.util.SortedSet; 035import java.util.TreeSet; 036import java.util.concurrent.atomic.AtomicBoolean; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.Coprocessor; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.HColumnDescriptor; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HTableDescriptor; 046import org.apache.hadoop.hbase.MetaTableAccessor; 047import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ClusterConnection; 051import org.apache.hadoop.hbase.client.Delete; 052import org.apache.hadoop.hbase.client.Get; 053import org.apache.hadoop.hbase.client.Mutation; 054import org.apache.hadoop.hbase.client.Put; 055import org.apache.hadoop.hbase.client.RegionInfo; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableState; 060import org.apache.hadoop.hbase.constraint.ConstraintException; 061import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 062import org.apache.hadoop.hbase.exceptions.DeserializationException; 063import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 064import org.apache.hadoop.hbase.master.MasterServices; 065import org.apache.hadoop.hbase.master.ServerListener; 066import org.apache.hadoop.hbase.master.TableStateManager; 067import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; 068import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 069import org.apache.hadoop.hbase.net.Address; 070import org.apache.hadoop.hbase.procedure2.Procedure; 071import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 072import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 073import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 074import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; 075import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.zookeeper.ZKUtil; 078import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 079import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 080import org.apache.yetus.audience.InterfaceAudience; 081import org.apache.zookeeper.KeeperException; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 085import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 086import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 087import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 089 090/** 091 * This is an implementation of {@link RSGroupInfoManager} which makes 092 * use of an HBase table as the persistence store for the group information. 093 * It also makes use of zookeeper to store group information needed 094 * for bootstrapping during offline mode. 095 * 096 * <h2>Concurrency</h2> 097 * RSGroup state is kept locally in Maps. There is a rsgroup name to cached 098 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the 099 * rsgroup they belong too (in {@link #tableMap}). These Maps are persisted to the 100 * hbase:rsgroup table (and cached in zk) on each modification. 101 * 102 * <p>Mutations on state are synchronized but reads can continue without having 103 * to wait on an instance monitor, mutations do wholesale replace of the Maps on 104 * update -- Copy-On-Write; the local Maps of state are read-only, just-in-case 105 * (see flushConfig). 106 * 107 * <p>Reads must not block else there is a danger we'll deadlock. 108 * 109 * <p>Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and 110 * then act on the results of the query modifying cache in zookeeper without another thread 111 * making intermediate modifications. These clients synchronize on the 'this' instance so 112 * no other has access concurrently. Reads must be able to continue concurrently. 113 */ 114@InterfaceAudience.Private 115final class RSGroupInfoManagerImpl implements RSGroupInfoManager { 116 private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); 117 118 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ 119 private final static HTableDescriptor RSGROUP_TABLE_DESC; 120 static { 121 RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME); 122 RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES)); 123 RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); 124 try { 125 RSGROUP_TABLE_DESC.addCoprocessor( 126 MultiRowMutationEndpoint.class.getName(), 127 null, Coprocessor.PRIORITY_SYSTEM, null); 128 } catch (IOException ex) { 129 throw new RuntimeException(ex); 130 } 131 } 132 133 // There two Maps are immutable and wholesale replaced on each modification 134 // so are safe to access concurrently. See class comment. 135 private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap(); 136 private volatile Map<TableName, String> tableMap = Collections.emptyMap(); 137 138 private final MasterServices masterServices; 139 private Table rsGroupTable; 140 private final ClusterConnection conn; 141 private final ZKWatcher watcher; 142 private final RSGroupStartupWorker rsGroupStartupWorker = new RSGroupStartupWorker(); 143 // contains list of groups that were last flushed to persistent store 144 private Set<String> prevRSGroups = new HashSet<>(); 145 private final ServerEventsListenerThread serverEventsListenerThread = 146 new ServerEventsListenerThread(); 147 private FailedOpenUpdaterThread failedOpenUpdaterThread; 148 149 private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException { 150 this.masterServices = masterServices; 151 this.watcher = masterServices.getZooKeeper(); 152 this.conn = masterServices.getClusterConnection(); 153 } 154 155 private synchronized void init() throws IOException{ 156 refresh(); 157 serverEventsListenerThread.start(); 158 masterServices.getServerManager().registerListener(serverEventsListenerThread); 159 failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration()); 160 failedOpenUpdaterThread.start(); 161 masterServices.getServerManager().registerListener(failedOpenUpdaterThread); 162 } 163 164 static RSGroupInfoManager getInstance(MasterServices master) throws IOException { 165 RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master); 166 instance.init(); 167 return instance; 168 } 169 170 public void start(){ 171 // create system table of rsgroup 172 rsGroupStartupWorker.start(); 173 } 174 175 @Override 176 public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { 177 checkGroupName(rsGroupInfo.getName()); 178 if (rsGroupMap.get(rsGroupInfo.getName()) != null || 179 rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 180 throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName()); 181 } 182 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 183 newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); 184 flushConfig(newGroupMap); 185 } 186 187 private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException { 188 RSGroupInfo rsGroupInfo = getRSGroup(groupName); 189 if (rsGroupInfo == null) { 190 throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist"); 191 } 192 return rsGroupInfo; 193 } 194 195 @Override 196 public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup, 197 String dstGroup) throws IOException { 198 RSGroupInfo src = getRSGroupInfo(srcGroup); 199 RSGroupInfo dst = getRSGroupInfo(dstGroup); 200 // If destination is 'default' rsgroup, only add servers that are online. If not online, drop 201 // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a 202 // rsgroup of dead servers that are to come back later). 203 Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)? 204 Utility.getOnlineServers(this.masterServices): null; 205 for (Address el: servers) { 206 src.removeServer(el); 207 if (onlineServers != null) { 208 if (!onlineServers.contains(el)) { 209 if (LOG.isDebugEnabled()) { 210 LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online"); 211 } 212 continue; 213 } 214 } 215 dst.addServer(el); 216 } 217 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 218 newGroupMap.put(src.getName(), src); 219 newGroupMap.put(dst.getName(), dst); 220 flushConfig(newGroupMap); 221 return dst.getServers(); 222 } 223 224 @Override 225 public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException { 226 for (RSGroupInfo info: rsGroupMap.values()) { 227 if (info.containsServer(serverHostPort)) { 228 return info; 229 } 230 } 231 return null; 232 } 233 234 @Override 235 public RSGroupInfo getRSGroup(String groupName) { 236 return rsGroupMap.get(groupName); 237 } 238 239 @Override 240 public String getRSGroupOfTable(TableName tableName) { 241 return tableMap.get(tableName); 242 } 243 244 @Override 245 public synchronized void moveTables(Set<TableName> tableNames, String groupName) 246 throws IOException { 247 if (groupName != null && !rsGroupMap.containsKey(groupName)) { 248 throw new DoNotRetryIOException("Group "+groupName+" does not exist"); 249 } 250 251 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 252 for(TableName tableName: tableNames) { 253 if (tableMap.containsKey(tableName)) { 254 RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName))); 255 src.removeTable(tableName); 256 newGroupMap.put(src.getName(), src); 257 } 258 if(groupName != null) { 259 RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName)); 260 dst.addTable(tableName); 261 newGroupMap.put(dst.getName(), dst); 262 } 263 } 264 flushConfig(newGroupMap); 265 } 266 267 @Override 268 public synchronized void removeRSGroup(String groupName) throws IOException { 269 if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 270 throw new DoNotRetryIOException("Group " + groupName + " does not exist or is a reserved " 271 + "group"); 272 } 273 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 274 newGroupMap.remove(groupName); 275 flushConfig(newGroupMap); 276 } 277 278 @Override 279 public List<RSGroupInfo> listRSGroups() { 280 return Lists.newLinkedList(rsGroupMap.values()); 281 } 282 283 @Override 284 public boolean isOnline() { 285 return rsGroupStartupWorker.isOnline(); 286 } 287 288 @Override 289 public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, 290 String srcGroup, String dstGroup) throws IOException { 291 //get server's group 292 RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup); 293 RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup); 294 295 //move servers 296 for (Address el: servers) { 297 srcGroupInfo.removeServer(el); 298 dstGroupInfo.addServer(el); 299 } 300 //move tables 301 for(TableName tableName: tables) { 302 srcGroupInfo.removeTable(tableName); 303 dstGroupInfo.addTable(tableName); 304 } 305 306 //flush changed groupinfo 307 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 308 newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo); 309 newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo); 310 flushConfig(newGroupMap); 311 } 312 313 @Override 314 public synchronized void removeServers(Set<Address> servers) throws IOException { 315 Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>(); 316 for (Address el: servers) { 317 RSGroupInfo rsGroupInfo = getRSGroupOfServer(el); 318 if (rsGroupInfo != null) { 319 RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName()); 320 if (newRsGroupInfo == null) { 321 rsGroupInfo.removeServer(el); 322 rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo); 323 } else { 324 newRsGroupInfo.removeServer(el); 325 rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo); 326 } 327 }else { 328 LOG.warn("Server " + el + " does not belong to any rsgroup."); 329 } 330 } 331 332 if (rsGroupInfos.size() > 0) { 333 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 334 newGroupMap.putAll(rsGroupInfos); 335 flushConfig(newGroupMap); 336 } 337 } 338 339 List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException { 340 List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList(); 341 for (Result result : rsGroupTable.getScanner(new Scan())) { 342 RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom( 343 result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); 344 rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto)); 345 } 346 return rsGroupInfoList; 347 } 348 349 List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException { 350 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 351 List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); 352 //Overwrite any info stored by table, this takes precedence 353 try { 354 if(ZKUtil.checkExists(watcher, groupBasePath) != -1) { 355 List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath); 356 if (children == null) { 357 return RSGroupInfoList; 358 } 359 for(String znode: children) { 360 byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode)); 361 if(data.length > 0) { 362 ProtobufUtil.expectPBMagicPrefix(data); 363 ByteArrayInputStream bis = new ByteArrayInputStream( 364 data, ProtobufUtil.lengthOfPBMagic(), data.length); 365 RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo( 366 RSGroupProtos.RSGroupInfo.parseFrom(bis))); 367 } 368 } 369 LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); 370 } 371 } catch (KeeperException|DeserializationException|InterruptedException e) { 372 throw new IOException("Failed to read rsGroupZNode",e); 373 } 374 return RSGroupInfoList; 375 } 376 377 @Override 378 public void refresh() throws IOException { 379 refresh(false); 380 } 381 382 /** 383 * Read rsgroup info from the source of truth, the hbase:rsgroup table. 384 * Update zk cache. Called on startup of the manager. 385 */ 386 private synchronized void refresh(boolean forceOnline) throws IOException { 387 List<RSGroupInfo> groupList = new LinkedList<>(); 388 389 // Overwrite anything read from zk, group table is source of truth 390 // if online read from GROUP table 391 if (forceOnline || isOnline()) { 392 LOG.debug("Refreshing in Online mode."); 393 if (rsGroupTable == null) { 394 rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME); 395 } 396 groupList.addAll(retrieveGroupListFromGroupTable()); 397 } else { 398 LOG.debug("Refreshing in Offline mode."); 399 groupList.addAll(retrieveGroupListFromZookeeper()); 400 } 401 402 // refresh default group, prune 403 NavigableSet<TableName> orphanTables = new TreeSet<>(); 404 for(String entry: masterServices.getTableDescriptors().getAll().keySet()) { 405 orphanTables.add(TableName.valueOf(entry)); 406 } 407 for (RSGroupInfo group: groupList) { 408 if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 409 orphanTables.removeAll(group.getTables()); 410 } 411 } 412 413 // This is added to the last of the list so it overwrites the 'default' rsgroup loaded 414 // from region group table or zk 415 groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), 416 orphanTables)); 417 418 // populate the data 419 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); 420 HashMap<TableName, String> newTableMap = Maps.newHashMap(); 421 for (RSGroupInfo group : groupList) { 422 newGroupMap.put(group.getName(), group); 423 for(TableName table: group.getTables()) { 424 newTableMap.put(table, group.getName()); 425 } 426 } 427 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 428 updateCacheOfRSGroups(rsGroupMap.keySet()); 429 } 430 431 private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> groupMap) 432 throws IOException { 433 Map<TableName,String> newTableMap = Maps.newHashMap(); 434 List<Mutation> mutations = Lists.newArrayList(); 435 436 // populate deletes 437 for(String groupName : prevRSGroups) { 438 if(!groupMap.containsKey(groupName)) { 439 Delete d = new Delete(Bytes.toBytes(groupName)); 440 mutations.add(d); 441 } 442 } 443 444 // populate puts 445 for(RSGroupInfo RSGroupInfo : groupMap.values()) { 446 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 447 Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); 448 p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); 449 mutations.add(p); 450 for(TableName entry: RSGroupInfo.getTables()) { 451 newTableMap.put(entry, RSGroupInfo.getName()); 452 } 453 } 454 455 if(mutations.size() > 0) { 456 multiMutate(mutations); 457 } 458 return newTableMap; 459 } 460 461 private synchronized void flushConfig() 462 throws IOException { 463 flushConfig(this.rsGroupMap); 464 } 465 466 private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) 467 throws IOException { 468 Map<TableName, String> newTableMap; 469 470 // For offline mode persistence is still unavailable 471 // We're refreshing in-memory state but only for default servers 472 if (!isOnline()) { 473 Map<String, RSGroupInfo> m = Maps.newHashMap(rsGroupMap); 474 RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP); 475 RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 476 if (!m.equals(newGroupMap) || 477 !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) { 478 throw new IOException("Only default servers can be updated during offline mode"); 479 } 480 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); 481 rsGroupMap = newGroupMap; 482 return; 483 } 484 485 newTableMap = flushConfigTable(newGroupMap); 486 487 // Make changes visible after having been persisted to the source of truth 488 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 489 490 try { 491 String groupBasePath = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, rsGroupZNode); 492 ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC); 493 494 List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size()); 495 for(String groupName : prevRSGroups) { 496 if(!newGroupMap.containsKey(groupName)) { 497 String znode = ZNodePaths.joinZNode(groupBasePath, groupName); 498 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 499 } 500 } 501 502 503 for (RSGroupInfo RSGroupInfo : newGroupMap.values()) { 504 String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName()); 505 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 506 LOG.debug("Updating znode: "+znode); 507 ZKUtil.createAndFailSilent(watcher, znode); 508 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 509 zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, 510 ProtobufUtil.prependPBMagic(proto.toByteArray()))); 511 } 512 LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); 513 514 ZKUtil.multiOrSequential(watcher, zkOps, false); 515 } catch (KeeperException e) { 516 LOG.error("Failed to write to rsGroupZNode", e); 517 masterServices.abort("Failed to write to rsGroupZNode", e); 518 throw new IOException("Failed to write to rsGroupZNode",e); 519 } 520 updateCacheOfRSGroups(newGroupMap.keySet()); 521 } 522 523 /** 524 * Make changes visible. 525 * Caller must be synchronized on 'this'. 526 */ 527 private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap, 528 Map<TableName, String> newTableMap) { 529 // Make maps Immutable. 530 this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); 531 this.tableMap = Collections.unmodifiableMap(newTableMap); 532 } 533 534 /** 535 * Update cache of rsgroups. 536 * Caller must be synchronized on 'this'. 537 * @param currentGroups Current list of Groups. 538 */ 539 private void updateCacheOfRSGroups(final Set<String> currentGroups) { 540 this.prevRSGroups.clear(); 541 this.prevRSGroups.addAll(currentGroups); 542 } 543 544 // Called by getDefaultServers. Presume it has lock in place. 545 private List<ServerName> getOnlineRS() throws IOException { 546 if (masterServices != null) { 547 return masterServices.getServerManager().getOnlineServersList(); 548 } 549 LOG.debug("Reading online RS from zookeeper"); 550 List<ServerName> servers = new LinkedList<>(); 551 try { 552 for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode)) { 553 servers.add(ServerName.parseServerName(el)); 554 } 555 } catch (KeeperException e) { 556 throw new IOException("Failed to retrieve server list from zookeeper", e); 557 } 558 return servers; 559 } 560 561 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 562 private SortedSet<Address> getDefaultServers() throws IOException { 563 SortedSet<Address> defaultServers = Sets.newTreeSet(); 564 for (ServerName serverName : getOnlineRS()) { 565 Address server = 566 Address.fromParts(serverName.getHostname(), serverName.getPort()); 567 boolean found = false; 568 for(RSGroupInfo rsgi: listRSGroups()) { 569 if(!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) && 570 rsgi.containsServer(server)) { 571 found = true; 572 break; 573 } 574 } 575 if (!found) { 576 defaultServers.add(server); 577 } 578 } 579 return defaultServers; 580 } 581 582 // Called by ServerEventsListenerThread. Synchronize on this because redoing 583 // the rsGroupMap then writing it out. 584 private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException { 585 RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); 586 RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables()); 587 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 588 newGroupMap.put(newInfo.getName(), newInfo); 589 flushConfig(newGroupMap); 590 } 591 592 // Called by FailedOpenUpdaterThread 593 private void updateFailedAssignments() { 594 // Kick all regions in FAILED_OPEN state 595 List<RegionInfo> stuckAssignments = Lists.newArrayList(); 596 for (RegionStateNode state: 597 masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) { 598 if (state.isStuck()) { 599 stuckAssignments.add(state.getRegionInfo()); 600 } 601 } 602 for (RegionInfo region: stuckAssignments) { 603 LOG.info("Retrying assignment of " + region); 604 try { 605 masterServices.getAssignmentManager().unassign(region); 606 } catch (IOException e) { 607 LOG.warn("Unable to reassign " + region, e); 608 } 609 } 610 } 611 612 /** 613 * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known 614 * servers. Notifications about server changes are received by registering {@link ServerListener}. 615 * As a listener, we need to return immediately, so the real work of updating the servers is 616 * done asynchronously in this thread. 617 */ 618 private class ServerEventsListenerThread extends Thread implements ServerListener { 619 private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class); 620 private boolean changed = false; 621 622 ServerEventsListenerThread() { 623 setDaemon(true); 624 } 625 626 @Override 627 public void serverAdded(ServerName serverName) { 628 serverChanged(); 629 } 630 631 @Override 632 public void serverRemoved(ServerName serverName) { 633 serverChanged(); 634 } 635 636 private synchronized void serverChanged() { 637 changed = true; 638 this.notify(); 639 } 640 641 @Override 642 public void run() { 643 setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName()); 644 SortedSet<Address> prevDefaultServers = new TreeSet<>(); 645 while(isMasterRunning(masterServices)) { 646 try { 647 LOG.info("Updating default servers."); 648 SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers(); 649 if (!servers.equals(prevDefaultServers)) { 650 RSGroupInfoManagerImpl.this.updateDefaultServers(servers); 651 prevDefaultServers = servers; 652 LOG.info("Updated with servers: "+servers.size()); 653 } 654 try { 655 synchronized (this) { 656 while (!changed) { 657 wait(); 658 } 659 changed = false; 660 } 661 } catch (InterruptedException e) { 662 LOG.warn("Interrupted", e); 663 } 664 } catch (IOException e) { 665 LOG.warn("Failed to update default servers", e); 666 } 667 } 668 } 669 } 670 671 private class FailedOpenUpdaterThread extends Thread implements ServerListener { 672 private final long waitInterval; 673 private volatile boolean hasChanged = false; 674 675 public FailedOpenUpdaterThread(Configuration conf) { 676 this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY, 677 DEFAULT_REASSIGN_WAIT_INTERVAL); 678 setDaemon(true); 679 } 680 681 @Override 682 public void serverAdded(ServerName serverName) { 683 serverChanged(); 684 } 685 686 @Override 687 public void serverRemoved(ServerName serverName) { 688 } 689 690 @Override 691 public void run() { 692 while (isMasterRunning(masterServices)) { 693 boolean interrupted = false; 694 try { 695 synchronized (this) { 696 while (!hasChanged) { 697 wait(); 698 } 699 hasChanged = false; 700 } 701 } catch (InterruptedException e) { 702 LOG.warn("Interrupted", e); 703 interrupted = true; 704 } 705 if (!isMasterRunning(masterServices) || interrupted) { 706 continue; 707 } 708 709 // First, wait a while in case more servers are about to rejoin the cluster 710 try { 711 Thread.sleep(waitInterval); 712 } catch (InterruptedException e) { 713 LOG.warn("Interrupted", e); 714 } 715 if (!isMasterRunning(masterServices)) { 716 continue; 717 } 718 719 // Kick all regions in FAILED_OPEN state 720 updateFailedAssignments(); 721 } 722 } 723 724 public void serverChanged() { 725 synchronized (this) { 726 hasChanged = true; 727 this.notify(); 728 } 729 } 730 } 731 732 private class RSGroupStartupWorker extends Thread { 733 private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class); 734 private volatile boolean online = false; 735 736 RSGroupStartupWorker() { 737 setDaemon(true); 738 } 739 740 @Override 741 public void run() { 742 setName(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName()); 743 if (waitForGroupTableOnline()) { 744 LOG.info("GroupBasedLoadBalancer is now online"); 745 } 746 } 747 748 private boolean waitForGroupTableOnline() { 749 final List<RegionInfo> foundRegions = new LinkedList<>(); 750 final List<RegionInfo> assignedRegions = new LinkedList<>(); 751 final AtomicBoolean found = new AtomicBoolean(false); 752 final TableStateManager tsm = masterServices.getTableStateManager(); 753 boolean createSent = false; 754 while (!found.get() && isMasterRunning(masterServices)) { 755 foundRegions.clear(); 756 assignedRegions.clear(); 757 found.set(true); 758 try { 759 conn.getTable(TableName.NAMESPACE_TABLE_NAME); 760 conn.getTable(RSGROUP_TABLE_NAME); 761 boolean rootMetaFound = 762 masterServices.getMetaTableLocator().verifyMetaRegionLocation( 763 conn, masterServices.getZooKeeper(), 1); 764 final AtomicBoolean nsFound = new AtomicBoolean(false); 765 if (rootMetaFound) { 766 MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() { 767 @Override 768 public boolean visitInternal(Result row) throws IOException { 769 RegionInfo info = MetaTableAccessor.getRegionInfo(row); 770 if (info != null) { 771 Cell serverCell = 772 row.getColumnLatestCell(HConstants.CATALOG_FAMILY, 773 HConstants.SERVER_QUALIFIER); 774 if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) { 775 ServerName sn = 776 ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell)); 777 if (sn == null) { 778 found.set(false); 779 } else if (tsm.isTableState(RSGROUP_TABLE_NAME, TableState.State.ENABLED)) { 780 try { 781 ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn); 782 ClientProtos.GetRequest request = 783 RequestConverter.buildGetRequest(info.getRegionName(), 784 new Get(ROW_KEY)); 785 rs.get(null, request); 786 assignedRegions.add(info); 787 } catch(Exception ex) { 788 LOG.debug("Caught exception while verifying group region", ex); 789 } 790 } 791 foundRegions.add(info); 792 } 793 if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) { 794 Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY, 795 HConstants.SERVER_QUALIFIER); 796 ServerName sn = null; 797 if(cell != null) { 798 sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell)); 799 } 800 if (sn == null) { 801 nsFound.set(false); 802 } else if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME, 803 TableState.State.ENABLED)) { 804 try { 805 ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn); 806 ClientProtos.GetRequest request = 807 RequestConverter.buildGetRequest(info.getRegionName(), 808 new Get(ROW_KEY)); 809 rs.get(null, request); 810 nsFound.set(true); 811 } catch(Exception ex) { 812 LOG.debug("Caught exception while verifying group region", ex); 813 } 814 } 815 } 816 } 817 return true; 818 } 819 }; 820 MetaTableAccessor.fullScanRegions(conn, visitor); 821 // if no regions in meta then we have to create the table 822 if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) { 823 createRSGroupTable(); 824 createSent = true; 825 } 826 LOG.info("RSGroup table=" + RSGROUP_TABLE_NAME + " isOnline=" + found.get() 827 + ", regionCount=" + foundRegions.size() + ", assignCount=" 828 + assignedRegions.size() + ", rootMetaFound=" + rootMetaFound); 829 found.set(found.get() && assignedRegions.size() == foundRegions.size() 830 && foundRegions.size() > 0); 831 } else { 832 LOG.info("Waiting for catalog tables to come online"); 833 found.set(false); 834 } 835 if (found.get()) { 836 LOG.debug("With group table online, refreshing cached information."); 837 RSGroupInfoManagerImpl.this.refresh(true); 838 online = true; 839 //flush any inconsistencies between ZK and HTable 840 RSGroupInfoManagerImpl.this.flushConfig(); 841 } 842 } catch (RuntimeException e) { 843 throw e; 844 } catch(Exception e) { 845 found.set(false); 846 LOG.warn("Failed to perform check", e); 847 } 848 try { 849 Thread.sleep(100); 850 } catch (InterruptedException e) { 851 LOG.info("Sleep interrupted", e); 852 } 853 } 854 return found.get(); 855 } 856 857 private void createRSGroupTable() throws IOException { 858 Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); 859 // wait for region to be online 860 int tries = 600; 861 while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) 862 && masterServices.getMasterProcedureExecutor().isRunning() 863 && tries > 0) { 864 try { 865 Thread.sleep(100); 866 } catch (InterruptedException e) { 867 throw new IOException("Wait interrupted ", e); 868 } 869 tries--; 870 } 871 if(tries <= 0) { 872 throw new IOException("Failed to create group table in a given time."); 873 } else { 874 Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId); 875 if (result != null && result.isFailed()) { 876 throw new IOException("Failed to create group table. " + 877 MasterProcedureUtil.unwrapRemoteIOException(result)); 878 } 879 } 880 } 881 882 public boolean isOnline() { 883 return online; 884 } 885 } 886 887 private static boolean isMasterRunning(MasterServices masterServices) { 888 return !masterServices.isAborted() && !masterServices.isStopped(); 889 } 890 891 private void multiMutate(List<Mutation> mutations) throws IOException { 892 CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY); 893 MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder 894 = MultiRowMutationProtos.MutateRowsRequest.newBuilder(); 895 for (Mutation mutation : mutations) { 896 if (mutation instanceof Put) { 897 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 898 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, 899 mutation)); 900 } else if (mutation instanceof Delete) { 901 mmrBuilder.addMutationRequest( 902 org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 903 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto. 904 MutationType.DELETE, mutation)); 905 } else { 906 throw new DoNotRetryIOException("multiMutate doesn't support " 907 + mutation.getClass().getName()); 908 } 909 } 910 911 MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service = 912 MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel); 913 try { 914 service.mutateRows(null, mmrBuilder.build()); 915 } catch (ServiceException ex) { 916 ProtobufUtil.toIOException(ex); 917 } 918 } 919 920 private void checkGroupName(String groupName) throws ConstraintException { 921 if (!groupName.matches("[a-zA-Z0-9_]+")) { 922 throw new ConstraintException("RSGroup name should only contain alphanumeric characters"); 923 } 924 } 925}