powered by Jive Software

Simultaneously reading and writing over a socket

Most MIDP2 phones don’‘t allow for simultaneously reading and writing over a socket. Unfortunately SMACK does that all the time (using multiple threads) in PacketReader and PacketWriter. Any tips on the best way to get rid of this behaviour would be very welcome I’'ve tried some things myself but it got rather ugly really fast!

Cheers,

Berco

Berco,

One thing you could try is adding a single lock object that both the reader and writer have to synchronize on before doing their operations. I have no idea whether that would work, however. You’'d need to be pretty careful to make sure that no deadlocks occured, though.

Regards,

Matt

Hi,

not trying to be a smart ass here (honest!), but synchronizing access on the outside is not likely to work, because the inside is what works asynchronously and access the critical resource.

The only sane way, I can think of, is synchronizing on the resource that needs protection, i.e. the connection. Currently the Writer and Reader(?) synchronize on their own queue. If they would both use a connection related monitor that should work.

Unfortunately this behavior is not desired in J2SE environments, as it does put a throttle on the throughput. Maybe the solution can be to make it configurable when instantiating the connection. E.g. the connection has two monitor references. One for outgoing traffic and one for incoming traffic. A parameter in the constructor could instruct the connection to instantiate a) a single monitor and assign it to both references or b)[j2se] two separate monitors and assign each to one of the references.

That shouldn’‘t be too much coding, and afaik wouldn’‘t break any outside code, but I haven’'t put too much thought into it. I might very well be wrong.

Cheers,

Mariano

Mariano,

Yeah, I’‘m not sure exactly what might work. At this time, we don’‘t have any plans to officially support J2ME in Smack. So, you’'d need to make these changes in a local copy of the source. If enough people need J2ME support, perhaps we could have a special Smack edition for J2ME as a companion project.

Regards,

Matt

Hi Mariano,

Thanks for the feedback.

I’'m trying to figure out what exactly to synchronize in my code. In PacketWriter.parsePackets() I could do the following:


synchronized(connection) {

while (!done) {

Packet packet = nextPacket();

writer.write(packet.toXML());

writer.flush();

}

}


And in PacketReader.parsePackets() the following:


synchronized(connection) {

int eventType = parser.getEventType();

}


I thought this would sync the reading/writing by using the lock (a monitor) on the connection object, but it appears to result in a deadlock. That might be due to the other synchronizations going on, but that’'s pretty difficult to find out.

Bottom line: I’'m not sure which part of the code to synchronize, if you (or somebody else on this list) have a suggestion that would be very welcome.

Cheers,

Berco

Hi Berco,

Thanks for the feedback.

You’‘re welcome. Unfortunately I am pretty busy myself at the moment, so that I can’'t provide you with an implementation.

I don’‘t know the answers without trying, but I’'d like to give you some pointers, which may or may not help you.

I don’'t know anything about MIDP. What exactly are the constraints? You can read and write, but not at the same time?

So it is kind of half-duplex?

I’'m trying to figure out what exactly to synchronize in my code.

In PacketWriter.parsePackets() I could do the following:

You mean writePackets(), right?



synchronized(connection) {

while (!done) {

Packet packet = nextPacket();

writer.write(packet.toXML());

writer.flush();

}

}



a) Synchronisation, should be as granular as possible, so I would try something like this:

existing code:

while (!done) {

Packet packet = nextPacket();

if (packet != null) {

synchronized (writer) {

writer.write(packet.toXML());

writer.flush();

}

}

}

You just want to make sure that the actual write over the wire won’'t happen at the same time as a read,

so I would change in “writer” to “connection”.

You also need to do that in KeepAliveTask.run().

Be aware that there are some more calls to write.xx without any synchronisation in the existing code. This seems to be opening and closing the stream. Not sure if a problem can arise here. Needs some brain cycles.

Are you using eclipse? Hit CTRLSHIFTU when your cursor is on “writer”, you’'ll see all occurences at once.

