001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.region; 019 020import static org.junit.jupiter.api.Assertions.assertEquals; 021import static org.junit.jupiter.api.Assertions.assertFalse; 022import static org.junit.jupiter.api.Assertions.assertTrue; 023 024import java.io.IOException; 025import java.net.InetAddress; 026import java.security.cert.X509Certificate; 027import java.util.HashSet; 028import java.util.Map; 029import java.util.Optional; 030import java.util.Set; 031import org.apache.hadoop.hbase.ExtendedCellScanner; 032import org.apache.hadoop.hbase.client.Get; 033import org.apache.hadoop.hbase.ipc.RpcCall; 034import org.apache.hadoop.hbase.ipc.RpcCallback; 035import org.apache.hadoop.hbase.ipc.RpcServer; 036import org.apache.hadoop.hbase.procedure2.Procedure; 037import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; 038import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; 039import org.apache.hadoop.hbase.security.User; 040import org.apache.hadoop.hbase.testclassification.MasterTests; 041import org.apache.hadoop.hbase.testclassification.SmallTests; 042import org.apache.hadoop.hbase.util.Bytes; 043import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 044import org.junit.jupiter.api.Tag; 045import org.junit.jupiter.api.Test; 046import org.slf4j.Logger; 047import org.slf4j.LoggerFactory; 048 049import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 050import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; 051import org.apache.hbase.thirdparty.com.google.protobuf.Message; 052 053import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; 055 056@Tag(SmallTests.TAG) 057@Tag(MasterTests.TAG) 058public class TestRegionProcedureStore extends RegionProcedureStoreTestBase { 059 060 private static final Logger LOG = LoggerFactory.getLogger(TestRegionProcedureStore.class); 061 062 private void verifyProcIdsOnRestart(final Set<Long> procIds) throws Exception { 063 LOG.debug("expected: " + procIds); 064 LoadCounter loader = new LoadCounter(); 065 ProcedureTestingUtility.storeRestart(store, true, loader); 066 assertEquals(procIds.size(), loader.getLoadedCount()); 067 assertEquals(0, loader.getCorruptedCount()); 068 } 069 070 @Test 071 public void testLoad() throws Exception { 072 Set<Long> procIds = new HashSet<>(); 073 074 // Insert something in the log 075 RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); 076 procIds.add(proc1.getProcId()); 077 store.insert(proc1, null); 078 079 RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure(); 080 RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure(); 081 proc3.setParent(proc2); 082 RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure(); 083 proc4.setParent(proc2); 084 085 procIds.add(proc2.getProcId()); 086 procIds.add(proc3.getProcId()); 087 procIds.add(proc4.getProcId()); 088 store.insert(proc2, new Procedure[] { proc3, proc4 }); 089 090 // Verify that everything is there 091 verifyProcIdsOnRestart(procIds); 092 093 // Update and delete something 094 proc1.finish(); 095 store.update(proc1); 096 proc4.finish(); 097 store.update(proc4); 098 store.delete(proc4.getProcId()); 099 procIds.remove(proc4.getProcId()); 100 101 // Verify that everything is there 102 verifyProcIdsOnRestart(procIds); 103 } 104 105 @Test 106 public void testCleanup() throws Exception { 107 RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); 108 store.insert(proc1, null); 109 RegionProcedureStoreTestProcedure proc2 = new RegionProcedureStoreTestProcedure(); 110 store.insert(proc2, null); 111 RegionProcedureStoreTestProcedure proc3 = new RegionProcedureStoreTestProcedure(); 112 store.insert(proc3, null); 113 LoadCounter loader = new LoadCounter(); 114 store.load(loader); 115 assertEquals(proc3.getProcId(), loader.getMaxProcId()); 116 assertEquals(3, loader.getRunnableCount()); 117 118 store.delete(proc3.getProcId()); 119 store.delete(proc2.getProcId()); 120 loader = new LoadCounter(); 121 store.load(loader); 122 assertEquals(proc3.getProcId(), loader.getMaxProcId()); 123 assertEquals(1, loader.getRunnableCount()); 124 125 // the row should still be there 126 assertTrue(store.region 127 .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); 128 assertTrue(store.region 129 .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists()); 130 131 // proc2 will be deleted after cleanup, but proc3 should still be there as it holds the max proc 132 // id 133 store.cleanup(); 134 assertTrue(store.region 135 .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); 136 assertFalse(store.region 137 .get(new Get(Bytes.toBytes(proc2.getProcId())).setCheckExistenceOnly(true)).getExists()); 138 139 RegionProcedureStoreTestProcedure proc4 = new RegionProcedureStoreTestProcedure(); 140 store.insert(proc4, null); 141 store.cleanup(); 142 // proc3 should also be deleted as now proc4 holds the max proc id 143 assertFalse(store.region 144 .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); 145 } 146 147 /** 148 * Test for HBASE-23895 149 */ 150 @Test 151 public void testInsertWithRpcCall() throws Exception { 152 RpcServer.setCurrentCall(newRpcCallWithDeadline()); 153 RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); 154 store.insert(proc1, null); 155 RpcServer.setCurrentCall(null); 156 } 157 158 @SuppressWarnings("checkstyle:methodlength") 159 private RpcCall newRpcCallWithDeadline() { 160 return new RpcCall() { 161 @Override 162 public long getDeadline() { 163 return EnvironmentEdgeManager.currentTime(); 164 } 165 166 @Override 167 public BlockingService getService() { 168 return null; 169 } 170 171 @Override 172 public Descriptors.MethodDescriptor getMethod() { 173 return null; 174 } 175 176 @Override 177 public Message getParam() { 178 return null; 179 } 180 181 @Override 182 public ExtendedCellScanner getCellScanner() { 183 return null; 184 } 185 186 @Override 187 public long getReceiveTime() { 188 return 0; 189 } 190 191 @Override 192 public long getStartTime() { 193 return 0; 194 } 195 196 @Override 197 public void setStartTime(long startTime) { 198 199 } 200 201 @Override 202 public int getTimeout() { 203 return 0; 204 } 205 206 @Override 207 public int getPriority() { 208 return 0; 209 } 210 211 @Override 212 public long getSize() { 213 return 0; 214 } 215 216 @Override 217 public RPCProtos.RequestHeader getHeader() { 218 return null; 219 } 220 221 @Override 222 public Map<String, byte[]> getConnectionAttributes() { 223 return null; 224 } 225 226 @Override 227 public Map<String, byte[]> getRequestAttributes() { 228 return null; 229 } 230 231 @Override 232 public byte[] getRequestAttribute(String key) { 233 return null; 234 } 235 236 @Override 237 public int getRemotePort() { 238 return 0; 239 } 240 241 @Override 242 public void setResponse(Message param, ExtendedCellScanner cells, Throwable errorThrowable, 243 String error) { 244 } 245 246 @Override 247 public void sendResponseIfReady() throws IOException { 248 } 249 250 @Override 251 public void cleanup() { 252 } 253 254 @Override 255 public String toShortString() { 256 return null; 257 } 258 259 @Override 260 public long disconnectSince() { 261 return 0; 262 } 263 264 @Override 265 public boolean isClientCellBlockSupported() { 266 return false; 267 } 268 269 @Override 270 public Optional<User> getRequestUser() { 271 return Optional.empty(); 272 } 273 274 @Override 275 public Optional<X509Certificate[]> getClientCertificateChain() { 276 return Optional.empty(); 277 } 278 279 @Override 280 public InetAddress getRemoteAddress() { 281 return null; 282 } 283 284 @Override 285 public HBaseProtos.VersionInfo getClientVersionInfo() { 286 return null; 287 } 288 289 @Override 290 public void setCallBack(RpcCallback callback) { 291 } 292 293 @Override 294 public boolean isRetryImmediatelySupported() { 295 return false; 296 } 297 298 @Override 299 public long getResponseCellSize() { 300 return 0; 301 } 302 303 @Override 304 public void incrementResponseCellSize(long cellSize) { 305 } 306 307 @Override 308 public long getBlockBytesScanned() { 309 return 0; 310 } 311 312 @Override 313 public void incrementBlockBytesScanned(long blockSize) { 314 } 315 316 @Override 317 public long getResponseExceptionSize() { 318 return 0; 319 } 320 321 @Override 322 public void incrementResponseExceptionSize(long exceptionSize) { 323 } 324 }; 325 } 326}