package org.eclipse.moquette.spi.impl;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.mchange.v2.c3p0.subst.C3P0Substitutions;
import java.io.File;
import java.text.ParseException;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Histogram;
import org.eclipse.moquette.commons.Constants;
import org.eclipse.moquette.proto.messages.AbstractMessage;
import org.eclipse.moquette.server.ServerChannel;
import org.eclipse.moquette.spi.IMessagesStore;
import org.eclipse.moquette.spi.IMessaging;
import org.eclipse.moquette.spi.ISessionsStore;
import org.eclipse.moquette.spi.impl.events.LostConnectionEvent;
import org.eclipse.moquette.spi.impl.events.MessagingEvent;
import org.eclipse.moquette.spi.impl.events.ProtocolEvent;
import org.eclipse.moquette.spi.impl.events.StopEvent;
import org.eclipse.moquette.spi.impl.security.ACLFileParser;
import org.eclipse.moquette.spi.impl.security.AcceptAllAuthenticator;
import org.eclipse.moquette.spi.impl.security.DenyAllAuthorizator;
import org.eclipse.moquette.spi.impl.security.FileAuthenticator;
import org.eclipse.moquette.spi.impl.security.IAuthenticator;
import org.eclipse.moquette.spi.impl.security.IAuthorizator;
import org.eclipse.moquette.spi.impl.security.PermitAllAuthorizator;
import org.eclipse.moquette.spi.impl.subscriptions.SubscriptionsStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/moquette/spi/impl/SimpleMessaging.class */
public class SimpleMessaging implements IMessaging, EventHandler<ValueEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleMessaging.class);
    private SubscriptionsStore subscriptions;
    private RingBuffer<ValueEvent> m_ringBuffer;
    private IMessagesStore m_storageService;
    private ISessionsStore m_sessionsStore;
    private ExecutorService m_executor;
    private Disruptor<ValueEvent> m_disruptor;
    private static SimpleMessaging INSTANCE;
    CountDownLatch m_stopLatch;
    private final ProtocolProcessor m_processor = new ProtocolProcessor();
    private final AnnotationSupport annotationSupport = new AnnotationSupport();
    private boolean benchmarkEnabled = false;
    Histogram histogram = new Histogram(5);

    private SimpleMessaging() {
    }

    public static SimpleMessaging getInstance() {
        if (INSTANCE == null) {
            INSTANCE = new SimpleMessaging();
        }
        return INSTANCE;
    }

    public void init(Properties properties) {
        this.subscriptions = new SubscriptionsStore();
        this.m_executor = Executors.newFixedThreadPool(1);
        this.m_disruptor = new Disruptor<>(ValueEvent.EVENT_FACTORY, 32768, this.m_executor);
        this.m_disruptor.handleEventsWith(this);
        this.m_disruptor.start();
        this.m_ringBuffer = this.m_disruptor.getRingBuffer();
        this.annotationSupport.processAnnotations(this.m_processor);
        processInit(properties);
    }

    private void disruptorPublish(MessagingEvent messagingEvent) {
        LOG.debug("disruptorPublish publishing event {}", messagingEvent);
        long next = this.m_ringBuffer.next();
        this.m_ringBuffer.get(next).setEvent(messagingEvent);
        this.m_ringBuffer.publish(next);
    }

    @Override // org.eclipse.moquette.spi.IMessaging
    public void lostConnection(String str) {
        disruptorPublish(new LostConnectionEvent(str));
    }

    @Override // org.eclipse.moquette.spi.IMessaging
    public void handleProtocolMessage(ServerChannel serverChannel, AbstractMessage abstractMessage) {
        disruptorPublish(new ProtocolEvent(serverChannel, abstractMessage));
    }

    @Override // org.eclipse.moquette.spi.IMessaging
    public void stop() {
        this.m_stopLatch = new CountDownLatch(1);
        disruptorPublish(new StopEvent());
        try {
            LOG.debug("waiting 10 sec to m_stopLatch");
            boolean z = !this.m_stopLatch.await(10L, TimeUnit.SECONDS);
            LOG.debug("after m_stopLatch");
            this.m_executor.shutdown();
            this.m_disruptor.shutdown();
            if (z) {
                LOG.error("Can't stop the server in 10 seconds");
            }
        } catch (InterruptedException e) {
            LOG.error((String) null, (Throwable) e);
        }
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(ValueEvent valueEvent, long j, boolean z) throws Exception {
        MessagingEvent event = valueEvent.getEvent();
        valueEvent.setEvent(null);
        LOG.info("onEvent processing messaging event from input ringbuffer {}", event);
        if (event instanceof StopEvent) {
            processStop();
            return;
        }
        if (event instanceof LostConnectionEvent) {
            this.m_processor.processConnectionLost((LostConnectionEvent) event);
            return;
        }
        if (event instanceof ProtocolEvent) {
            ServerChannel session = ((ProtocolEvent) event).getSession();
            AbstractMessage message = ((ProtocolEvent) event).getMessage();
            try {
                long nanoTime = System.nanoTime();
                this.annotationSupport.dispatch(session, message);
                if (this.benchmarkEnabled) {
                    this.histogram.recordValue(System.nanoTime() - nanoTime);
                }
            } catch (Throwable th) {
                LOG.error("Serious error processing the message {} for {}", message, session, th);
            }
        }
    }

    private void processInit(Properties properties) {
        IAuthorizator permitAllAuthorizator;
        this.benchmarkEnabled = Boolean.parseBoolean(System.getProperty("moquette.processor.benchmark", "false"));
        MemoryStorageService memoryStorageService = new MemoryStorageService();
        this.m_storageService = memoryStorageService;
        this.m_sessionsStore = memoryStorageService;
        this.m_storageService.initStore();
        this.subscriptions.init(this.m_sessionsStore);
        String property = properties.getProperty(Constants.PASSWORD_FILE_PROPERTY_NAME, "");
        String property2 = System.getProperty("moquette.path", null);
        final String property3 = properties.getProperty("b4x_user", "");
        final String property4 = properties.getProperty("b4x_password", "");
        IAuthenticator acceptAllAuthenticator = property4.length() > 0 ? new IAuthenticator() { // from class: org.eclipse.moquette.spi.impl.SimpleMessaging.1
            @Override // org.eclipse.moquette.spi.impl.security.IAuthenticator
            public boolean checkValid(String str, String str2) {
                return property3.equals(str) && property4.equals(str2);
            }
        } : property.isEmpty() ? new AcceptAllAuthenticator() : new FileAuthenticator(property2, property);
        String property5 = properties.getProperty(Constants.ACL_FILE_PROPERTY_NAME, "");
        if (property5 == null || property5.isEmpty()) {
            permitAllAuthorizator = new PermitAllAuthorizator();
            LOG.info("Starting without ACL definition");
        } else {
            permitAllAuthorizator = new DenyAllAuthorizator();
            File file = new File(property2, property5);
            try {
                permitAllAuthorizator = ACLFileParser.parse(file);
            } catch (ParseException e) {
                LOG.error(String.format("Format error in parsing acl file %s", file), (Throwable) e);
            }
            LOG.info("Using acl file defined at path {}", property5);
        }
        this.m_processor.init(this.subscriptions, this.m_storageService, this.m_sessionsStore, acceptAllAuthenticator, Boolean.parseBoolean(properties.getProperty(Constants.ALLOW_ANONYMOUS_PROPERTY_NAME, C3P0Substitutions.DEBUG)), permitAllAuthorizator);
    }

    private void processStop() {
        LOG.debug("processStop invoked");
        this.m_storageService.close();
        LOG.debug("subscription tree {}", this.subscriptions.dumpTree());
        this.subscriptions = null;
        this.m_stopLatch.countDown();
        if (this.benchmarkEnabled) {
            this.histogram.outputPercentileDistribution(System.out, Double.valueOf(1000.0d));
        }
    }
}
