C++程序  |  774行  |  25.1 KB

/* GIO - GLib Input, Output and Streaming Library
 * 
 * Copyright (C) 2006-2007 Red Hat, Inc.
 *
 * This library is free software; you can redistribute it and/or
 * modify it under the terms of the GNU Lesser General Public
 * License as published by the Free Software Foundation; either
 * version 2 of the License, or (at your option) any later version.
 *
 * This library is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Lesser General Public License for more details.
 *
 * You should have received a copy of the GNU Lesser General
 * Public License along with this library; if not, write to the
 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
 * Boston, MA 02111-1307, USA.
 *
 * Author: Christian Kellner <gicmo@gnome.org> 
 */

#include "config.h"
#include "gbufferedoutputstream.h"
#include "goutputstream.h"
#include "gsimpleasyncresult.h"
#include "string.h"
#include "glibintl.h"

#include <gioalias.h>

/**
 * SECTION:gbufferedoutputstream
 * @short_description: Buffered Output Stream
 * @include: gio/gio.h
 * @see_also: #GFilterOutputStream, #GOutputStream
 * 
 * Buffered output stream implements #GFilterOutputStream and provides 
 * for buffered writes. 
 * 
 * By default, #GBufferedOutputStream's buffer size is set at 4 kilobytes.
 * 
 * To create a buffered output stream, use g_buffered_output_stream_new(), 
 * or g_buffered_output_stream_new_sized() to specify the buffer's size 
 * at construction.
 * 
 * To get the size of a buffer within a buffered input stream, use 
 * g_buffered_output_stream_get_buffer_size(). To change the size of a 
 * buffered output stream's buffer, use 
 * g_buffered_output_stream_set_buffer_size(). Note that the buffer's 
 * size cannot be reduced below the size of the data within the buffer.
 **/

#define DEFAULT_BUFFER_SIZE 4096

struct _GBufferedOutputStreamPrivate {
  guint8 *buffer; 
  gsize   len;
  goffset pos;
  gboolean auto_grow;
};

enum {
  PROP_0,
  PROP_BUFSIZE,
  PROP_AUTO_GROW
};

static void     g_buffered_output_stream_set_property (GObject      *object,
                                                       guint         prop_id,
                                                       const GValue *value,
                                                       GParamSpec   *pspec);

static void     g_buffered_output_stream_get_property (GObject    *object,
                                                       guint       prop_id,
                                                       GValue     *value,
                                                       GParamSpec *pspec);
static void     g_buffered_output_stream_finalize     (GObject *object);


static gssize   g_buffered_output_stream_write        (GOutputStream *stream,
                                                       const void    *buffer,
                                                       gsize          count,
                                                       GCancellable  *cancellable,
                                                       GError       **error);
static gboolean g_buffered_output_stream_flush        (GOutputStream    *stream,
                                                       GCancellable  *cancellable,
                                                       GError          **error);
static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
                                                       GCancellable   *cancellable,
                                                       GError        **error);

static void     g_buffered_output_stream_write_async  (GOutputStream        *stream,
                                                       const void           *buffer,
                                                       gsize                 count,
                                                       int                   io_priority,
                                                       GCancellable         *cancellable,
                                                       GAsyncReadyCallback   callback,
                                                       gpointer              data);
static gssize   g_buffered_output_stream_write_finish (GOutputStream        *stream,
                                                       GAsyncResult         *result,
                                                       GError              **error);
static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
                                                       int                   io_priority,
                                                       GCancellable         *cancellable,
                                                       GAsyncReadyCallback   callback,
                                                       gpointer              data);
static gboolean g_buffered_output_stream_flush_finish (GOutputStream        *stream,
                                                       GAsyncResult         *result,
                                                       GError              **error);
static void     g_buffered_output_stream_close_async  (GOutputStream        *stream,
                                                       int                   io_priority,
                                                       GCancellable         *cancellable,
                                                       GAsyncReadyCallback   callback,
                                                       gpointer              data);
static gboolean g_buffered_output_stream_close_finish (GOutputStream        *stream,
                                                       GAsyncResult         *result,
                                                       GError              **error);

G_DEFINE_TYPE (GBufferedOutputStream,
               g_buffered_output_stream,
               G_TYPE_FILTER_OUTPUT_STREAM)