Every write()/flush() is a candidate for synchronisation.

And in PacketReader.parsePackets() the following:



synchronized(connection) {

int eventType = parser.getEventType();

}



Ok, here is the real problem and likely the cause of the dead lock.

The read calls seem to be synchronous and blocking. This will likely result in the reader not returning any bytes anymore at some point in time and so

the lock is held infinite, because new packets will only arrive as a response when you write something to the server,

which cannot happen because it wants to have the same lock.

The only way the lock can be released is when the server sends something without the client asking for it (like an IM from another client).

So this can’'t work.

Two choices:

a) Look at the XMPPull API if you can set a timeout or use another method which takes a timeout. I doubt a bit, that this is available. In case it is

implement a spin lock like below.

b) The source of the XMLPullParser is a Reader.

PacketReader.constructor():

parser.setInput(connection.reader);

This can be the solution to your problem. It would be very clean to also go directly to the source, i.e. the code touching the wire, as in the a.m.

code.

Create a class like

public class CooperativeReader extends Reader {

private Reader source;

private Object lock;

public CooperativeReader(Reader source, Object lock) {

this. … blablba

}

public int read(char cbuf[], int off, int len) throws IOException {

synchronized(lock) {

while(true) {

if (source.isReady())

return source.read(cbuf, off, len);

lock.wait(2000); // This releases the lock!

}

}

}

About isReady():

/**

  • Tell whether this stream is ready to be read.
  • @return True if the next read() is guaranteed not to block for input,

  • false otherwise. Note that returning false does not guarantee that the

  • next read will block.

  • @exception IOException If an I/O error occurs

*/

public boolean ready() throws IOException {

In the constructor do the following:

PacketReader.constructor():

parser.setInput(new CooperativeReader(connection.reader, connection));

You should also add “connection.notifyAll();” as the last statement in the synchronized

blocks in the PacketWriter to let this one wake up earlier then after 2000 ms. For tests it would be enought to reduce 2000 ms to 100 ms.

If this works, we need to add something to make it more production stable, like adding a timeout to read or checking for close() etc.

Will see about that then.

I might be totally wrong. I didn’'t test that, but it makes sense to me. I have been wrong before though and I am in a hurry right now.

I thought this would sync the reading/writing by using

the lock (a monitor) on the connection object, but it

appears to result in a deadlock. That might be due to

the other synchronizations going on, but that’'s pretty

difficult to find out.

Well, first of all: Running into deadlocks so easily is is a good thing ™.

Second best to the real solution

The real problems arise when you find deadlocks just in some hard to reproduce constellations.

These things are really funny after you

shipped

Cheers,

Mariano

Hi Mariano,

I don’‘t know the answers without trying, but I’'d like

to give you some pointers, which may or may not help

you.

Thank you very much for your (quick) help! Very much appreciated.

I don’'t know anything about MIDP. What exactly are

the constraints? You can read and write, but not at

the same time? So it is kind of half-duplex?

Yes.

You mean writePackets(), right?

Yes, sorry about that.

so I would change in “writer” to

“connection”.

You also need to do that in KeepAliveTask.run().

Hmmm…I can’'t find that class anywhere. Are you sure that is the correct name?

Be aware that there are some more calls to write.xx

without any synchronisation in the existing code.

This seems to be opening and closing the stream. Not

sure if a problem can arise here. Needs some brain

cycles.

Are you using eclipse? Hit CTRLSHIFTU when your

cursor is on “writer”, you’'ll see all occurences at

once.

Wow! Excellent! Another little Eclipse gem! Thanks for the tip.

Every write()/flush() is a candidate for

synchronisation.

OK, I’'ll change all 6 of them.

Ok, here is the real problem and likely the cause of

the dead lock.

Sounds reasonable.

Two choices:

a) Look at the XMPPull API if you can set a timeout

or use another method which takes a timeout. I doubt

a bit, that this is available. In case it is

implement a spin lock like below.

b) The source of the XMLPullParser is a Reader.

