001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.hadoop.hbase.rsgroup; 020 021import com.google.protobuf.RpcCallback; 022import com.google.protobuf.RpcController; 023import com.google.protobuf.Service; 024 025import java.io.IOException; 026import java.util.Collections; 027import java.util.HashSet; 028import java.util.List; 029import java.util.Optional; 030import java.util.Set; 031import java.util.stream.Collectors; 032 033import org.apache.hadoop.hbase.CoprocessorEnvironment; 034import org.apache.hadoop.hbase.HBaseIOException; 035import org.apache.hadoop.hbase.HConstants; 036import org.apache.hadoop.hbase.MasterNotRunningException; 037import org.apache.hadoop.hbase.NamespaceDescriptor; 038import org.apache.hadoop.hbase.PleaseHoldException; 039import org.apache.hadoop.hbase.ServerName; 040import org.apache.hadoop.hbase.TableName; 041import org.apache.hadoop.hbase.client.RegionInfo; 042import org.apache.hadoop.hbase.client.SnapshotDescription; 043import org.apache.hadoop.hbase.client.TableDescriptor; 044import org.apache.hadoop.hbase.constraint.ConstraintException; 045import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor; 046import org.apache.hadoop.hbase.coprocessor.HasMasterServices; 047import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 048import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; 049import org.apache.hadoop.hbase.coprocessor.MasterObserver; 050import org.apache.hadoop.hbase.coprocessor.ObserverContext; 051import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 052import org.apache.hadoop.hbase.ipc.RpcServer; 053import org.apache.hadoop.hbase.master.MasterServices; 054import org.apache.hadoop.hbase.net.Address; 055import org.apache.hadoop.hbase.protobuf.ProtobufUtil; 056import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; 057import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos; 058import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest; 059import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse; 060import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest; 061import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse; 062import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest; 063import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse; 064import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest; 065import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse; 066import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest; 067import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse; 068import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest; 069import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse; 070import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersAndTablesRequest; 071import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersAndTablesResponse; 072import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersRequest; 073import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersResponse; 074import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest; 075import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesResponse; 076import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService; 077import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest; 078import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse; 079import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest; 080import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse; 081import org.apache.hadoop.hbase.protobuf.generated.TableProtos; 082import org.apache.hadoop.hbase.security.User; 083import org.apache.hadoop.hbase.security.UserProvider; 084import org.apache.hadoop.hbase.security.access.AccessChecker; 085import org.apache.hadoop.hbase.security.access.Permission.Action; 086import org.apache.hadoop.hbase.zookeeper.ZKWatcher; 087import org.apache.yetus.audience.InterfaceAudience; 088import org.slf4j.Logger; 089import org.slf4j.LoggerFactory; 090import org.apache.hbase.thirdparty.com.google.common.collect.Sets; 091 092// TODO: Encapsulate MasterObserver functions into separate subclass. 093@CoreCoprocessor 094@InterfaceAudience.Private 095public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver { 096 private static final Logger LOG = LoggerFactory.getLogger(RSGroupAdminEndpoint.class); 097 098 private MasterServices master = null; 099 // Only instance of RSGroupInfoManager. RSGroup aware load balancers ask for this instance on 100 // their setup. 101 private RSGroupInfoManager groupInfoManager; 102 private RSGroupAdminServer groupAdminServer; 103 private final RSGroupAdminService groupAdminService = new RSGroupAdminServiceImpl(); 104 private AccessChecker accessChecker; 105 106 /** Provider for mapping principal names to Users */ 107 private UserProvider userProvider; 108 109 @Override 110 public void start(CoprocessorEnvironment env) throws IOException { 111 if (!(env instanceof HasMasterServices)) { 112 throw new IOException("Does not implement HMasterServices"); 113 } 114 115 master = ((HasMasterServices)env).getMasterServices(); 116 groupInfoManager = RSGroupInfoManagerImpl.getInstance(master); 117 groupAdminServer = new RSGroupAdminServer(master, groupInfoManager); 118 Class<?> clazz = 119 master.getConfiguration().getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, null); 120 if (!RSGroupableBalancer.class.isAssignableFrom(clazz)) { 121 throw new IOException("Configured balancer does not support RegionServer groups."); 122 } 123 ZKWatcher zk = ((HasMasterServices)env).getMasterServices().getZooKeeper(); 124 accessChecker = new AccessChecker(env.getConfiguration(), zk); 125 126 // set the user-provider. 127 this.userProvider = UserProvider.instantiate(env.getConfiguration()); 128 } 129 130 @Override 131 public void stop(CoprocessorEnvironment env) { 132 accessChecker.stop(); 133 } 134 135 @Override 136 public Iterable<Service> getServices() { 137 return Collections.singleton(groupAdminService); 138 } 139 140 @Override 141 public Optional<MasterObserver> getMasterObserver() { 142 return Optional.of(this); 143 } 144 145 RSGroupInfoManager getGroupInfoManager() { 146 return groupInfoManager; 147 } 148 149 /** 150 * Implementation of RSGroupAdminService defined in RSGroupAdmin.proto. 151 * This class calls {@link RSGroupAdminServer} for actual work, converts result to protocol 152 * buffer response, handles exceptions if any occurred and then calls the {@code RpcCallback} with 153 * the response. 154 */ 155 private class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService { 156 @Override 157 public void getRSGroupInfo(RpcController controller, 158 GetRSGroupInfoRequest request, RpcCallback<GetRSGroupInfoResponse> done) { 159 GetRSGroupInfoResponse.Builder builder = GetRSGroupInfoResponse.newBuilder(); 160 String groupName = request.getRSGroupName(); 161 LOG.info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, group=" 162 + groupName); 163 try { 164 checkPermission("getRSGroupInfo"); 165 RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); 166 if (rsGroupInfo != null) { 167 builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(rsGroupInfo)); 168 } 169 } catch (IOException e) { 170 CoprocessorRpcUtils.setControllerException(controller, e); 171 } 172 done.run(builder.build()); 173 } 174 175 @Override 176 public void getRSGroupInfoOfTable(RpcController controller, 177 GetRSGroupInfoOfTableRequest request, RpcCallback<GetRSGroupInfoOfTableResponse> done) { 178 GetRSGroupInfoOfTableResponse.Builder builder = GetRSGroupInfoOfTableResponse.newBuilder(); 179 TableName tableName = ProtobufUtil.toTableName(request.getTableName()); 180 LOG.info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, table=" 181 + tableName); 182 try { 183 checkPermission("getRSGroupInfoOfTable"); 184 RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupInfoOfTable(tableName); 185 if (RSGroupInfo != null) { 186 builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo)); 187 } 188 } catch (IOException e) { 189 CoprocessorRpcUtils.setControllerException(controller, e); 190 } 191 done.run(builder.build()); 192 } 193 194 @Override 195 public void moveServers(RpcController controller, MoveServersRequest request, 196 RpcCallback<MoveServersResponse> done) { 197 MoveServersResponse.Builder builder = MoveServersResponse.newBuilder(); 198 Set<Address> hostPorts = Sets.newHashSet(); 199 for (HBaseProtos.ServerName el : request.getServersList()) { 200 hostPorts.add(Address.fromParts(el.getHostName(), el.getPort())); 201 } 202 LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts +" to rsgroup " 203 + request.getTargetGroup()); 204 try { 205 checkPermission("moveServers"); 206 groupAdminServer.moveServers(hostPorts, request.getTargetGroup()); 207 } catch (IOException e) { 208 CoprocessorRpcUtils.setControllerException(controller, e); 209 } 210 done.run(builder.build()); 211 } 212 213 @Override 214 public void moveTables(RpcController controller, MoveTablesRequest request, 215 RpcCallback<MoveTablesResponse> done) { 216 MoveTablesResponse.Builder builder = MoveTablesResponse.newBuilder(); 217 Set<TableName> tables = new HashSet<>(request.getTableNameList().size()); 218 for (TableProtos.TableName tableName : request.getTableNameList()) { 219 tables.add(ProtobufUtil.toTableName(tableName)); 220 } 221 LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables +" to rsgroup " 222 + request.getTargetGroup()); 223 try { 224 checkPermission("moveTables"); 225 groupAdminServer.moveTables(tables, request.getTargetGroup()); 226 } catch (IOException e) { 227 CoprocessorRpcUtils.setControllerException(controller, e); 228 } 229 done.run(builder.build()); 230 } 231 232 @Override 233 public void addRSGroup(RpcController controller, AddRSGroupRequest request, 234 RpcCallback<AddRSGroupResponse> done) { 235 AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder(); 236 LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName()); 237 try { 238 checkPermission("addRSGroup"); 239 groupAdminServer.addRSGroup(request.getRSGroupName()); 240 } catch (IOException e) { 241 CoprocessorRpcUtils.setControllerException(controller, e); 242 } 243 done.run(builder.build()); 244 } 245 246 @Override 247 public void removeRSGroup(RpcController controller, 248 RemoveRSGroupRequest request, RpcCallback<RemoveRSGroupResponse> done) { 249 RemoveRSGroupResponse.Builder builder = 250 RemoveRSGroupResponse.newBuilder(); 251 LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName()); 252 try { 253 checkPermission("removeRSGroup"); 254 groupAdminServer.removeRSGroup(request.getRSGroupName()); 255 } catch (IOException e) { 256 CoprocessorRpcUtils.setControllerException(controller, e); 257 } 258 done.run(builder.build()); 259 } 260 261 @Override 262 public void balanceRSGroup(RpcController controller, 263 BalanceRSGroupRequest request, RpcCallback<BalanceRSGroupResponse> done) { 264 BalanceRSGroupResponse.Builder builder = BalanceRSGroupResponse.newBuilder(); 265 LOG.info(master.getClientIdAuditPrefix() + " balance rsgroup, group=" 266 + request.getRSGroupName()); 267 try { 268 checkPermission("balanceRSGroup"); 269 builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName())); 270 } catch (IOException e) { 271 CoprocessorRpcUtils.setControllerException(controller, e); 272 builder.setBalanceRan(false); 273 } 274 done.run(builder.build()); 275 } 276 277 @Override 278 public void listRSGroupInfos(RpcController controller, 279 ListRSGroupInfosRequest request, RpcCallback<ListRSGroupInfosResponse> done) { 280 ListRSGroupInfosResponse.Builder builder = ListRSGroupInfosResponse.newBuilder(); 281 LOG.info(master.getClientIdAuditPrefix() + " list rsgroup"); 282 try { 283 checkPermission("listRSGroup"); 284 for (RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) { 285 builder.addRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo)); 286 } 287 } catch (IOException e) { 288 CoprocessorRpcUtils.setControllerException(controller, e); 289 } 290 done.run(builder.build()); 291 } 292 293 @Override 294 public void getRSGroupInfoOfServer(RpcController controller, 295 GetRSGroupInfoOfServerRequest request, RpcCallback<GetRSGroupInfoOfServerResponse> done) { 296 GetRSGroupInfoOfServerResponse.Builder builder = GetRSGroupInfoOfServerResponse.newBuilder(); 297 Address hp = Address.fromParts(request.getServer().getHostName(), 298 request.getServer().getPort()); 299 LOG.info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, server=" 300 + hp); 301 try { 302 checkPermission("getRSGroupInfoOfServer"); 303 RSGroupInfo info = groupAdminServer.getRSGroupOfServer(hp); 304 if (info != null) { 305 builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(info)); 306 } 307 } catch (IOException e) { 308 CoprocessorRpcUtils.setControllerException(controller, e); 309 } 310 done.run(builder.build()); 311 } 312 313 @Override 314 public void moveServersAndTables(RpcController controller, 315 MoveServersAndTablesRequest request, RpcCallback<MoveServersAndTablesResponse> done) { 316 MoveServersAndTablesResponse.Builder builder = MoveServersAndTablesResponse.newBuilder(); 317 Set<Address> hostPorts = Sets.newHashSet(); 318 for (HBaseProtos.ServerName el : request.getServersList()) { 319 hostPorts.add(Address.fromParts(el.getHostName(), el.getPort())); 320 } 321 Set<TableName> tables = new HashSet<>(request.getTableNameList().size()); 322 for (TableProtos.TableName tableName : request.getTableNameList()) { 323 tables.add(ProtobufUtil.toTableName(tableName)); 324 } 325 LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts 326 + " and tables " + tables + " to rsgroup" + request.getTargetGroup()); 327 try { 328 checkPermission("moveServersAndTables"); 329 groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup()); 330 } catch (IOException e) { 331 CoprocessorRpcUtils.setControllerException(controller, e); 332 } 333 done.run(builder.build()); 334 } 335 336 @Override 337 public void removeServers(RpcController controller, 338 RemoveServersRequest request, 339 RpcCallback<RemoveServersResponse> done) { 340 RemoveServersResponse.Builder builder = 341 RemoveServersResponse.newBuilder(); 342 Set<Address> servers = Sets.newHashSet(); 343 for (HBaseProtos.ServerName el : request.getServersList()) { 344 servers.add(Address.fromParts(el.getHostName(), el.getPort())); 345 } 346 LOG.info(master.getClientIdAuditPrefix() 347 + " remove decommissioned servers from rsgroup: " + servers); 348 try { 349 checkPermission("removeServers"); 350 groupAdminServer.removeServers(servers); 351 } catch (IOException e) { 352 CoprocessorRpcUtils.setControllerException(controller, e); 353 } 354 done.run(builder.build()); 355 } 356 } 357 358 boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException { 359 String groupName; 360 try { 361 groupName = 362 master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()) 363 .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); 364 if (groupName == null) { 365 groupName = RSGroupInfo.DEFAULT_GROUP; 366 } 367 } catch (MasterNotRunningException | PleaseHoldException e) { 368 LOG.info("Master has not initialized yet; temporarily using default RSGroup '" + 369 RSGroupInfo.DEFAULT_GROUP + "' for deploy of system table"); 370 groupName = RSGroupInfo.DEFAULT_GROUP; 371 } 372 373 RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); 374 if (rsGroupInfo == null) { 375 throw new ConstraintException( 376 "Default RSGroup (" + groupName + ") for this table's " + "namespace does not exist."); 377 } 378 379 for (ServerName onlineServer : master.getServerManager().createDestinationServersList()) { 380 if (rsGroupInfo.getServers().contains(onlineServer.getAddress())) { 381 return true; 382 } 383 } 384 return false; 385 } 386 387 void assignTableToGroup(TableDescriptor desc) throws IOException { 388 String groupName = 389 master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString()) 390 .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); 391 if (groupName == null) { 392 groupName = RSGroupInfo.DEFAULT_GROUP; 393 } 394 RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName); 395 if (rsGroupInfo == null) { 396 throw new ConstraintException("Default RSGroup (" + groupName + ") for this table's " 397 + "namespace does not exist."); 398 } 399 if (!rsGroupInfo.containsTable(desc.getTableName())) { 400 LOG.debug("Pre-moving table " + desc.getTableName() + " to RSGroup " + groupName); 401 groupAdminServer.moveTables(Sets.newHashSet(desc.getTableName()), groupName); 402 } 403 } 404 405 ///////////////////////////////////////////////////////////////////////////// 406 // MasterObserver overrides 407 ///////////////////////////////////////////////////////////////////////////// 408 409 @Override 410 public void preCreateTableAction( 411 final ObserverContext<MasterCoprocessorEnvironment> ctx, 412 final TableDescriptor desc, 413 final RegionInfo[] regions) throws IOException { 414 if (!desc.getTableName().isSystemTable() && !rsgroupHasServersOnline(desc)) { 415 throw new HBaseIOException("No online servers in the rsgroup, which table " + 416 desc.getTableName().getNameAsString() + " belongs to"); 417 } 418 } 419 420 // Assign table to default RSGroup. 421 @Override 422 public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx, 423 TableDescriptor desc, RegionInfo[] regions) throws IOException { 424 assignTableToGroup(desc); 425 } 426 427 // Remove table from its RSGroup. 428 @Override 429 public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx, 430 TableName tableName) throws IOException { 431 try { 432 RSGroupInfo group = groupAdminServer.getRSGroupInfoOfTable(tableName); 433 if (group != null) { 434 LOG.debug(String.format("Removing deleted table '%s' from rsgroup '%s'", tableName, 435 group.getName())); 436 groupAdminServer.moveTables(Sets.newHashSet(tableName), null); 437 } 438 } catch (IOException ex) { 439 LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex); 440 } 441 } 442 443 @Override 444 public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, 445 NamespaceDescriptor ns) throws IOException { 446 String group = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP); 447 if(group != null && groupAdminServer.getRSGroupInfo(group) == null) { 448 throw new ConstraintException("Region server group "+group+" does not exit"); 449 } 450 } 451 452 @Override 453 public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx, 454 NamespaceDescriptor ns) throws IOException { 455 preCreateNamespace(ctx, ns); 456 } 457 458 @Override 459 public void preCloneSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx, 460 SnapshotDescription snapshot, TableDescriptor desc) throws IOException { 461 assignTableToGroup(desc); 462 } 463 464 @Override 465 public void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx, 466 List<ServerName> servers, List<ServerName> notClearedServers) 467 throws IOException { 468 Set<Address> clearedServer = servers.stream(). 469 filter(server -> !notClearedServers.contains(server)). 470 map(ServerName::getAddress). 471 collect(Collectors.toSet()); 472 groupAdminServer.removeServers(clearedServer); 473 } 474 475 public void checkPermission(String request) throws IOException { 476 accessChecker.requirePermission(getActiveUser(), request, Action.ADMIN); 477 } 478 479 /** 480 * Returns the active user to which authorization checks should be applied. 481 * If we are in the context of an RPC call, the remote user is used, 482 * otherwise the currently logged in user is used. 483 */ 484 private User getActiveUser() throws IOException { 485 // for non-rpc handling, fallback to system user 486 Optional<User> optionalUser = RpcServer.getRequestUser(); 487 if (optionalUser.isPresent()) { 488 return optionalUser.get(); 489 } 490 return userProvider.getCurrent(); 491 } 492}