C++程序  |  151行  |  3.86 KB

// StreamBinder.cpp

#include "StdAfx.h"

#include "StreamBinder.h"
#include "../../Common/Defs.h"
#include "../../Common/MyCom.h"

using namespace NWindows;
using namespace NSynchronization;

class CSequentialInStreamForBinder:
  public ISequentialInStream,
  public CMyUnknownImp
{
public:
  MY_UNKNOWN_IMP

  STDMETHOD(Read)(void *data, UInt32 size, UInt32 *processedSize);
private:
  CStreamBinder *m_StreamBinder;
public:
  ~CSequentialInStreamForBinder() { m_StreamBinder->CloseRead(); }
  void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; }
};

STDMETHODIMP CSequentialInStreamForBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
  { return m_StreamBinder->Read(data, size, processedSize); }

class CSequentialOutStreamForBinder:
  public ISequentialOutStream,
  public CMyUnknownImp
{
public:
  MY_UNKNOWN_IMP

  STDMETHOD(Write)(const void *data, UInt32 size, UInt32 *processedSize);

private:
  CStreamBinder *m_StreamBinder;
public:
  ~CSequentialOutStreamForBinder() {  m_StreamBinder->CloseWrite(); }
  void SetBinder(CStreamBinder *streamBinder) { m_StreamBinder = streamBinder; }
};

STDMETHODIMP CSequentialOutStreamForBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
  { return m_StreamBinder->Write(data, size, processedSize); }


//////////////////////////
// CStreamBinder
// (_thereAreBytesToReadEvent && _bufferSize == 0) means that stream is finished.

HRes CStreamBinder::CreateEvents()
{
  RINOK(_allBytesAreWritenEvent.Create(true));
  RINOK(_thereAreBytesToReadEvent.Create());
  return _readStreamIsClosedEvent.Create();
}

void CStreamBinder::ReInit()
{
  _thereAreBytesToReadEvent.Reset();
  _readStreamIsClosedEvent.Reset();
  ProcessedSize = 0;
}


  
void CStreamBinder::CreateStreams(ISequentialInStream **inStream,
      ISequentialOutStream **outStream)
{
  CSequentialInStreamForBinder *inStreamSpec = new
      CSequentialInStreamForBinder;
  CMyComPtr<ISequentialInStream> inStreamLoc(inStreamSpec);
  inStreamSpec->SetBinder(this);
  *inStream = inStreamLoc.Detach();

  CSequentialOutStreamForBinder *outStreamSpec = new
      CSequentialOutStreamForBinder;
  CMyComPtr<ISequentialOutStream> outStreamLoc(outStreamSpec);
  outStreamSpec->SetBinder(this);
  *outStream = outStreamLoc.Detach();

  _buffer = NULL;
  _bufferSize= 0;
  ProcessedSize = 0;
}

HRESULT CStreamBinder::Read(void *data, UInt32 size, UInt32 *processedSize)
{
  UInt32 sizeToRead = size;
  if (size > 0)
  {
    RINOK(_thereAreBytesToReadEvent.Lock());
    sizeToRead = MyMin(_bufferSize, size);
    if (_bufferSize > 0)
    {
      memcpy(data, _buffer, sizeToRead);
      _buffer = ((const Byte *)_buffer) + sizeToRead;
      _bufferSize -= sizeToRead;
      if (_bufferSize == 0)
      {
        _thereAreBytesToReadEvent.Reset();
        _allBytesAreWritenEvent.Set();
      }
    }
  }
  if (processedSize != NULL)
    *processedSize = sizeToRead;
  ProcessedSize += sizeToRead;
  return S_OK;
}

void CStreamBinder::CloseRead()
{
  _readStreamIsClosedEvent.Set();
}

HRESULT CStreamBinder::Write(const void *data, UInt32 size, UInt32 *processedSize)
{
  if (size > 0)
  {
    _buffer = data;
    _bufferSize = size;
    _allBytesAreWritenEvent.Reset();
    _thereAreBytesToReadEvent.Set();

    HANDLE events[2];
    events[0] = _allBytesAreWritenEvent;
    events[1] = _readStreamIsClosedEvent;
    DWORD waitResult = ::WaitForMultipleObjects(2, events, FALSE, INFINITE);
    if (waitResult != WAIT_OBJECT_0 + 0)
    {
      // ReadingWasClosed = true;
      return S_FALSE;
    }
    // if(!_allBytesAreWritenEvent.Lock())
    //   return E_FAIL;
  }
  if (processedSize != NULL)
    *processedSize = size;
  return S_OK;
}

void CStreamBinder::CloseWrite()
{
  // _bufferSize must be = 0
  _thereAreBytesToReadEvent.Set();
}