Project

General

Profile

Bug #5066

When I use mongodb to save muc message, there can't add message to history. HistoryProvider is null

Added by 来恩 周 about 2 years ago. Updated about 2 years ago.

Status:
Closed
Priority:
Normal
Assignee:
Target version:
Start date:
2017-03-14
Due date:
2017-03-15
% Done:

100%

Estimated time:
Database:
MongoDB
Applicable version:
7.2.0
Source Code Disclaimer:

Description

2017-03-14 12:11:24.870 [in_7-muc] GroupchatMessageModule.addMessageToHistory() WARNING: Can't add message to history!

java.lang.NullPointerException

at tigase.muc.history.HistoryProviderMDBean.addMessage(HistoryProviderMDBean.java:88)

at tigase.muc.modules.GroupchatMessageModule.addMessageToHistory(GroupchatMessageModule.java:87)

at tigase.muc.modules.GroupchatMessageModule.process(GroupchatMessageModule.java:286)

at tigase.component.modules.StanzaProcessor.process(StanzaProcessor.java:56)

at tigase.component.modules.StanzaProcessor.processPacket(StanzaProcessor.java:77)

at tigase.component.AbstractKernelBasedComponent.processPacket(AbstractKernelBasedComponent.java:102)

at tigase.muc.MUCComponent.processPacket(MUCComponent.java:125)

at tigase.server.AbstractMessageReceiver$QueueListener.run(AbstractMessageReceiver.java:1513)

###################################

/*

  • MongoHistoryProvider.java

*

  • Tigase Jabber/XMPP Server - MongoDB support

  • Copyright (C) 2004-2016 "Tigase, Inc." office@tigase.com

*

  • This program is free software: you can redistribute it and/or modify

  • it under the terms of the GNU Affero General Public License as published by

  • the Free Software Foundation, either version 3 of the License,

  • or (at your option) any later version.

*

  • This program is distributed in the hope that it will be useful,

  • but WITHOUT ANY WARRANTY; without even the implied warranty of

  • MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

  • GNU Affero General Public License for more details.

*

  • You should have received a copy of the GNU Affero General Public License

  • along with this program. Look for COPYING file in the top folder.

  • If not, see http://www.gnu.org/licenses/.

*

*/

package tigase.mongodb.muc;

import com.mongodb.MongoNamespace;

import com.mongodb.client.FindIterable;

import com.mongodb.client.MongoCollection;

import com.mongodb.client.MongoDatabase;

import com.mongodb.client.model.Filters;

import org.bson.Document;

import org.bson.conversions.Bson;

import org.bson.types.Binary;

import tigase.component.PacketWriter;

import tigase.component.exceptions.ComponentException;

import tigase.db.Repository;

import tigase.db.TigaseDBException;

import tigase.kernel.beans.config.ConfigField;

import tigase.mongodb.MongoDataSource;

import tigase.mongodb.RepositoryVersionAware;

import tigase.muc.Affiliation;

import tigase.muc.Room;

import tigase.muc.RoomConfig;

import tigase.muc.history.AbstractHistoryProvider;

import tigase.server.Packet;

import tigase.util.TigaseStringprepException;

import tigase.xml.Element;

import tigase.xmpp.Authorization;

import tigase.xmpp.BareJID;

import tigase.xmpp.JID;

import tigase.xmpp.mam.MAMRepository;

import tigase.xmpp.mam.Query;

import tigase.xmpp.mam.QueryImpl;

import java.nio.charset.Charset;

import java.security.MessageDigest;

import java.security.NoSuchAlgorithmException;

import java.util.*;

import java.util.logging.Level;

import static com.mongodb.client.model.Accumulators.first;

import static com.mongodb.client.model.Aggregates.group;

import static tigase.mongodb.Helper.collectionExists;

/**

*

  • @author andrzej

*/

@Repository.Meta( supportedUris = { "mongodb:.*" } )