static void
g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
{
  GObjectClass *object_class;
  GOutputStreamClass *ostream_class;
 
  g_type_class_add_private (klass, sizeof (GBufferedOutputStreamPrivate));

  object_class = G_OBJECT_CLASS (klass);
  object_class->get_property = g_buffered_output_stream_get_property;
  object_class->set_property = g_buffered_output_stream_set_property;
  object_class->finalize     = g_buffered_output_stream_finalize;

  ostream_class = G_OUTPUT_STREAM_CLASS (klass);
  ostream_class->write_fn = g_buffered_output_stream_write;
  ostream_class->flush = g_buffered_output_stream_flush;
  ostream_class->close_fn = g_buffered_output_stream_close;
  ostream_class->write_async  = g_buffered_output_stream_write_async;
  ostream_class->write_finish = g_buffered_output_stream_write_finish;
  ostream_class->flush_async  = g_buffered_output_stream_flush_async;
  ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
  ostream_class->close_async  = g_buffered_output_stream_close_async;
  ostream_class->close_finish = g_buffered_output_stream_close_finish;

  g_object_class_install_property (object_class,
                                   PROP_BUFSIZE,
                                   g_param_spec_uint ("buffer-size",
                                                      P_("Buffer Size"),
                                                      P_("The size of the backend buffer"),
                                                      1,
                                                      G_MAXUINT,
                                                      DEFAULT_BUFFER_SIZE,
                                                      G_PARAM_READWRITE|G_PARAM_CONSTRUCT|
                                                      G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));

  g_object_class_install_property (object_class,
                                   PROP_AUTO_GROW,
                                   g_param_spec_boolean ("auto-grow",
                                                         P_("Auto-grow"),
                                                         P_("Whether the buffer should automatically grow"),
                                                         FALSE,
                                                         G_PARAM_READWRITE|
                                                         G_PARAM_STATIC_NAME|G_PARAM_STATIC_NICK|G_PARAM_STATIC_BLURB));

}

/**
 * g_buffered_output_stream_get_buffer_size:
 * @stream: a #GBufferedOutputStream.
 * 
 * Gets the size of the buffer in the @stream.
 * 
 * Returns: the current size of the buffer.
 **/
gsize
g_buffered_output_stream_get_buffer_size (GBufferedOutputStream *stream)
{
  g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), -1);

  return stream->priv->len;
}

/**
 * g_buffered_output_stream_set_buffer_size:
 * @stream: a #GBufferedOutputStream.
 * @size: a #gsize.
 *
 * Sets the size of the internal buffer to @size.
 **/    
void
g_buffered_output_stream_set_buffer_size (GBufferedOutputStream *stream,
                                          gsize                  size)
{
  GBufferedOutputStreamPrivate *priv;
  guint8 *buffer;
  
  g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));

  priv = stream->priv;
  
  if (size == priv->len)
    return;

  if (priv->buffer)
    {
      size = MAX (size, priv->pos);

      buffer = g_malloc (size);
      memcpy (buffer, priv->buffer, priv->pos);
      g_free (priv->buffer);
      priv->buffer = buffer;
      priv->len = size;
      /* Keep old pos */
    }
  else
    {
      priv->buffer = g_malloc (size);
      priv->len = size;
      priv->pos = 0;
    }

  g_object_notify (G_OBJECT (stream), "buffer-size");
}

/**
 * g_buffered_output_stream_get_auto_grow:
 * @stream: a #GBufferedOutputStream.
 * 
 * Checks if the buffer automatically grows as data is added.
 * 
 * Returns: %TRUE if the @stream's buffer automatically grows,
 * %FALSE otherwise.
 **/  
gboolean
g_buffered_output_stream_get_auto_grow (GBufferedOutputStream *stream)
{
  g_return_val_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream), FALSE);

  return stream->priv->auto_grow;
}

/**
 * g_buffered_output_stream_set_auto_grow:
 * @stream: a #GBufferedOutputStream.
 * @auto_grow: a #gboolean.
 *
 * Sets whether or not the @stream's buffer should automatically grow.
 * If @auto_grow is true, then each write will just make the buffer
 * larger, and you must manually flush the buffer to actually write out
 * the data to the underlying stream.
 **/
void
g_buffered_output_stream_set_auto_grow (GBufferedOutputStream *stream,
                                        gboolean               auto_grow)
{
  GBufferedOutputStreamPrivate *priv;
  g_return_if_fail (G_IS_BUFFERED_OUTPUT_STREAM (stream));
  priv = stream->priv;
  auto_grow = auto_grow != FALSE;
  if (priv->auto_grow != auto_grow)
    {
      priv->auto_grow = auto_grow;
      g_object_notify (G_OBJECT (stream), "auto-grow");
    }
}

