/*
* Conditions Of Use
*
* This software was developed by employees of the National Institute of
* Standards and Technology (NIST), an agency of the Federal Government.
* Pursuant to title 15 Untied States Code Section 105, works of NIST
* employees are not subject to copyright protection in the United States
* and are considered to be in the public domain. As a result, a formal
* license is not needed to use the software.
*
* This software is provided by NIST as a service and is expressly
* provided "AS IS." NIST MAKES NO WARRANTY OF ANY KIND, EXPRESS, IMPLIED
* OR STATUTORY, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTY OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT
* AND DATA ACCURACY. NIST does not warrant or make any representations
* regarding the use of the software or the results thereof, including but
* not limited to the correctness, accuracy, reliability or usefulness of
* the software.
*
* Permission to use this software is contingent upon your acceptance
* of the terms of this agreement
*
* .
*
*/
package gov.nist.javax.sip.parser;
import gov.nist.core.InternalErrorHandler;
import gov.nist.javax.sip.stack.SIPStackTimerTask;
import java.io.*;
import java.util.*;
/**
* Input class for the pipelined parser. Buffer all bytes read from the socket
* and make them available to the message parser.
*
* @author M. Ranganathan (Contains a bug fix contributed by Rob Daugherty (
* Lucent Technologies) )
*
*/
public class Pipeline extends InputStream {
private LinkedList buffList;
private Buffer currentBuffer;
private boolean isClosed;
private Timer timer;
private InputStream pipe;
private int readTimeout;
private TimerTask myTimerTask;
class MyTimer extends SIPStackTimerTask {
Pipeline pipeline;
private boolean isCancelled;
protected MyTimer(Pipeline pipeline) {
this.pipeline = pipeline;
}
protected void runTask() {
if (this.isCancelled)
return;
try {
pipeline.close();
} catch (IOException ex) {
InternalErrorHandler.handleException(ex);
}
}
public boolean cancel() {
boolean retval = super.cancel();
this.isCancelled = true;
return retval;
}
}
class Buffer {
byte[] bytes;
int length;
int ptr;
public Buffer(byte[] bytes, int length) {
ptr = 0;
this.length = length;
this.bytes = bytes;
}
public int getNextByte() {
int retval = bytes[ptr++] & 0xFF;
return retval;
}
}
public void startTimer() {
if (this.readTimeout == -1)
return;
// TODO make this a tunable number. For now 4 seconds
// between reads seems reasonable upper limit.
this.myTimerTask = new MyTimer(this);
this.timer.schedule(this.myTimerTask, this.readTimeout);
}
public void stopTimer() {
if (this.readTimeout == -1)
return;
if (this.myTimerTask != null)
this.myTimerTask.cancel();
}
public Pipeline(InputStream pipe, int readTimeout, Timer timer) {
// pipe is the Socket stream
// this is recorded here to implement a timeout.
this.timer = timer;
this.pipe = pipe;
buffList = new LinkedList();
this.readTimeout = readTimeout;
}
public void write(byte[] bytes, int start, int length) throws IOException {
if (this.isClosed)
throw new IOException("Closed!!");
Buffer buff = new Buffer(bytes, length);
buff.ptr = start;
synchronized (this.buffList) {
buffList.add(buff);
buffList.notifyAll();
}
}
public void write(byte[] bytes) throws IOException {
if (this.isClosed)
throw new IOException("Closed!!");
Buffer buff = new Buffer(bytes, bytes.length);
synchronized (this.buffList) {
buffList.add(buff);
buffList.notifyAll();
}
}
public void close() throws IOException {
this.isClosed = true;
synchronized (this.buffList) {
this.buffList.notifyAll();
}
// JvB: added
this.pipe.close();
}
public int read() throws IOException {
// if (this.isClosed) return -1;
synchronized (this.buffList) {
if (currentBuffer != null
&& currentBuffer.ptr < currentBuffer.length) {
int retval = currentBuffer.getNextByte();
if (currentBuffer.ptr == currentBuffer.length)
this.currentBuffer = null;
return retval;
}
// Bug fix contributed by Rob Daugherty.
if (this.isClosed && this.buffList.isEmpty())
return -1;
try {
// wait till something is posted.
while (this.buffList.isEmpty()) {
this.buffList.wait();
if (this.isClosed)
return -1;
}
currentBuffer = (Buffer) this.buffList.removeFirst();
int retval = currentBuffer.getNextByte();
if (currentBuffer.ptr == currentBuffer.length)
this.currentBuffer = null;
return retval;
} catch (InterruptedException ex) {
throw new IOException(ex.getMessage());
} catch (NoSuchElementException ex) {
ex.printStackTrace();
throw new IOException(ex.getMessage());
}
}
}
}