001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.Objects; 024import java.util.Set; 025import org.apache.commons.lang3.StringUtils; 026import org.apache.hadoop.hbase.client.AsyncAdmin; 027import org.apache.hadoop.hbase.client.AsyncConnection; 028import org.apache.hadoop.hbase.client.ConnectionFactory; 029import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest; 030import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse; 031import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecService; 032import org.apache.hadoop.hbase.util.Pair; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Overrides commands to make use of coprocessor where possible. Only supports actions taken against 039 * Master and Region Server hosts. 040 */ 041@InterfaceAudience.Private 042@SuppressWarnings("unused") // no way to test this without a distributed cluster. 043public class CoprocClusterManager extends HBaseClusterManager { 044 private static final Logger LOG = LoggerFactory.getLogger(CoprocClusterManager.class); 045 private static final Set<ServiceType> supportedServices = buildSupportedServicesSet(); 046 047 @Override 048 protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd) 049 throws IOException { 050 if (!supportedServices.contains(service)) { 051 throw unsupportedServiceType(service); 052 } 053 054 // We only support actions vs. Master or Region Server processes. We're issuing those actions 055 // via the coprocessor that's running within those processes. Thus, there's no support for 056 // honoring the configured service user. 057 final String command = StringUtils.join(cmd, " "); 058 LOG.info("Executing remote command: {}, hostname:{}", command, hostname); 059 060 try (final AsyncConnection conn = ConnectionFactory.createAsyncConnection(getConf()).join()) { 061 final AsyncAdmin admin = conn.getAdmin(); 062 final ShellExecRequest req = 063 ShellExecRequest.newBuilder().setCommand(command).setAwaitResponse(false).build(); 064 065 final ShellExecResponse resp; 066 switch (service) { 067 case HBASE_MASTER: 068 // What happens if the intended action was killing a backup master? Right now we have 069 // no `RestartBackupMasterAction` so it's probably fine. 070 resp = masterExec(admin, req); 071 break; 072 case HBASE_REGIONSERVER: 073 final ServerName targetHost = resolveRegionServerName(admin, hostname); 074 resp = regionServerExec(admin, req, targetHost); 075 break; 076 default: 077 throw new RuntimeException("should not happen"); 078 } 079 080 if (LOG.isDebugEnabled()) { 081 LOG.debug("Executed remote command: {}, exit code:{} , output:{}", command, 082 resp.getExitCode(), resp.getStdout()); 083 } else { 084 LOG.info("Executed remote command: {}, exit code:{}", command, resp.getExitCode()); 085 } 086 return new Pair<>(resp.getExitCode(), resp.getStdout()); 087 } 088 } 089 090 private static Set<ServiceType> buildSupportedServicesSet() { 091 final Set<ServiceType> set = new HashSet<>(); 092 set.add(ServiceType.HBASE_MASTER); 093 set.add(ServiceType.HBASE_REGIONSERVER); 094 return Collections.unmodifiableSet(set); 095 } 096 097 private static ShellExecResponse masterExec(final AsyncAdmin admin, final ShellExecRequest req) { 098 // TODO: Admin API provides no means of sending exec to a backup master. 099 return admin.<ShellExecService.Stub, ShellExecResponse> coprocessorService( 100 ShellExecService::newStub, 101 (stub, controller, callback) -> stub.shellExec(controller, req, callback)).join(); 102 } 103 104 private static ShellExecResponse regionServerExec(final AsyncAdmin admin, 105 final ShellExecRequest req, final ServerName targetHost) { 106 return admin.<ShellExecService.Stub, ShellExecResponse> coprocessorService( 107 ShellExecService::newStub, 108 (stub, controller, callback) -> stub.shellExec(controller, req, callback), targetHost).join(); 109 } 110 111 private static ServerName resolveRegionServerName(final AsyncAdmin admin, final String hostname) { 112 return admin.getRegionServers() 113 .thenApply( 114 names -> names.stream().filter(sn -> Objects.equals(sn.getHostname(), hostname)).findAny()) 115 .join().orElseThrow(() -> serverNotFound(hostname)); 116 } 117 118 private static RuntimeException serverNotFound(final String hostname) { 119 return new RuntimeException( 120 String.format("Did not find %s amongst the servers known to the client.", hostname)); 121 } 122 123 private static RuntimeException unsupportedServiceType(final ServiceType serviceType) { 124 return new RuntimeException( 125 String.format("Unable to service request for service=%s", serviceType)); 126 } 127}