001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.regionserver.handler; 019 020import java.io.IOException; 021import java.util.concurrent.atomic.AtomicInteger; 022import org.apache.hadoop.hbase.Server; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.SplitLogCounters; 025import org.apache.hadoop.hbase.SplitLogTask; 026import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; 027import org.apache.hadoop.hbase.executor.EventHandler; 028import org.apache.hadoop.hbase.executor.EventType; 029import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor; 030import org.apache.hadoop.hbase.regionserver.SplitLogWorker.TaskExecutor.Status; 031import org.apache.hadoop.hbase.util.CancelableProgressable; 032import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 033import org.apache.yetus.audience.InterfaceAudience; 034import org.slf4j.Logger; 035import org.slf4j.LoggerFactory; 036 037/** 038 * Handles log splitting a wal Used by the zk-based distributed log splitting. Created by 039 * ZKSplitLogWorkerCoordination. 040 * @deprecated since 2.4.0 and in 3.0.0, to be removed in 4.0.0, replaced by procedure-based 041 * distributed WAL splitter, see SplitWALManager 042 */ 043@Deprecated 044@InterfaceAudience.Private 045public class WALSplitterHandler extends EventHandler { 046 private static final Logger LOG = LoggerFactory.getLogger(WALSplitterHandler.class); 047 private final ServerName serverName; 048 private final CancelableProgressable reporter; 049 private final AtomicInteger inProgressTasks; 050 private final TaskExecutor splitTaskExecutor; 051 private final SplitLogWorkerCoordination.SplitTaskDetails splitTaskDetails; 052 private final SplitLogWorkerCoordination coordination; 053 054 public WALSplitterHandler(final Server server, SplitLogWorkerCoordination coordination, 055 SplitLogWorkerCoordination.SplitTaskDetails splitDetails, CancelableProgressable reporter, 056 AtomicInteger inProgressTasks, TaskExecutor splitTaskExecutor) { 057 super(server, EventType.RS_LOG_REPLAY); 058 this.splitTaskDetails = splitDetails; 059 this.coordination = coordination; 060 this.reporter = reporter; 061 this.inProgressTasks = inProgressTasks; 062 this.inProgressTasks.incrementAndGet(); 063 this.serverName = server.getServerName(); 064 this.splitTaskExecutor = splitTaskExecutor; 065 } 066 067 @Override 068 public void process() throws IOException { 069 long startTime = EnvironmentEdgeManager.currentTime(); 070 Status status = null; 071 try { 072 status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), reporter); 073 switch (status) { 074 case DONE: 075 coordination.endTask(new SplitLogTask.Done(this.serverName), 076 SplitLogCounters.tot_wkr_task_done, splitTaskDetails); 077 break; 078 case PREEMPTED: 079 SplitLogCounters.tot_wkr_preempt_task.increment(); 080 LOG.warn("task execution preempted " + splitTaskDetails.getWALFile()); 081 break; 082 case ERR: 083 if (server != null && !server.isStopped()) { 084 coordination.endTask(new SplitLogTask.Err(this.serverName), 085 SplitLogCounters.tot_wkr_task_err, splitTaskDetails); 086 break; 087 } 088 // if the RS is exiting then there is probably a tons of stuff 089 // that can go wrong. Resign instead of signaling error. 090 // $FALL-THROUGH$ 091 case RESIGNED: 092 if (server != null && server.isStopped()) { 093 LOG.info("task execution interrupted because worker is exiting " 094 + splitTaskDetails.toString()); 095 } 096 coordination.endTask(new SplitLogTask.Resigned(this.serverName), 097 SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails); 098 break; 099 } 100 } finally { 101 LOG.info("Worker " + serverName + " done with task " + splitTaskDetails.toString() + " in " 102 + (EnvironmentEdgeManager.currentTime() - startTime) + "ms. Status = " + status); 103 this.inProgressTasks.decrementAndGet(); 104 } 105 } 106}