001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.Objects;
024import java.util.Set;
025import org.apache.commons.lang3.StringUtils;
026import org.apache.hadoop.hbase.client.AsyncAdmin;
027import org.apache.hadoop.hbase.client.AsyncConnection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest;
030import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse;
031import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecService;
032import org.apache.hadoop.hbase.util.Pair;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Overrides commands to make use of coprocessor where possible. Only supports actions taken against
039 * Master and Region Server hosts.
040 */
041@InterfaceAudience.Private
042@SuppressWarnings("unused") // no way to test this without a distributed cluster.
043public class CoprocClusterManager extends HBaseClusterManager {
044  private static final Logger LOG = LoggerFactory.getLogger(CoprocClusterManager.class);
045  private static final Set<ServiceType> supportedServices = buildSupportedServicesSet();
046
047  @Override
048  protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
049    throws IOException {
050    if (!supportedServices.contains(service)) {
051      throw unsupportedServiceType(service);
052    }
053
054    // We only support actions vs. Master or Region Server processes. We're issuing those actions
055    // via the coprocessor that's running within those processes. Thus, there's no support for
056    // honoring the configured service user.
057    final String command = StringUtils.join(cmd, " ");
058    LOG.info("Executing remote command: {}, hostname:{}", command, hostname);
059
060    try (final AsyncConnection conn = ConnectionFactory.createAsyncConnection(getConf()).join()) {
061      final AsyncAdmin admin = conn.getAdmin();
062      final ShellExecRequest req =
063        ShellExecRequest.newBuilder().setCommand(command).setAwaitResponse(false).build();
064
065      final ShellExecResponse resp;
066      switch (service) {
067        case HBASE_MASTER:
068          // What happens if the intended action was killing a backup master? Right now we have
069          // no `RestartBackupMasterAction` so it's probably fine.
070          resp = masterExec(admin, req);
071          break;
072        case HBASE_REGIONSERVER:
073          final ServerName targetHost = resolveRegionServerName(admin, hostname);
074          resp = regionServerExec(admin, req, targetHost);
075          break;
076        default:
077          throw new RuntimeException("should not happen");
078      }
079
080      if (LOG.isDebugEnabled()) {
081        LOG.debug("Executed remote command: {}, exit code:{} , output:{}", command,
082          resp.getExitCode(), resp.getStdout());
083      } else {
084        LOG.info("Executed remote command: {}, exit code:{}", command, resp.getExitCode());
085      }
086      return new Pair<>(resp.getExitCode(), resp.getStdout());
087    }
088  }
089
090  private static Set<ServiceType> buildSupportedServicesSet() {
091    final Set<ServiceType> set = new HashSet<>();
092    set.add(ServiceType.HBASE_MASTER);
093    set.add(ServiceType.HBASE_REGIONSERVER);
094    return Collections.unmodifiableSet(set);
095  }
096
097  private static ShellExecResponse masterExec(final AsyncAdmin admin, final ShellExecRequest req) {
098    // TODO: Admin API provides no means of sending exec to a backup master.
099    return admin.<ShellExecService.Stub, ShellExecResponse> coprocessorService(
100      ShellExecService::newStub,
101      (stub, controller, callback) -> stub.shellExec(controller, req, callback)).join();
102  }
103
104  private static ShellExecResponse regionServerExec(final AsyncAdmin admin,
105    final ShellExecRequest req, final ServerName targetHost) {
106    return admin.<ShellExecService.Stub, ShellExecResponse> coprocessorService(
107      ShellExecService::newStub,
108      (stub, controller, callback) -> stub.shellExec(controller, req, callback), targetHost).join();
109  }
110
111  private static ServerName resolveRegionServerName(final AsyncAdmin admin, final String hostname) {
112    return admin.getRegionServers()
113      .thenApply(
114        names -> names.stream().filter(sn -> Objects.equals(sn.getHostname(), hostname)).findAny())
115      .join().orElseThrow(() -> serverNotFound(hostname));
116  }
117
118  private static RuntimeException serverNotFound(final String hostname) {
119    return new RuntimeException(
120      String.format("Did not find %s amongst the servers known to the client.", hostname));
121  }
122
123  private static RuntimeException unsupportedServiceType(final ServiceType serviceType) {
124    return new RuntimeException(
125      String.format("Unable to service request for service=%s", serviceType));
126  }
127}