public class MongoHistoryProvider

    extends AbstractHistoryProvider<MongoDataSource>

    implements RepositoryVersionAware, MAMRepository {

private static final int DEF_BATCH_SIZE = 100;

private static final String HASH_ALG = "SHA-256";

private static final String HISTORY_COLLECTION = "tig_muc_room_history";

private static final String HISTORY_COLLECTION_OLD = "muc_history";

private static final Charset UTF8 = Charset.forName("UTF-8");


private MongoDatabase db;

protected MongoCollection<Document> historyCollection;


@ConfigField(desc = "Batch size", alias = "batch-size")

private int batchSize = DEF_BATCH_SIZE;


protected byte[] generateId(BareJID user) throws TigaseDBException {

    return calculateHash(user.toString().toLowerCase());

}


protected byte[] calculateHash(String user) throws TigaseDBException {

    try {

        MessageDigest md = MessageDigest.getInstance(HASH_ALG);

        return md.digest(user.getBytes(UTF8));

    } catch (NoSuchAlgorithmException ex) {

        throw new TigaseDBException("Should not happen!!", ex);

    }

}


@Override

public void addJoinEvent(Room room, Date date, JID senderJID, String nickName) {

}


@Override

public void addLeaveEvent(Room room, Date date, JID senderJID, String nickName) {

}


@Override

public void addMessage(Room room, Element message, String body, JID senderJid, String senderNickname, Date time) {

    try {

        byte[] rid = generateId(room.getRoomJID());

        Document dto = new Document("room_jid_id", rid).append("room_jid", room.getRoomJID().toString())

                .append("event_type", 1)

                .append("sender_jid", senderJid.toString()).append("sender_nickname", senderNickname)

                .append("body", body).append("public_event", room.getConfig().isLoggingEnabled());

        if (time != null) {

            dto.append("timestamp", time);

        }

        if (message != null) {

            dto.append("msg", message.toString());

        }

        historyCollection.insertOne(dto);

    } catch (Exception ex) {

        log.log(Level.WARNING, "Can't add MUC message to database", ex);

        throw new RuntimeException(ex);

    }

}


@Override

public void addSubjectChange(Room room, Element message, String subject, JID senderJid, String senderNickname, Date time) {

}


@Override

public void destroy() {

}


@Override

public void getHistoryMessages(Room room, JID senderJID, Integer maxchars, Integer maxstanzas, Integer seconds, Date since, PacketWriter writer) {

    Affiliation recipientAffiliation = room.getAffiliation(senderJID.getBareJID());

    boolean addRealJids = room.getConfig().getRoomAnonymity() == RoomConfig.Anonymity.nonanonymous

            || room.getConfig().getRoomAnonymity() == RoomConfig.Anonymity.semianonymous

            && (recipientAffiliation == Affiliation.owner || recipientAffiliation == Affiliation.admin);


    try {

        byte[] rid = generateId(room.getRoomJID());

        int maxMessages = room.getConfig().getMaxHistory();

        int limit = maxstanzas != null ? Math.min(maxMessages, maxstanzas) : maxMessages;

        if (since == null && seconds != null && maxstanzas == null) {

            since = new Date(new Date().getTime() - seconds * 1000);

        }


        Document crit = new Document("room_jid_id", rid);

        if (since != null) {

            crit.append("timestamp", new Document("$gte", since));

            Document order = new Document("timestamp", 1);

            FindIterable<Document> cursor = historyCollection.find(crit).batchSize(batchSize).limit(limit).sort(order);

            for (Document dto : cursor) {

                Packet packet = createMessage(room.getRoomJID(), senderJID, dto, addRealJids);

                writer.write(packet);

            }

        } else {

            Document order = new Document("timestamp", -1);

            FindIterable<Document> cursor = historyCollection.find(crit).batchSize(batchSize).limit(limit).sort(order);

            List<Packet> results = new ArrayList<Packet>();

            for (Document dto : cursor) {

                Packet packet = createMessage(room.getRoomJID(), senderJID, dto, addRealJids);

                results.add(packet);

            }

            Collections.reverse(results);

            writer.write(results);

        }

    } catch (Exception ex) {

        if (log.isLoggable(Level.SEVERE))

            log.log(Level.SEVERE, "Can't get history", ex);

        throw new RuntimeException(ex);

    }

}


@Override

public boolean isPersistent(Room room) {

    return true;

}


@Override

public void removeHistory(Room room) {

    try {

        byte[] rid = generateId(room.getRoomJID());

        Document crit = new Document("room_jid_id", rid);

        db.getCollection(HISTORY_COLLECTION).deleteMany(crit);

    } catch (Exception ex) {

        if (log.isLoggable(Level.SEVERE))

            log.log(Level.SEVERE, "Can't remove history", ex);

        throw new RuntimeException(ex);

    }       

}


@Override

public void setDataSource(MongoDataSource dataSource) {

    db = dataSource.getDatabase();


    if (!collectionExists(db, HISTORY_COLLECTION)) {

        if (collectionExists(db, HISTORY_COLLECTION_OLD)) {

            db.getCollection(HISTORY_COLLECTION_OLD).renameCollection(new MongoNamespace(db.getName(), HISTORY_COLLECTION));

        } else {

            db.createCollection(HISTORY_COLLECTION);

        }

    }

    historyCollection = db.getCollection(HISTORY_COLLECTION);


    historyCollection.createIndex(new Document("room_jid_id", 1));

    historyCollection.createIndex(new Document("room_jid_id", 1).append("timestamp", 1));

}


@Override

public void updateSchema() throws TigaseDBException {

    List<Bson> aggregationQuery = Arrays.asList(group("$room_jid_id", first("room_jid", "$room_jid")));

    for (Document doc : historyCollection.aggregate(aggregationQuery).batchSize(100)) {

        String roomJid = (String) doc.get("room_jid");


        byte[] oldRoomJidId = ((Binary) doc.get("_id")).getData();

        byte[] newRoomJidId = calculateHash(roomJid.toString().toLowerCase());


        if (Arrays.equals(oldRoomJidId, newRoomJidId)) {

            continue;

        }


        historyCollection.updateMany(new Document("room_jid_id", oldRoomJidId),

                                     new Document("$set", new Document("room_jid_id", newRoomJidId)));

    }

}


private Long getItemPosition(String msgId, Bson filter) throws ComponentException {

    if (msgId == null) {

        return null;

    }

    try {

        Date ts = new Date(Long.parseLong(msgId));


        return historyCollection.count(Filters.and(filter, Filters.lt("timestamp", ts)));

    } catch (NumberFormatException ex) {

        throw new ComponentException(Authorization.ITEM_NOT_FOUND, "Not found message with id = " + msgId);

    }

}


@Override

public void queryItems(Query query, ItemHandler itemHandler) throws TigaseDBException, ComponentException {

    try {

        byte[] rid = generateId(query.getComponentJID().getBareJID());


        List<Bson> filters = new ArrayList<>();

        filters.add(Filters.eq("room_jid_id", rid));


        if (query.getStart() != null) {

            filters.add(Filters.gte("timestamp", query.getStart()));

        }

        if (query.getEnd() != null) {

            filters.add(Filters.lte("timestamp", query.getEnd()));

        }

        if (query.getWith() != null) {

            filters.add(Filters.eq("sender_nickname", query.getWith().toString()));

        }


        Bson filter = Filters.and(filters);

        long count = historyCollection.count(filter);


        Long after = getItemPosition(query.getRsm().getAfter(), filter);

        Long before = getItemPosition(query.getRsm().getBefore(), filter);


        AbstractHistoryProvider.calculateOffsetAndPosition(query, (int) count, before == null ? null : before.intValue(), after == null ? null : after.intValue());


        Document order = new Document("timestamp", 1);

        FindIterable<Document> cursor = historyCollection.find(filter).sort(order).skip(query.getRsm().getIndex()).limit(query.getRsm().getMax());

        for (Document dto : cursor) {

            String sender_nickname = (String) dto.get("sender_nickname");

            String msg = (String) dto.get("msg");

            String body = (String) dto.get("body");

            Date timestamp = (Date) dto.get("timestamp");


            Element msgEl = createMessageElement(query.getComponentJID().getBareJID(), query.getQuestionerJID(), sender_nickname, msg, body);

            Item item = new Item() {

                @Override

                public String getId() {

                    return String.valueOf(timestamp.getTime());

                }


                @Override

                public Element getMessage() {

                    return msgEl;

                }


                @Override

                public Date getTimestamp() {

                    return timestamp;

                }

            };

            itemHandler.itemFound(query, item);

        }

    } catch (Exception ex) {

        if (log.isLoggable(Level.SEVERE))

            log.log(Level.SEVERE, "Can't get history", ex);

        throw new RuntimeException(ex);

    }


}


@Override

public Query newQuery() {

    return new QueryImpl();

}



private Packet createMessage(BareJID roomJid, JID senderJID, Document dto, boolean addRealJids) throws TigaseStringprepException {

    String sender_nickname = (String) dto.get("sender_nickname");

    String msg = (String) dto.get("msg");

    String body = (String) dto.get("body");

    String sender_jid = (String) dto.get("sender_jid");

    Date timestamp = (Date) dto.get("timestamp");


    return createMessage(roomJid, senderJID, sender_nickname, msg, body, sender_jid, addRealJids, timestamp);

}

}