static void
g_buffered_output_stream_set_property (GObject      *object,
                                       guint         prop_id,
                                       const GValue *value,
                                       GParamSpec   *pspec)
{
  GBufferedOutputStream *stream;

  stream = G_BUFFERED_OUTPUT_STREAM (object);

  switch (prop_id) 
    {
    case PROP_BUFSIZE:
      g_buffered_output_stream_set_buffer_size (stream, g_value_get_uint (value));
      break;    

    case PROP_AUTO_GROW:
      g_buffered_output_stream_set_auto_grow (stream, g_value_get_boolean (value));
      break;

    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
    }

}

static void
g_buffered_output_stream_get_property (GObject    *object,
                                       guint       prop_id,
                                       GValue     *value,
                                       GParamSpec *pspec)
{
  GBufferedOutputStream *buffered_stream;
  GBufferedOutputStreamPrivate *priv;

  buffered_stream = G_BUFFERED_OUTPUT_STREAM (object);
  priv = buffered_stream->priv;

  switch (prop_id)
    {
    case PROP_BUFSIZE:
      g_value_set_uint (value, priv->len);
      break;

    case PROP_AUTO_GROW:
      g_value_set_boolean (value, priv->auto_grow);
      break;

    default:
      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
      break;
    }

}

static void
g_buffered_output_stream_finalize (GObject *object)
{
  GBufferedOutputStream *stream;
  GBufferedOutputStreamPrivate *priv;

  stream = G_BUFFERED_OUTPUT_STREAM (object);
  priv = stream->priv;

  g_free (priv->buffer);

  G_OBJECT_CLASS (g_buffered_output_stream_parent_class)->finalize (object);
}

static void
g_buffered_output_stream_init (GBufferedOutputStream *stream)
{
  stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
                                              G_TYPE_BUFFERED_OUTPUT_STREAM,
                                              GBufferedOutputStreamPrivate);

}

/**
 * g_buffered_output_stream_new:
 * @base_stream: a #GOutputStream.
 * 
 * Creates a new buffered output stream for a base stream.
 * 
 * Returns: a #GOutputStream for the given @base_stream.
 **/  
GOutputStream *
g_buffered_output_stream_new (GOutputStream *base_stream)
{
  GOutputStream *stream;

  g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);

  stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
                         "base-stream", base_stream,
                         NULL);

  return stream;
}

/**
 * g_buffered_output_stream_new_sized:
 * @base_stream: a #GOutputStream.
 * @size: a #gsize.
 * 
 * Creates a new buffered output stream with a given buffer size.
 * 
 * Returns: a #GOutputStream with an internal buffer set to @size.
 **/  
GOutputStream *
g_buffered_output_stream_new_sized (GOutputStream *base_stream,
                                    gsize          size)
{
  GOutputStream *stream;

  g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), NULL);

  stream = g_object_new (G_TYPE_BUFFERED_OUTPUT_STREAM,
                         "base-stream", base_stream,
                         "buffer-size", size,
                         NULL);

  return stream;
}

static gboolean
flush_buffer (GBufferedOutputStream  *stream,
              GCancellable           *cancellable,
              GError                 **error)
{
  GBufferedOutputStreamPrivate *priv;
  GOutputStream                *base_stream;
  gboolean                      res;
  gsize                         bytes_written;
  gsize                         count;

  priv = stream->priv;
  bytes_written = 0;
  base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;

  g_return_val_if_fail (G_IS_OUTPUT_STREAM (base_stream), FALSE);

  res = g_output_stream_write_all (base_stream,
                                   priv->buffer,
                                   priv->pos,
                                   &bytes_written,
                                   cancellable,
                                   error);

  count = priv->pos - bytes_written;

  if (count > 0)
    g_memmove (priv->buffer, priv->buffer + bytes_written, count);
  
  priv->pos -= bytes_written;

  return res;
}

