001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure.flush; 019 020import java.util.List; 021import java.util.concurrent.Callable; 022import java.util.stream.Collectors; 023import org.apache.hadoop.hbase.errorhandling.ForeignException; 024import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 025import org.apache.hadoop.hbase.procedure.ProcedureMember; 026import org.apache.hadoop.hbase.procedure.Subprocedure; 027import org.apache.hadoop.hbase.procedure.flush.RegionServerFlushTableProcedureManager.FlushTableSubprocedurePool; 028import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; 029import org.apache.hadoop.hbase.regionserver.HRegion; 030import org.apache.hadoop.hbase.util.Bytes; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035/** 036 * This flush region implementation uses the distributed procedure framework to flush table regions. 037 * Its acquireBarrier stage does nothing. Its insideBarrier stage flushes the regions. 038 */ 039@InterfaceAudience.Private 040public class FlushTableSubprocedure extends Subprocedure { 041 private static final Logger LOG = LoggerFactory.getLogger(FlushTableSubprocedure.class); 042 043 private final String table; 044 private final List<String> families; 045 private final List<HRegion> regions; 046 private final FlushTableSubprocedurePool taskManager; 047 048 public FlushTableSubprocedure(ProcedureMember member, ForeignExceptionDispatcher errorListener, 049 long wakeFrequency, long timeout, List<HRegion> regions, String table, List<String> families, 050 FlushTableSubprocedurePool taskManager) { 051 super(member, table, errorListener, wakeFrequency, timeout); 052 this.table = table; 053 this.families = families; 054 this.regions = regions; 055 this.taskManager = taskManager; 056 } 057 058 private static class RegionFlushTask implements Callable<Void> { 059 HRegion region; 060 List<byte[]> families; 061 062 RegionFlushTask(HRegion region, List<byte[]> families) { 063 this.region = region; 064 this.families = families; 065 } 066 067 @Override 068 public Void call() throws Exception { 069 LOG.debug("Starting region operation on " + region); 070 region.startRegionOperation(); 071 try { 072 LOG.debug("Flush region " + region.toString() + " started..."); 073 if (families == null || families.isEmpty()) { 074 region.flush(true); 075 } else { 076 region.flushcache(families, false, FlushLifeCycleTracker.DUMMY); 077 } 078 // TODO: flush result is not checked? 079 } finally { 080 LOG.debug("Closing region operation on " + region); 081 region.closeRegionOperation(); 082 } 083 return null; 084 } 085 } 086 087 private void flushRegions() throws ForeignException { 088 if (regions.isEmpty()) { 089 // No regions on this RS, we are basically done. 090 return; 091 } 092 093 monitor.rethrowException(); 094 095 // assert that the taskManager is empty. 096 if (taskManager.hasTasks()) { 097 throw new IllegalStateException( 098 "Attempting to flush " + table + " but we currently have outstanding tasks"); 099 } 100 101 List<byte[]> familiesToFlush = null; 102 if (families != null && !families.isEmpty()) { 103 LOG.debug("About to flush family {} on all regions for table {}", families, table); 104 familiesToFlush = families.stream().map(Bytes::toBytes).collect(Collectors.toList()); 105 } 106 // Add all hfiles already existing in region. 107 for (HRegion region : regions) { 108 // submit one task per region for parallelize by region. 109 taskManager.submitTask(new RegionFlushTask(region, familiesToFlush)); 110 monitor.rethrowException(); 111 } 112 113 // wait for everything to complete. 114 LOG.debug("Flush region tasks submitted for " + regions.size() + " regions"); 115 try { 116 taskManager.waitForOutstandingTasks(); 117 } catch (InterruptedException e) { 118 throw new ForeignException(getMemberName(), e); 119 } 120 } 121 122 /** 123 * Flush the online regions on this rs for the target table. 124 */ 125 @Override 126 public void acquireBarrier() throws ForeignException { 127 flushRegions(); 128 } 129 130 @Override 131 public byte[] insideBarrier() throws ForeignException { 132 // No-Op 133 return new byte[0]; 134 } 135 136 /** 137 * Cancel threads if they haven't finished. 138 */ 139 @Override 140 public void cleanup(Exception e) { 141 LOG.info("Aborting all flush region subprocedure task threads for '" + table + "' due to error", 142 e); 143 try { 144 taskManager.cancelTasks(); 145 } catch (InterruptedException e1) { 146 Thread.currentThread().interrupt(); 147 } 148 } 149 150 public void releaseBarrier() { 151 // NO OP 152 } 153 154}