001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase; 019 020import com.google.protobuf.RpcCallback; 021import com.google.protobuf.RpcController; 022import com.google.protobuf.Service; 023import java.io.IOException; 024import java.util.Collections; 025import java.util.concurrent.ExecutorService; 026import java.util.concurrent.Executors; 027import org.apache.commons.lang3.StringUtils; 028import org.apache.hadoop.conf.Configuration; 029import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey; 030import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor; 031import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessor; 032import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint; 033import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecRequest; 034import org.apache.hadoop.hbase.coprocessor.protobuf.generated.ShellExecEndpoint.ShellExecResponse; 035import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 036import org.apache.hadoop.util.Shell; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 041 042/** 043 * Receives shell commands from the client and executes them blindly. Intended only for use 044 * by {@link ChaosMonkey} via {@link CoprocClusterManager} 045 */ 046@InterfaceAudience.Private 047public class ShellExecEndpointCoprocessor extends ShellExecEndpoint.ShellExecService implements 048 MasterCoprocessor, RegionServerCoprocessor { 049 private static final Logger LOG = LoggerFactory.getLogger(ShellExecEndpointCoprocessor.class); 050 051 public static final String BACKGROUND_DELAY_MS_KEY = "hbase.it.shellexeccoproc.async.delay.ms"; 052 public static final long DEFAULT_BACKGROUND_DELAY_MS = 1_000; 053 054 private final ExecutorService backgroundExecutor; 055 private Configuration conf; 056 057 public ShellExecEndpointCoprocessor() { 058 backgroundExecutor = Executors.newSingleThreadExecutor( 059 new ThreadFactoryBuilder() 060 .setNameFormat(ShellExecEndpointCoprocessor.class.getSimpleName() + "-{}") 061 .setDaemon(true) 062 .setUncaughtExceptionHandler((t, e) -> LOG.warn("Thread {} threw", t, e)) 063 .build()); 064 } 065 066 @Override 067 public Iterable<Service> getServices() { 068 return Collections.singletonList(this); 069 } 070 071 @Override 072 public void start(CoprocessorEnvironment env) { 073 conf = env.getConfiguration(); 074 } 075 076 @Override 077 public void shellExec( 078 final RpcController controller, 079 final ShellExecRequest request, 080 final RpcCallback<ShellExecResponse> done 081 ) { 082 final String command = request.getCommand(); 083 if (StringUtils.isBlank(command)) { 084 throw new RuntimeException("Request contained an empty command."); 085 } 086 final boolean awaitResponse = !request.hasAwaitResponse() || request.getAwaitResponse(); 087 final String[] subShellCmd = new String[] { "/usr/bin/env", "bash", "-c", command }; 088 final Shell.ShellCommandExecutor shell = new Shell.ShellCommandExecutor(subShellCmd); 089 090 final String msgFmt = "Executing command" 091 + (!awaitResponse ? " on a background thread" : "") + ": {}"; 092 LOG.info(msgFmt, command); 093 094 if (awaitResponse) { 095 runForegroundTask(shell, controller, done); 096 } else { 097 runBackgroundTask(shell, done); 098 } 099 } 100 101 private void runForegroundTask( 102 final Shell.ShellCommandExecutor shell, 103 final RpcController controller, 104 final RpcCallback<ShellExecResponse> done 105 ) { 106 ShellExecResponse.Builder builder = ShellExecResponse.newBuilder(); 107 try { 108 doExec(shell, builder); 109 } catch (IOException e) { 110 LOG.error("Failure launching process", e); 111 CoprocessorRpcUtils.setControllerException(controller, e); 112 } 113 done.run(builder.build()); 114 } 115 116 private void runBackgroundTask( 117 final Shell.ShellCommandExecutor shell, 118 final RpcCallback<ShellExecResponse> done 119 ) { 120 final long sleepDuration = conf.getLong(BACKGROUND_DELAY_MS_KEY, DEFAULT_BACKGROUND_DELAY_MS); 121 backgroundExecutor.submit(() -> { 122 try { 123 // sleep first so that the RPC can ACK. race condition here as we have no means of blocking 124 // until the IPC response has been acknowledged by the client. 125 Thread.sleep(sleepDuration); 126 doExec(shell, ShellExecResponse.newBuilder()); 127 } catch (InterruptedException e) { 128 LOG.warn("Interrupted before launching process.", e); 129 } catch (IOException e) { 130 LOG.error("Failure launching process", e); 131 } 132 }); 133 done.run(ShellExecResponse.newBuilder().build()); 134 } 135 136 /** 137 * Execute {@code shell} and collect results into {@code builder} as side-effects. 138 */ 139 private void doExec( 140 final Shell.ShellCommandExecutor shell, 141 final ShellExecResponse.Builder builder 142 ) throws IOException { 143 try { 144 shell.execute(); 145 builder 146 .setExitCode(shell.getExitCode()) 147 .setStdout(shell.getOutput()); 148 } catch (Shell.ExitCodeException e) { 149 LOG.warn("Launched process failed", e); 150 builder 151 .setExitCode(e.getExitCode()) 152 .setStdout(shell.getOutput()) 153 .setStderr(e.getMessage()); 154 } 155 } 156}