本文主要研究一下flink的DualKeyMaphtml
@Test public void testKeySets() { final Random random = new Random(); final int capacity = 10; final Set<Tuple2<Integer, Integer>> keys = new HashSet<>(capacity); for (int i = 0; i < capacity; i++) { int keyA = random.nextInt(); int keyB = random.nextInt(); keys.add(Tuple2.of(keyA, keyB)); } final DualKeyMap<Integer, Integer, String> dualKeyMap = new DualKeyMap<>(capacity); for (Tuple2<Integer, Integer> key : keys) { dualKeyMap.put(key.f0, key.f1, "foobar"); } assertThat(dualKeyMap.keySetA(), Matchers.equalTo(keys.stream().map(t -> t.f0).collect(Collectors.toSet()))); assertThat(dualKeyMap.keySetB(), Matchers.equalTo(keys.stream().map(t -> t.f1).collect(Collectors.toSet()))); }
flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DualKeyMap.javajava
public class DualKeyMap<A, B, V> { private final HashMap<A, Tuple2<B, V>> aMap; private final HashMap<B, A> bMap; private transient Collection<V> values; public DualKeyMap(int initialCapacity) { this.aMap = new HashMap<>(initialCapacity); this.bMap = new HashMap<>(initialCapacity); } public int size() { return aMap.size(); } public V getKeyA(A aKey) { final Tuple2<B, V> value = aMap.get(aKey); if (value != null) { return value.f1; } else { return null; } } public V getKeyB(B bKey) { final A aKey = bMap.get(bKey); if (aKey != null) { return aMap.get(aKey).f1; } else { return null; } } public V put(A aKey, B bKey, V value) { Tuple2<B, V> aValue = aMap.put(aKey, Tuple2.of(bKey, value)); bMap.put(bKey, aKey); if (aValue != null) { return aValue.f1; } else { return null; } } public boolean containsKeyA(A aKey) { return aMap.containsKey(aKey); } public boolean containsKeyB(B bKey) { return bMap.containsKey(bKey); } public V removeKeyA(A aKey) { Tuple2<B, V> aValue = aMap.remove(aKey); if (aValue != null) { bMap.remove(aValue.f0); return aValue.f1; } else { return null; } } public V removeKeyB(B bKey) { A aKey = bMap.remove(bKey); if (aKey != null) { Tuple2<B, V> aValue = aMap.remove(aKey); if (aValue != null) { return aValue.f1; } else { return null; } } else { return null; } } public Collection<V> values() { Collection<V> vs = values; if (vs == null) { vs = new Values(); values = vs; } return vs; } public Set<A> keySetA() { return aMap.keySet(); } public Set<B> keySetB() { return bMap.keySet(); } public void clear() { aMap.clear(); bMap.clear(); } // ----------------------------------------------------------------------- // Inner classes // ----------------------------------------------------------------------- /** * Collection which contains the values of the dual key map. */ private final class Values extends AbstractCollection<V> { @Override public Iterator<V> iterator() { return new ValueIterator(); } @Override public int size() { return aMap.size(); } } /** * Iterator which iterates over the values of the dual key map. */ private final class ValueIterator implements Iterator<V> { private final Iterator<Tuple2<B, V>> iterator = aMap.values().iterator(); @Override public boolean hasNext() { return iterator.hasNext(); } @Override public V next() { Tuple2<B, V> value = iterator.next(); return value.f1; } } }