Java程序  |  214行  |  6.65 KB

/*
 * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *
 *   - Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *
 *   - Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *
 *   - Neither the name of Oracle nor the names of its
 *     contributors may be used to endorse or promote products derived
 *     from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
 * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
 * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

/*
 * This source code is provided to illustrate the usage of a given feature
 * or technique and has been deliberately simplified. Additional steps
 * required for a production-quality application, such as security checks,
 * input validation and proper error handling, might not be present in
 * this sample code.
 */


import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;

/**
 * Client represents a remote connection to the chat server.
 * It contains methods for reading and writing messages from the
 * channel.
 * Messages are considered to be separated by newline, so incomplete
 * messages are buffered in the {@code Client}.
 *
 * All reads and writes are asynchronous and uses the nio2 asynchronous
 * elements.
 */
class Client {
    private final AsynchronousSocketChannel channel;
    private AtomicReference<ClientReader> reader;
    private String userName;
    private final StringBuilder messageBuffer = new StringBuilder();

    private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>();
    private boolean writing = false;

    public Client(AsynchronousSocketChannel channel, ClientReader reader) {
        this.channel = channel;
        this.reader = new AtomicReference<ClientReader>(reader);
    }

    /**
     * Enqueues a write of the buffer to the channel.
     * The call is asynchronous so the buffer is not safe to modify after
     * passing the buffer here.
     *
     * @param buffer the buffer to send to the channel
     */
    private void writeMessage(final ByteBuffer buffer) {
        boolean threadShouldWrite = false;

        synchronized(queue) {
            queue.add(buffer);
            // Currently no thread writing, make this thread dispatch a write
            if (!writing) {
                writing = true;
                threadShouldWrite = true;
            }
        }

        if (threadShouldWrite) {
            writeFromQueue();
        }
    }

    private void writeFromQueue() {
        ByteBuffer buffer;

        synchronized (queue) {
            buffer = queue.poll();
            if (buffer == null) {
                writing = false;
            }
        }

        // No new data in buffer to write
        if (writing) {
            writeBuffer(buffer);
        }
    }

    private void writeBuffer(ByteBuffer buffer) {
        channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                if (buffer.hasRemaining()) {
                    channel.write(buffer, buffer, this);
                } else {
                    // Go back and check if there is new data to write
                    writeFromQueue();
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
            }
        });
    }

    /**
     * Sends a message
     * @param string the message
     */
    public void writeStringMessage(String string) {
        writeMessage(ByteBuffer.wrap(string.getBytes()));
    }

    /**
     * Send a message from a specific client
     * @param client the message is sent from
     * @param message to send
     */
    public void writeMessageFrom(Client client, String message) {
        if (reader.get().acceptsMessages()) {
            writeStringMessage(client.getUserName() + ": " + message);
        }
    }

    /**
     * Enqueue a read
     * @param completionHandler callback on completed read
     */
    public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) {
        ByteBuffer input = ByteBuffer.allocate(256);
        if (!channel.isOpen()) {
            return;
        }
        channel.read(input, input, completionHandler);
    }

    /**
     * Closes the channel
     */
    public void close() {
        try {
            channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * Run the current states actions.
     */
    public void run() {
        reader.get().run(this);
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public void setReader(ClientReader reader) {
        this.reader.set(reader);
    }

    public String getUserName() {
        return userName;
    }

    public void appendMessage(String message) {
        synchronized (messageBuffer) {
            messageBuffer.append(message);
        }
    }

    /**
     * @return the next newline separated message in the buffer. null is returned if the buffer
     * doesn't contain any newline.
     */
    public String nextMessage() {
        synchronized(messageBuffer) {
            int nextNewline = messageBuffer.indexOf("\n");
            if (nextNewline == -1) {
                return null;
            }
            String message = messageBuffer.substring(0, nextNewline + 1);
            messageBuffer.delete(0, nextNewline + 1);
            return message;
        }
    }
}