001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.concurrent.LinkedBlockingQueue; 022import org.apache.hadoop.hbase.PleaseHoldException; 023import org.apache.hadoop.hbase.client.ConnectionUtils; 024import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 025import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 026import org.apache.hadoop.hbase.util.Threads; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; 032import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 033 034import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; 035import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 036 037/** 038 * A thread which calls {@code reportProcedureDone} to tell master the result of a remote procedure. 039 */ 040@InterfaceAudience.Private 041class RemoteProcedureResultReporter extends Thread { 042 043 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureResultReporter.class); 044 045 private static final int MAX_BATCH = 100; 046 047 private final HRegionServer server; 048 049 private final LinkedBlockingQueue<RemoteProcedureResult> results = new LinkedBlockingQueue<>(); 050 051 public RemoteProcedureResultReporter(HRegionServer server) { 052 this.server = server; 053 } 054 055 public void complete(long procId, long initiatingMasterActiveTime, Throwable error, 056 byte[] procReturnValue) { 057 RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId) 058 .setInitiatingMasterActiveTime(initiatingMasterActiveTime); 059 if (error != null) { 060 LOG.debug("Failed to complete execution of pid={}", procId, error); 061 builder.setStatus(RemoteProcedureResult.Status.ERROR).setError( 062 ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), error)); 063 } else { 064 LOG.debug("Successfully complete execution of pid={}", procId); 065 builder.setStatus(RemoteProcedureResult.Status.SUCCESS); 066 } 067 if (procReturnValue != null) { 068 builder.setProcResultData(ByteString.copyFrom(procReturnValue)); 069 } 070 results.add(builder.build()); 071 } 072 073 @Override 074 public void run() { 075 ReportProcedureDoneRequest.Builder builder = ReportProcedureDoneRequest.newBuilder(); 076 int tries = 0; 077 while (!server.isStopped()) { 078 if (builder.getResultCount() == 0) { 079 try { 080 builder.addResult(results.take()); 081 } catch (InterruptedException e) { 082 Thread.currentThread().interrupt(); 083 continue; 084 } 085 } 086 while (builder.getResultCount() < MAX_BATCH) { 087 RemoteProcedureResult result = results.poll(); 088 if (result == null) { 089 break; 090 } 091 builder.addResult(result); 092 } 093 ReportProcedureDoneRequest request = builder.build(); 094 try { 095 server.reportProcedureDone(builder.build()); 096 builder.clear(); 097 tries = 0; 098 } catch (IOException e) { 099 boolean pause = 100 e instanceof ServerNotRunningYetException || e instanceof PleaseHoldException; 101 long pauseTime; 102 if (pause) { 103 // Do backoff else we flood the Master with requests. 104 pauseTime = ConnectionUtils.getPauseTime(server.getRetryPauseTime(), tries); 105 } else { 106 pauseTime = server.getRetryPauseTime(); // Reset. 107 } 108 LOG.info("Failed procedure report " + TextFormat.shortDebugString(request) + "; retry (#" 109 + tries + ")" 110 + (pause 111 ? " after " + pauseTime + "ms delay (Master is coming online...)." 112 : " immediately."), 113 e); 114 Threads.sleep(pauseTime); 115 tries++; 116 } 117 } 118 } 119}