Help in openfire plugin

Hi, I am new to openfire, I wanted to create a openfire plugin which is a bot. There will be a bot user which will be added as a participant in to muc, So whenever the plugin loads it will follow these steps

  1. create a smack connection
  2. login with bot username and password
  3. join the rooms in which it is particpant by fetching room list
  4. If any message is recieved it will push to a broker.
  5. If muc invite(from RestApiplugin) is recieved it’ll join the invited chatroom.

Considering the above use-case, by referring some of the codes from openfire forum and ofchat plugin I have written this bot plugin and it will be deployed in openfire server as a plugin. It is working as expected in single node.

So, I have the below queries/feedback using which I wanted to make the plugin better :

  1. How to make it cluster aware, is the code which I have written is already cluster aware as I have used ofchat plugin code?. How to avoid duplicate message recieving in plugin.
  2. Is using of smack as plugin is a good approach for writing bot plugin?.
  3. How to make Invite which is sent using RestApi plugin should be sent to plugin deployed in all node?

Below is the code:
XmppChatManager:

import java.io.IOException;
import java.security.cert.Certificate;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import org.dom4j.DocumentHelper;
import org.eclipse.jetty.servlet.ServletHolder;
import org.jivesoftware.openfire.SessionManager;
import org.jivesoftware.openfire.SessionPacketRouter;
import org.jivesoftware.openfire.XMPPServer;
import org.jivesoftware.openfire.auth.AuthFactory;
import org.jivesoftware.openfire.auth.AuthToken;
import org.jivesoftware.openfire.auth.UnauthorizedException;
import org.jivesoftware.openfire.net.VirtualConnection;
import org.jivesoftware.openfire.session.LocalClientSession;
import org.jivesoftware.smack.AbstractXMPPConnection;
import org.jivesoftware.smack.ConnectionConfiguration;
import org.jivesoftware.smack.MessageListener;
import org.jivesoftware.smack.SmackException;
import org.jivesoftware.smack.StanzaListener;
import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smack.chat2.Chat;
import org.jivesoftware.smack.chat2.ChatManager;
import org.jivesoftware.smack.chat2.IncomingChatMessageListener;
import org.jivesoftware.smack.filter.PacketTypeFilter;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smack.packet.ExtensionElement;
import org.jivesoftware.smack.packet.Message;
import org.jivesoftware.smack.packet.Nonza;
import org.jivesoftware.smack.packet.Presence;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.StreamOpen;
import org.jivesoftware.smack.packet.TopLevelStreamElement;
import org.jivesoftware.smack.util.PacketParserUtils;
import org.jivesoftware.smackx.muc.HostedRoom;
import org.jivesoftware.smackx.muc.InvitationListener;
import org.jivesoftware.smackx.muc.MucEnterConfiguration;
import org.jivesoftware.smackx.muc.MultiUserChat;
import org.jivesoftware.smackx.muc.MultiUserChatManager;
import org.jivesoftware.smackx.muc.packet.MUCUser;
import org.jxmpp.jid.DomainBareJid;
import org.jxmpp.jid.EntityBareJid;
import org.jxmpp.jid.EntityFullJid;
import org.jxmpp.jid.EntityJid;
import org.jxmpp.jid.impl.JidCreate;
import org.jxmpp.jid.parts.Resourcepart;
import org.jxmpp.stringprep.XmppStringprepException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmpp.packet.JID;


@SuppressWarnings("rawtypes")
public class XmppChatManager extends AbstractXMPPConnection implements InvitationListener{

	private static Logger Log = LoggerFactory.getLogger( "OpenfireConnection" );

	private static final ConcurrentHashMap<String, XmppChatManager> connections = new ConcurrentHashMap<>();
	private static final ConcurrentHashMap<String, XmppChatManager> users = new ConcurrentHashMap<>();
	private StanzaListener stanzaListener;
	private static final String domain = XMPPServer.getInstance().getServerInfo().getXMPPDomain();
	private static final String hostname = XMPPServer.getInstance().getServerInfo().getHostname();
	private ChatManager chatManager;
	public boolean anonymous = false;
	public ConcurrentHashMap<String, Chat> chats;
	private boolean reconnect = false;
	private LocalClientSession session;
	private SmackConnection smackConnection;
	private ServletHolder sseHolder;
	// private ClientServlet clientServlet;
	private String ssePath;
	public OpenfireConfiguration config;
	public MultiUserChatManager mucManager;
	public ConcurrentHashMap<String, MultiUserChat> groupchats;


	public XmppChatManager(OpenfireConfiguration configuration) {
		super(configuration);
		config = configuration;
		user = getUserJid();
	}