static gssize
g_buffered_output_stream_write  (GOutputStream *stream,
                                 const void    *buffer,
                                 gsize          count,
                                 GCancellable  *cancellable,
                                 GError       **error)
{
  GBufferedOutputStream        *bstream;
  GBufferedOutputStreamPrivate *priv;
  gboolean res;
  gsize    n;
  gsize new_size;

  bstream = G_BUFFERED_OUTPUT_STREAM (stream);
  priv = bstream->priv;

  n = priv->len - priv->pos;

  if (priv->auto_grow && n < count)
    {
      new_size = MAX (priv->len * 2, priv->len + count);
      g_buffered_output_stream_set_buffer_size (bstream, new_size);
    }
  else if (n == 0)
    {
      res = flush_buffer (bstream, cancellable, error);
      
      if (res == FALSE)
	return -1;
    }

  n = priv->len - priv->pos;
  
  count = MIN (count, n);
  memcpy (priv->buffer + priv->pos, buffer, count);
  priv->pos += count;

  return count;
}

static gboolean
g_buffered_output_stream_flush (GOutputStream  *stream,
                                GCancellable   *cancellable,
                                GError        **error)
{
  GBufferedOutputStream *bstream;
  GBufferedOutputStreamPrivate *priv;
  GOutputStream                *base_stream;
  gboolean res;

  bstream = G_BUFFERED_OUTPUT_STREAM (stream);
  priv = bstream->priv;
  base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;

  res = flush_buffer (bstream, cancellable, error);

  if (res == FALSE)
    return FALSE;

  res = g_output_stream_flush (base_stream, cancellable, error);

  return res;
}

static gboolean
g_buffered_output_stream_close (GOutputStream  *stream,
                                GCancellable   *cancellable,
                                GError        **error)
{
  GBufferedOutputStream        *bstream;
  GBufferedOutputStreamPrivate *priv;
  GOutputStream                *base_stream;
  gboolean                      res;

  bstream = G_BUFFERED_OUTPUT_STREAM (stream);
  priv = bstream->priv;
  base_stream = G_FILTER_OUTPUT_STREAM (bstream)->base_stream;

  res = flush_buffer (bstream, cancellable, error);

  if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
    {
      /* report the first error but still close the stream */
      if (res)
        res = g_output_stream_close (base_stream, cancellable, error); 
      else
        g_output_stream_close (base_stream, cancellable, NULL); 
    }

  return res;
}

/* ************************** */
/* Async stuff implementation */
/* ************************** */

/* TODO: This should be using the base class async ops, not threads */

typedef struct {

  guint flush_stream : 1;
  guint close_stream : 1;

} FlushData;

static void
free_flush_data (gpointer data)
{
  g_slice_free (FlushData, data);
}

/* This function is used by all three (i.e. 
 * _write, _flush, _close) functions since
 * all of them will need to flush the buffer
 * and so closing and writing is just a special
 * case of flushing + some addition stuff */
static void
flush_buffer_thread (GSimpleAsyncResult *result,
                     GObject            *object,
                     GCancellable       *cancellable)
{
  GBufferedOutputStream *stream;
  GOutputStream *base_stream;
  FlushData     *fdata;
  gboolean       res;
  GError        *error = NULL;

  stream = G_BUFFERED_OUTPUT_STREAM (object);
  fdata = g_simple_async_result_get_op_res_gpointer (result);
  base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;

  res = flush_buffer (stream, cancellable, &error);

  /* if flushing the buffer didn't work don't even bother
   * to flush the stream but just report that error */
  if (res && fdata->flush_stream)
    res = g_output_stream_flush (base_stream, cancellable, &error);

  if (fdata->close_stream) 
    {
     
      /* if flushing the buffer or the stream returned 
       * an error report that first error but still try 
       * close the stream */
      if (g_filter_output_stream_get_close_base_stream (G_FILTER_OUTPUT_STREAM (stream)))
        {
          if (res == FALSE)
            g_output_stream_close (base_stream, cancellable, NULL);
          else 
            res = g_output_stream_close (base_stream, cancellable, &error);
        }
    }

  if (res == FALSE)
    {
      g_simple_async_result_set_from_error (result, error);
      g_error_free (error);
    }
}

typedef struct {
    
  FlushData fdata;

  gsize  count;
  const void  *buffer;

} WriteData;

static void 
free_write_data (gpointer data)
{
  g_slice_free (WriteData, data);
}

