Project

General

Profile

jaxmpp 3.2 .0 send 12000 messages , tigase only save some one into tig_offline_messages with mongodb

w xy
Added about 2 months ago

when no user was online with tigase, i send 12000 messages via jaxmpp , tigase only save some one into tig_tig_offline_messages, sometime 6000, or 5000 and so on.

tigase config.tdsl :

'cm-traffic-throttling' = 'xmpp:0:0:disc,bin:0:0:disc'

`@Slf4j
@Component
public class XMPPServer implements ApplicationContextAware {

@Autowired
private XMPPConfig xmppConfig;
@Autowired
private Datastore dsForTigase;

private final Jaxmpp jaxmpp = new Jaxmpp();
private InBandRegistrationModule register;
private XmppModulesManager modulesManager = jaxmpp.getModulesManager();
private BlockingQueue<Message> messageQueue = new PriorityBlockingQueue<>(200000,
        Comparator.comparingInt(o -> o.getPriority().getIndex()));

private void init() throws JaxmppException {
    this.setConfiguration();
    this.register("10005");
    this.jaxmpp.login(false);
    this.jaxmpp.getConnector().keepalive();
}

private void setConfiguration() {
    this.setConfiguration(jaxmpp, xmppConfig.getUsername());
    jaxmpp.getSessionObject().setProperty(InBandRegistrationModule.IN_BAND_REGISTRATION_MODE_KEY, Boolean.TRUE);
    register = modulesManager.register(new InBandRegistrationModule());
}

private void setConfiguration(Jaxmpp jaxmpp, String username) {
    ConnectionConfiguration configuration = jaxmpp.getConnectionConfiguration();
    configuration.setDomain(xmppConfig.getServerName());
    configuration.setPort(xmppConfig.getPort());
    configuration.setServer(xmppConfig.getHost());
    configuration.setUserJID(this.userJid(username));
    configuration.setUserPassword(Md5Util.md5Hex(username));
    configuration.setResource(xmppConfig.getResource());
    configuration.setDisableTLS(true);
    configuration.setConnectionType(ConnectionConfiguration.ConnectionType.socket);
}

private String userJid(String userId) {
    return String.format("%s@%s", userId, xmppConfig.getServerName());
}


public boolean register(String username) {
    ThreadUtil.executeInThread(obj -> {
        DBCollection collection = dsForTigase.getDB().getCollection("tig_users");
        BasicDBObject query = new BasicDBObject("user_id", this.userJid(username));
        if (null != collection.findOne(query)) {

            return;
        }
        try {
            register.register(username, Md5Util.md5Hex(username), null, new AsyncCallback() {
                @Override
                public void onError(Stanza responseStanza, XMPPException.ErrorCondition error) throws JaxmppException {
                    log.error("create fail" + responseStanza.getErrorMessage());
                }

                @Override
                public void onSuccess(Stanza responseStanza) {
                    log.info(responseStanza.toString());
                }

                @Override
                public void onTimeout() {
                    log.error("create timeout");
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    });
    return true;
}

@Override
public void setApplicationContext(@Nullable ApplicationContext applicationContext) throws BeansException {
    try {
        this.init();

        ExecutorService service = Executors.newCachedThreadPool();
        for (int i = 10006; i < (10006 + 10); i++) {
            String username = String.valueOf(i);
            this.register(username);
            Jaxmpp jaxmpp = new Jaxmpp();
            this.setConfiguration(jaxmpp, username);
            service.submit(new MsgConsumer(messageQueue, jaxmpp));
        }


    } catch (JaxmppException e) {
        e.printStackTrace();
    }
}

}`


`
@Slf4j
public class MsgConsumer implements Runnable {
private BlockingQueue messageQueue;
private final Jaxmpp jaxmpp;
private MessageModule messageModule;

public MsgConsumer(BlockingQueue<Message> queue, Jaxmpp jaxmpp) throws JaxmppException {
    this.messageQueue = queue;
    this.jaxmpp = jaxmpp;
    this.messageModule = jaxmpp.getModulesManager().register(new MessageModule());
    this.jaxmpp.login(false);
    this.jaxmpp.getConnector().keepalive();
}

@Override
public void run() {
    while (true) {
        if (!messageQueue.isEmpty() && this.jaxmpp.isConnected()) {
            Message message = messageQueue.poll();
            if (message == null || message.getMessage() == null) {
                return;
            }
            try {
                messageModule.sendMessage(message.getMessage());

            } catch (Exception e) {
                e.printStackTrace();
                messageQueue.offer(message);
            }
        }
    }
}

}
`


Replies (4)

Avatar?id=6023&size=32x32

Added by Artur Hefczyc TigaseTeam about 2 months ago

I do not know your exact use-case and do not understand your requirements, so it is difficult to give you a good suggestions. But maybe your approach is not optimal. Have you considered using PubSub? Instead of sending a single message 12k times from a client, you could send 1 message to PubSub and let it take care of distributing it to 12k users?

As for the problem you describe. You may just have a DB congestion problem. Maybe your DB configuration cannot handle so many requests?

Added by w xy about 2 months ago

thanks , i will try. But it is ok via smack, all messages were saved into tig_tig_offline_messages ,and thousands of off messages were inserted per second via samck. However , dozens of off messages were inserted per second via jaxmpp.

But , there is error that no response received within reply timeout. Timeout was 15000ms via smack.

Added by w xy about 2 months ago

if (!messageQueue.isEmpty() && this.jaxmpp.isConnected()) {
try {
Message message = messageQueue.take();
if (message.getMessage() == null) {
return;
}
try {
messageModule.sendMessage(message.getMessage());
log.debug("发送推送消息:" + message.toString());
** Thread.sleep(100);**
} catch (JaxmppException e) {
e.printStackTrace();
messageQueue.put(message);
}
} catch (InterruptedException e) {
e.fillInStackTrace();
}
}

it is ok by adding the sleeping. but a little slow

    (1-4/4)