001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase;
019
020import java.io.IOException;
021import java.util.Collections;
022import java.util.HashSet;
023import java.util.Objects;
024import java.util.Set;
025import org.apache.commons.lang3.StringUtils;
026import org.apache.hadoop.hbase.client.AsyncAdmin;
027import org.apache.hadoop.hbase.client.AsyncConnection;
028import org.apache.hadoop.hbase.client.ConnectionFactory;
029import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest;
030import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse;
031import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecService;
032import org.apache.hadoop.hbase.util.Pair;
033import org.apache.yetus.audience.InterfaceAudience;
034import org.slf4j.Logger;
035import org.slf4j.LoggerFactory;
036
037/**
038 * Overrides commands to make use of coprocessor where possible. Only supports actions taken
039 * against Master and Region Server hosts.
040 */
041@InterfaceAudience.Private
042@SuppressWarnings("unused") // no way to test this without a distributed cluster.
043public class CoprocClusterManager extends HBaseClusterManager {
044  private static final Logger LOG = LoggerFactory.getLogger(CoprocClusterManager.class);
045  private static final Set<ServiceType> supportedServices = buildSupportedServicesSet();
046
047  @Override
048  protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd)
049    throws IOException {
050    if (!supportedServices.contains(service)) {
051      throw unsupportedServiceType(service);
052    }
053
054    // We only support actions vs. Master or Region Server processes. We're issuing those actions
055    // via the coprocessor that's running within those processes. Thus, there's no support for
056    // honoring the configured service user.
057    final String command = StringUtils.join(cmd, " ");
058    LOG.info("Executing remote command: {}, hostname:{}", command, hostname);
059
060    try (final AsyncConnection conn = ConnectionFactory.createAsyncConnection(getConf()).join()) {
061      final AsyncAdmin admin = conn.getAdmin();
062      final ShellExecRequest req = ShellExecRequest.newBuilder()
063        .setCommand(command)
064        .setAwaitResponse(false)
065        .build();
066
067      final ShellExecResponse resp;
068      switch(service) {
069        case HBASE_MASTER:
070          // What happens if the intended action was killing a backup master? Right now we have
071          // no `RestartBackupMasterAction` so it's probably fine.
072          resp = masterExec(admin, req);
073          break;
074        case HBASE_REGIONSERVER:
075          final ServerName targetHost = resolveRegionServerName(admin, hostname);
076          resp = regionServerExec(admin, req, targetHost);
077          break;
078        default:
079          throw new RuntimeException("should not happen");
080      }
081
082      if (LOG.isDebugEnabled()) {
083        LOG.debug("Executed remote command: {}, exit code:{} , output:{}", command, resp.getExitCode(),
084          resp.getStdout());
085      } else {
086        LOG.info("Executed remote command: {}, exit code:{}", command, resp.getExitCode());
087      }
088      return new Pair<>(resp.getExitCode(), resp.getStdout());
089    }
090  }
091
092  private static Set<ServiceType> buildSupportedServicesSet() {
093    final Set<ServiceType> set = new HashSet<>();
094    set.add(ServiceType.HBASE_MASTER);
095    set.add(ServiceType.HBASE_REGIONSERVER);
096    return Collections.unmodifiableSet(set);
097  }
098
099  private static ShellExecResponse masterExec(final AsyncAdmin admin,
100    final ShellExecRequest req) {
101    // TODO: Admin API provides no means of sending exec to a backup master.
102    return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
103      ShellExecService::newStub,
104      (stub, controller, callback) -> stub.shellExec(controller, req, callback))
105      .join();
106  }
107
108  private static ShellExecResponse regionServerExec(final AsyncAdmin admin,
109    final ShellExecRequest req, final ServerName targetHost) {
110    return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService(
111      ShellExecService::newStub,
112      (stub, controller, callback) -> stub.shellExec(controller, req, callback),
113      targetHost)
114      .join();
115  }
116
117  private static ServerName resolveRegionServerName(final AsyncAdmin admin,
118    final String hostname) {
119    return admin.getRegionServers()
120      .thenApply(names -> names.stream()
121        .filter(sn -> Objects.equals(sn.getHostname(), hostname))
122        .findAny())
123      .join()
124      .orElseThrow(() -> serverNotFound(hostname));
125  }
126
127  private static RuntimeException serverNotFound(final String hostname) {
128    return new RuntimeException(
129      String.format("Did not find %s amongst the servers known to the client.", hostname));
130  }
131
132  private static RuntimeException unsupportedServiceType(final ServiceType serviceType) {
133    return new RuntimeException(
134      String.format("Unable to service request for service=%s", serviceType));
135  }
136}