001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure.flush; 019 020import java.util.Arrays; 021import java.util.Collections; 022import java.util.List; 023import java.util.concurrent.Callable; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028import org.apache.hadoop.hbase.errorhandling.ForeignException; 029import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 030import org.apache.hadoop.hbase.procedure.ProcedureMember; 031import org.apache.hadoop.hbase.procedure.Subprocedure; 032import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; 033import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 034import org.apache.hadoop.hbase.regionserver.HRegion; 035import org.apache.hadoop.hbase.util.Bytes; 036 037/** 038 * This flush region implementation uses the distributed procedure framework to flush 039 * table regions. 040 * Its acquireBarrier stage does nothing. Its insideBarrier stage flushes the regions. 041 */ 042@InterfaceAudience.Private 043public class FlushTableSubprocedure extends Subprocedure { 044 private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class); 045 046 private final String table; 047 private final String family; 048 private final List<HRegion> regions; 049 private final FlushTableSubprocedurePool taskManager; 050 051 public FlushTableSubprocedure(ProcedureMember member, 052 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, 053 List<HRegion> regions, String table, String family, 054 FlushTableSubprocedurePool taskManager) { 055 super(member, table, errorListener, wakeFrequency, timeout); 056 this.table = table; 057 this.family = family; 058 this.regions = regions; 059 this.taskManager = taskManager; 060 } 061 062 private static class RegionFlushTask implements Callable<Void> { 063 HRegion region; 064 List<byte[]> families; 065 RegionFlushTask(HRegion region, List<byte[]> families) { 066 this.region = region; 067 this.families = families; 068 } 069 070 @Override 071 public Void call() throws Exception { 072 LOG.debug("Starting region operation on " + region); 073 region.startRegionOperation(); 074 try { 075 LOG.debug("Flush region " + region.toString() + " started..."); 076 if (families == null) { 077 region.flush(true); 078 } else { 079 region.flushcache(families, false, FlushLifeCycleTracker.DUMMY); 080 } 081 // TODO: flush result is not checked? 082 } finally { 083 LOG.debug("Closing region operation on " + region); 084 region.closeRegionOperation(); 085 } 086 return null; 087 } 088 } 089 090 private void flushRegions() throws ForeignException { 091 if (regions.isEmpty()) { 092 // No regions on this RS, we are basically done. 093 return; 094 } 095 096 monitor.rethrowException(); 097 098 // assert that the taskManager is empty. 099 if (taskManager.hasTasks()) { 100 throw new IllegalStateException("Attempting to flush " 101 + table + " but we currently have outstanding tasks"); 102 } 103 List<byte[]> families = null; 104 if (family != null) { 105 LOG.debug("About to flush family {} on all regions for table {}", family, table); 106 families = Collections.singletonList(Bytes.toBytes(family)); 107 } 108 // Add all hfiles already existing in region. 109 for (HRegion region : regions) { 110 // submit one task per region for parallelize by region. 111 taskManager.submitTask(new RegionFlushTask(region, families)); 112 monitor.rethrowException(); 113 } 114 115 // wait for everything to complete. 116 LOG.debug("Flush region tasks submitted for " + regions.size() + " regions"); 117 try { 118 taskManager.waitForOutstandingTasks(); 119 } catch (InterruptedException e) { 120 throw new ForeignException(getMemberName(), e); 121 } 122 } 123 124 /** 125 * Flush the online regions on this rs for the target table. 126 */ 127 @Override 128 public void acquireBarrier() throws ForeignException { 129 flushRegions(); 130 } 131 132 @Override 133 public byte[] insideBarrier() throws ForeignException { 134 // No-Op 135 return new byte[0]; 136 } 137 138 /** 139 * Cancel threads if they haven't finished. 140 */ 141 @Override 142 public void cleanup(Exception e) { 143 LOG.info("Aborting all flush region subprocedure task threads for '" 144 + table + "' due to error", e); 145 try { 146 taskManager.cancelTasks(); 147 } catch (InterruptedException e1) { 148 Thread.currentThread().interrupt(); 149 } 150 } 151 152 public void releaseBarrier() { 153 // NO OP 154 } 155 156}