C++程序  |  138行  |  3.4 KB

/*
 * Copyright (C) 2015 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "fec_private.h"

struct process_info {
    int id;
    fec_handle *f;
    uint8_t *buf;
    size_t count;
    uint64_t offset;
    read_func func;
    ssize_t rc;
    size_t errors;
};

/* thread function  */
static void * __process(void *cookie)
{
    process_info *p = static_cast<process_info *>(cookie);

    debug("thread %d: [%" PRIu64 ", %" PRIu64 ")", p->id, p->offset,
        p->offset + p->count);

    p->rc = p->func(p->f, p->buf, p->count, p->offset, &p->errors);
    return p;
}

/* launches a maximum number of threads to process a read */
ssize_t process(fec_handle *f, uint8_t *buf, size_t count, uint64_t offset,
        read_func func)
{
    check(f);
    check(buf)
    check(func);

    if (count == 0) {
        return 0;
    }

    int threads = sysconf(_SC_NPROCESSORS_ONLN);

    if (threads < WORK_MIN_THREADS) {
        threads = WORK_MIN_THREADS;
    } else if (threads > WORK_MAX_THREADS) {
        threads = WORK_MAX_THREADS;
    }

    uint64_t start = (offset / FEC_BLOCKSIZE) * FEC_BLOCKSIZE;
    size_t blocks = fec_div_round_up(count, FEC_BLOCKSIZE);

    if ((size_t)threads > blocks) {
        threads = (int)blocks;
    }

    size_t count_per_thread = fec_div_round_up(blocks, threads) * FEC_BLOCKSIZE;
    size_t left = count;
    uint64_t pos = offset;
    uint64_t end = start + count_per_thread;

    debug("%d threads, %zu bytes per thread (total %zu)", threads,
        count_per_thread, count);

    std::vector<pthread_t> handles;
    process_info info[threads];
    ssize_t rc = 0;

    /* start threads to process queue */
    for (int i = 0; i < threads; ++i) {
        check(left > 0);

        info[i].id = i;
        info[i].f = f;
        info[i].buf = &buf[pos - offset];
        info[i].count = (size_t)(end - pos);
        info[i].offset = pos;
        info[i].func = func;
        info[i].rc = -1;
        info[i].errors = 0;

        if (info[i].count > left) {
            info[i].count = left;
        }

        pthread_t thread;

        if (pthread_create(&thread, NULL, __process, &info[i]) != 0) {
            error("failed to create thread: %s", strerror(errno));
            rc = -1;
        } else {
            handles.push_back(thread);
        }

        pos = end;
        end  += count_per_thread;
        left -= info[i].count;
    }

    check(left == 0);

    ssize_t nread = 0;

    /* wait for all threads to complete */
    for (auto thread : handles) {
        process_info *p = NULL;

        if (pthread_join(thread, (void **)&p) != 0) {
            error("failed to join thread: %s", strerror(errno));
            rc = -1;
        } else if (!p || p->rc == -1) {
            rc = -1;
        } else {
            nread += p->rc;
            f->errors += p->errors;
        }
    }

    if (rc == -1) {
        errno = EIO;
        return -1;
    }

    return nread;
}