001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.region; 019 020import static org.junit.Assert.assertEquals; 021import static org.junit.Assert.assertFalse; 022import static org.junit.Assert.assertTrue; 023 024import java.io.IOException; 025import java.net.InetAddress; 026import java.util.HashSet; 027import java.util.Optional; 028import java.util.Set; 029import org.apache.hadoop.hbase.CellScanner; 030import org.apache.hadoop.hbase.HBaseClassTestRule; 031import org.apache.hadoop.hbase.client.Get; 032import org.apache.hadoop.hbase.ipc.RpcCall; 033import org.apache.hadoop.hbase.ipc.RpcCallback; 034import org.apache.hadoop.hbase.ipc.RpcServer; 035import org.apache.hadoop.hbase.procedure2.Procedure; 036import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 037import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 038import org.apache.hadoop.hbase.security.User; 039import org.apache.hadoop.hbase.testclassification.MasterTests; 040import org.apache.hadoop.hbase.testclassification.SmallTests; 041import org.apache.hadoop.hbase.util.Bytes; 042import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 043import org.junit.ClassRule; 044import org.junit.Test; 045import org.junit.experimental.categories.Category; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 051import org.apache.hbase.thirdparty.com.google.protobuf.Message; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 055 056@Category({ MasterTests.class, SmallTests.class }) 057public class TestRegionProcedureStore extends RegionProcedureStoreTestBase { 058 059 @ClassRule 060 public static final HBaseClassTestRule CLASS_RULE = 061 HBaseClassTestRule.forClass(TestRegionProcedureStore.class); 062 063 private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class); 064 065 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { 066 LOG.debug("expected: " + procIds); 067 LoadCounter loader = new LoadCounter(); 068 ProcedureTestingUtility.storeRestart(store, true, loader); 069 assertEquals(procIds.size(), loader.getLoadedCount()); 070 assertEquals(0, loader.getCorruptedCount()); 071 } 072 073 @Test 074 public void testLoad() throws Exception { 075 Set<Long> procIds = new HashSet<>(); 076 077 // Insert something in the log 078 RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); 079 procIds.add(proc1.getProcId()); 080 store.insert(proc1, null); 081 082 RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure(); 083 RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure(); 084 proc3.setParent(proc2); 085 RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure(); 086 proc4.setParent(proc2); 087 088 procIds.add(proc2.getProcId()); 089 procIds.add(proc3.getProcId()); 090 procIds.add(proc4.getProcId()); 091 store.insert(proc2, new Procedure[] { proc3, proc4 }); 092 093 // Verify that everything is there 094 verifyProcIdsOnRestart(procIds); 095 096 // Update and delete something 097 proc1.finish(); 098 store.update(proc1); 099 proc4.finish(); 100 store.update(proc4); 101 store.delete(proc4.getProcId()); 102 procIds.remove(proc4.getProcId()); 103 104 // Verify that everything is there 105 verifyProcIdsOnRestart(procIds); 106 } 107 108 @Test 109 public void testCleanup() throws Exception { 110 RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); 111 store.insert(proc1, null); 112 RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure(); 113 store.insert(proc2, null); 114 RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure(); 115 store.insert(proc3, null); 116 LoadCounter loader = new LoadCounter(); 117 store.load(loader); 118 assertEquals(proc3.getProcId(), loader.getMaxProcId()); 119 assertEquals(3, loader.getRunnableCount()); 120 121 store.delete(proc3.getProcId()); 122 store.delete(proc2.getProcId()); 123 loader = new LoadCounter(); 124 store.load(loader); 125 assertEquals(proc3.getProcId(), loader.getMaxProcId()); 126 assertEquals(1, loader.getRunnableCount()); 127 128 // the row should still be there 129 assertTrue(store.region 130 .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); 131 assertTrue(store.region 132 .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists()); 133 134 // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc 135 // id 136 store.cleanup(); 137 assertTrue(store.region 138 .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); 139 assertFalse(store.region 140 .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists()); 141 142 RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure(); 143 store.insert(proc4, null); 144 store.cleanup(); 145 // proc3 should also be deleted as now proc4 holds the max proc id 146 assertFalse(store.region 147 .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); 148 } 149 150 /** 151 * Test for HBASE-23895 152 */ 153 @Test 154 public void testInsertWithRpcCall() throws Exception { 155 RpcServer.setCurrentCall(newRpcCallWithDeadline()); 156 RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); 157 store.insert(proc1, null); 158 RpcServer.setCurrentCall(null); 159 } 160 161 private RpcCall newRpcCallWithDeadline() { 162 return new RpcCall() { 163 @Override 164 public long getDeadline() { 165 return EnvironmentEdgeManager.currentTime(); 166 } 167 168 @Override 169 public BlockingService getService() { 170 return null; 171 } 172 173 @Override 174 public Descriptors.MethodDescriptor getMethod() { 175 return null; 176 } 177 178 @Override 179 public Message getParam() { 180 return null; 181 } 182 183 @Override 184 public CellScanner getCellScanner() { 185 return null; 186 } 187 188 @Override 189 public long getReceiveTime() { 190 return 0; 191 } 192 193 @Override 194 public long getStartTime() { 195 return 0; 196 } 197 198 @Override 199 public void setStartTime(long startTime) { 200 201 } 202 203 @Override 204 public int getTimeout() { 205 return 0; 206 } 207 208 @Override 209 public int getPriority() { 210 return 0; 211 } 212 213 @Override 214 public long getSize() { 215 return 0; 216 } 217 218 @Override 219 public RPCProtos.RequestHeader getHeader() { 220 return null; 221 } 222 223 @Override 224 public int getRemotePort() { 225 return 0; 226 } 227 228 @Override 229 public void setResponse(Message param, CellScanner cells, Throwable errorThrowable, 230 String error) { 231 } 232 233 @Override 234 public void sendResponseIfReady() throws IOException { 235 } 236 237 @Override 238 public void cleanup() { 239 } 240 241 @Override 242 public String toShortString() { 243 return null; 244 } 245 246 @Override 247 public long disconnectSince() { 248 return 0; 249 } 250 251 @Override 252 public boolean isClientCellBlockSupported() { 253 return false; 254 } 255 256 @Override 257 public Optional<User> getRequestUser() { 258 return Optional.empty(); 259 } 260 261 @Override 262 public InetAddress getRemoteAddress() { 263 return null; 264 } 265 266 @Override 267 public HBaseProtos.VersionInfo getClientVersionInfo() { 268 return null; 269 } 270 271 @Override 272 public void setCallBack(RpcCallback callback) { 273 } 274 275 @Override 276 public boolean isRetryImmediatelySupported() { 277 return false; 278 } 279 280 @Override 281 public long getResponseCellSize() { 282 return 0; 283 } 284 285 @Override 286 public void incrementResponseCellSize(long cellSize) { 287 } 288 289 @Override 290 public long getResponseBlockSize() { 291 return 0; 292 } 293 294 @Override 295 public void incrementResponseBlockSize(long blockSize) { 296 } 297 298 @Override 299 public long getResponseExceptionSize() { 300 return 0; 301 } 302 303 @Override 304 public void incrementResponseExceptionSize(long exceptionSize) { 305 } 306 }; 307 } 308}