001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.procedure2.store.wal; 019 020import java.util.Collection; 021import java.util.Collections; 022import java.util.HashMap; 023import java.util.Map; 024import org.apache.yetus.audience.InterfaceAudience; 025import org.slf4j.Logger; 026import org.slf4j.LoggerFactory; 027 028import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; 029 030/** 031 * This class is used to track the active procedures when loading procedures from proc wal file. 032 * <p/> 033 * We will read proc wal files from new to old, but when reading a proc wal file, we will still read 034 * from top to bottom, so there are two groups of methods for this class. 035 * <p/> 036 * The first group is {@link #add(ProcedureProtos.Procedure)} and {@link #remove(long)}. It is used 037 * when reading a proc wal file. In these methods, for the same procedure, typically the one comes 038 * later should win, please see the comment for 039 * {@link #isIncreasing(ProcedureProtos.Procedure, ProcedureProtos.Procedure)} to see the 040 * exceptions. 041 * <p/> 042 * The second group is {@link #merge(WALProcedureMap)}. We will have a global 043 * {@link WALProcedureMap} to hold global the active procedures, and a local {@link WALProcedureMap} 044 * to hold the active procedures for the current proc wal file. And when we finish reading a proc 045 * wal file, we will merge the local one into the global one, by calling the 046 * {@link #merge(WALProcedureMap)} method of the global one and pass the local one in. In this 047 * method, for the same procedure, the one comes earlier will win, as we read the proc wal files 048 * from new to old(the reverse order). 049 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we 050 * use the new region based procedure store. 051 */ 052@Deprecated 053@InterfaceAudience.Private 054class WALProcedureMap { 055 056 private static final Logger LOG = LoggerFactory.getLogger(WALProcedureMap.class); 057 058 private final Map<Long, ProcedureProtos.Procedure> procMap = new HashMap<>(); 059 060 private long minModifiedProcId = Long.MAX_VALUE; 061 062 private long maxModifiedProcId = Long.MIN_VALUE; 063 064 private void trackProcId(long procId) { 065 minModifiedProcId = Math.min(minModifiedProcId, procId); 066 maxModifiedProcId = Math.max(maxModifiedProcId, procId); 067 } 068 069 /** 070 * @return True if this new procedure is 'richer' than the current one else false and we log this 071 * incidence where it appears that the WAL has older entries appended after newer ones. 072 * See HBASE-18152. 073 */ 074 private static boolean isIncreasing(ProcedureProtos.Procedure current, 075 ProcedureProtos.Procedure candidate) { 076 // Check that the procedures we see are 'increasing'. We used to compare 077 // procedure id first and then update time but it can legitimately go backwards if the 078 // procedure is failed or rolled back so that was unreliable. Was going to compare 079 // state but lets see if comparing update time enough (unfortunately this issue only 080 // seen under load...) 081 boolean increasing = current.getLastUpdate() <= candidate.getLastUpdate(); 082 if (!increasing) { 083 LOG.warn("NOT INCREASING! current=" + current + ", candidate=" + candidate); 084 } 085 return increasing; 086 } 087 088 public void add(ProcedureProtos.Procedure proc) { 089 procMap.compute(proc.getProcId(), (procId, existingProc) -> { 090 if (existingProc == null || isIncreasing(existingProc, proc)) { 091 return proc; 092 } else { 093 return existingProc; 094 } 095 }); 096 trackProcId(proc.getProcId()); 097 } 098 099 public void remove(long procId) { 100 procMap.remove(procId); 101 } 102 103 public boolean isEmpty() { 104 return procMap.isEmpty(); 105 } 106 107 public boolean contains(long procId) { 108 return procMap.containsKey(procId); 109 } 110 111 /** 112 * Merge the given {@link WALProcedureMap} into this one. The {@link WALProcedureMap} passed in 113 * will be cleared after merging. 114 */ 115 public void merge(WALProcedureMap other) { 116 other.procMap.forEach(procMap::putIfAbsent); 117 maxModifiedProcId = Math.max(maxModifiedProcId, other.maxModifiedProcId); 118 minModifiedProcId = Math.max(minModifiedProcId, other.minModifiedProcId); 119 other.procMap.clear(); 120 other.maxModifiedProcId = Long.MIN_VALUE; 121 other.minModifiedProcId = Long.MAX_VALUE; 122 } 123 124 public Collection<ProcedureProtos.Procedure> getProcedures() { 125 return Collections.unmodifiableCollection(procMap.values()); 126 } 127 128 public long getMinModifiedProcId() { 129 return minModifiedProcId; 130 } 131 132 public long getMaxModifiedProcId() { 133 return maxModifiedProcId; 134 } 135}