/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.streams;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.Utils;
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
import org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.TasksTupleWithEpochs;
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineLong;
import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger;

public class StreamsGroup
implements Group {
    private static final String PROTOCOL_TYPE = "streams";
    private final Logger log;
    private final SnapshotRegistry snapshotRegistry;
    private final String groupId;
    private final TimelineObject<StreamsGroupState> state;
    private final TimelineInteger groupEpoch;
    private final TimelineHashMap<String, StreamsGroupMember> members;
    private final TimelineHashMap<String, String> staticMembers;
    private final TimelineInteger validatedTopologyEpoch;
    protected final TimelineLong metadataHash;
    private final TimelineInteger targetAssignmentEpoch;
    private final TimelineHashMap<String, TasksTuple> targetAssignment;
    private final TimelineHashMap<String, TimelineHashMap<Integer, String>> currentActiveTaskToProcessId;
    private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentStandbyTaskToProcessIds;
    private final TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentWarmupTaskToProcessIds;
    private final TimelineObject<Optional<StreamsTopology>> topology;
    private final TimelineObject<Optional<ConfiguredTopology>> configuredTopology;
    private DeadlineAndEpoch metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
    private Optional<String> shutdownRequestMemberId = Optional.empty();
    private int endpointInformationEpoch = 0;
    private TimelineHashMap<String, String> lastAssignmentConfigs;

    public StreamsGroup(LogContext logContext, SnapshotRegistry snapshotRegistry, String groupId) {
        this.log = logContext.logger(StreamsGroup.class);
        this.snapshotRegistry = Objects.requireNonNull(snapshotRegistry);
        this.groupId = Objects.requireNonNull(groupId);
        this.state = new TimelineObject(snapshotRegistry, (Object)StreamsGroupState.EMPTY);
        this.groupEpoch = new TimelineInteger(snapshotRegistry);
        this.members = new TimelineHashMap(snapshotRegistry, 0);
        this.staticMembers = new TimelineHashMap(snapshotRegistry, 0);
        this.validatedTopologyEpoch = new TimelineInteger(snapshotRegistry);
        this.metadataHash = new TimelineLong(snapshotRegistry);
        this.targetAssignmentEpoch = new TimelineInteger(snapshotRegistry);
        this.targetAssignment = new TimelineHashMap(snapshotRegistry, 0);
        this.currentActiveTaskToProcessId = new TimelineHashMap(snapshotRegistry, 0);
        this.currentStandbyTaskToProcessIds = new TimelineHashMap(snapshotRegistry, 0);
        this.currentWarmupTaskToProcessIds = new TimelineHashMap(snapshotRegistry, 0);
        this.topology = new TimelineObject(snapshotRegistry, Optional.empty());
        this.configuredTopology = new TimelineObject(snapshotRegistry, Optional.empty());
        this.lastAssignmentConfigs = new TimelineHashMap(snapshotRegistry, 0);
    }

    @Override
    public Group.GroupType type() {
        return Group.GroupType.STREAMS;
    }

    @Override
    public String stateAsString() {
        return ((StreamsGroupState)((Object)this.state.get())).toString();
    }

    @Override
    public String stateAsString(long committedOffset) {
        return ((StreamsGroupState)((Object)this.state.get(committedOffset))).toString();
    }

    @Override
    public ListGroupsResponseData.ListedGroup asListedGroup(long committedOffset) {
        return new ListGroupsResponseData.ListedGroup().setGroupId(this.groupId).setProtocolType(PROTOCOL_TYPE).setGroupState(((StreamsGroupState)((Object)this.state.get(committedOffset))).toString()).setGroupType(this.type().toString());
    }

    public Optional<ConfiguredTopology> configuredTopology() {
        return (Optional)this.configuredTopology.get();
    }

    public Optional<StreamsTopology> topology() {
        return (Optional)this.topology.get();
    }

    public void setTopology(StreamsTopology topology) {
        this.topology.set(Optional.ofNullable(topology));
        this.maybeUpdateGroupState();
    }

    public void setConfiguredTopology(ConfiguredTopology configuredTopology) {
        this.configuredTopology.set(Optional.ofNullable(configuredTopology));
    }

    @Override
    public String groupId() {
        return this.groupId;
    }

    public StreamsGroupState state() {
        return (StreamsGroupState)((Object)this.state.get());
    }

    public int groupEpoch() {
        return this.groupEpoch.get();
    }

    public void setGroupEpoch(int groupEpoch) {
        this.groupEpoch.set(groupEpoch);
        this.maybeUpdateGroupState();
    }

    public int assignmentEpoch() {
        return this.targetAssignmentEpoch.get();
    }

    public void setTargetAssignmentEpoch(int targetAssignmentEpoch) {
        this.targetAssignmentEpoch.set(targetAssignmentEpoch);
        this.maybeUpdateGroupState();
    }

    public String staticMemberId(String groupInstanceId) {
        return (String)this.staticMembers.get((Object)groupInstanceId);
    }

    public StreamsGroupMember getMemberOrThrow(String memberId) throws UnknownMemberIdException {
        StreamsGroupMember member = (StreamsGroupMember)this.members.get((Object)memberId);
        if (member != null) {
            return member;
        }
        throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", memberId, this.groupId));
    }

    public StreamsGroupMember getOrCreateUninitializedMember(String memberId) throws UnknownMemberIdException {
        StreamsGroupMember member = (StreamsGroupMember)this.members.get((Object)memberId);
        if (member != null) {
            return member;
        }
        return new StreamsGroupMember.Builder(memberId).build();
    }

    public StreamsGroupMember getOrCreateDefaultMember(String memberId) throws UnknownMemberIdException {
        StreamsGroupMember member = (StreamsGroupMember)this.members.get((Object)memberId);
        if (member != null) {
            return member;
        }
        return StreamsGroupMember.Builder.withDefaults(memberId).build();
    }

    public StreamsGroupMember staticMember(String instanceId) {
        String existingMemberId = this.staticMemberId(instanceId);
        return existingMemberId == null ? null : this.getMemberOrThrow(existingMemberId);
    }

    public void updateMember(StreamsGroupMember newMember) {
        if (newMember == null) {
            throw new IllegalArgumentException("newMember cannot be null.");
        }
        StreamsGroupMember oldMember = (StreamsGroupMember)this.members.put((Object)newMember.memberId(), (Object)newMember);
        this.maybeUpdateTaskProcessId(oldMember, newMember);
        this.updateStaticMember(newMember);
        this.maybeUpdateGroupState();
    }

    private void updateStaticMember(StreamsGroupMember newMember) {
        if (newMember.instanceId() != null && newMember.instanceId().isPresent()) {
            this.staticMembers.put((Object)newMember.instanceId().get(), (Object)newMember.memberId());
        }
    }

    public void removeMember(String memberId) {
        StreamsGroupMember oldMember = (StreamsGroupMember)this.members.remove((Object)memberId);
        this.maybeRemoveTaskProcessId(oldMember);
        this.removeStaticMember(oldMember);
        this.maybeUpdateGroupState();
    }

    private void removeStaticMember(StreamsGroupMember oldMember) {
        if (oldMember.instanceId() != null && oldMember.instanceId().isPresent()) {
            this.staticMembers.remove((Object)oldMember.instanceId().get());
        }
    }

    @Override
    public boolean hasMember(String memberId) {
        return this.members.containsKey((Object)memberId);
    }

    @Override
    public int numMembers() {
        return this.members.size();
    }

    public Map<String, StreamsGroupMember> members() {
        return Collections.unmodifiableMap(this.members);
    }

    public Map<String, String> staticMembers() {
        return Collections.unmodifiableMap(this.staticMembers);
    }

    public TasksTuple targetAssignment(String memberId) {
        return (TasksTuple)this.targetAssignment.getOrDefault((Object)memberId, (Object)TasksTuple.EMPTY);
    }

    public void updateTargetAssignment(String memberId, TasksTuple newTargetAssignment) {
        this.targetAssignment.put((Object)memberId, (Object)newTargetAssignment);
    }

    public void removeTargetAssignment(String memberId) {
        this.targetAssignment.remove((Object)memberId);
    }

    public Map<String, TasksTuple> targetAssignment() {
        return Collections.unmodifiableMap(this.targetAssignment);
    }

    public String currentActiveTaskProcessId(String subtopologyId, int taskId) {
        Map tasks = (Map)this.currentActiveTaskToProcessId.get((Object)subtopologyId);
        if (tasks == null) {
            return null;
        }
        return tasks.getOrDefault(taskId, null);
    }

    public Set<String> currentStandbyTaskProcessIds(String subtopologyId, int taskId) {
        Map tasks = (Map)this.currentStandbyTaskToProcessIds.get((Object)subtopologyId);
        if (tasks == null) {
            return Set.of();
        }
        return tasks.getOrDefault(taskId, Set.of());
    }

    public Set<String> currentWarmupTaskProcessIds(String subtopologyId, int taskId) {
        Map tasks = (Map)this.currentWarmupTaskToProcessIds.get((Object)subtopologyId);
        if (tasks == null) {
            return Set.of();
        }
        return tasks.getOrDefault(taskId, Set.of());
    }

    public long metadataHash() {
        return this.metadataHash.get();
    }

    public void setMetadataHash(long metadataHash) {
        this.metadataHash.set(metadataHash);
    }

    public int validatedTopologyEpoch() {
        return this.validatedTopologyEpoch.get();
    }

    public void setValidatedTopologyEpoch(int validatedTopologyEpoch) {
        this.validatedTopologyEpoch.set(validatedTopologyEpoch);
        this.maybeUpdateGroupState();
    }

    public long computeMetadataHash(CoordinatorMetadataImage metadataImage, Map<String, Long> topicHashCache, StreamsTopology topology) {
        Set<String> requiredTopicNames = topology.requiredTopics();
        HashMap<String, Long> topicHash = new HashMap<String, Long>(requiredTopicNames.size());
        requiredTopicNames.forEach(topicName -> metadataImage.topicMetadata(topicName).ifPresent(__ -> topicHash.put((String)topicName, topicHashCache.computeIfAbsent((String)topicName, k -> Utils.computeTopicHash(topicName, metadataImage)))));
        return Utils.computeGroupHash(topicHash);
    }

    public void setMetadataRefreshDeadline(long deadlineMs, int groupEpoch) {
        this.metadataRefreshDeadline = new DeadlineAndEpoch(deadlineMs, groupEpoch);
    }

    @Override
    public void requestMetadataRefresh() {
        this.metadataRefreshDeadline = DeadlineAndEpoch.EMPTY;
    }

    public boolean hasMetadataExpired(long currentTimeMs) {
        return currentTimeMs >= this.metadataRefreshDeadline.deadlineMs || this.groupEpoch() < this.metadataRefreshDeadline.epoch;
    }

    public DeadlineAndEpoch metadataRefreshDeadline() {
        return this.metadataRefreshDeadline;
    }

    @Override
    public CommitPartitionValidator validateOffsetCommit(String memberId, String groupInstanceId, int memberEpoch, boolean isTransactional, int apiVersion) throws UnknownMemberIdException, StaleMemberEpochException {
        if (memberEpoch < 0 && this.members().isEmpty()) {
            return CommitPartitionValidator.NO_OP;
        }
        if (isTransactional && memberEpoch == -1 && memberId.equals("") && groupInstanceId == null) {
            return CommitPartitionValidator.NO_OP;
        }
        StreamsGroupMember member = this.getMemberOrThrow(memberId);
        if (!isTransactional && apiVersion < 9) {
            throw new UnsupportedVersionException("OffsetCommit version 9 or above must be used by members using the streams group protocol");
        }
        if (memberEpoch == member.memberEpoch()) {
            return CommitPartitionValidator.NO_OP;
        }
        if (memberEpoch > member.memberEpoch()) {
            throw new StaleMemberEpochException(String.format("Received member epoch %d is newer than current member epoch %d.", memberEpoch, member.memberEpoch()));
        }
        return this.createAssignmentEpochValidator(member, memberEpoch);
    }

    @Override
    public void validateOffsetFetch(String memberId, int memberEpoch, long lastCommittedOffset) throws UnknownMemberIdException, StaleMemberEpochException {
        if (memberId == null && memberEpoch < 0) {
            return;
        }
        StreamsGroupMember member = (StreamsGroupMember)this.members.get((Object)memberId, lastCommittedOffset);
        if (member == null) {
            throw new UnknownMemberIdException(String.format("Member %s is not a member of group %s.", memberId, this.groupId));
        }
        this.validateMemberEpoch(memberEpoch, member.memberEpoch());
    }

    @Override
    public void validateOffsetDelete() {
    }

    @Override
    public void validateDeleteGroup() throws ApiException {
        if (this.state() != StreamsGroupState.EMPTY) {
            throw Errors.NON_EMPTY_GROUP.exception();
        }
    }

    @Override
    public boolean isSubscribedToTopic(String topic) {
        if (this.state.get() == StreamsGroupState.EMPTY || this.state.get() == StreamsGroupState.DEAD) {
            return false;
        }
        Optional maybeTopology = (Optional)this.topology.get();
        if (maybeTopology.isEmpty()) {
            return false;
        }
        return ((StreamsTopology)maybeTopology.get()).sourceTopicMap().containsKey(topic);
    }

    @Override
    public void createGroupTombstoneRecords(List<CoordinatorRecord> records) {
        this.members().forEach((memberId, member) -> records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(this.groupId(), memberId)));
        this.members().forEach((memberId, member) -> records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(this.groupId(), memberId)));
        records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(this.groupId()));
        this.members().forEach((memberId, member) -> records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(this.groupId(), memberId)));
        records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(this.groupId()));
        records.add(StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(this.groupId()));
    }

    public static String initialRebalanceTimeoutKey(String groupId) {
        return "initial-rebalance-timeout-" + groupId;
    }

    @Override
    public void cancelTimers(CoordinatorTimer<Void, CoordinatorRecord> timer) {
        timer.cancel(StreamsGroup.initialRebalanceTimeoutKey(this.groupId));
    }

    @Override
    public boolean isEmpty() {
        return this.state() == StreamsGroupState.EMPTY;
    }

    @Override
    public Optional<OffsetExpirationCondition> offsetExpirationCondition() {
        return Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs));
    }

    @Override
    public boolean isInStates(Set<String> statesFilter, long committedOffset) {
        return statesFilter.contains(((StreamsGroupState)((Object)this.state.get(committedOffset))).toLowerCaseString());
    }

    private void validateMemberEpoch(int receivedMemberEpoch, int expectedMemberEpoch) throws StaleMemberEpochException {
        if (receivedMemberEpoch != expectedMemberEpoch) {
            throw new StaleMemberEpochException(String.format("The received member epoch %d does not match the expected member epoch %d.", receivedMemberEpoch, expectedMemberEpoch));
        }
    }

    private void maybeUpdateGroupState() {
        StreamsGroupState newState = StreamsGroupState.STABLE;
        if (this.members.isEmpty()) {
            newState = StreamsGroupState.EMPTY;
            this.clearShutdownRequestMemberId();
        } else if (this.topology().filter(t -> t.topologyEpoch() == this.validatedTopologyEpoch.get()).isEmpty()) {
            newState = StreamsGroupState.NOT_READY;
        } else if (this.groupEpoch.get() > this.targetAssignmentEpoch.get()) {
            newState = StreamsGroupState.ASSIGNING;
        } else {
            for (StreamsGroupMember member : this.members.values()) {
                if (member.isReconciledTo(this.targetAssignmentEpoch.get())) continue;
                newState = StreamsGroupState.RECONCILING;
                break;
            }
        }
        this.state.set((Object)newState);
    }

    private void maybeUpdateTaskProcessId(StreamsGroupMember oldMember, StreamsGroupMember newMember) {
        this.maybeRemoveTaskProcessId(oldMember);
        this.addTaskProcessId(newMember.assignedTasks(), newMember.processId());
        this.addTaskProcessId(newMember.tasksPendingRevocation(), newMember.processId());
    }

    private void maybeRemoveTaskProcessId(StreamsGroupMember oldMember) {
        if (oldMember != null) {
            this.removeTaskProcessIds(oldMember.assignedTasks(), oldMember.processId());
            this.removeTaskProcessIds(oldMember.tasksPendingRevocation(), oldMember.processId());
        }
    }

    void removeTaskProcessIds(TasksTupleWithEpochs tasks, String processId) {
        if (tasks != null) {
            this.removeTaskProcessIds(tasks.activeTasksWithEpochs(), this.currentActiveTaskToProcessId, processId);
            this.removeTaskProcessIdsFromSet(tasks.standbyTasks(), this.currentStandbyTaskToProcessIds, processId);
            this.removeTaskProcessIdsFromSet(tasks.warmupTasks(), this.currentWarmupTaskToProcessIds, processId);
        }
    }

    private void removeTaskProcessIds(Map<String, Map<Integer, Integer>> assignment, TimelineHashMap<String, TimelineHashMap<Integer, String>> currentTasksProcessId, String expectedProcessId) {
        assignment.forEach((subtopologyId, assignedPartitions) -> currentTasksProcessId.compute(subtopologyId, (__, partitionsOrNull) -> {
            if (partitionsOrNull != null) {
                assignedPartitions.keySet().forEach(partitionId -> {
                    String prevValue = (String)partitionsOrNull.get(partitionId);
                    if (Objects.equals(prevValue, expectedProcessId)) {
                        partitionsOrNull.remove(partitionId);
                    } else {
                        this.log.debug("[GroupId {}] Cannot remove the process ID {} from task {}_{} because the partition is still owned at a different process ID {}", new Object[]{this.groupId, expectedProcessId, subtopologyId, partitionId, prevValue});
                    }
                });
                if (partitionsOrNull.isEmpty()) {
                    return null;
                }
                return partitionsOrNull;
            }
            this.log.debug("[GroupId {}] Cannot remove the process ID {} from {} because it does not have any processId", new Object[]{this.groupId, expectedProcessId, subtopologyId});
            return partitionsOrNull;
        }));
    }

    private void removeTaskProcessIdsFromSet(Map<String, Set<Integer>> assignment, TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentTasksProcessId, String processIdToRemove) {
        assignment.forEach((subtopologyId, assignedPartitions) -> currentTasksProcessId.compute(subtopologyId, (__, partitionsOrNull) -> {
            if (partitionsOrNull != null) {
                assignedPartitions.forEach(partitionId -> {
                    if (!partitionsOrNull.containsKey(partitionId) || !((Set)partitionsOrNull.get(partitionId)).remove(processIdToRemove)) {
                        this.log.debug("[GroupId {}] Cannot remove the process ID {} from task {}_{} because the task is not owned by this process ID", new Object[]{this.groupId, processIdToRemove, subtopologyId, partitionId});
                    }
                });
                if (partitionsOrNull.isEmpty()) {
                    return null;
                }
                return partitionsOrNull;
            }
            this.log.debug("[GroupId {}] Cannot remove the process ID {} from {} because it does not have any process ID", new Object[]{this.groupId, processIdToRemove, subtopologyId});
            return partitionsOrNull;
        }));
    }

    void addTaskProcessId(TasksTupleWithEpochs tasks, String processId) {
        if (tasks != null && processId != null) {
            this.addTaskProcessIdFromActiveTasksWithEpochs(tasks.activeTasksWithEpochs(), processId, this.currentActiveTaskToProcessId);
            this.addTaskProcessIdToSet(tasks.standbyTasks(), processId, this.currentStandbyTaskToProcessIds);
            this.addTaskProcessIdToSet(tasks.warmupTasks(), processId, this.currentWarmupTaskToProcessIds);
        }
    }

    private void addTaskProcessIdFromActiveTasksWithEpochs(Map<String, Map<Integer, Integer>> tasksWithEpochs, String processId, TimelineHashMap<String, TimelineHashMap<Integer, String>> currentTaskProcessId) {
        tasksWithEpochs.forEach((subtopologyId, assignedTaskPartitionsWithEpochs) -> currentTaskProcessId.compute(subtopologyId, (__, partitionsOrNull) -> {
            if (partitionsOrNull == null) {
                partitionsOrNull = new TimelineHashMap(this.snapshotRegistry, assignedTaskPartitionsWithEpochs.size());
            }
            for (Integer partitionId : assignedTaskPartitionsWithEpochs.keySet()) {
                String prevValue = (String)partitionsOrNull.put((Object)partitionId, (Object)processId);
                if (prevValue == null) continue;
                this.log.debug("[GroupId {}] Setting the process ID of {}-{} to {} even though the partition is still owned by process ID {}", new Object[]{this.groupId, subtopologyId, partitionId, processId, prevValue});
            }
            return partitionsOrNull;
        }));
    }

    private void addTaskProcessIdToSet(Map<String, Set<Integer>> tasks, String processId, TimelineHashMap<String, TimelineHashMap<Integer, Set<String>>> currentTaskProcessId) {
        tasks.forEach((subtopologyId, assignedTaskPartitions) -> currentTaskProcessId.compute(subtopologyId, (__, partitionsOrNull) -> {
            if (partitionsOrNull == null) {
                partitionsOrNull = new TimelineHashMap(this.snapshotRegistry, assignedTaskPartitions.size());
            }
            for (Integer partitionId : assignedTaskPartitions) {
                ((Set)partitionsOrNull.computeIfAbsent((Object)partitionId, ___ -> new HashSet())).add(processId);
            }
            return partitionsOrNull;
        }));
    }

    public StreamsGroupDescribeResponseData.DescribedGroup asDescribedGroup(long committedOffset) {
        StreamsGroupDescribeResponseData.DescribedGroup describedGroup = new StreamsGroupDescribeResponseData.DescribedGroup().setGroupId(this.groupId).setGroupEpoch(this.groupEpoch.get(committedOffset)).setGroupState(((StreamsGroupState)((Object)this.state.get(committedOffset))).toString()).setAssignmentEpoch(this.targetAssignmentEpoch.get(committedOffset)).setTopology(((Optional)this.configuredTopology.get(committedOffset)).filter(ConfiguredTopology::isReady).map(ConfiguredTopology::asStreamsGroupDescribeTopology).orElse(((Optional)this.topology.get(committedOffset)).map(StreamsTopology::asStreamsGroupDescribeTopology).orElseThrow(() -> new IllegalStateException("There should always be a topology for a streams group."))));
        this.members.entrySet(committedOffset).forEach(entry -> describedGroup.members().add(((StreamsGroupMember)entry.getValue()).asStreamsGroupDescribeMember((TasksTuple)this.targetAssignment.get((Object)((StreamsGroupMember)entry.getValue()).memberId(), committedOffset))));
        return describedGroup;
    }

    public void setShutdownRequestMemberId(String memberId) {
        if (this.shutdownRequestMemberId.isEmpty()) {
            this.log.info("[GroupId {}][MemberId {}] Shutdown requested for the streams application.", (Object)this.groupId, (Object)memberId);
            this.shutdownRequestMemberId = Optional.of(memberId);
        }
    }

    public Optional<String> getShutdownRequestMemberId() {
        return this.shutdownRequestMemberId;
    }

    private void clearShutdownRequestMemberId() {
        if (this.shutdownRequestMemberId.isPresent()) {
            this.log.info("[GroupId {}] Clearing shutdown requested for the streams application.", (Object)this.groupId);
            this.shutdownRequestMemberId = Optional.empty();
        }
    }

    public int endpointInformationEpoch() {
        return this.endpointInformationEpoch;
    }

    public void setEndpointInformationEpoch(int endpointInformationEpoch) {
        this.endpointInformationEpoch = endpointInformationEpoch;
    }

    public Map<String, String> lastAssignmentConfigs() {
        return Collections.unmodifiableMap(this.lastAssignmentConfigs);
    }

    public void setLastAssignmentConfigs(Map<String, String> lastAssignmentConfigs) {
        this.lastAssignmentConfigs.clear();
        if (lastAssignmentConfigs != null) {
            this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
        }
    }

    private CommitPartitionValidator createAssignmentEpochValidator(StreamsGroupMember member, int receivedMemberEpoch) {
        StreamsTopology streamsTopology = (StreamsTopology)((Optional)this.topology.get()).orElseThrow(() -> new StaleMemberEpochException("Topology is not available for offset commit validation."));
        TasksTupleWithEpochs assignedTasks = member.assignedTasks();
        TasksTupleWithEpochs tasksPendingRevocation = member.tasksPendingRevocation();
        return (topicName, topicId, partitionId) -> {
            StreamsGroupTopologyValue.Subtopology subtopology = streamsTopology.sourceTopicMap().get(topicName);
            if (subtopology == null) {
                throw new StaleMemberEpochException("Topic " + topicName + " is not in the topology.");
            }
            String subtopologyId = subtopology.subtopologyId();
            Integer assignmentEpoch = (Integer)assignedTasks.activeTasksWithEpochs().getOrDefault(subtopologyId, Collections.emptyMap()).get(partitionId);
            if (assignmentEpoch == null) {
                assignmentEpoch = (Integer)tasksPendingRevocation.activeTasksWithEpochs().getOrDefault(subtopologyId, Collections.emptyMap()).get(partitionId);
            }
            if (assignmentEpoch == null) {
                throw new StaleMemberEpochException(String.format("Task %s-%d is not assigned or pending revocation for member.", subtopologyId, partitionId));
            }
            if (receivedMemberEpoch < assignmentEpoch) {
                throw new StaleMemberEpochException(String.format("Received member epoch %d is older than assignment epoch %d for task %s-%d.", receivedMemberEpoch, assignmentEpoch, subtopologyId, partitionId));
            }
        };
    }

    public static class DeadlineAndEpoch {
        static final DeadlineAndEpoch EMPTY = new DeadlineAndEpoch(0L, 0);
        public final long deadlineMs;
        public final int epoch;

        DeadlineAndEpoch(long deadlineMs, int epoch) {
            this.deadlineMs = deadlineMs;
            this.epoch = epoch;
        }
    }

    public static enum StreamsGroupState {
        EMPTY("Empty"),
        NOT_READY("NotReady"),
        ASSIGNING("Assigning"),
        RECONCILING("Reconciling"),
        STABLE("Stable"),
        DEAD("Dead");

        private final String name;
        private final String lowerCaseName;

        private StreamsGroupState(String name) {
            this.name = name;
            this.lowerCaseName = Objects.equals(name, "NotReady") ? "not_ready" : name.toLowerCase(Locale.ROOT);
        }

        public String toString() {
            return this.name;
        }

        public String toLowerCaseString() {
            return this.lowerCaseName;
        }
    }
}

