C++程序  |  274行  |  6.15 KB

/*
 * thread_pool.cpp - Thread Pool
 *
 *  Copyright (c) 2017 Intel Corporation
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * Author: Wind Yuan <feng.yuan@intel.com>
 */

#include "thread_pool.h"

#define XCAM_POOL_MIN_THREADS 2
#define XCAM_POOL_MAX_THREADS 1024

namespace XCam {

class UserThread
    : public Thread
{
public:
    UserThread (const SmartPtr<ThreadPool> &pool, const char *name)
        : Thread (name)
        , _pool (pool)
    {}

protected:
    virtual bool started ();
    virtual void stopped ();
    virtual bool loop ();

private:
    SmartPtr<ThreadPool> _pool;
};

bool
UserThread::started ()
{
    XCAM_ASSERT (_pool.ptr ());
    SmartLock lock (_pool->_mutex);
    return true;
}

void
UserThread::stopped ()
{
    XCAM_LOG_DEBUG ("thread(%s, %p) stopped", XCAM_STR(get_name ()), this);
}

bool
UserThread::loop ()
{
    XCAM_ASSERT (_pool.ptr ());
    {
        SmartLock lock (_pool->_mutex);
        if (!_pool->_running)
            return false;
    }

    SmartPtr<ThreadPool::UserData> data = _pool->_data_queue.pop ();
    if (!data.ptr ()) {
        XCAM_LOG_DEBUG ("user thread(%s) get null data, need stop", XCAM_STR (_pool->get_name ()));
        return false;
    }

    {
        SmartLock lock (_pool->_mutex);
        XCAM_ASSERT (_pool->_free_threads > 0);
        --_pool->_free_threads;
    }

    bool ret = _pool->dispatch (data);

    if (ret) {
        SmartLock lock (_pool->_mutex);
        ++_pool->_free_threads;
    }
    return ret;
}

bool
ThreadPool::dispatch (const SmartPtr<ThreadPool::UserData> &data)
{
    XCAM_FAIL_RETURN (
        ERROR, data.ptr(), true,
        "ThreadPool(%s) dispatch NULL data", XCAM_STR (get_name ()));
    XCamReturn err = data->run ();
    data->done (err);
    return true;
}

ThreadPool::ThreadPool (const char *name)
    : _name (NULL)
    , _min_threads (XCAM_POOL_MIN_THREADS)
    , _max_threads (XCAM_POOL_MIN_THREADS)
    , _allocated_threads (0)
    , _free_threads (0)
    , _running (false)
{
    if (name)
        _name = strndup (name, XCAM_MAX_STR_SIZE);
}

ThreadPool::~ThreadPool ()
{
    stop ();

    xcam_mem_clear (_name);
}

bool
ThreadPool::set_threads (uint32_t min, uint32_t max)
{
    XCAM_FAIL_RETURN (
        ERROR, !_running, false,
        "ThreadPool(%s) set threads failed, need stop the pool first", XCAM_STR(get_name ()));

    if (min < XCAM_POOL_MIN_THREADS)
        min = XCAM_POOL_MIN_THREADS;
    if (max > XCAM_POOL_MAX_THREADS)
        max = XCAM_POOL_MAX_THREADS;

    if (min > max)
        min = max;

    _min_threads = min;
    _max_threads = max;
    return true;
}

bool
ThreadPool::is_running ()
{
    SmartLock locker(_mutex);
    return _running;
}

XCamReturn
ThreadPool::start ()
{
    SmartLock locker(_mutex);
    if (_running)
        return XCAM_RETURN_NO_ERROR;

    _free_threads = 0;
    _allocated_threads = 0;
    _data_queue.resume_pop ();

    for (uint32_t i = 0; i < _min_threads; ++i) {
        XCamReturn ret = create_user_thread_unsafe ();
        XCAM_FAIL_RETURN (
            ERROR, xcam_ret_is_ok (ret), ret,
            "thread pool(%s) start failed by creating user thread", XCAM_STR (get_name()));
    }

    XCAM_ASSERT (_allocated_threads == _min_threads);

    _running = true;
    return XCAM_RETURN_NO_ERROR;
}

XCamReturn
ThreadPool::stop ()
{
    UserThreadList threads;
    {
        SmartLock locker(_mutex);
        if (!_running)
            return XCAM_RETURN_NO_ERROR;

        _running = false;
        threads = _thread_list;
        _thread_list.clear ();
    }

    for (UserThreadList::iterator i = threads.begin (); i != threads.end (); ++i)
    {
        SmartPtr<UserThread> t = *i;
        XCAM_ASSERT (t.ptr ());
        t->emit_stop ();
    }

    _data_queue.pause_pop ();
    _data_queue.clear ();

    for (UserThreadList::iterator i = threads.begin (); i != threads.end (); ++i)
    {
        SmartPtr<UserThread> t = *i;
        XCAM_ASSERT (t.ptr ());
        t->stop ();
    }

    {
        SmartLock locker(_mutex);
        _free_threads = 0;
        _allocated_threads = 0;
    }

    return XCAM_RETURN_NO_ERROR;
}

XCamReturn
ThreadPool::create_user_thread_unsafe ()
{
    char name[256];
    snprintf (name, 255, "%s-%d", XCAM_STR (get_name()), _allocated_threads);
    SmartPtr<UserThread> thread = new UserThread (this, name);
    XCAM_ASSERT (thread.ptr ());
    XCAM_FAIL_RETURN (
        ERROR, thread.ptr () && thread->start (), XCAM_RETURN_ERROR_THREAD,
        "ThreadPool(%s) create user thread failed by starting error", XCAM_STR (get_name()));

    _thread_list.push_back (thread);

    ++_allocated_threads;
    ++_free_threads;
    XCAM_ASSERT (_free_threads <= _allocated_threads);

    return XCAM_RETURN_NO_ERROR;
}

XCamReturn
ThreadPool::queue (const SmartPtr<UserData> &data)
{
    XCAM_ASSERT (data.ptr ());
    {
        SmartLock locker (_mutex);
        if (!_running)
            return XCAM_RETURN_ERROR_THREAD;
    }

    if (!_data_queue.push (data))
        return XCAM_RETURN_ERROR_THREAD;

    do {
        SmartLock locker(_mutex);
        if (!_running) {
            _data_queue.erase (data);
            return XCAM_RETURN_ERROR_THREAD;
        }

        if (_allocated_threads >= _max_threads)
            break;

        if (!_free_threads)
            break;

        XCamReturn err = create_user_thread_unsafe ();
        if (!xcam_ret_is_ok (err) && _allocated_threads) {
            XCAM_LOG_WARNING ("thread pool(%s) create new thread failed but queue data can continue");
            break;
        }

        XCAM_FAIL_RETURN (
            ERROR, xcam_ret_is_ok (err), err,
            "thread pool(%s) queue data failed by creating user thread", XCAM_STR (get_name()));

    } while (0);

    return XCAM_RETURN_NO_ERROR;
}

}