001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import java.io.IOException; 021import java.util.Collections; 022import java.util.HashSet; 023import java.util.Objects; 024import java.util.Set; 025import org.apache.commons.lang3.StringUtils; 026import org.apache.hadoop.hbase.client.AsyncAdmin; 027import org.apache.hadoop.hbase.client.AsyncConnection; 028import org.apache.hadoop.hbase.client.ConnectionFactory; 029import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest; 030import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse; 031import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecService; 032import org.apache.hadoop.hbase.util.Pair; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Overrides commands to make use of coprocessor where possible. Only supports actions taken 039 * against Master and Region Server hosts. 040 */ 041@InterfaceAudience.Private 042@SuppressWarnings("unused") // no way to test this without a distributed cluster. 043public class CoprocClusterManager extends HBaseClusterManager { 044 private static final Logger LOG = LoggerFactory.getLogger(CoprocClusterManager.class); 045 private static final Set<ServiceType> supportedServices = buildSupportedServicesSet(); 046 047 @Override 048 protected Pair<Integer, String> exec(String hostname, ServiceType service, String... cmd) 049 throws IOException { 050 if (!supportedServices.contains(service)) { 051 throw unsupportedServiceType(service); 052 } 053 054 // We only support actions vs. Master or Region Server processes. We're issuing those actions 055 // via the coprocessor that's running within those processes. Thus, there's no support for 056 // honoring the configured service user. 057 final String command = StringUtils.join(cmd, " "); 058 LOG.info("Executing remote command: {}, hostname:{}", command, hostname); 059 060 try (final AsyncConnection conn = ConnectionFactory.createAsyncConnection(getConf()).join()) { 061 final AsyncAdmin admin = conn.getAdmin(); 062 final ShellExecRequest req = ShellExecRequest.newBuilder() 063 .setCommand(command) 064 .setAwaitResponse(false) 065 .build(); 066 067 final ShellExecResponse resp; 068 switch(service) { 069 case HBASE_MASTER: 070 // What happens if the intended action was killing a backup master? Right now we have 071 // no `RestartBackupMasterAction` so it's probably fine. 072 resp = masterExec(admin, req); 073 break; 074 case HBASE_REGIONSERVER: 075 final ServerName targetHost = resolveRegionServerName(admin, hostname); 076 resp = regionServerExec(admin, req, targetHost); 077 break; 078 default: 079 throw new RuntimeException("should not happen"); 080 } 081 082 if (LOG.isDebugEnabled()) { 083 LOG.debug("Executed remote command: {}, exit code:{} , output:{}", command, resp.getExitCode(), 084 resp.getStdout()); 085 } else { 086 LOG.info("Executed remote command: {}, exit code:{}", command, resp.getExitCode()); 087 } 088 return new Pair<>(resp.getExitCode(), resp.getStdout()); 089 } 090 } 091 092 private static Set<ServiceType> buildSupportedServicesSet() { 093 final Set<ServiceType> set = new HashSet<>(); 094 set.add(ServiceType.HBASE_MASTER); 095 set.add(ServiceType.HBASE_REGIONSERVER); 096 return Collections.unmodifiableSet(set); 097 } 098 099 private static ShellExecResponse masterExec(final AsyncAdmin admin, 100 final ShellExecRequest req) { 101 // TODO: Admin API provides no means of sending exec to a backup master. 102 return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService( 103 ShellExecService::newStub, 104 (stub, controller, callback) -> stub.shellExec(controller, req, callback)) 105 .join(); 106 } 107 108 private static ShellExecResponse regionServerExec(final AsyncAdmin admin, 109 final ShellExecRequest req, final ServerName targetHost) { 110 return admin.<ShellExecService.Stub, ShellExecResponse>coprocessorService( 111 ShellExecService::newStub, 112 (stub, controller, callback) -> stub.shellExec(controller, req, callback), 113 targetHost) 114 .join(); 115 } 116 117 private static ServerName resolveRegionServerName(final AsyncAdmin admin, 118 final String hostname) { 119 return admin.getRegionServers() 120 .thenApply(names -> names.stream() 121 .filter(sn -> Objects.equals(sn.getHostname(), hostname)) 122 .findAny()) 123 .join() 124 .orElseThrow(() -> serverNotFound(hostname)); 125 } 126 127 private static RuntimeException serverNotFound(final String hostname) { 128 return new RuntimeException( 129 String.format("Did not find %s amongst the servers known to the client.", hostname)); 130 } 131 132 private static RuntimeException unsupportedServiceType(final ServiceType serviceType) { 133 return new RuntimeException( 134 String.format("Unable to service request for service=%s", serviceType)); 135 } 136}