	public static XmppChatManager createConnection(String username, String password, boolean anonymous)
	{
		try {

			if (!anonymous && username != null && password != null && !"".equals(username.trim()) && !"".equals(password.trim()))
			{
				AuthFactory.authenticate( username, password );
			}

		} catch ( Exception e ) {
			return null;
		}

		XmppChatManager connection = users.get(username);

		if (connection == null)
		{
			try {
				OpenfireConfiguration config = OpenfireConfiguration.builder()
						.setUsernameAndPassword(username, password)
						.setXmppDomain(domain)
						.setResource(username + (new Random(new Date().getTime()).nextInt()))
						.setHost(hostname)
						.setPort(0)
						.enableDefaultDebugger()
						.setSendPresence(true)
						.build();
				connection = new XmppChatManager(config);
				connection.anonymous = false;
				connection.connect();
				connection.login();
				Presence presence = new Presence(Presence.Type.available);
				connection.sendStanza(presence);

				
				final XmppChatManager conn=connection;
				connection.stanzaListener = new StanzaListener()
				{
					public void processStanza(Stanza packet) {
						conn.processMessageStanza(packet);
					}
				};

				//connection.addPacketInterceptor((a)-> System.out.println(a), null);
				
				connection.addAsyncStanzaListener(connection.stanzaListener, new PacketTypeFilter(Message.class));
				connections.put(connection.getStreamId(), connection);

				users.put(username, connection);

				ChatManager chatManager = ChatManager.getInstanceFor(connection);
				chatManager.addIncomingListener(new IncomingChatMessageListener() {
					@Override
					public void newIncomingMessage(EntityBareJid from, Message message, Chat chat) {
						Log.info("Message :::"+message.getBody());
					}
				});

				connection.chats = new ConcurrentHashMap<String, Chat>();
				connection.groupchats = new ConcurrentHashMap<String, MultiUserChat>();
				connection.mucManager = MultiUserChatManager.getInstanceFor(connection);
				connection.mucManager.addInvitationListener(connection);
				
				
				
				List<DomainBareJid> mucServiceDomains = connection.mucManager.getMucServiceDomains();

				for (DomainBareJid domainBareJid : mucServiceDomains) {
					List<HostedRoom> hostedRooms = connection.mucManager.getHostedRooms(domainBareJid);
					for (HostedRoom hostedRoom : hostedRooms) {
						Log.info("hostedRooms::: jid{}  name{}",hostedRoom.getJid(),hostedRoom.getName());
						try {
							connection.joinRoom(hostedRoom.getJid().toString(), connection.getUserJid().toString());
						} catch (Exception e) {
							Log.error("Error joinign muc::{}",e);
						}
					}
				}
			}
			catch(Exception e) {
				Log.error("Error occured::",e);
			}
		}
		return connection;
	}




	// -------------------------------------------------------
	//
	// Groupchat/ Chat rooms
	//
	// -------------------------------------------------------

	@SuppressWarnings("deprecation")
	public boolean joinRoom(String mGroupChatName, String mNickName) {
		Log.debug("joinRoom " + mGroupChatName + " " + mNickName);
		if(mGroupChatName!=null) {
			try {
				MultiUserChat mMultiUserChat = groupchats.get(mGroupChatName);

				if (mMultiUserChat == null)
				{
					mMultiUserChat = mucManager.getMultiUserChat(JidCreate.entityBareFrom(mGroupChatName));
					groupchats.put(mGroupChatName, mMultiUserChat);
				}
				MucEnterConfiguration.Builder mec = mMultiUserChat.getEnterConfigurationBuilder(Resourcepart.from(mNickName));
				mec.requestMaxStanzasHistory(1);
	            MucEnterConfiguration mucEnterConfig = mec.build();
				mMultiUserChat.join(mucEnterConfig);
				mMultiUserChat.addMessageListener(new MessageListener() {

					@Override
					public void processMessage(Message message) {
						Log.info("MUC message recieved is::{}",message.getBody());

					}
				});
				return true;

			} catch (Exception e) {
				Log.error("joinRoom", e);

			}
		}
		return false;
	}



	public boolean leaveRoom(String mGroupChatName) {
		Log.debug("leaveRoom " + mGroupChatName);

		try {
			MultiUserChat mMultiUserChat = groupchats.get(mGroupChatName);
			mMultiUserChat.leave();
			return true;

		} catch (Exception e) {
			Log.error("leaveRoom", e);
			return false;
		}
	}




	private void sendPacket(TopLevelStreamElement stanza)
	{
		sendPacket(stanza.toXML(StreamOpen.CLIENT_NAMESPACE).toString());
		firePacketSendingListeners((Stanza) stanza);
	}