Associated revisions

Revision 99d8b0a1 (diff)
Added by Andrzej Wójcik IoT 1 CloudTigaseTeam about 2 years ago

#5066: fixed issue with user creation and initialization of repositories when MongoDB was used

Revision 8b83a61e (diff)
Added by Andrzej Wójcik IoT 1 CloudTigaseTeam about 2 years ago

#5066: fixed issue with automatic selection of proper history provider implementation for MongoDB

Revision 6bfd3a44 (diff)
Added by Andrzej Wójcik IoT 1 CloudTigaseTeam about 2 years ago

#5066: fixed issue with initialization of Tigase XMPP Server 7.2.0-SNAPSHOT when MongoDB was used

History

#1 Updated by 来恩 周 about 2 years ago

I have already solved the problem when I configure init.properties file like

muc (class: tigase.muc.MUCComponent) {

admin (class: tigase.muc.modules.ModeratorModule3) {}

disco (class: tigase.muc.modules.DiscoveryModule3) {}

groupchat (class: tigase.muc.modules.GroupchatMessageModule3) {}

presences (class: tigase.muc.modules.PresenceModuleImpl3) {}

historyProviderPool (class: tigase.muc.history.HistoryProviderMDBean) {

    default (class: tigase.muc.history.HistoryProviderMDBean.HistoryProviderConfigBean) {

        name = 'mucHistoryProvider' 

        'repo-class' = 'tigase.mongodb.muc.MongoHistoryProvider'

        instance (class: tigase.mongodb.muc.MongoHistoryProvider) {}

    }

}

}

#2 Updated by Andrzej Wójcik IoT 1 CloudTigaseTeam about 2 years ago

  • Status changed from New to In QA
  • Assignee changed from Andrzej Wójcik to 来恩 周
  • % Done changed from 0 to 100
  • Estimated time deleted (2.00 h)

I've found and fixed root cause of this issue. Additionally I also fixed few issues found during testing Tigase XMPP Server 7.2.0-SNAPSHOT with MongoDB.

This issue was caused by failure of automatic discovery of history provided implementation for MongoDB, which could be replaced with manual configuration (what you mentioned in comment above).

Next snapshot build will contain my fix for this issue.

#3 Updated by 来恩 周 about 2 years ago

Thank you.

#4 Updated by Wojciech Kapcia TigaseTeam about 2 years ago

  • Status changed from In QA to Closed

来恩 周 wrote:

Thank you.

Assuming resolved.

Also available in: Atom PDF