static void
g_buffered_output_stream_write_async (GOutputStream        *stream,
                                      const void           *buffer,
                                      gsize                 count,
                                      int                   io_priority,
                                      GCancellable         *cancellable,
                                      GAsyncReadyCallback   callback,
                                      gpointer              data)
{
  GBufferedOutputStream *buffered_stream;
  GBufferedOutputStreamPrivate *priv;
  GSimpleAsyncResult *res;
  WriteData *wdata;

  buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
  priv = buffered_stream->priv;

  wdata = g_slice_new (WriteData);
  wdata->count  = count;
  wdata->buffer = buffer;

  res = g_simple_async_result_new (G_OBJECT (stream),
                                   callback,
                                   data,
                                   g_buffered_output_stream_write_async);

  g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);

  /* if we have space left directly call the
   * callback (from idle) otherwise schedule a buffer 
   * flush in the thread. In both cases the actual
   * copying of the data to the buffer will be done in
   * the write_finish () func since that should
   * be fast enough */
  if (priv->len - priv->pos > 0)
    {
      g_simple_async_result_complete_in_idle (res);
    }
  else
    {
      wdata->fdata.flush_stream = FALSE;
      wdata->fdata.close_stream = FALSE;
      g_simple_async_result_run_in_thread (res, 
                                           flush_buffer_thread, 
                                           io_priority,
                                           cancellable);
      g_object_unref (res);
    }
}

static gssize
g_buffered_output_stream_write_finish (GOutputStream        *stream,
                                       GAsyncResult         *result,
                                       GError              **error)
{
  GBufferedOutputStreamPrivate *priv;
  GBufferedOutputStream        *buffered_stream;
  GSimpleAsyncResult *simple;
  WriteData          *wdata;
  gssize              count;

  simple = G_SIMPLE_ASYNC_RESULT (result);
  buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
  priv = buffered_stream->priv;

  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
            g_buffered_output_stream_write_async);

  wdata = g_simple_async_result_get_op_res_gpointer (simple);

  /* Now do the real copying of data to the buffer */
  count = priv->len - priv->pos; 
  count = MIN (wdata->count, count);

  memcpy (priv->buffer + priv->pos, wdata->buffer, count);
  
  priv->pos += count;

  return count;
}

static void
g_buffered_output_stream_flush_async (GOutputStream        *stream,
                                      int                   io_priority,
                                      GCancellable         *cancellable,
                                      GAsyncReadyCallback   callback,
                                      gpointer              data)
{
  GSimpleAsyncResult *res;
  FlushData          *fdata;

  fdata = g_slice_new (FlushData);
  fdata->flush_stream = TRUE;
  fdata->close_stream = FALSE;

  res = g_simple_async_result_new (G_OBJECT (stream),
                                   callback,
                                   data,
                                   g_buffered_output_stream_flush_async);

  g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);

  g_simple_async_result_run_in_thread (res, 
                                       flush_buffer_thread, 
                                       io_priority,
                                       cancellable);
  g_object_unref (res);
}

static gboolean
g_buffered_output_stream_flush_finish (GOutputStream        *stream,
                                       GAsyncResult         *result,
                                       GError              **error)
{
  GSimpleAsyncResult *simple;

  simple = G_SIMPLE_ASYNC_RESULT (result);

  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
            g_buffered_output_stream_flush_async);

  return TRUE;
}

static void
g_buffered_output_stream_close_async (GOutputStream        *stream,
                                      int                   io_priority,
                                      GCancellable         *cancellable,
                                      GAsyncReadyCallback   callback,
                                      gpointer              data)
{
  GSimpleAsyncResult *res;
  FlushData          *fdata;

  fdata = g_slice_new (FlushData);
  fdata->close_stream = TRUE;

  res = g_simple_async_result_new (G_OBJECT (stream),
                                   callback,
                                   data,
                                   g_buffered_output_stream_close_async);

  g_simple_async_result_set_op_res_gpointer (res, fdata, free_flush_data);

  g_simple_async_result_run_in_thread (res, 
                                       flush_buffer_thread, 
                                       io_priority,
                                       cancellable);
  g_object_unref (res);
}

static gboolean
g_buffered_output_stream_close_finish (GOutputStream        *stream,
                                       GAsyncResult         *result,
                                       GError              **error)
{
  GSimpleAsyncResult *simple;

  simple = G_SIMPLE_ASYNC_RESULT (result);

  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
            g_buffered_output_stream_close_async);

  return TRUE;
}

#define __G_BUFFERED_OUTPUT_STREAM_C__
#include "gioaliasdef.c"