/*
* libhdfs engine
*
* this engine helps perform read/write operations on hdfs cluster using
* libhdfs. hdfs doesnot support modification of data once file is created.
*
* so to mimic that create many files of small size (e.g 256k), and this
* engine select a file based on the offset generated by fio.
*
* thus, random reads and writes can also be achieved with this logic.
*
* NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
* to appropriate value to work this engine properly
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/uio.h>
#include <errno.h>
#include <assert.h>
#include "../fio.h"
#include "hdfs.h"
struct hdfsio_data {
char host[256];
int port;
hdfsFS fs;
hdfsFile fp;
unsigned long fsbs;
unsigned long fscount;
unsigned long curr_file_id;
unsigned int numjobs;
unsigned int fid_correction;
};
static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
{
/* make sure that hdfsConnect is invoked before executing this function */
hdfsSetWorkingDirectory(hd->fs, "/.perftest");
hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
if (hd->fp) {
hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
hdfsCloseFile(hd->fs, hd->fp);
}
hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
if (hd->fp) {
hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
hdfsCloseFile(hd->fs, hd->fp);
}
return 0;
}
static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
{
struct hdfsio_data *hd;
hdfsFileInfo *fi;
unsigned long f_id;
char fname[80];
int open_flags = 0;
hd = td->io_ops->data;
if (hd->curr_file_id == -1) {
/* see comment in fio_hdfsio_setup() function */
fio_hdfsio_setup_fs_params(hd);
}
/* find out file id based on the offset generated by fio */
f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
if (f_id == hd->curr_file_id) {
/* file is already open */
return 0;
}
if (hd->curr_file_id != -1) {
hdfsCloseFile(hd->fs, hd->fp);
}
if (io_u->ddir == DDIR_READ) {
open_flags = O_RDONLY;
} else if (io_u->ddir == DDIR_WRITE) {
open_flags = O_WRONLY;
} else {
log_err("hdfs: Invalid I/O Operation\n");
}
hd->curr_file_id = f_id;
do {
sprintf(fname, ".f%lu", f_id);
fi = hdfsGetPathInfo(hd->fs, fname);
if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
/* file has enough data to read OR file is opened in write mode */
hd->fp =
hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
hd->fsbs);
if (hd->fp) {
break;
}
}
/* file is empty, so try next file for reading */
f_id = (f_id + 1) % hd->fscount;
} while (1);
return 0;
}
static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
{
if (ret != (int)io_u->xfer_buflen) {
if (ret >= 0) {
io_u->resid = io_u->xfer_buflen - ret;
io_u->error = 0;
return FIO_Q_COMPLETED;
} else
io_u->error = errno;
}
if (io_u->error)
td_verror(td, io_u->error, "xfer");
return FIO_Q_COMPLETED;
}
static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
{
struct hdfsio_data *hd;
int ret = 0;
hd = td->io_ops->data;
if (io_u->ddir == DDIR_READ) {
ret =
hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
} else if (io_u->ddir == DDIR_WRITE) {
ret =
hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
io_u->xfer_buflen);
} else {
log_err("hdfs: Invalid I/O Operation\n");
}
return fio_io_end(td, io_u, ret);
}
int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
{
struct hdfsio_data *hd;
hd = td->io_ops->data;
hd->fs = hdfsConnect(hd->host, hd->port);
hdfsSetWorkingDirectory(hd->fs, "/.perftest");
hd->fid_correction = (getpid() % hd->numjobs);
return 0;
}
int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
{
struct hdfsio_data *hd;
hd = td->io_ops->data;
hdfsDisconnect(hd->fs);
return 0;
}
static int fio_hdfsio_setup(struct thread_data *td)
{
struct hdfsio_data *hd;
struct fio_file *f;
static unsigned int numjobs = 1; /* atleast one job has to be there! */
numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
if (!td->io_ops->data) {
hd = malloc(sizeof(*hd));;
memset(hd, 0, sizeof(*hd));
td->io_ops->data = hd;
/* separate host and port from filename */
*(strchr(td->o.filename, ',')) = ' ';
sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
/* read fbs and fcount and based on that set f->real_file_size */
f = td->files[0];
#if 0
/* IMHO, this should be done here instead of fio_hdfsio_prep()
* but somehow calling it here doesn't seem to work,
* some problem with libhdfs that needs to be debugged */
hd->fs = hdfsConnect(hd->host, hd->port);
fio_hdfsio_setup_fs_params(hd);
hdfsDisconnect(hd->fs);
#else
/* so, as an alternate, using environment variables */
if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
hd->fsbs = atol(getenv("FIO_HDFS_BS"));
} else {
log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
return 1;
}
#endif
f->real_file_size = hd->fscount * hd->fsbs;
td->o.nr_files = 1;
hd->curr_file_id = -1;
hd->numjobs = numjobs;
fio_file_set_size_known(f);
}
return 0;
}
static struct ioengine_ops ioengine_hdfs = {
.name = "libhdfs",
.version = FIO_IOOPS_VERSION,
.setup = fio_hdfsio_setup,
.prep = fio_hdfsio_prep,
.queue = fio_hdfsio_queue,
.open_file = fio_hdfsio_open_file,
.close_file = fio_hdfsio_close_file,
.flags = FIO_SYNCIO,
};
static void fio_init fio_hdfsio_register(void)
{
register_ioengine(&ioengine_hdfs);
}
static void fio_exit fio_hdfsio_unregister(void)
{
unregister_ioengine(&ioengine_hdfs);
}