	public void sendPacket(String data)
	{
		try {
			Log.debug("sendPacket " + data );
			smackConnection.getRouter().route(DocumentHelper.parseText(data).getRootElement());

		} catch ( Exception e ) {
			Log.error( "An error occurred while attempting to route the packet : ", e );
		}
	}

	@Override
	public void sendNonza(Nonza element) {
		TopLevelStreamElement stanza = (TopLevelStreamElement) element;
		sendPacket(stanza);
	}

	@Override
	protected void sendStanzaInternal(Stanza packet) {
		TopLevelStreamElement stanza = (TopLevelStreamElement) packet;
		sendPacket(stanza);
	}

	public void enableStreamFeature(ExtensionElement streamFeature) {
		addStreamFeature(streamFeature);
	}

	@Override
	public boolean isSecureConnection() {
		return false;
	}

	@Override
	public boolean isUsingCompression() {
		return false;
	}
	@Override
	protected void connectInternal() throws SmackException, IOException, XMPPException, InterruptedException {
		Log.debug("connectInternal " + config.getUsername());

		streamId = "botuser" + new Random(new Date().getTime()).nextInt();
		smackConnection = new SmackConnection(streamId, this);
		connected = true;
		saslFeatureReceived.reportSuccess();
		tlsHandled.reportSuccess();

	}


	private EntityFullJid getUserJid()
	{
		try {
			return JidCreate.entityFullFrom(config.getUsername() + "@" + config.getXMPPServiceDomain() + "/" + config.getResource());
		}
		catch (XmppStringprepException e) {
			throw new IllegalStateException(e);
		}
	}

	@Override
	protected void loginInternal(String username, String password, Resourcepart resource) throws XMPPException
	{
		Log.info("loginInternal  "+user);
		try {
			AuthToken authToken = null;

			if (username == null || password == null || "".equals(username) || "".equals(password))
			{
				String user = resource.toString();
				if (username != null && !"".equals(username)) user = username;
				authToken = new AuthToken(user, anonymous);

			} else {
				username = username.toLowerCase().trim();
				user = getUserJid();
				JID userJid = XMPPServer.getInstance().createJID(username, null);

				session = (LocalClientSession) SessionManager.getInstance().getSession(userJid);

				if (session != null)
				{
					session.close();
					SessionManager.getInstance().removeSession(session);
				}


				try {
					authToken = AuthFactory.authenticate( username, password );

				} catch ( UnauthorizedException e ) {
					authToken = new AuthToken(resource.toString(), true);
				}
			}

			session = SessionManager.getInstance().createClientSession( smackConnection, (Locale) null );
			smackConnection.setRouter( new SessionPacketRouter( session ) );
			session.setAuthToken(authToken, resource.toString());
			authenticated = true;

			afterSuccessfulLogin(false);

		} catch (Exception e) {
			Log.error("loginInternal", e);
		}
	}

	@Override
	protected void shutdown() {
		Log.debug("shutdown " + config.getUsername());

		user = null;
		authenticated = false;
		reconnect = true;

		try {
			JID userJid = XMPPServer.getInstance().createJID(getUsername(), config.getResource().toString());

			session = (LocalClientSession) SessionManager.getInstance().getSession(userJid);

			if (session != null)
			{
				session.close();
				SessionManager.getInstance().removeSession(session);
			}

			//RESTServicePlugin.getInstance().removeServlets(sseHolder);

		} catch (Exception e) {
			Log.error("shutdown", e);
		}
	}
	public String getUsername()
	{
		return config.getUsername().toString();
	}

	// -------------------------------------------------------
	//
	// SmackConnection
	//
	// -------------------------------------------------------

	public class SmackConnection extends VirtualConnection
	{
		private SessionPacketRouter router;
		private String remoteAddr;
		private String hostName;
		private LocalClientSession session;
		private boolean isSecure = false;
		private XmppChatManager connection;

		public SmackConnection(String hostName, XmppChatManager connection)
		{
			this.remoteAddr = hostName;
			this.hostName = hostName;
			this.connection = connection;
		}

		public void setConnection(XmppChatManager connection) {
			this.connection = connection;
		}

		public boolean isSecure() {
			return isSecure;
		}

		public void setSecure(boolean isSecure) {
			this.isSecure = isSecure;
		}

		public SessionPacketRouter getRouter()
		{
			return router;
		}

		public void setRouter(SessionPacketRouter router)
		{
			this.router = router;
		}

