001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver; 019 020import java.io.IOException; 021import java.util.concurrent.LinkedBlockingQueue; 022import org.apache.hadoop.hbase.PleaseHoldException; 023import org.apache.hadoop.hbase.client.ConnectionUtils; 024import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; 025import org.apache.hadoop.hbase.util.ForeignExceptionUtil; 026import org.apache.hadoop.hbase.util.Threads; 027import org.apache.yetus.audience.InterfaceAudience; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; 032 033import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; 034import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; 035 036/** 037 * A thread which calls {@code reportProcedureDone} to tell master the result of a remote procedure. 038 */ 039@InterfaceAudience.Private 040class RemoteProcedureResultReporter extends Thread { 041 042 private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureResultReporter.class); 043 044 private static final int MAX_BATCH = 100; 045 046 private final HRegionServer server; 047 048 private final LinkedBlockingQueue<RemoteProcedureResult> results = new LinkedBlockingQueue<>(); 049 050 public RemoteProcedureResultReporter(HRegionServer server) { 051 this.server = server; 052 } 053 054 public void complete(long procId, long initiatingMasterActiveTime, Throwable error) { 055 RemoteProcedureResult.Builder builder = RemoteProcedureResult.newBuilder().setProcId(procId) 056 .setInitiatingMasterActiveTime(initiatingMasterActiveTime); 057 if (error != null) { 058 LOG.debug("Failed to complete execution of pid={}", procId, error); 059 builder.setStatus(RemoteProcedureResult.Status.ERROR).setError( 060 ForeignExceptionUtil.toProtoForeignException(server.getServerName().toString(), error)); 061 } else { 062 LOG.debug("Successfully complete execution of pid={}", procId); 063 builder.setStatus(RemoteProcedureResult.Status.SUCCESS); 064 } 065 results.add(builder.build()); 066 } 067 068 @Override 069 public void run() { 070 ReportProcedureDoneRequest.Builder builder = ReportProcedureDoneRequest.newBuilder(); 071 int tries = 0; 072 while (!server.isStopped()) { 073 if (builder.getResultCount() == 0) { 074 try { 075 builder.addResult(results.take()); 076 } catch (InterruptedException e) { 077 Thread.currentThread().interrupt(); 078 continue; 079 } 080 } 081 while (builder.getResultCount() < MAX_BATCH) { 082 RemoteProcedureResult result = results.poll(); 083 if (result == null) { 084 break; 085 } 086 builder.addResult(result); 087 } 088 ReportProcedureDoneRequest request = builder.build(); 089 try { 090 server.reportProcedureDone(builder.build()); 091 builder.clear(); 092 tries = 0; 093 } catch (IOException e) { 094 boolean pause = 095 e instanceof ServerNotRunningYetException || e instanceof PleaseHoldException; 096 long pauseTime; 097 if (pause) { 098 // Do backoff else we flood the Master with requests. 099 pauseTime = ConnectionUtils.getPauseTime(server.getRetryPauseTime(), tries); 100 } else { 101 pauseTime = server.getRetryPauseTime(); // Reset. 102 } 103 LOG.info("Failed procedure report " + TextFormat.shortDebugString(request) + "; retry (#" 104 + tries + ")" 105 + (pause 106 ? " after " + pauseTime + "ms delay (Master is coming online...)." 107 : " immediately."), 108 e); 109 Threads.sleep(pauseTime); 110 tries++; 111 } 112 } 113 } 114}