001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.hbase.rsgroup;
020
021import com.google.protobuf.RpcCallback;
022import com.google.protobuf.RpcController;
023import com.google.protobuf.Service;
024
025import java.io.IOException;
026import java.util.Collections;
027import java.util.HashSet;
028import java.util.List;
029import java.util.Optional;
030import java.util.Set;
031import java.util.stream.Collectors;
032
033import org.apache.hadoop.hbase.CoprocessorEnvironment;
034import org.apache.hadoop.hbase.HBaseIOException;
035import org.apache.hadoop.hbase.HConstants;
036import org.apache.hadoop.hbase.MasterNotRunningException;
037import org.apache.hadoop.hbase.NamespaceDescriptor;
038import org.apache.hadoop.hbase.PleaseHoldException;
039import org.apache.hadoop.hbase.ServerName;
040import org.apache.hadoop.hbase.TableName;
041import org.apache.hadoop.hbase.client.RegionInfo;
042import org.apache.hadoop.hbase.client.SnapshotDescription;
043import org.apache.hadoop.hbase.client.TableDescriptor;
044import org.apache.hadoop.hbase.constraint.ConstraintException;
045import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
046import org.apache.hadoop.hbase.coprocessor.HasMasterServices;
047import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
048import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
049import org.apache.hadoop.hbase.coprocessor.MasterObserver;
050import org.apache.hadoop.hbase.coprocessor.ObserverContext;
051import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
052import org.apache.hadoop.hbase.ipc.RpcServer;
053import org.apache.hadoop.hbase.master.MasterServices;
054import org.apache.hadoop.hbase.net.Address;
055import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
056import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
057import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos;
058import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupRequest;
059import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.AddRSGroupResponse;
060import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupRequest;
061import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.BalanceRSGroupResponse;
062import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerRequest;
063import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfServerResponse;
064import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableRequest;
065import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoOfTableResponse;
066import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoRequest;
067import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.GetRSGroupInfoResponse;
068import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosRequest;
069import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.ListRSGroupInfosResponse;
070import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersAndTablesRequest;
071import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersAndTablesResponse;
072import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersRequest;
073import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveServersResponse;
074import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesRequest;
075import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.MoveTablesResponse;
076import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RSGroupAdminService;
077import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupRequest;
078import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveRSGroupResponse;
079import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersRequest;
080import org.apache.hadoop.hbase.protobuf.generated.RSGroupAdminProtos.RemoveServersResponse;
081import org.apache.hadoop.hbase.protobuf.generated.TableProtos;
082import org.apache.hadoop.hbase.security.User;
083import org.apache.hadoop.hbase.security.UserProvider;
084import org.apache.hadoop.hbase.security.access.AccessChecker;
085import org.apache.hadoop.hbase.security.access.Permission.Action;
086import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
087import org.apache.yetus.audience.InterfaceAudience;
088import org.slf4j.Logger;
089import org.slf4j.LoggerFactory;
090import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
091
092// TODO: Encapsulate MasterObserver functions into separate subclass.
093@CoreCoprocessor
094@InterfaceAudience.Private
095public class RSGroupAdminEndpoint implements MasterCoprocessor, MasterObserver {
096  private static final Logger LOG = LoggerFactory.getLogger(RSGroupAdminEndpoint.class);
097
098  private MasterServices master = null;
099  // Only instance of RSGroupInfoManager. RSGroup aware load balancers ask for this instance on
100  // their setup.
101  private RSGroupInfoManager groupInfoManager;
102  private RSGroupAdminServer groupAdminServer;
103  private final RSGroupAdminService groupAdminService = new RSGroupAdminServiceImpl();
104  private AccessChecker accessChecker;
105
106  /** Provider for mapping principal names to Users */
107  private UserProvider userProvider;
108
109  @Override
110  public void start(CoprocessorEnvironment env) throws IOException {
111    if (!(env instanceof HasMasterServices)) {
112      throw new IOException("Does not implement HMasterServices");
113    }
114
115    master = ((HasMasterServices)env).getMasterServices();
116    groupInfoManager = RSGroupInfoManagerImpl.getInstance(master);
117    groupAdminServer = new RSGroupAdminServer(master, groupInfoManager);
118    Class<?> clazz =
119        master.getConfiguration().getClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, null);
120    if (!RSGroupableBalancer.class.isAssignableFrom(clazz)) {
121      throw new IOException("Configured balancer does not support RegionServer groups.");
122    }
123    ZKWatcher zk = ((HasMasterServices)env).getMasterServices().getZooKeeper();
124    accessChecker = new AccessChecker(env.getConfiguration(), zk);
125
126    // set the user-provider.
127    this.userProvider = UserProvider.instantiate(env.getConfiguration());
128  }
129
130  @Override
131  public void stop(CoprocessorEnvironment env) {
132    accessChecker.stop();
133  }
134
135  @Override
136  public Iterable<Service> getServices() {
137    return Collections.singleton(groupAdminService);
138  }
139
140  @Override
141  public Optional<MasterObserver> getMasterObserver() {
142    return Optional.of(this);
143  }
144
145  RSGroupInfoManager getGroupInfoManager() {
146    return groupInfoManager;
147  }
148
149  /**
150   * Implementation of RSGroupAdminService defined in RSGroupAdmin.proto.
151   * This class calls {@link RSGroupAdminServer} for actual work, converts result to protocol
152   * buffer response, handles exceptions if any occurred and then calls the {@code RpcCallback} with
153   * the response.
154   */
155  private class RSGroupAdminServiceImpl extends RSGroupAdminProtos.RSGroupAdminService {
156    @Override
157    public void getRSGroupInfo(RpcController controller,
158        GetRSGroupInfoRequest request, RpcCallback<GetRSGroupInfoResponse> done) {
159      GetRSGroupInfoResponse.Builder builder = GetRSGroupInfoResponse.newBuilder();
160      String groupName = request.getRSGroupName();
161      LOG.info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, group="
162              + groupName);
163      try {
164        checkPermission("getRSGroupInfo");
165        RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName);
166        if (rsGroupInfo != null) {
167          builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(rsGroupInfo));
168        }
169      } catch (IOException e) {
170        CoprocessorRpcUtils.setControllerException(controller, e);
171      }
172      done.run(builder.build());
173    }
174
175    @Override
176    public void getRSGroupInfoOfTable(RpcController controller,
177        GetRSGroupInfoOfTableRequest request, RpcCallback<GetRSGroupInfoOfTableResponse> done) {
178      GetRSGroupInfoOfTableResponse.Builder builder = GetRSGroupInfoOfTableResponse.newBuilder();
179      TableName tableName = ProtobufUtil.toTableName(request.getTableName());
180      LOG.info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, table="
181          + tableName);
182      try {
183        checkPermission("getRSGroupInfoOfTable");
184        RSGroupInfo RSGroupInfo = groupAdminServer.getRSGroupInfoOfTable(tableName);
185        if (RSGroupInfo != null) {
186          builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
187        }
188      } catch (IOException e) {
189        CoprocessorRpcUtils.setControllerException(controller, e);
190      }
191      done.run(builder.build());
192    }
193
194    @Override
195    public void moveServers(RpcController controller, MoveServersRequest request,
196        RpcCallback<MoveServersResponse> done) {
197      MoveServersResponse.Builder builder = MoveServersResponse.newBuilder();
198      Set<Address> hostPorts = Sets.newHashSet();
199      for (HBaseProtos.ServerName el : request.getServersList()) {
200        hostPorts.add(Address.fromParts(el.getHostName(), el.getPort()));
201      }
202      LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts +" to rsgroup "
203          + request.getTargetGroup());
204      try {
205        checkPermission("moveServers");
206        groupAdminServer.moveServers(hostPorts, request.getTargetGroup());
207      } catch (IOException e) {
208        CoprocessorRpcUtils.setControllerException(controller, e);
209      }
210      done.run(builder.build());
211    }
212
213    @Override
214    public void moveTables(RpcController controller, MoveTablesRequest request,
215        RpcCallback<MoveTablesResponse> done) {
216      MoveTablesResponse.Builder builder = MoveTablesResponse.newBuilder();
217      Set<TableName> tables = new HashSet<>(request.getTableNameList().size());
218      for (TableProtos.TableName tableName : request.getTableNameList()) {
219        tables.add(ProtobufUtil.toTableName(tableName));
220      }
221      LOG.info(master.getClientIdAuditPrefix() + " move tables " + tables +" to rsgroup "
222          + request.getTargetGroup());
223      try {
224        checkPermission("moveTables");
225        groupAdminServer.moveTables(tables, request.getTargetGroup());
226      } catch (IOException e) {
227        CoprocessorRpcUtils.setControllerException(controller, e);
228      }
229      done.run(builder.build());
230    }
231
232    @Override
233    public void addRSGroup(RpcController controller, AddRSGroupRequest request,
234        RpcCallback<AddRSGroupResponse> done) {
235      AddRSGroupResponse.Builder builder = AddRSGroupResponse.newBuilder();
236      LOG.info(master.getClientIdAuditPrefix() + " add rsgroup " + request.getRSGroupName());
237      try {
238        checkPermission("addRSGroup");
239        groupAdminServer.addRSGroup(request.getRSGroupName());
240      } catch (IOException e) {
241        CoprocessorRpcUtils.setControllerException(controller, e);
242      }
243      done.run(builder.build());
244    }
245
246    @Override
247    public void removeRSGroup(RpcController controller,
248        RemoveRSGroupRequest request, RpcCallback<RemoveRSGroupResponse> done) {
249      RemoveRSGroupResponse.Builder builder =
250          RemoveRSGroupResponse.newBuilder();
251      LOG.info(master.getClientIdAuditPrefix() + " remove rsgroup " + request.getRSGroupName());
252      try {
253        checkPermission("removeRSGroup");
254        groupAdminServer.removeRSGroup(request.getRSGroupName());
255      } catch (IOException e) {
256        CoprocessorRpcUtils.setControllerException(controller, e);
257      }
258      done.run(builder.build());
259    }
260
261    @Override
262    public void balanceRSGroup(RpcController controller,
263        BalanceRSGroupRequest request, RpcCallback<BalanceRSGroupResponse> done) {
264      BalanceRSGroupResponse.Builder builder = BalanceRSGroupResponse.newBuilder();
265      LOG.info(master.getClientIdAuditPrefix() + " balance rsgroup, group="
266          + request.getRSGroupName());
267      try {
268        checkPermission("balanceRSGroup");
269        builder.setBalanceRan(groupAdminServer.balanceRSGroup(request.getRSGroupName()));
270      } catch (IOException e) {
271        CoprocessorRpcUtils.setControllerException(controller, e);
272        builder.setBalanceRan(false);
273      }
274      done.run(builder.build());
275    }
276
277    @Override
278    public void listRSGroupInfos(RpcController controller,
279        ListRSGroupInfosRequest request, RpcCallback<ListRSGroupInfosResponse> done) {
280      ListRSGroupInfosResponse.Builder builder = ListRSGroupInfosResponse.newBuilder();
281      LOG.info(master.getClientIdAuditPrefix() + " list rsgroup");
282      try {
283        checkPermission("listRSGroup");
284        for (RSGroupInfo RSGroupInfo : groupAdminServer.listRSGroups()) {
285          builder.addRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(RSGroupInfo));
286        }
287      } catch (IOException e) {
288        CoprocessorRpcUtils.setControllerException(controller, e);
289      }
290      done.run(builder.build());
291    }
292
293    @Override
294    public void getRSGroupInfoOfServer(RpcController controller,
295        GetRSGroupInfoOfServerRequest request, RpcCallback<GetRSGroupInfoOfServerResponse> done) {
296      GetRSGroupInfoOfServerResponse.Builder builder = GetRSGroupInfoOfServerResponse.newBuilder();
297      Address hp = Address.fromParts(request.getServer().getHostName(),
298          request.getServer().getPort());
299      LOG.info(master.getClientIdAuditPrefix() + " initiates rsgroup info retrieval, server="
300          + hp);
301      try {
302        checkPermission("getRSGroupInfoOfServer");
303        RSGroupInfo info = groupAdminServer.getRSGroupOfServer(hp);
304        if (info != null) {
305          builder.setRSGroupInfo(RSGroupProtobufUtil.toProtoGroupInfo(info));
306        }
307      } catch (IOException e) {
308        CoprocessorRpcUtils.setControllerException(controller, e);
309      }
310      done.run(builder.build());
311    }
312
313    @Override
314    public void moveServersAndTables(RpcController controller,
315        MoveServersAndTablesRequest request, RpcCallback<MoveServersAndTablesResponse> done) {
316      MoveServersAndTablesResponse.Builder builder = MoveServersAndTablesResponse.newBuilder();
317      Set<Address> hostPorts = Sets.newHashSet();
318      for (HBaseProtos.ServerName el : request.getServersList()) {
319        hostPorts.add(Address.fromParts(el.getHostName(), el.getPort()));
320      }
321      Set<TableName> tables = new HashSet<>(request.getTableNameList().size());
322      for (TableProtos.TableName tableName : request.getTableNameList()) {
323        tables.add(ProtobufUtil.toTableName(tableName));
324      }
325      LOG.info(master.getClientIdAuditPrefix() + " move servers " + hostPorts
326          + " and tables " + tables + " to rsgroup" + request.getTargetGroup());
327      try {
328        checkPermission("moveServersAndTables");
329        groupAdminServer.moveServersAndTables(hostPorts, tables, request.getTargetGroup());
330      } catch (IOException e) {
331        CoprocessorRpcUtils.setControllerException(controller, e);
332      }
333      done.run(builder.build());
334    }
335
336    @Override
337    public void removeServers(RpcController controller,
338        RemoveServersRequest request,
339        RpcCallback<RemoveServersResponse> done) {
340      RemoveServersResponse.Builder builder =
341          RemoveServersResponse.newBuilder();
342      Set<Address> servers = Sets.newHashSet();
343      for (HBaseProtos.ServerName el : request.getServersList()) {
344        servers.add(Address.fromParts(el.getHostName(), el.getPort()));
345      }
346      LOG.info(master.getClientIdAuditPrefix()
347          + " remove decommissioned servers from rsgroup: " + servers);
348      try {
349        checkPermission("removeServers");
350        groupAdminServer.removeServers(servers);
351      } catch (IOException e) {
352        CoprocessorRpcUtils.setControllerException(controller, e);
353      }
354      done.run(builder.build());
355    }
356  }
357
358  boolean rsgroupHasServersOnline(TableDescriptor desc) throws IOException {
359    String groupName;
360    try {
361      groupName =
362        master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString())
363        .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
364      if (groupName == null) {
365        groupName = RSGroupInfo.DEFAULT_GROUP;
366      }
367    } catch (MasterNotRunningException | PleaseHoldException e) {
368      LOG.info("Master has not initialized yet; temporarily using default RSGroup '" +
369          RSGroupInfo.DEFAULT_GROUP + "' for deploy of system table");
370      groupName = RSGroupInfo.DEFAULT_GROUP;
371    }
372
373    RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName);
374    if (rsGroupInfo == null) {
375      throw new ConstraintException(
376          "Default RSGroup (" + groupName + ") for this table's " + "namespace does not exist.");
377    }
378
379    for (ServerName onlineServer : master.getServerManager().createDestinationServersList()) {
380      if (rsGroupInfo.getServers().contains(onlineServer.getAddress())) {
381        return true;
382      }
383    }
384    return false;
385  }
386
387  void assignTableToGroup(TableDescriptor desc) throws IOException {
388    String groupName =
389        master.getClusterSchema().getNamespace(desc.getTableName().getNamespaceAsString())
390                .getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
391    if (groupName == null) {
392      groupName = RSGroupInfo.DEFAULT_GROUP;
393    }
394    RSGroupInfo rsGroupInfo = groupAdminServer.getRSGroupInfo(groupName);
395    if (rsGroupInfo == null) {
396      throw new ConstraintException("Default RSGroup (" + groupName + ") for this table's "
397          + "namespace does not exist.");
398    }
399    if (!rsGroupInfo.containsTable(desc.getTableName())) {
400      LOG.debug("Pre-moving table " + desc.getTableName() + " to RSGroup " + groupName);
401      groupAdminServer.moveTables(Sets.newHashSet(desc.getTableName()), groupName);
402    }
403  }
404
405  /////////////////////////////////////////////////////////////////////////////
406  // MasterObserver overrides
407  /////////////////////////////////////////////////////////////////////////////
408
409  @Override
410  public void preCreateTableAction(
411      final ObserverContext<MasterCoprocessorEnvironment> ctx,
412      final TableDescriptor desc,
413      final RegionInfo[] regions) throws IOException {
414    if (!desc.getTableName().isSystemTable() && !rsgroupHasServersOnline(desc)) {
415      throw new HBaseIOException("No online servers in the rsgroup, which table " +
416          desc.getTableName().getNameAsString() + " belongs to");
417    }
418  }
419
420  // Assign table to default RSGroup.
421  @Override
422  public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
423      TableDescriptor desc, RegionInfo[] regions) throws IOException {
424    assignTableToGroup(desc);
425  }
426
427  // Remove table from its RSGroup.
428  @Override
429  public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
430                              TableName tableName) throws IOException {
431    try {
432      RSGroupInfo group = groupAdminServer.getRSGroupInfoOfTable(tableName);
433      if (group != null) {
434        LOG.debug(String.format("Removing deleted table '%s' from rsgroup '%s'", tableName,
435            group.getName()));
436        groupAdminServer.moveTables(Sets.newHashSet(tableName), null);
437      }
438    } catch (IOException ex) {
439      LOG.debug("Failed to perform RSGroup information cleanup for table: " + tableName, ex);
440    }
441  }
442
443  @Override
444  public void preCreateNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
445                                 NamespaceDescriptor ns) throws IOException {
446    String group = ns.getConfigurationValue(RSGroupInfo.NAMESPACE_DESC_PROP_GROUP);
447    if(group != null && groupAdminServer.getRSGroupInfo(group) == null) {
448      throw new ConstraintException("Region server group "+group+" does not exit");
449    }
450  }
451
452  @Override
453  public void preModifyNamespace(ObserverContext<MasterCoprocessorEnvironment> ctx,
454      NamespaceDescriptor ns) throws IOException {
455    preCreateNamespace(ctx, ns);
456  }
457
458  @Override
459  public void preCloneSnapshot(ObserverContext<MasterCoprocessorEnvironment> ctx,
460      SnapshotDescription snapshot, TableDescriptor desc) throws IOException {
461    assignTableToGroup(desc);
462  }
463
464  @Override
465  public void postClearDeadServers(ObserverContext<MasterCoprocessorEnvironment> ctx,
466      List<ServerName> servers, List<ServerName> notClearedServers)
467      throws IOException {
468    Set<Address> clearedServer = servers.stream().
469        filter(server -> !notClearedServers.contains(server)).
470        map(ServerName::getAddress).
471        collect(Collectors.toSet());
472    groupAdminServer.removeServers(clearedServer);
473  }
474
475  public void checkPermission(String request) throws IOException {
476    accessChecker.requirePermission(getActiveUser(), request, Action.ADMIN);
477  }
478
479  /**
480   * Returns the active user to which authorization checks should be applied.
481   * If we are in the context of an RPC call, the remote user is used,
482   * otherwise the currently logged in user is used.
483   */
484  private User getActiveUser() throws IOException {
485    // for non-rpc handling, fallback to system user
486    Optional<User> optionalUser = RpcServer.getRequestUser();
487    if (optionalUser.isPresent()) {
488      return optionalUser.get();
489    }
490    return userProvider.getCurrent();
491  }
492}