001/** 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver.handler; 020 021import java.io.IOException; 022import java.util.concurrent.atomic.AtomicInteger; 023 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027import org.apache.hadoop.hbase.Server; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.SplitLogCounters; 030import org.apache.hadoop.hbase.SplitLogTask; 031import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 032import org.apache.hadoop.hbase.executor.EventHandler; 033import org.apache.hadoop.hbase.executor.EventType; 034import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; 035import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; 036import org.apache.hadoop.hbase.util.CancelableProgressable; 037 038/** 039 * Handles log splitting a wal 040 */ 041@InterfaceAudience.Private 042public class WALSplitterHandler extends EventHandler { 043 private static final Logger LOG = LoggerFactory.getLogger(WALSplitterHandler.class); 044 private final ServerName serverName; 045 private final CancelableProgressable reporter; 046 private final AtomicInteger inProgressTasks; 047 private final TaskExecutor splitTaskExecutor; 048 private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails; 049 private final SplitLogWorkerCoordination coordination; 050 051 052 public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination, 053 SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter, 054 AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) { 055 super(server, EventType.RS_LOG_REPLAY); 056 this.splitTaskDetails = splitDetails; 057 this.coordination = coordination; 058 this.reporter = reporter; 059 this.inProgressTasks = inProgressTasks; 060 this.inProgressTasks.incrementAndGet(); 061 this.serverName = server.getServerName(); 062 this.splitTaskExecutor = splitTaskExecutor; 063 } 064 065 066 @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SF_SWITCH_FALLTHROUGH", 067 justification="Intentional") 068 @Override 069 public void process() throws IOException { 070 long startTime = System.currentTimeMillis(); 071 Status status = null; 072 try { 073 status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), reporter); 074 boolean wasCounterIncremented = false; 075 switch (status) { 076 case DONE: 077 coordination.endTask(new SplitLogTask.Done(this.serverName), 078 SplitLogCounters.tot_wkr_task_done, splitTaskDetails); 079 break; 080 case PREEMPTED: 081 SplitLogCounters.tot_wkr_preempt_task.increment(); 082 wasCounterIncremented = true; 083 // Preempted state can currently be returned either when task is preempted, or when 084 // there's a particular kind of error (e.g. some ZK/HDFS errors, in my observation). 085 // In the latter case, master-side split task will get stuck if we don't update the 086 // status. Treat preemption as error to be on the safe side. 087 LOG.warn("task execution preempted; treating as error " + splitTaskDetails.getWALFile()); 088 //$FALL-THROUGH$ 089 case ERR: 090 if (server != null && !server.isStopped()) { 091 coordination.endTask(new SplitLogTask.Err(this.serverName), wasCounterIncremented 092 ? null : SplitLogCounters.tot_wkr_task_err, splitTaskDetails); 093 break; 094 } 095 // if the RS is exiting then there is probably a tons of stuff 096 // that can go wrong. Resign instead of signaling error. 097 //$FALL-THROUGH$ 098 case RESIGNED: 099 if (server != null && server.isStopped()) { 100 LOG.info("task execution interrupted because worker is exiting " 101 + splitTaskDetails.toString()); 102 } 103 coordination.endTask(new SplitLogTask.Resigned(this.serverName), wasCounterIncremented 104 ? null : SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails); 105 break; 106 } 107 } finally { 108 LOG.info("Worker " + serverName + " done with task " + splitTaskDetails.toString() + " in " 109 + (System.currentTimeMillis() - startTime) + "ms. Status = " + status); 110 this.inProgressTasks.decrementAndGet(); 111 } 112 } 113}