		public void closeVirtualConnection()
		{
			Log.debug("SmackConnection - close ");

			if (this.connection!= null) this.connection.shutdown();
		}

		public byte[] getAddress() {
			return remoteAddr.getBytes();
		}

		public String getHostAddress() {
			return remoteAddr;
		}

		public String getHostName()  {
			return ( hostName != null ) ? hostName : "0.0.0.0";
		}

		public void systemShutdown() {

		}

		public void deliver(org.xmpp.packet.Packet packet) throws UnauthorizedException
		{
			deliverRawText(packet.toXML());
		}

		public void deliverRawText(String text)
		{
			int pos = text.indexOf("<message ");

			if (pos > -1)
			{
				text = text.substring(0, pos + 9) + "xmlns=\"jabber:client\"" + text.substring(pos + 8);
			}

			Log.debug("SmackConnection - deliverRawText\n" + text);


			Stanza stanza = connection.handleParser(text);

			if (stanza != null)
			{
				processMessageStanza(stanza);
			}
		}

		@Override
		public org.jivesoftware.openfire.spi.ConnectionConfiguration getConfiguration()
		{
			return null;
		}

		public Certificate[] getPeerCertificates() {
			return null;
		}

	}



	public void processMessageStanza(Stanza packet)
	{
		Log.debug("Received packet: \n" + packet.toXML(StreamOpen.CLIENT_NAMESPACE));
	}


	public Stanza handleParser(String xml)
	{
		Stanza stanza = null;

		try {
			stanza = PacketParserUtils.parseStanza(xml);
		}
		catch (Exception e) {
			Log.error("handleParser failed");
		}

		if (stanza != null) {
			invokeStanzaCollectorsAndNotifyRecvListeners(stanza);
		}

		return stanza;
	}

	// -------------------------------------------------------
	//
	// ClientServlet
	//
	// -------------------------------------------------------



	public static class OpenfireConfiguration extends ConnectionConfiguration
	{
		protected OpenfireConfiguration(Builder builder) {
			super(builder);
		}

		public static Builder builder() {
			return new Builder();
		}

		public static final class Builder extends ConnectionConfiguration.Builder<Builder, OpenfireConfiguration> {

			private Builder() {
			}

			@Override
			public OpenfireConfiguration build() {
				return new OpenfireConfiguration(this);
			}

			@Override
			protected Builder getThis() {
				return this;
			}
		}
	}




	// -------------------------------------------------------
	//
	// InvitationListener
	//
	// -------------------------------------------------------

	@Override
	public void invitationReceived(XMPPConnection xmppConnection, MultiUserChat multiUserChat, EntityJid inviter, String reason, String password, Message message, MUCUser.Invite invitation)
	{
		try {
			String room = multiUserChat.getRoom().toString();
			Log.info("invitationReceived::::room{}   inviter{}",room ,inviter);
			joinRoom(room, this.getUserJid().toString());

		} catch (Exception e) {
			Log.error("invitationReceived", e);
		}
	}
}

ExamplePlugin:

public class ExamplePlugin implements Plugin
{

    @Override
    public void initializePlugin( PluginManager manager, File pluginDirectory )
    {
    	
    	//configure username and password
    	String username="bot";
    	String password="123456";
    	String totp=null;
    	
      try {
     // OpenfireConnection connection = null;
     // boolean anonymous = true;
     // Log.debug("/{username}/login " + loginUser);
    	XmppChatManager chatManager=XmppChatManager.createConnection(username, password,false); 
    	if (chatManager == null)
          {
              Log.info("COnnection is NULLL::::::::::");
          }

      } catch (Exception e) {
      	Log.error("Exception occuered::",e);
      }
	}

    @Override
    public void destroyPlugin()
    {
        Log.info("Destroying Example Plugin");
    }

	
}

Plugin code(example code which does the same thing as explained above):
exampleplugin.jar (35.6 MB)
example-plugin.zip (35.7 MB)

I have uploaded my self created chatbot plugin to github:
You can fork it and so on… have fun…

2 Likes

does this work in cluster?
have you checked my project, any feedback on that?

Sorry i did not check your project…
the bot should runs in a clustered environment, but it will only connect from senior cluster member.

@totzkotz thanks for the response!!. I have checked your code i think you are also using smack lib for creating connection. The one more problem which I am facing in my plugin is after deploy if it is taking more memory. Before deployment the memory consumption openfire JAVA heap bar is 120mb but after deployent it created only one connection and joined 1 chatroom romm only but the memory consumtpion went to 400 to 650mb. So, didn’t got what is going wrong.

I use a combination of smack for the connection and direct server interaction via plugin code… runs just fine… also using aiml for the bot ai