001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.util; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.concurrent.Callable; 023import org.apache.hadoop.hbase.HRegionLocation; 024import org.apache.hadoop.hbase.ServerName; 025import org.apache.hadoop.hbase.client.Admin; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.client.RegionInfo; 028import org.apache.hadoop.hbase.client.ResultScanner; 029import org.apache.hadoop.hbase.client.Scan; 030import org.apache.hadoop.hbase.client.Table; 031import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; 032import org.apache.yetus.audience.InterfaceAudience; 033import org.slf4j.Logger; 034import org.slf4j.LoggerFactory; 035 036/** 037 * Move Regions and make sure that they are up on the target server.If a region movement fails we 038 * exit as failure 039 */ 040@InterfaceAudience.Private 041class MoveWithAck implements Callable<Boolean> { 042 043 private static final Logger LOG = LoggerFactory.getLogger(MoveWithAck.class); 044 045 private final RegionInfo region; 046 private final ServerName targetServer; 047 private final List<RegionInfo> movedRegions; 048 private final ServerName sourceServer; 049 private final Connection conn; 050 private final Admin admin; 051 052 MoveWithAck(Connection conn, RegionInfo regionInfo, ServerName sourceServer, 053 ServerName targetServer, List<RegionInfo> movedRegions) throws IOException { 054 this.conn = conn; 055 this.region = regionInfo; 056 this.targetServer = targetServer; 057 this.movedRegions = movedRegions; 058 this.sourceServer = sourceServer; 059 this.admin = conn.getAdmin(); 060 } 061 062 @Override 063 public Boolean call() throws IOException, InterruptedException { 064 boolean moved = false; 065 int count = 0; 066 int retries = admin.getConfiguration().getInt(RegionMover.MOVE_RETRIES_MAX_KEY, 067 RegionMover.DEFAULT_MOVE_RETRIES_MAX); 068 int maxWaitInSeconds = admin.getConfiguration().getInt(RegionMover.MOVE_WAIT_MAX_KEY, 069 RegionMover.DEFAULT_MOVE_WAIT_MAX); 070 long startTime = EnvironmentEdgeManager.currentTime(); 071 boolean sameServer = true; 072 // Assert we can scan the region in its current location 073 isSuccessfulScan(region); 074 LOG.info("Moving region: {} from {} to {}", region.getRegionNameAsString(), sourceServer, 075 targetServer); 076 while (count < retries && sameServer) { 077 if (count > 0) { 078 LOG.debug("Retry {} of maximum {} for region: {}", count, retries, 079 region.getRegionNameAsString()); 080 } 081 count = count + 1; 082 admin.move(region.getEncodedNameAsBytes(), targetServer); 083 long maxWait = startTime + (maxWaitInSeconds * 1000); 084 while (EnvironmentEdgeManager.currentTime() < maxWait) { 085 sameServer = isSameServer(region, sourceServer); 086 if (!sameServer) { 087 break; 088 } 089 Thread.sleep(1000); 090 } 091 } 092 if (sameServer) { 093 LOG.error("Region: {} stuck on {} for {} sec , newServer={}", region.getRegionNameAsString(), 094 this.sourceServer, getTimeDiffInSec(startTime), this.targetServer); 095 } else { 096 isSuccessfulScan(region); 097 LOG.info("Moved Region {} , cost (sec): {}", region.getRegionNameAsString(), 098 getTimeDiffInSec(startTime)); 099 moved = true; 100 movedRegions.add(region); 101 } 102 return moved; 103 } 104 105 private static String getTimeDiffInSec(long startTime) { 106 return String.format("%.3f", (float) (EnvironmentEdgeManager.currentTime() - startTime) / 1000); 107 } 108 109 /** 110 * Tries to scan a row from passed region 111 */ 112 private void isSuccessfulScan(RegionInfo region) throws IOException { 113 Scan scan = new Scan().withStartRow(region.getStartKey()).withStopRow(region.getEndKey(), false) 114 .setRaw(true).setOneRowLimit().setMaxResultSize(1L).setCaching(1) 115 .setFilter(new FirstKeyOnlyFilter()).setCacheBlocks(false); 116 try (Table table = conn.getTable(region.getTable()); 117 ResultScanner scanner = table.getScanner(scan)) { 118 scanner.next(); 119 } catch (IOException e) { 120 LOG.error("Could not scan region: {}", region.getEncodedName(), e); 121 throw e; 122 } 123 } 124 125 /** 126 * Returns true if passed region is still on serverName when we look at hbase:meta. 127 * @return true if region is hosted on serverName otherwise false 128 */ 129 private boolean isSameServer(RegionInfo region, ServerName serverName) throws IOException { 130 ServerName serverForRegion = getServerNameForRegion(region, admin, conn); 131 return serverForRegion != null && serverForRegion.equals(serverName); 132 } 133 134 /** 135 * Get servername that is up in hbase:meta hosting the given region. this is hostname + port + 136 * startcode comma-delimited. Can return null 137 * @return regionServer hosting the given region 138 */ 139 static ServerName getServerNameForRegion(RegionInfo region, Admin admin, Connection conn) 140 throws IOException { 141 if (!admin.tableExists(region.getTable()) || !admin.isTableEnabled(region.getTable())) { 142 return null; 143 } 144 HRegionLocation loc = conn.getRegionLocator(region.getTable()) 145 .getRegionLocation(region.getStartKey(), region.getReplicaId(), true); 146 if (loc != null) { 147 return loc.getServerName(); 148 } else { 149 return null; 150 } 151 } 152 153}