|
10 | 10 |
|
11 | 11 | package com.tangosol.coherence.component.util.daemon.queueProcessor.packetProcessor;
|
12 | 12 |
|
| 13 | +import com.oracle.coherence.common.io.BufferManager; |
| 14 | + |
13 | 15 | import com.tangosol.coherence.component.net.Member;
|
14 | 16 | import com.tangosol.coherence.component.net.Message;
|
15 | 17 | import com.tangosol.coherence.component.net.Packet;
|
|
21 | 23 | import com.tangosol.coherence.component.net.packet.notifyPacket.Ack;
|
22 | 24 | import com.tangosol.coherence.component.net.packet.notifyPacket.Request;
|
23 | 25 | import com.tangosol.coherence.component.util.WindowedArray;
|
24 |
| -import com.tangosol.coherence.component.util.daemon.queueProcessor.packetProcessor.PacketPublisher; |
25 |
| -import com.oracle.coherence.common.io.BufferManager; |
26 | 26 | import com.tangosol.coherence.config.Config;
|
27 | 27 | import com.tangosol.io.nio.ByteBufferReadBuffer;
|
28 | 28 | import com.tangosol.net.internal.PacketComparator;
|
|
31 | 31 | import com.tangosol.util.ClassHelper;
|
32 | 32 | import com.tangosol.util.LongArray;
|
33 | 33 | import com.tangosol.util.SimpleLongArray;
|
| 34 | +import com.tangosol.util.SparseArray; |
| 35 | + |
34 | 36 | import java.io.IOException;
|
35 | 37 | import java.net.SocketAddress;
|
36 | 38 | import java.nio.ByteBuffer;
|
@@ -334,40 +336,47 @@ protected void cleanup(com.tangosol.coherence.component.net.Member member)
|
334 | 336 | // import java.util.Iterator;
|
335 | 337 |
|
336 | 338 | BufferManager mgr = getBufferManager();
|
337 |
| - |
338 |
| - for (Iterator iter = member.getMessagePile().iterator(); iter.hasNext();) |
| 339 | + |
| 340 | + SparseArray messagePile = member.getMessagePile(); |
| 341 | + if (messagePile != null) |
339 | 342 | {
|
340 |
| - Object o = iter.next(); |
341 |
| - if (o instanceof LongArray) // all we have are sequel packets |
| 343 | + for (Iterator iter = member.getMessagePile().iterator(); iter.hasNext(); ) |
342 | 344 | {
|
343 |
| - for (Iterator iterPacket = ((LongArray) o).iterator(); iterPacket.hasNext(); ) |
| 345 | + Object o = iter.next(); |
| 346 | + if (o instanceof LongArray) // all we have are sequel packets |
344 | 347 | {
|
345 |
| - mgr.release(((MessagePacket) iterPacket.next()).getByteBuffer()); |
| 348 | + for (Iterator iterPacket = ((LongArray) o).iterator(); iterPacket.hasNext(); ) |
| 349 | + { |
| 350 | + mgr.release(((MessagePacket) iterPacket.next()).getByteBuffer()); |
| 351 | + } |
346 | 352 | }
|
| 353 | + // else o instanceof Message; handled below |
347 | 354 | }
|
348 |
| - // else o instanceof Message; handled below |
| 355 | + member.setMessagePile(null); |
349 | 356 | }
|
350 |
| - member.setMessagePile(null); |
351 | 357 |
|
352 | 358 | WindowedArray la = member.getMessageIncoming();
|
353 |
| - for (long li = la.getFirstIndex(), le = la.getLastIndex(); li <= le; li = la.getFirstIndex()) |
| 359 | + if (la != null) |
354 | 360 | {
|
355 |
| - Message msg = (Message) la.remove(li); |
356 |
| - if (msg == null) |
| 361 | + for (long li = la.getFirstIndex(), le = la.getLastIndex(); li <= le; li = la.getFirstIndex()) |
357 | 362 | {
|
358 |
| - break; // signifies that the remainder or the indexes must be null |
359 |
| - } |
360 |
| - |
361 |
| - for (int i = 0, c = msg.getMessagePartCount(); i < c; ++i) |
362 |
| - { |
363 |
| - MessagePacket packet = msg.getPacket(i); |
364 |
| - if (packet != null) |
| 363 | + Message msg = (Message) la.remove(li); |
| 364 | + if (msg == null) |
365 | 365 | {
|
366 |
| - mgr.release(packet.getByteBuffer()); |
| 366 | + break; // signifies that the remainder or the indexes must be null |
| 367 | + } |
| 368 | + |
| 369 | + for (int i = 0, c = msg.getMessagePartCount(); i < c; ++i) |
| 370 | + { |
| 371 | + MessagePacket packet = msg.getPacket(i); |
| 372 | + if (packet != null) |
| 373 | + { |
| 374 | + mgr.release(packet.getByteBuffer()); |
| 375 | + } |
367 | 376 | }
|
368 | 377 | }
|
| 378 | + member.setMessageIncoming(null); |
369 | 379 | }
|
370 |
| - member.setMessageIncoming(null); |
371 | 380 | }
|
372 | 381 |
|
373 | 382 | /**
|
@@ -1104,7 +1113,7 @@ protected void onPacketDirected(com.tangosol.coherence.component.net.Member memb
|
1104 | 1113 | // sequel Packets can "find their way home"
|
1105 | 1114 | laPile.set(lFromMsgId, msg);
|
1106 | 1115 | }
|
1107 |
| - |
| 1116 | + |
1108 | 1117 | // check for completed Messages in the array
|
1109 | 1118 | if (lToMsgId == lMsgFirst)
|
1110 | 1119 | {
|
|
0 commit comments