/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.security.authenticator;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestAndSize;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator;
import org.apache.kafka.common.security.kerberos.KerberosError;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Utils;
import org.ietf.jgss.GSSCredential;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SaslServerAuthenticator
implements Authenticator {
    static final int MAX_RECEIVE_SIZE = 524288;
    private static final Logger LOG = LoggerFactory.getLogger(SaslServerAuthenticator.class);
    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
    private final SecurityProtocol securityProtocol;
    private final ListenerName listenerName;
    private final String connectionId;
    private final Map<String, Subject> subjects;
    private final TransportLayer transportLayer;
    private final Set<String> enabledMechanisms;
    private final Map<String, ?> configs;
    private final KafkaPrincipalBuilder principalBuilder;
    private final Map<String, AuthenticateCallbackHandler> callbackHandlers;
    private SaslState saslState = SaslState.INITIAL_REQUEST;
    private SaslState pendingSaslState = null;
    private AuthenticationException pendingException = null;
    private SaslServer saslServer;
    private String saslMechanism;
    private NetworkReceive netInBuffer;
    private Send netOutBuffer;
    private boolean enableKafkaSaslAuthenticateHeaders;

    public SaslServerAuthenticator(Map<String, ?> configs, Map<String, AuthenticateCallbackHandler> callbackHandlers, String connectionId, Map<String, Subject> subjects, KerberosShortNamer kerberosNameParser, ListenerName listenerName, SecurityProtocol securityProtocol, TransportLayer transportLayer) throws IOException {
        this.callbackHandlers = callbackHandlers;
        this.connectionId = connectionId;
        this.subjects = subjects;
        this.listenerName = listenerName;
        this.securityProtocol = securityProtocol;
        this.enableKafkaSaslAuthenticateHeaders = false;
        this.transportLayer = transportLayer;
        this.configs = configs;
        List enabledMechanisms = (List)this.configs.get("sasl.enabled.mechanisms");
        if (enabledMechanisms == null || enabledMechanisms.isEmpty()) {
            throw new IllegalArgumentException("No SASL mechanisms are enabled");
        }
        this.enabledMechanisms = new HashSet<String>(enabledMechanisms);
        for (String mechanism : enabledMechanisms) {
            if (!callbackHandlers.containsKey(mechanism)) {
                throw new IllegalArgumentException("Callback handler not specified for SASL mechanism " + mechanism);
            }
            if (subjects.containsKey(mechanism)) continue;
            throw new IllegalArgumentException("Subject cannot be null for SASL mechanism " + mechanism);
        }
        this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser);
    }

    private void createSaslServer(String mechanism) throws IOException {
        this.saslMechanism = mechanism;
        Subject subject = this.subjects.get(mechanism);
        final AuthenticateCallbackHandler callbackHandler = this.callbackHandlers.get(mechanism);
        if (mechanism.equals("GSSAPI")) {
            this.saslServer = this.createSaslKerberosServer(callbackHandler, this.configs, subject);
        } else {
            try {
                this.saslServer = Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>(){

                    @Override
                    public SaslServer run() throws SaslException {
                        return Sasl.createSaslServer(SaslServerAuthenticator.this.saslMechanism, "kafka", SaslServerAuthenticator.this.serverAddress().getHostName(), SaslServerAuthenticator.this.configs, callbackHandler);
                    }
                });
            }
            catch (PrivilegedActionException e) {
                throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
            }
        }
    }

    private SaslServer createSaslKerberosServer(final AuthenticateCallbackHandler saslServerCallbackHandler, final Map<String, ?> configs, Subject subject) throws IOException {
        KerberosName kerberosName;
        String servicePrincipal = SaslClientAuthenticator.firstPrincipal(subject);
        try {
            kerberosName = KerberosName.parse(servicePrincipal);
        }
        catch (IllegalArgumentException e) {
            throw new KafkaException("Principal has name with unexpected format " + servicePrincipal);
        }
        final String servicePrincipalName = kerberosName.serviceName();
        final String serviceHostname = kerberosName.hostName();
        LOG.debug("Creating SaslServer for {} with mechanism {}", (Object)kerberosName, (Object)this.saslMechanism);
        boolean usingNativeJgss = Boolean.getBoolean("sun.security.jgss.native");
        if (usingNativeJgss) {
            try {
                GSSManager manager = GSSManager.getInstance();
                Oid krb5Mechanism = new Oid("1.2.840.113554.1.2.2");
                GSSName gssName = manager.createName(servicePrincipalName + "@" + serviceHostname, GSSName.NT_HOSTBASED_SERVICE);
                GSSCredential cred = manager.createCredential(gssName, Integer.MAX_VALUE, krb5Mechanism, 2);
                subject.getPrivateCredentials().add(cred);
            }
            catch (GSSException ex) {
                LOG.warn("Cannot add private credential to subject; clients authentication may fail", (Throwable)ex);
            }
        }
        try {
            return Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>(){

                @Override
                public SaslServer run() throws SaslException {
                    return Sasl.createSaslServer(SaslServerAuthenticator.this.saslMechanism, servicePrincipalName, serviceHostname, configs, saslServerCallbackHandler);
                }
            });
        }
        catch (PrivilegedActionException e) {
            throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
        }
    }

    @Override
    public void authenticate() throws IOException {
        if (this.netOutBuffer != null && !this.flushNetOutBufferAndUpdateInterestOps()) {
            return;
        }
        if (this.saslServer != null && this.saslServer.isComplete()) {
            this.setSaslState(SaslState.COMPLETE);
            return;
        }
        if (this.netInBuffer == null) {
            this.netInBuffer = new NetworkReceive(524288, this.connectionId);
        }
        this.netInBuffer.readFrom(this.transportLayer);
        if (this.netInBuffer.complete()) {
            this.netInBuffer.payload().rewind();
            byte[] clientToken = new byte[this.netInBuffer.payload().remaining()];
            this.netInBuffer.payload().get(clientToken, 0, clientToken.length);
            this.netInBuffer = null;
            try {
                switch (this.saslState) {
                    case HANDSHAKE_OR_VERSIONS_REQUEST: 
                    case HANDSHAKE_REQUEST: {
                        this.handleKafkaRequest(clientToken);
                        break;
                    }
                    case INITIAL_REQUEST: {
                        if (this.handleKafkaRequest(clientToken)) break;
                    }
                    case AUTHENTICATE: {
                        this.handleSaslToken(clientToken);
                        if (!this.saslServer.isComplete()) break;
                        this.setSaslState(SaslState.COMPLETE);
                        break;
                    }
                }
            }
            catch (AuthenticationException e) {
                this.setSaslState(SaslState.FAILED, e);
            }
            catch (Exception e) {
                this.saslState = SaslState.FAILED;
                throw e;
            }
        }
    }

    @Override
    public KafkaPrincipal principal() {
        SaslAuthenticationContext context = new SaslAuthenticationContext(this.saslServer, this.securityProtocol, this.clientAddress(), this.listenerName.value());
        KafkaPrincipal principal = this.principalBuilder.build(context);
        if (ScramMechanism.isScram(this.saslMechanism) && Boolean.parseBoolean((String)this.saslServer.getNegotiatedProperty("tokenauth"))) {
            principal.tokenAuthenticated(true);
        }
        return principal;
    }

    @Override
    public boolean complete() {
        return this.saslState == SaslState.COMPLETE;
    }

    @Override
    public void close() throws IOException {
        if (this.principalBuilder instanceof Closeable) {
            Utils.closeQuietly((Closeable)((Object)this.principalBuilder), "principal builder");
        }
        if (this.saslServer != null) {
            this.saslServer.dispose();
        }
    }

    private void setSaslState(SaslState saslState) throws IOException {
        this.setSaslState(saslState, null);
    }

    private void setSaslState(SaslState saslState, AuthenticationException exception) throws IOException {
        if (this.netOutBuffer != null && !this.netOutBuffer.completed()) {
            this.pendingSaslState = saslState;
            this.pendingException = exception;
        } else {
            this.saslState = saslState;
            LOG.debug("Set SASL server state to {}", (Object)saslState);
            this.pendingSaslState = null;
            this.pendingException = null;
            if (exception != null) {
                throw exception;
            }
        }
    }

    private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
        boolean flushedCompletely = this.flushNetOutBuffer();
        if (flushedCompletely) {
            this.transportLayer.removeInterestOps(4);
            if (this.pendingSaslState != null) {
                this.setSaslState(this.pendingSaslState, this.pendingException);
            }
        } else {
            this.transportLayer.addInterestOps(4);
        }
        return flushedCompletely;
    }

    private boolean flushNetOutBuffer() throws IOException {
        if (!this.netOutBuffer.completed()) {
            this.netOutBuffer.writeTo(this.transportLayer);
        }
        return this.netOutBuffer.completed();
    }

    private InetAddress serverAddress() {
        return this.transportLayer.socketChannel().socket().getLocalAddress();
    }

    private InetAddress clientAddress() {
        return this.transportLayer.socketChannel().socket().getInetAddress();
    }

    private void handleSaslToken(byte[] clientToken) throws IOException {
        if (!this.enableKafkaSaslAuthenticateHeaders) {
            byte[] response = this.saslServer.evaluateResponse(clientToken);
            if (response != null) {
                this.netOutBuffer = new NetworkSend(this.connectionId, ByteBuffer.wrap(response));
                this.flushNetOutBufferAndUpdateInterestOps();
            }
        } else {
            ByteBuffer requestBuffer = ByteBuffer.wrap(clientToken);
            RequestHeader header = RequestHeader.parse(requestBuffer);
            ApiKeys apiKey = header.apiKey();
            short version = header.apiVersion();
            RequestContext requestContext = new RequestContext(header, this.connectionId, this.clientAddress(), KafkaPrincipal.ANONYMOUS, this.listenerName, this.securityProtocol);
            RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
            if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
                IllegalSaslStateException e = new IllegalSaslStateException("Unexpected Kafka request of type " + (Object)((Object)apiKey) + " during SASL authentication.");
                this.sendKafkaResponse(requestContext, requestAndSize.request.getErrorResponse(e));
                throw e;
            }
            if (!apiKey.isVersionSupported(version)) {
                throw new UnsupportedVersionException("Version " + version + " is not supported for apiKey " + (Object)((Object)apiKey));
            }
            SaslAuthenticateRequest saslAuthenticateRequest = (SaslAuthenticateRequest)requestAndSize.request;
            try {
                byte[] responseToken = this.saslServer.evaluateResponse(Utils.readBytes(saslAuthenticateRequest.saslAuthBytes()));
                ByteBuffer responseBuf = responseToken == null ? EMPTY_BUFFER : ByteBuffer.wrap(responseToken);
                this.sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, responseBuf));
            }
            catch (SaslAuthenticationException e) {
                this.sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
                throw e;
            }
            catch (SaslException e) {
                KerberosError kerberosError = KerberosError.fromException(e);
                if (kerberosError != null && kerberosError.retriable()) {
                    throw e;
                }
                String errorMessage = "Authentication failed due to invalid credentials with SASL mechanism " + this.saslMechanism;
                this.sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, errorMessage));
                throw new SaslAuthenticationException(errorMessage, e);
            }
        }
    }

    private boolean handleKafkaRequest(byte[] requestBytes) throws IOException, AuthenticationException {
        boolean isKafkaRequest = false;
        String clientMechanism = null;
        try {
            ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
            RequestHeader header = RequestHeader.parse(requestBuffer);
            ApiKeys apiKey = header.apiKey();
            if (this.saslState == SaslState.INITIAL_REQUEST) {
                this.setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
            }
            isKafkaRequest = true;
            if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE) {
                throw new IllegalSaslStateException("Unexpected Kafka request of type " + (Object)((Object)apiKey) + " during SASL handshake.");
            }
            LOG.debug("Handling Kafka request {}", (Object)apiKey);
            RequestContext requestContext = new RequestContext(header, this.connectionId, this.clientAddress(), KafkaPrincipal.ANONYMOUS, this.listenerName, this.securityProtocol);
            RequestAndSize requestAndSize = requestContext.parseRequest(requestBuffer);
            if (apiKey == ApiKeys.API_VERSIONS) {
                this.handleApiVersionsRequest(requestContext, (ApiVersionsRequest)requestAndSize.request);
            } else {
                clientMechanism = this.handleHandshakeRequest(requestContext, (SaslHandshakeRequest)requestAndSize.request);
            }
        }
        catch (InvalidRequestException e) {
            if (this.saslState == SaslState.INITIAL_REQUEST) {
                if (LOG.isDebugEnabled()) {
                    StringBuilder tokenBuilder = new StringBuilder();
                    for (byte b : requestBytes) {
                        tokenBuilder.append(String.format("%02x", b));
                        if (tokenBuilder.length() >= 20) break;
                    }
                    LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", (Object)requestBytes.length, (Object)tokenBuilder);
                }
                if (this.enabledMechanisms.contains("GSSAPI")) {
                    LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
                    clientMechanism = "GSSAPI";
                }
                throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
            }
            throw e;
        }
        if (clientMechanism != null) {
            this.createSaslServer(clientMechanism);
            this.setSaslState(SaslState.AUTHENTICATE);
        }
        return isKafkaRequest;
    }

    private String handleHandshakeRequest(RequestContext context, SaslHandshakeRequest handshakeRequest) throws IOException, UnsupportedSaslMechanismException {
        String clientMechanism = handshakeRequest.mechanism();
        short version = context.header.apiVersion();
        if (version >= 1) {
            this.enableKafkaSaslAuthenticateHeaders(true);
        }
        if (this.enabledMechanisms.contains(clientMechanism)) {
            LOG.debug("Using SASL mechanism '{}' provided by client", (Object)clientMechanism);
            this.sendKafkaResponse(context, new SaslHandshakeResponse(Errors.NONE, this.enabledMechanisms));
            return clientMechanism;
        }
        LOG.debug("SASL mechanism '{}' requested by client is not supported", (Object)clientMechanism);
        this.sendKafkaResponse(context, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, this.enabledMechanisms));
        throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + clientMechanism);
    }

    protected ApiVersionsResponse apiVersionsResponse() {
        return ApiVersionsResponse.defaultApiVersionsResponse();
    }

    protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
        this.enableKafkaSaslAuthenticateHeaders = flag;
    }

    private void handleApiVersionsRequest(RequestContext context, ApiVersionsRequest apiVersionsRequest) throws IOException {
        if (this.saslState != SaslState.HANDSHAKE_OR_VERSIONS_REQUEST) {
            throw new IllegalStateException("Unexpected ApiVersions request received during SASL authentication state " + (Object)((Object)this.saslState));
        }
        if (apiVersionsRequest.hasUnsupportedRequestVersion()) {
            this.sendKafkaResponse(context, apiVersionsRequest.getErrorResponse(0, Errors.UNSUPPORTED_VERSION.exception()));
        } else {
            this.sendKafkaResponse(context, this.apiVersionsResponse());
            this.setSaslState(SaslState.HANDSHAKE_REQUEST);
        }
    }

    private void sendKafkaResponse(RequestContext context, AbstractResponse response) throws IOException {
        this.sendKafkaResponse(context.buildResponse(response));
    }

    private void sendKafkaResponse(Send send) throws IOException {
        this.netOutBuffer = send;
        this.flushNetOutBufferAndUpdateInterestOps();
    }

    private static enum SaslState {
        INITIAL_REQUEST,
        HANDSHAKE_OR_VERSIONS_REQUEST,
        HANDSHAKE_REQUEST,
        AUTHENTICATE,
        COMPLETE,
        FAILED;

    }
}

