001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.rsgroup; 020 021import com.google.protobuf.ServiceException; 022 023import java.io.ByteArrayInputStream; 024import java.io.IOException; 025import java.util.ArrayList; 026import java.util.Collections; 027import java.util.HashMap; 028import java.util.HashSet; 029import java.util.LinkedList; 030import java.util.List; 031import java.util.Map; 032import java.util.NavigableSet; 033import java.util.Set; 034import java.util.SortedSet; 035import java.util.TreeSet; 036import java.util.concurrent.atomic.AtomicBoolean; 037 038import org.apache.hadoop.conf.Configuration; 039import org.apache.hadoop.hbase.Cell; 040import org.apache.hadoop.hbase.CellUtil; 041import org.apache.hadoop.hbase.Coprocessor; 042import org.apache.hadoop.hbase.DoNotRetryIOException; 043import org.apache.hadoop.hbase.HColumnDescriptor; 044import org.apache.hadoop.hbase.HConstants; 045import org.apache.hadoop.hbase.HTableDescriptor; 046import org.apache.hadoop.hbase.MetaTableAccessor; 047import org.apache.hadoop.hbase.MetaTableAccessor.DefaultVisitorBase; 048import org.apache.hadoop.hbase.ServerName; 049import org.apache.hadoop.hbase.TableName; 050import org.apache.hadoop.hbase.client.ClusterConnection; 051import org.apache.hadoop.hbase.client.Delete; 052import org.apache.hadoop.hbase.client.Get; 053import org.apache.hadoop.hbase.client.Mutation; 054import org.apache.hadoop.hbase.client.Put; 055import org.apache.hadoop.hbase.client.RegionInfo; 056import org.apache.hadoop.hbase.client.Result; 057import org.apache.hadoop.hbase.client.Scan; 058import org.apache.hadoop.hbase.client.Table; 059import org.apache.hadoop.hbase.client.TableState; 060import org.apache.hadoop.hbase.constraint.ConstraintException; 061import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; 062import org.apache.hadoop.hbase.exceptions.DeserializationException; 063import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; 064import org.apache.hadoop.hbase.master.MasterServices; 065import org.apache.hadoop.hbase.master.ServerListener; 066import org.apache.hadoop.hbase.master.TableStateManager; 067import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode; 068import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; 069import org.apache.hadoop.hbase.net.Address; 070import org.apache.hadoop.hbase.procedure2.Procedure; 071import org.apache.hadoop.hbase.protobuf.ProtobufMagic; 072import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 073import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos; 074import org.apache.hadoop.hbase.protobuf.generated.RSGroupProtos; 075import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy; 076import org.apache.hadoop.hbase.util.Bytes; 077import org.apache.hadoop.hbase.zookeeper.ZKUtil; 078import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 079import org.apache.hadoop.hbase.zookeeper.ZNodePaths; 080import org.apache.yetus.audience.InterfaceAudience; 081import org.apache.zookeeper.KeeperException; 082import org.slf4j.Logger; 083import org.slf4j.LoggerFactory; 084import org.apache.hbase.thirdparty.com.google.common.collect.Lists; 085import org.apache.hbase.thirdparty.com.google.common.collect.Maps; 086import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 087import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 088import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; 089 090/** 091 * This is an implementation of {@link RSGroupInfoManager} which makes 092 * use of an HBase table as the persistence store for the group information. 093 * It also makes use of zookeeper to store group information needed 094 * for bootstrapping during offline mode. 095 * 096 * <h2>Concurrency</h2> 097 * RSGroup state is kept locally in Maps. There is a rsgroup name to cached 098 * RSGroupInfo Map at {@link #rsGroupMap} and a Map of tables to the name of the 099 * rsgroup they belong too (in {@link #tableMap}). These Maps are persisted to the 100 * hbase:rsgroup table (and cached in zk) on each modification. 101 * 102 * <p>Mutations on state are synchronized but reads can continue without having 103 * to wait on an instance monitor, mutations do wholesale replace of the Maps on 104 * update -- Copy-On-Write; the local Maps of state are read-only, just-in-case 105 * (see flushConfig). 106 * 107 * <p>Reads must not block else there is a danger we'll deadlock. 108 * 109 * <p>Clients of this class, the {@link RSGroupAdminEndpoint} for example, want to query and 110 * then act on the results of the query modifying cache in zookeeper without another thread 111 * making intermediate modifications. These clients synchronize on the 'this' instance so 112 * no other has access concurrently. Reads must be able to continue concurrently. 113 */ 114@InterfaceAudience.Private 115final class RSGroupInfoManagerImpl implements RSGroupInfoManager { 116 private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class); 117 118 /** Table descriptor for <code>hbase:rsgroup</code> catalog table */ 119 private final static HTableDescriptor RSGROUP_TABLE_DESC; 120 static { 121 RSGROUP_TABLE_DESC = new HTableDescriptor(RSGROUP_TABLE_NAME); 122 RSGROUP_TABLE_DESC.addFamily(new HColumnDescriptor(META_FAMILY_BYTES)); 123 RSGROUP_TABLE_DESC.setRegionSplitPolicyClassName(DisabledRegionSplitPolicy.class.getName()); 124 try { 125 RSGROUP_TABLE_DESC.addCoprocessor( 126 MultiRowMutationEndpoint.class.getName(), 127 null, Coprocessor.PRIORITY_SYSTEM, null); 128 } catch (IOException ex) { 129 throw new RuntimeException(ex); 130 } 131 } 132 133 // There two Maps are immutable and wholesale replaced on each modification 134 // so are safe to access concurrently. See class comment. 135 private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap(); 136 private volatile Map<TableName, String> tableMap = Collections.emptyMap(); 137 138 private final MasterServices masterServices; 139 private Table rsGroupTable; 140 private final ClusterConnection conn; 141 private final ZKWatcher watcher; 142 private final RSGroupStartupWorker rsGroupStartupWorker = new RSGroupStartupWorker(); 143 // contains list of groups that were last flushed to persistent store 144 private Set<String> prevRSGroups = new HashSet<>(); 145 private final ServerEventsListenerThread serverEventsListenerThread = 146 new ServerEventsListenerThread(); 147 private FailedOpenUpdaterThread failedOpenUpdaterThread; 148 149 private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException { 150 this.masterServices = masterServices; 151 this.watcher = masterServices.getZooKeeper(); 152 this.conn = masterServices.getClusterConnection(); 153 } 154 155 private synchronized void init() throws IOException{ 156 refresh(); 157 serverEventsListenerThread.start(); 158 masterServices.getServerManager().registerListener(serverEventsListenerThread); 159 failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration()); 160 failedOpenUpdaterThread.start(); 161 masterServices.getServerManager().registerListener(failedOpenUpdaterThread); 162 } 163 164 static RSGroupInfoManager getInstance(MasterServices master) throws IOException { 165 RSGroupInfoManagerImpl instance = new RSGroupInfoManagerImpl(master); 166 instance.init(); 167 return instance; 168 } 169 170 public void start(){ 171 // create system table of rsgroup 172 rsGroupStartupWorker.start(); 173 } 174 175 @Override 176 public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException { 177 checkGroupName(rsGroupInfo.getName()); 178 if (rsGroupMap.get(rsGroupInfo.getName()) != null || 179 rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 180 throw new DoNotRetryIOException("Group already exists: "+ rsGroupInfo.getName()); 181 } 182 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 183 newGroupMap.put(rsGroupInfo.getName(), rsGroupInfo); 184 flushConfig(newGroupMap); 185 } 186 187 private RSGroupInfo getRSGroupInfo(final String groupName) throws DoNotRetryIOException { 188 RSGroupInfo rsGroupInfo = getRSGroup(groupName); 189 if (rsGroupInfo == null) { 190 throw new DoNotRetryIOException("RSGroup " + groupName + " does not exist"); 191 } 192 return rsGroupInfo; 193 } 194 195 @Override 196 public synchronized Set<Address> moveServers(Set<Address> servers, String srcGroup, 197 String dstGroup) throws IOException { 198 RSGroupInfo src = getRSGroupInfo(srcGroup); 199 RSGroupInfo dst = getRSGroupInfo(dstGroup); 200 // If destination is 'default' rsgroup, only add servers that are online. If not online, drop 201 // it. If not 'default' group, add server to 'dst' rsgroup EVEN IF IT IS NOT online (could be a 202 // rsgroup of dead servers that are to come back later). 203 Set<Address> onlineServers = dst.getName().equals(RSGroupInfo.DEFAULT_GROUP)? 204 Utility.getOnlineServers(this.masterServices): null; 205 for (Address el: servers) { 206 src.removeServer(el); 207 if (onlineServers != null) { 208 if (!onlineServers.contains(el)) { 209 if (LOG.isDebugEnabled()) { 210 LOG.debug("Dropping " + el + " during move-to-default rsgroup because not online"); 211 } 212 continue; 213 } 214 } 215 dst.addServer(el); 216 } 217 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 218 newGroupMap.put(src.getName(), src); 219 newGroupMap.put(dst.getName(), dst); 220 flushConfig(newGroupMap); 221 return dst.getServers(); 222 } 223 224 @Override 225 public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException { 226 for (RSGroupInfo info: rsGroupMap.values()) { 227 if (info.containsServer(serverHostPort)) { 228 return info; 229 } 230 } 231 return null; 232 } 233 234 @Override 235 public RSGroupInfo getRSGroup(String groupName) { 236 return rsGroupMap.get(groupName); 237 } 238 239 @Override 240 public String getRSGroupOfTable(TableName tableName) { 241 return tableMap.get(tableName); 242 } 243 244 @Override 245 public synchronized void moveTables(Set<TableName> tableNames, String groupName) 246 throws IOException { 247 if (groupName != null && !rsGroupMap.containsKey(groupName)) { 248 throw new DoNotRetryIOException("Group "+groupName+" does not exist"); 249 } 250 251 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 252 for(TableName tableName: tableNames) { 253 if (tableMap.containsKey(tableName)) { 254 RSGroupInfo src = new RSGroupInfo(newGroupMap.get(tableMap.get(tableName))); 255 src.removeTable(tableName); 256 newGroupMap.put(src.getName(), src); 257 } 258 if(groupName != null) { 259 RSGroupInfo dst = new RSGroupInfo(newGroupMap.get(groupName)); 260 dst.addTable(tableName); 261 newGroupMap.put(dst.getName(), dst); 262 } 263 } 264 flushConfig(newGroupMap); 265 } 266 267 @Override 268 public synchronized void removeRSGroup(String groupName) throws IOException { 269 if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) { 270 throw new DoNotRetryIOException("Group " + groupName + " does not exist or is a reserved " 271 + "group"); 272 } 273 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 274 newGroupMap.remove(groupName); 275 flushConfig(newGroupMap); 276 } 277 278 @Override 279 public List<RSGroupInfo> listRSGroups() { 280 return Lists.newLinkedList(rsGroupMap.values()); 281 } 282 283 @Override 284 public boolean isOnline() { 285 return rsGroupStartupWorker.isOnline(); 286 } 287 288 @Override 289 public void moveServersAndTables(Set<Address> servers, Set<TableName> tables, 290 String srcGroup, String dstGroup) throws IOException { 291 //get server's group 292 RSGroupInfo srcGroupInfo = getRSGroupInfo(srcGroup); 293 RSGroupInfo dstGroupInfo = getRSGroupInfo(dstGroup); 294 295 //move servers 296 for (Address el: servers) { 297 srcGroupInfo.removeServer(el); 298 dstGroupInfo.addServer(el); 299 } 300 //move tables 301 for(TableName tableName: tables) { 302 srcGroupInfo.removeTable(tableName); 303 dstGroupInfo.addTable(tableName); 304 } 305 306 //flush changed groupinfo 307 Map<String,RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 308 newGroupMap.put(srcGroupInfo.getName(), srcGroupInfo); 309 newGroupMap.put(dstGroupInfo.getName(), dstGroupInfo); 310 flushConfig(newGroupMap); 311 } 312 313 @Override 314 public synchronized void removeServers(Set<Address> servers) throws IOException { 315 Map<String, RSGroupInfo> rsGroupInfos = new HashMap<String, RSGroupInfo>(); 316 for (Address el: servers) { 317 RSGroupInfo rsGroupInfo = getRSGroupOfServer(el); 318 if (rsGroupInfo != null) { 319 RSGroupInfo newRsGroupInfo = rsGroupInfos.get(rsGroupInfo.getName()); 320 if (newRsGroupInfo == null) { 321 rsGroupInfo.removeServer(el); 322 rsGroupInfos.put(rsGroupInfo.getName(), rsGroupInfo); 323 } else { 324 newRsGroupInfo.removeServer(el); 325 rsGroupInfos.put(newRsGroupInfo.getName(), newRsGroupInfo); 326 } 327 }else { 328 LOG.warn("Server " + el + " does not belong to any rsgroup."); 329 } 330 } 331 332 if (rsGroupInfos.size() > 0) { 333 Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 334 newGroupMap.putAll(rsGroupInfos); 335 flushConfig(newGroupMap); 336 } 337 } 338 339 List<RSGroupInfo> retrieveGroupListFromGroupTable() throws IOException { 340 List<RSGroupInfo> rsGroupInfoList = Lists.newArrayList(); 341 for (Result result : rsGroupTable.getScanner(new Scan())) { 342 RSGroupProtos.RSGroupInfo proto = RSGroupProtos.RSGroupInfo.parseFrom( 343 result.getValue(META_FAMILY_BYTES, META_QUALIFIER_BYTES)); 344 rsGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo(proto)); 345 } 346 return rsGroupInfoList; 347 } 348 349 List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException { 350 String groupBasePath = ZNodePaths.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode); 351 List<RSGroupInfo> RSGroupInfoList = Lists.newArrayList(); 352 //Overwrite any info stored by table, this takes precedence 353 try { 354 if(ZKUtil.checkExists(watcher, groupBasePath) != -1) { 355 for(String znode: ZKUtil.listChildrenAndWatchForNewChildren(watcher, groupBasePath)) { 356 byte[] data = ZKUtil.getData(watcher, ZNodePaths.joinZNode(groupBasePath, znode)); 357 if(data.length > 0) { 358 ProtobufUtil.expectPBMagicPrefix(data); 359 ByteArrayInputStream bis = new ByteArrayInputStream( 360 data, ProtobufUtil.lengthOfPBMagic(), data.length); 361 RSGroupInfoList.add(RSGroupProtobufUtil.toGroupInfo( 362 RSGroupProtos.RSGroupInfo.parseFrom(bis))); 363 } 364 } 365 LOG.debug("Read ZK GroupInfo count:" + RSGroupInfoList.size()); 366 } 367 } catch (KeeperException|DeserializationException|InterruptedException e) { 368 throw new IOException("Failed to read rsGroupZNode",e); 369 } 370 return RSGroupInfoList; 371 } 372 373 @Override 374 public void refresh() throws IOException { 375 refresh(false); 376 } 377 378 /** 379 * Read rsgroup info from the source of truth, the hbase:rsgroup table. 380 * Update zk cache. Called on startup of the manager. 381 */ 382 private synchronized void refresh(boolean forceOnline) throws IOException { 383 List<RSGroupInfo> groupList = new LinkedList<>(); 384 385 // Overwrite anything read from zk, group table is source of truth 386 // if online read from GROUP table 387 if (forceOnline || isOnline()) { 388 LOG.debug("Refreshing in Online mode."); 389 if (rsGroupTable == null) { 390 rsGroupTable = conn.getTable(RSGROUP_TABLE_NAME); 391 } 392 groupList.addAll(retrieveGroupListFromGroupTable()); 393 } else { 394 LOG.debug("Refreshing in Offline mode."); 395 groupList.addAll(retrieveGroupListFromZookeeper()); 396 } 397 398 // refresh default group, prune 399 NavigableSet<TableName> orphanTables = new TreeSet<>(); 400 for(String entry: masterServices.getTableDescriptors().getAll().keySet()) { 401 orphanTables.add(TableName.valueOf(entry)); 402 } 403 for (RSGroupInfo group: groupList) { 404 if(!group.getName().equals(RSGroupInfo.DEFAULT_GROUP)) { 405 orphanTables.removeAll(group.getTables()); 406 } 407 } 408 409 // This is added to the last of the list so it overwrites the 'default' rsgroup loaded 410 // from region group table or zk 411 groupList.add(new RSGroupInfo(RSGroupInfo.DEFAULT_GROUP, getDefaultServers(), 412 orphanTables)); 413 414 // populate the data 415 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(); 416 HashMap<TableName, String> newTableMap = Maps.newHashMap(); 417 for (RSGroupInfo group : groupList) { 418 newGroupMap.put(group.getName(), group); 419 for(TableName table: group.getTables()) { 420 newTableMap.put(table, group.getName()); 421 } 422 } 423 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 424 updateCacheOfRSGroups(rsGroupMap.keySet()); 425 } 426 427 private synchronized Map<TableName,String> flushConfigTable(Map<String,RSGroupInfo> groupMap) 428 throws IOException { 429 Map<TableName,String> newTableMap = Maps.newHashMap(); 430 List<Mutation> mutations = Lists.newArrayList(); 431 432 // populate deletes 433 for(String groupName : prevRSGroups) { 434 if(!groupMap.containsKey(groupName)) { 435 Delete d = new Delete(Bytes.toBytes(groupName)); 436 mutations.add(d); 437 } 438 } 439 440 // populate puts 441 for(RSGroupInfo RSGroupInfo : groupMap.values()) { 442 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 443 Put p = new Put(Bytes.toBytes(RSGroupInfo.getName())); 444 p.addColumn(META_FAMILY_BYTES, META_QUALIFIER_BYTES, proto.toByteArray()); 445 mutations.add(p); 446 for(TableName entry: RSGroupInfo.getTables()) { 447 newTableMap.put(entry, RSGroupInfo.getName()); 448 } 449 } 450 451 if(mutations.size() > 0) { 452 multiMutate(mutations); 453 } 454 return newTableMap; 455 } 456 457 private synchronized void flushConfig() 458 throws IOException { 459 flushConfig(this.rsGroupMap); 460 } 461 462 private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) 463 throws IOException { 464 Map<TableName, String> newTableMap; 465 466 // For offline mode persistence is still unavailable 467 // We're refreshing in-memory state but only for default servers 468 if (!isOnline()) { 469 Map<String, RSGroupInfo> m = Maps.newHashMap(rsGroupMap); 470 RSGroupInfo oldDefaultGroup = m.remove(RSGroupInfo.DEFAULT_GROUP); 471 RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP); 472 if (!m.equals(newGroupMap) || 473 !oldDefaultGroup.getTables().equals(newDefaultGroup.getTables())) { 474 throw new IOException("Only default servers can be updated during offline mode"); 475 } 476 newGroupMap.put(RSGroupInfo.DEFAULT_GROUP, newDefaultGroup); 477 rsGroupMap = newGroupMap; 478 return; 479 } 480 481 newTableMap = flushConfigTable(newGroupMap); 482 483 // Make changes visible after having been persisted to the source of truth 484 resetRSGroupAndTableMaps(newGroupMap, newTableMap); 485 486 try { 487 String groupBasePath = ZNodePaths.joinZNode(watcher.znodePaths.baseZNode, rsGroupZNode); 488 ZKUtil.createAndFailSilent(watcher, groupBasePath, ProtobufMagic.PB_MAGIC); 489 490 List<ZKUtil.ZKUtilOp> zkOps = new ArrayList<>(newGroupMap.size()); 491 for(String groupName : prevRSGroups) { 492 if(!newGroupMap.containsKey(groupName)) { 493 String znode = ZNodePaths.joinZNode(groupBasePath, groupName); 494 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 495 } 496 } 497 498 499 for (RSGroupInfo RSGroupInfo : newGroupMap.values()) { 500 String znode = ZNodePaths.joinZNode(groupBasePath, RSGroupInfo.getName()); 501 RSGroupProtos.RSGroupInfo proto = RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo); 502 LOG.debug("Updating znode: "+znode); 503 ZKUtil.createAndFailSilent(watcher, znode); 504 zkOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent(znode)); 505 zkOps.add(ZKUtil.ZKUtilOp.createAndFailSilent(znode, 506 ProtobufUtil.prependPBMagic(proto.toByteArray()))); 507 } 508 LOG.debug("Writing ZK GroupInfo count: " + zkOps.size()); 509 510 ZKUtil.multiOrSequential(watcher, zkOps, false); 511 } catch (KeeperException e) { 512 LOG.error("Failed to write to rsGroupZNode", e); 513 masterServices.abort("Failed to write to rsGroupZNode", e); 514 throw new IOException("Failed to write to rsGroupZNode",e); 515 } 516 updateCacheOfRSGroups(newGroupMap.keySet()); 517 } 518 519 /** 520 * Make changes visible. 521 * Caller must be synchronized on 'this'. 522 */ 523 private void resetRSGroupAndTableMaps(Map<String, RSGroupInfo> newRSGroupMap, 524 Map<TableName, String> newTableMap) { 525 // Make maps Immutable. 526 this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap); 527 this.tableMap = Collections.unmodifiableMap(newTableMap); 528 } 529 530 /** 531 * Update cache of rsgroups. 532 * Caller must be synchronized on 'this'. 533 * @param currentGroups Current list of Groups. 534 */ 535 private void updateCacheOfRSGroups(final Set<String> currentGroups) { 536 this.prevRSGroups.clear(); 537 this.prevRSGroups.addAll(currentGroups); 538 } 539 540 // Called by getDefaultServers. Presume it has lock in place. 541 private List<ServerName> getOnlineRS() throws IOException { 542 if (masterServices != null) { 543 return masterServices.getServerManager().getOnlineServersList(); 544 } 545 LOG.debug("Reading online RS from zookeeper"); 546 List<ServerName> servers = new LinkedList<>(); 547 try { 548 for (String el: ZKUtil.listChildrenNoWatch(watcher, watcher.znodePaths.rsZNode)) { 549 servers.add(ServerName.parseServerName(el)); 550 } 551 } catch (KeeperException e) { 552 throw new IOException("Failed to retrieve server list from zookeeper", e); 553 } 554 return servers; 555 } 556 557 // Called by ServerEventsListenerThread. Presume it has lock on this manager when it runs. 558 private SortedSet<Address> getDefaultServers() throws IOException { 559 SortedSet<Address> defaultServers = Sets.newTreeSet(); 560 for (ServerName serverName : getOnlineRS()) { 561 Address server = 562 Address.fromParts(serverName.getHostname(), serverName.getPort()); 563 boolean found = false; 564 for(RSGroupInfo rsgi: listRSGroups()) { 565 if(!RSGroupInfo.DEFAULT_GROUP.equals(rsgi.getName()) && 566 rsgi.containsServer(server)) { 567 found = true; 568 break; 569 } 570 } 571 if (!found) { 572 defaultServers.add(server); 573 } 574 } 575 return defaultServers; 576 } 577 578 // Called by ServerEventsListenerThread. Synchronize on this because redoing 579 // the rsGroupMap then writing it out. 580 private synchronized void updateDefaultServers(SortedSet<Address> servers) throws IOException { 581 RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP); 582 RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers, info.getTables()); 583 HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap); 584 newGroupMap.put(newInfo.getName(), newInfo); 585 flushConfig(newGroupMap); 586 } 587 588 // Called by FailedOpenUpdaterThread 589 private void updateFailedAssignments() { 590 // Kick all regions in FAILED_OPEN state 591 List<RegionInfo> stuckAssignments = Lists.newArrayList(); 592 for (RegionStateNode state: 593 masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) { 594 if (state.isStuck()) { 595 stuckAssignments.add(state.getRegionInfo()); 596 } 597 } 598 for (RegionInfo region: stuckAssignments) { 599 LOG.info("Retrying assignment of " + region); 600 try { 601 masterServices.getAssignmentManager().unassign(region); 602 } catch (IOException e) { 603 LOG.warn("Unable to reassign " + region, e); 604 } 605 } 606 } 607 608 /** 609 * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known 610 * servers. Notifications about server changes are received by registering {@link ServerListener}. 611 * As a listener, we need to return immediately, so the real work of updating the servers is 612 * done asynchronously in this thread. 613 */ 614 private class ServerEventsListenerThread extends Thread implements ServerListener { 615 private final Logger LOG = LoggerFactory.getLogger(ServerEventsListenerThread.class); 616 private boolean changed = false; 617 618 ServerEventsListenerThread() { 619 setDaemon(true); 620 } 621 622 @Override 623 public void serverAdded(ServerName serverName) { 624 serverChanged(); 625 } 626 627 @Override 628 public void serverRemoved(ServerName serverName) { 629 serverChanged(); 630 } 631 632 private synchronized void serverChanged() { 633 changed = true; 634 this.notify(); 635 } 636 637 @Override 638 public void run() { 639 setName(ServerEventsListenerThread.class.getName() + "-" + masterServices.getServerName()); 640 SortedSet<Address> prevDefaultServers = new TreeSet<>(); 641 while(isMasterRunning(masterServices)) { 642 try { 643 LOG.info("Updating default servers."); 644 SortedSet<Address> servers = RSGroupInfoManagerImpl.this.getDefaultServers(); 645 if (!servers.equals(prevDefaultServers)) { 646 RSGroupInfoManagerImpl.this.updateDefaultServers(servers); 647 prevDefaultServers = servers; 648 LOG.info("Updated with servers: "+servers.size()); 649 } 650 try { 651 synchronized (this) { 652 while (!changed) { 653 wait(); 654 } 655 changed = false; 656 } 657 } catch (InterruptedException e) { 658 LOG.warn("Interrupted", e); 659 } 660 } catch (IOException e) { 661 LOG.warn("Failed to update default servers", e); 662 } 663 } 664 } 665 } 666 667 private class FailedOpenUpdaterThread extends Thread implements ServerListener { 668 private final long waitInterval; 669 private volatile boolean hasChanged = false; 670 671 public FailedOpenUpdaterThread(Configuration conf) { 672 this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY, 673 DEFAULT_REASSIGN_WAIT_INTERVAL); 674 setDaemon(true); 675 } 676 677 @Override 678 public void serverAdded(ServerName serverName) { 679 serverChanged(); 680 } 681 682 @Override 683 public void serverRemoved(ServerName serverName) { 684 } 685 686 @Override 687 public void run() { 688 while (isMasterRunning(masterServices)) { 689 boolean interrupted = false; 690 try { 691 synchronized (this) { 692 while (!hasChanged) { 693 wait(); 694 } 695 hasChanged = false; 696 } 697 } catch (InterruptedException e) { 698 LOG.warn("Interrupted", e); 699 interrupted = true; 700 } 701 if (!isMasterRunning(masterServices) || interrupted) { 702 continue; 703 } 704 705 // First, wait a while in case more servers are about to rejoin the cluster 706 try { 707 Thread.sleep(waitInterval); 708 } catch (InterruptedException e) { 709 LOG.warn("Interrupted", e); 710 } 711 if (!isMasterRunning(masterServices)) { 712 continue; 713 } 714 715 // Kick all regions in FAILED_OPEN state 716 updateFailedAssignments(); 717 } 718 } 719 720 public void serverChanged() { 721 synchronized (this) { 722 hasChanged = true; 723 this.notify(); 724 } 725 } 726 } 727 728 private class RSGroupStartupWorker extends Thread { 729 private final Logger LOG = LoggerFactory.getLogger(RSGroupStartupWorker.class); 730 private volatile boolean online = false; 731 732 RSGroupStartupWorker() { 733 setDaemon(true); 734 } 735 736 @Override 737 public void run() { 738 setName(RSGroupStartupWorker.class.getName() + "-" + masterServices.getServerName()); 739 if (waitForGroupTableOnline()) { 740 LOG.info("GroupBasedLoadBalancer is now online"); 741 } 742 } 743 744 private boolean waitForGroupTableOnline() { 745 final List<RegionInfo> foundRegions = new LinkedList<>(); 746 final List<RegionInfo> assignedRegions = new LinkedList<>(); 747 final AtomicBoolean found = new AtomicBoolean(false); 748 final TableStateManager tsm = masterServices.getTableStateManager(); 749 boolean createSent = false; 750 while (!found.get() && isMasterRunning(masterServices)) { 751 foundRegions.clear(); 752 assignedRegions.clear(); 753 found.set(true); 754 try { 755 conn.getTable(TableName.NAMESPACE_TABLE_NAME); 756 conn.getTable(RSGROUP_TABLE_NAME); 757 boolean rootMetaFound = 758 masterServices.getMetaTableLocator().verifyMetaRegionLocation( 759 conn, masterServices.getZooKeeper(), 1); 760 final AtomicBoolean nsFound = new AtomicBoolean(false); 761 if (rootMetaFound) { 762 MetaTableAccessor.Visitor visitor = new DefaultVisitorBase() { 763 @Override 764 public boolean visitInternal(Result row) throws IOException { 765 RegionInfo info = MetaTableAccessor.getRegionInfo(row); 766 if (info != null) { 767 Cell serverCell = 768 row.getColumnLatestCell(HConstants.CATALOG_FAMILY, 769 HConstants.SERVER_QUALIFIER); 770 if (RSGROUP_TABLE_NAME.equals(info.getTable()) && serverCell != null) { 771 ServerName sn = 772 ServerName.parseVersionedServerName(CellUtil.cloneValue(serverCell)); 773 if (sn == null) { 774 found.set(false); 775 } else if (tsm.isTableState(RSGROUP_TABLE_NAME, TableState.State.ENABLED)) { 776 try { 777 ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn); 778 ClientProtos.GetRequest request = 779 RequestConverter.buildGetRequest(info.getRegionName(), 780 new Get(ROW_KEY)); 781 rs.get(null, request); 782 assignedRegions.add(info); 783 } catch(Exception ex) { 784 LOG.debug("Caught exception while verifying group region", ex); 785 } 786 } 787 foundRegions.add(info); 788 } 789 if (TableName.NAMESPACE_TABLE_NAME.equals(info.getTable())) { 790 Cell cell = row.getColumnLatestCell(HConstants.CATALOG_FAMILY, 791 HConstants.SERVER_QUALIFIER); 792 ServerName sn = null; 793 if(cell != null) { 794 sn = ServerName.parseVersionedServerName(CellUtil.cloneValue(cell)); 795 } 796 if (sn == null) { 797 nsFound.set(false); 798 } else if (tsm.isTableState(TableName.NAMESPACE_TABLE_NAME, 799 TableState.State.ENABLED)) { 800 try { 801 ClientProtos.ClientService.BlockingInterface rs = conn.getClient(sn); 802 ClientProtos.GetRequest request = 803 RequestConverter.buildGetRequest(info.getRegionName(), 804 new Get(ROW_KEY)); 805 rs.get(null, request); 806 nsFound.set(true); 807 } catch(Exception ex) { 808 LOG.debug("Caught exception while verifying group region", ex); 809 } 810 } 811 } 812 } 813 return true; 814 } 815 }; 816 MetaTableAccessor.fullScanRegions(conn, visitor); 817 // if no regions in meta then we have to create the table 818 if (foundRegions.size() < 1 && rootMetaFound && !createSent && nsFound.get()) { 819 createRSGroupTable(); 820 createSent = true; 821 } 822 LOG.info("RSGroup table=" + RSGROUP_TABLE_NAME + " isOnline=" + found.get() 823 + ", regionCount=" + foundRegions.size() + ", assignCount=" 824 + assignedRegions.size() + ", rootMetaFound=" + rootMetaFound); 825 found.set(found.get() && assignedRegions.size() == foundRegions.size() 826 && foundRegions.size() > 0); 827 } else { 828 LOG.info("Waiting for catalog tables to come online"); 829 found.set(false); 830 } 831 if (found.get()) { 832 LOG.debug("With group table online, refreshing cached information."); 833 RSGroupInfoManagerImpl.this.refresh(true); 834 online = true; 835 //flush any inconsistencies between ZK and HTable 836 RSGroupInfoManagerImpl.this.flushConfig(); 837 } 838 } catch (RuntimeException e) { 839 throw e; 840 } catch(Exception e) { 841 found.set(false); 842 LOG.warn("Failed to perform check", e); 843 } 844 try { 845 Thread.sleep(100); 846 } catch (InterruptedException e) { 847 LOG.info("Sleep interrupted", e); 848 } 849 } 850 return found.get(); 851 } 852 853 private void createRSGroupTable() throws IOException { 854 Long procId = masterServices.createSystemTable(RSGROUP_TABLE_DESC); 855 // wait for region to be online 856 int tries = 600; 857 while (!(masterServices.getMasterProcedureExecutor().isFinished(procId)) 858 && masterServices.getMasterProcedureExecutor().isRunning() 859 && tries > 0) { 860 try { 861 Thread.sleep(100); 862 } catch (InterruptedException e) { 863 throw new IOException("Wait interrupted ", e); 864 } 865 tries--; 866 } 867 if(tries <= 0) { 868 throw new IOException("Failed to create group table in a given time."); 869 } else { 870 Procedure<?> result = masterServices.getMasterProcedureExecutor().getResult(procId); 871 if (result != null && result.isFailed()) { 872 throw new IOException("Failed to create group table. " + 873 MasterProcedureUtil.unwrapRemoteIOException(result)); 874 } 875 } 876 } 877 878 public boolean isOnline() { 879 return online; 880 } 881 } 882 883 private static boolean isMasterRunning(MasterServices masterServices) { 884 return !masterServices.isAborted() && !masterServices.isStopped(); 885 } 886 887 private void multiMutate(List<Mutation> mutations) throws IOException { 888 CoprocessorRpcChannel channel = rsGroupTable.coprocessorService(ROW_KEY); 889 MultiRowMutationProtos.MutateRowsRequest.Builder mmrBuilder 890 = MultiRowMutationProtos.MutateRowsRequest.newBuilder(); 891 for (Mutation mutation : mutations) { 892 if (mutation instanceof Put) { 893 mmrBuilder.addMutationRequest(org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 894 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType.PUT, 895 mutation)); 896 } else if (mutation instanceof Delete) { 897 mmrBuilder.addMutationRequest( 898 org.apache.hadoop.hbase.protobuf.ProtobufUtil.toMutation( 899 org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto. 900 MutationType.DELETE, mutation)); 901 } else { 902 throw new DoNotRetryIOException("multiMutate doesn't support " 903 + mutation.getClass().getName()); 904 } 905 } 906 907 MultiRowMutationProtos.MultiRowMutationService.BlockingInterface service = 908 MultiRowMutationProtos.MultiRowMutationService.newBlockingStub(channel); 909 try { 910 service.mutateRows(null, mmrBuilder.build()); 911 } catch (ServiceException ex) { 912 ProtobufUtil.toIOException(ex); 913 } 914 } 915 916 private void checkGroupName(String groupName) throws ConstraintException { 917 if (!groupName.matches("[a-zA-Z0-9_]+")) { 918 throw new ConstraintException("RSGroup name should only contain alphanumeric characters"); 919 } 920 } 921}