PacketReader.constructor():

parser.setInput(connection.reader);

This can be the solution to your problem. It would be

very clean to also go directly to the source, i.e.

the code touching the wire, as in the a.m.

code.

Create a class like

public class CooperativeReader extends Reader {

You should also add “connection.notifyAll();” as the

last statement in the synchronized

blocks in the PacketWriter to let this one wake up

earlier then after 2000 ms. For tests it would be

enought to reduce 2000 ms to 100 ms.

I’'ll test it out first thing in the morning tomorrow!

If this works, we need to add something to make it

more production stable, like adding a timeout to read

or checking for close() etc.

Will see about that then.

Yep, I’'ll first test it out before worrying about production stability.

I might be totally wrong. I didn’'t test that, but it

makes sense to me. I have been wrong before though

and I am in a hurry right now.

Hurry? Wow, amazing that you could still come up with such an elaborate solution. Respect!

Well, first of all: Running into deadlocks so easily

is is a good thing ™.

Second best to the real solution

HHH! That keeps the faith up!

The real problems arise when you find deadlocks just

in some hard to reproduce constellations.

These things are really funny after you

shipped

I hope you’'re not speaking from experience, are you?

Again, thanks for your excellent response and I’'ll let you know the results asap.

Cheers,

Berco

Hi Berco,

just a quick answer for now.

You also need to do that in KeepAliveTask.run().

Hmmm…I can’'t find that class anywhere. Are you

sure that is the correct name?

It’'s an inner class of PacketWriter.

Don’‘t thank me too early. Let’'s see if it works

Greetings to the Netherlands,

Mariano

Hi Mariano,

KeepAliveTask is not a subclass of my PacketWriter, which might be due to the fact that I’'m not porting the latest version of Smack?

As you suggest I’'m adding the CooperativeReader class as input for my parser, but I run into a few issues:

About isReady():

Is that a method I should add to the InputStreamReader (meaning I have to extend the latter and add the isReady() method)? If so, any suggestion how decide whether the next read() is guaranteed not to block for input?

In CooperativeReader.read() there is a call:

source.read(cbuf, off, len);

How do I know what exactly to read?

Again, thanks for your excellent help and greetings to all Germans!

Berco

Hi Berco,

KeepAliveTask is not a subclass of my PacketWriter,

I was talking about an inner class.

which might be due to the fact that I’'m not porting

the latest version of Smack?

Hit CTRLSHIFTT and type KeepAliveTask. Eclipse will show it to you.

I am using smack 1.3.0.

Why do you say “your” PacketWriter. Because it is your copy of the PacketWriter or did you implement one yourself?

As you suggest I’'m adding the CooperativeReader class

as input for my parser, but I run into a few issues:

No surprise here. I’'ve just been providing a rough outline.

About isReady():

Is that a method I should add to the

InputStreamReader (meaning I have to extend the

latter and add the isReady() method)?

No, just take the code I wrote and that should be it.

It might be that some other methods or the other variations of read() needs to do some synchronisation, but this can likely be delegated to the one read method I wrote.

If so, any

suggestion how decide whether the next read() is

guaranteed not to block for input?

Don’'t do it yourself. See my code. It is delegating this to the original source and as long as the used implementation supports isReady() all is fine. If not, we will see about that then.

In CooperativeReader.read() there is a call:

source.read(cbuf, off, len);

How do I know what exactly to read?

You don’'t have to. You may want to lookup the decorator pattern. Quite interesting. Readers/Writers, Input and Outputstream use this pattern.

Basically you augment something else with some new behaviour, but stay interface compatible.

In this case you will be called with the parameters you just pass on to the “source”, which is the actual reader and implements the same interface as you do. You also enhance the delegating call by providing synchronisation.

Is that any clearer now?

Again, thanks for your excellent help and greetings

to all Germans!

Alright, will pass that on, when I meet all them

The real problems arise when you find deadlocks just

in some hard to reproduce constellations.

These things are really funny after you

shipped

I hope you’'re not speaking from experience, are you?

Well, I honestly believe that such a bug never slipped to production, but other bugs did. We have a pretty large application with 41 external interfaces and loads of products we integrate which is used by up to 500 users concurrently. We use multi-threading very cautiously and as I said above there hasn’'t been one multi-threading bug in production, but that can be even worse.

It is very hard to find a bug, when there is none. And having an environment with so many outside systems makes it really tough to impossible to reproduce an error outside of the production environment, which of cause, can not be fiddled with.

To cut a long story short: Never had any problems with threading, but have lots of painful experience hunting for potential bugs …

Cheers,

Mariano

Hi Mariano,

KeepAliveTask is not a subclass of my

PacketWriter,

I was talking about an inner class.

Yep, I know, but it’'s not in version 1.2.1 of Smack.

I am using smack 1.3.0.

Why do you say “your” PacketWriter. Because it is

your copy of the PacketWriter or did you implement

one yourself?

I just meant my copy, I didn’'t implement it myself…

About isReady():

Is that a method I should add to the

InputStreamReader (meaning I have to extend the

latter and add the isReady() method)?

No, just take the code I wrote and that should be

it.

I’‘m a little confused here. Do you mean that a Reader already should have an isReady() method? That isn’‘t the case in either J2ME or J2SE. What I did for now is change Reader.isReady() to Reader.ready() (which DOES exist in Reader). After that the deadlock still occurs,PacketReader.writePacket() runs into a deadlock. It seems to be in PacketReader.parsePackets(), where there’‘s a connectionIDLock. I’'m not totally sure though.

If so, any

suggestion how decide whether the next read() is

guaranteed not to block for input?

Don’'t do it yourself. See my code. It is delegating

this to the original source and as long as the used

implementation supports isReady() all is fine. If

not, we will see about that then.

It isn’‘t the case, so I recided to calling Reader.ready(), but I’'m not sure that works.

It is very hard to find a bug, when there is none.

HHH! LOL!

Thanks again for your great help!

Cheers,

Berco

Hi Berco,

Yep, I know, but it’'s not in version 1.2.1 of Smack.

I am using smack 1.3.0.

Any reason you don’'t upgrade to a current version?

isReady()/ready()

My bad. Yes, of course, I meant to say “ready()”. I just wrote that from memory without having an API handy.

It isn’'t the case, so I recided to calling

Reader.ready(), but I’'m not sure that works.

sure, that was what I meant.

still deadlocking …

Hmmh. Ok, I think I have to try it myself then. Unfortunately I am currently working six days a week, but hopefully I will find some time on Sunday for doing that, but I don’'t promise that, as I am not sure what other chores are waiting for me on Sunday.

Btw. You could annotate your code like ready() etc. with println()s to see if ready() is implemented properly in the classes actually implementing it and in what situations the deadlock occurs. Also eclipse is offering support for watching monitors. The view is called “Threads and Monitors”.

Cheers,

Mariano

Hi Mariano,

Any reason you don’'t upgrade to a current version?

Yes, I’‘ve started with porting the 1.2.1 version to J2ME and doing it all again for the 1.3 version would take me a lot of time. The moment I’‘ve got this all working I will post all changes I’'ve made to this forum so others can work on an ‘‘official’’ port of SMACK to J2ME.

still deadlocking …

Hmmh. Ok, I think I have to try it myself then.

Unfortunately I am currently working six days a week,

but hopefully I will find some time on Sunday for

doing that, but I don’'t promise that, as I am not

sure what other chores are waiting for me on Sunday.

That would be very nice of you. In the meantime I’'ll keep forking the code.

Btw. You could annotate your code like ready() etc.

with println()s to see if ready() is implemented

properly in the classes actually implementing it and

in what situations the deadlock occurs.

Yes, but ready() is implemented in the Reader class of J2ME, of which the source IS available, but it would be quite a hassle to edit that.

Also eclipse

is offering support for watching monitors. The view

is called “Threads and Monitors”.

Yes, but the problem is that running J2ME applications is not easy from within Eclipse. I’‘ve used the excellent EclipseME plugin, but the folder hierarchy that I have is unusable for that plugin (these wireless toolkits are rather picky regarding folder structures), so I have to reside to the standard wireless toolkit from SUN. This means that I can’‘t use Eclipse’‘s function of watching monitors. I’‘ve read somewhere that you can do remote debugging with Eclipse, so I’'ll look into that.

I’'ll keep you informed.

Cheers,

Berco

Yes, but ready() is implemented in the Reader class

of J2ME, of which the source IS available, but it

would be quite a hassle to edit that.

But you did implement ready() in your own Reader like I described? Just annotate the calls like

System.out.println(Thread.currentThreade()+": before call to source.ready()");

System.out.println(Thread.currentThreade()*": after call to source.ready(), result="*source.ready());

I’'ve read somewhere that you can do remote debugging

with Eclipse, so I’'ll look into that.

Yes, pretty easy to do for J2SE. Not sure if a J2ME VM supports that.

I’'ll keep you informed.

Thx.

Mariano

I’'ve read somewhere that you can do remote debugging

with Eclipse, so I’'ll look into that.

I just tried Eclipse’‘s remote debugging utility but unfortunately the Wireless Toolkit’‘s JVM doesn’‘t support the retrieval of monitor information. At least, that’‘s what Eclipse’'s “Threads and Monitors” window says. The exact error is:

“The currently selected VM does not support the retrieval of monitor information”

So unfortunately remotely watching monitors is not an option.

But you did implement ready() in your own Reader like

I described?

You provided read(), but not ready() (only a signature). I probably misinterpreted your example, but I created a CooperativeReader, added a read() to it and that’'s it. The read() looks as follows:


public int read(char cbuf[], int off, int len) throws IOException {

synchronized(lock) {

while(true) {

System.out.println(Thread.currentThread() + “: before call to source.ready()”);

if (source.ready()) {

System.out.println(Thread.currentThread()*": after call to source.ready(), result="*source.ready());

return source.read(cbuf, off, len);

}

try {

lock.wait(2000); // This releases the lock!

} catch (InterruptedException e) {

System.out.println(“CooperativeReader.read: lock interrupted”);

e.printStackTrace();

}

}

}

}


If I should add a ready() to CooperativeReader that overrides that of Reader, I wouldn’‘t know how to make sure the next read() call wouldn’'t block. Could it be:


public boolean ready() throws IOException {

return source.ready();

}


Cheers,

Berco

public boolean ready() throws IOException {

return source.ready();

}

Exactly! That was my intention… Sorry for not being more specific. I am still not sure if the whole approach would work, but without this it can’'t work, because I believe to remember that Reader.ready() always returns false.

Cheers,

Mariano

OK, the deadlock still occurs. It appears that the login times out due to a deadlock. This may in turn be caused by PacketReader deadlocking. Here’'s some System.outs (I hope they make at least SOME sense


PacketWriter.writePackets

PacketWriter.writePackets: != done

PacketReader.parsePackets()

PacketReader.parsePackets(): got eventType: 0

PacketReader.parsePackets: getting next eventType

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: after call to source.ready(), result= true

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: after call to source.ready(), result= true

PacketReader.parsePackets: got next eventType: 2

PacketReader.parsePackets(): got eventType: 2

PacketReader.parsePackets(): got START_TAG

PacketReader.parsePackets: getting next eventType

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

XMPPConnection.init: packetReader started

ProviderManager.static: trying to get providerStream

ProviderManager.static: got providerStream

ProviderManager.static: got factory

ProviderManager.static: got parser

ProviderManager: parser.getEventType(); 0

ProviderManager: iq provider

ProviderManager: iq provider

ProviderManager: extensionprovider

ProviderManager: extensionprovider

ProviderManager: extensionprovider

ProviderManager: extensionprovider

MainUI: can’'t connect to server

org.jivesoftware.smack.XMPPException: No response from the server.

at org.jivesoftware.smack.XMPPConnection.login(+114)

at org.jivesoftware.smack.XMPPConnection.login(+8)

at MainUI$ConnectionRunner.run(+73)

PacketWriter.writePackets: != done

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: after call to source.ready(), result= true

PacketReader.parsePackets: got next eventType: 2

PacketReader.parsePackets(): got eventType: 2

PacketReader.parsePackets(): got START_TAG

PacketReader.parseIQ()

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: after call to source.ready(), result= true

PacketReader.parsePackets: getting next eventType

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

PacketWriter.writePackets: == done

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

CooperativeReader.read: Thread[@223940028,5]: before call to source.ready()

Berco,

huuh, bugger.

My bad. My perception was that ready() would take a peek at new packets, without needing to call read(). That is something which should be accomplishable as it is eventually delegating to StreamDecoder, which is from NIO. Unfortuantely the sources for StreamDecoder are not available?! ;-(

So solving this needs some more effort. We need to take the peek ourself in a separate Thread from the CooperativeReader. The second thread needs to call read() (blocking) and before doing that would set an instance variable ,boolean blocking, to true. After it read a byte it needs to store that byte in an instance variable “peekByte” and set “blocking” to false. In the original read method we can get rid of ready() ;-( and instead see, if “blocking” is true. In that case we do the same spinlock as with ready(). If false we take the peakByte and read the “remaining” bytes (because we already read one while taking the peek) with a call to the source and return the concatenated byte array constructed from the peekByte and the remaining Bytes.

That is more work than before, but should be less then 30 lines of actual code (without blank lines, comments etc.).

Sorry for not being more verbose and responsive, but I have to come up with a concept and the according paper for a new feature we are going to implement until Monday morning and I am not making very good progress, which could seriously jeopardize my Sunday.

Cheers,

Mariano

huuh, bugger.

HHH! Yep, bugger.

My bad. My perception was that ready() would take a

a peek at new packets, without needing to call

read(). That is something which should be

accomplishable as it is eventually delegating to

StreamDecoder, which is from NIO. Unfortuantely the

sources for StreamDecoder are not available?! ;-(

Probably is since it’‘s part of the CLDC/MIDP API. I’'m not sure if we want to go there though.

So solving this needs some more effort. We need to

to take the peek ourself in a separate Thread from

the CooperativeReader. The second thread needs to

call read() (blocking) and before doing that would

set an instance variable ,boolean blocking, to true.

After it read a byte it needs to store that byte in

an instance variable “peekByte” and set “blocking” to

false. In the original read method we can get rid of

ready() ;-( and instead see, if “blocking” is true.

In that case we do the same spinlock as with ready().

If false we take the peakByte and read the

“remaining” bytes (because we already read one while

taking the peek) with a call to the source and return

the concatenated byte array constructed from the

peekByte and the remaining Bytes.

This might indeed work, although it’‘s getting more cumbersome than I expected in the beginning. I didn’‘t expect avoiding simultaneous reading/writing would be so difficult. But anyway, thanks to your excellent help I’'m getting a steam course in multithreaded programming, which is very nice.

Sorry for not being more verbose and responsive,

e, but I have to come up with a concept and the

according paper for a new feature we are going to

implement until Monday morning and I am not making

very good progress, which could seriously jeopardize

my Sunday.

I think you’‘re very responsive! Thanks for that. Success with the concept and paper, we’'ll see if that jeopardizes your sunday.

Ein gutes wochenende!

Bero