/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ /* vim:set expandtab ts=4 shiftwidth=4: */ /* * Copyright (C) 2008 Sun Microsystems, Inc. All rights reserved. * Use is subject to license terms. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Lesser General Public * License as published by the Free Software Foundation; either * version 2 of the License, or (at your option) any later version. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General * Public License along with this library; if not, write to the * Free Software Foundation, Inc., 59 Temple Place, Suite 330, * Boston, MA 02111-1307, USA. * * Authors: Lin Ma <lin.ma@sun.com> */ #include "config.h" #include <port.h> #include <sys/types.h> #include <sys/time.h> #include <sys/stat.h> #include <errno.h> #include <glib.h> #include "fen-data.h" #include "fen-kernel.h" #include "fen-missing.h" #include "fen-dump.h" #define PROCESS_EVENTQ_TIME 10 /* in milliseconds */ #define PAIR_EVENTS_TIMEVAL 00000 /* in microseconds */ #define PAIR_EVENTS_INC_TIMEVAL 0000 /* in microseconds */ #define SCAN_CHANGINGS_TIME 50 /* in milliseconds */ #define SCAN_CHANGINGS_MAX_TIME (4*100) /* in milliseconds */ #define SCAN_CHANGINGS_MIN_TIME (4*100) /* in milliseconds */ #define INIT_CHANGES_NUM 2 #define BASE_NUM 2 #ifdef GIO_COMPILATION #define FD_W if (fd_debug_enabled) g_warning static gboolean fd_debug_enabled = FALSE; #else #include "gam_error.h" #define FD_W(...) GAM_DEBUG(DEBUG_INFO, __VA_ARGS__) #endif G_LOCK_EXTERN (fen_lock); static GList *deleting_data = NULL; static guint deleting_data_id = 0; static void (*emit_once_cb) (fdata *f, int events, gpointer sub); static void (*emit_cb) (fdata *f, int events); static int (*_event_converter) (int event); static gboolean fdata_delete (fdata* f); static gint fdata_sub_find (gpointer a, gpointer b); static void scan_children (node_t *f); static void scan_known_children (node_t* f); node_t* add_missing_cb (node_t* parent, gpointer user_data) { g_assert (parent); FD_W ("%s p:0x%p %s\n", __func__, parent, (gchar*)user_data); return add_node (parent, (gchar*)user_data); } gboolean pre_del_cb (node_t* node, gpointer user_data) { fdata* data; g_assert (node); data = node_get_data (node); FD_W ("%s node:0x%p %s\n", __func__, node, NODE_NAME(node)); if (data != NULL) { if (!FN_IS_PASSIVE(data)) { return FALSE; } fdata_delete (data); } return TRUE; } static guint _pow (guint x, guint y) { guint z = 1; g_assert (x >= 0 && y >= 0); for (; y > 0; y--) { z *= x; } return z; } static guint get_scalable_scan_time (fdata* data) { guint sleep_time; /* Caculate from num = 0 */ sleep_time = _pow (BASE_NUM, data->changed_event_num) * SCAN_CHANGINGS_TIME; if (sleep_time < SCAN_CHANGINGS_MIN_TIME) { sleep_time = SCAN_CHANGINGS_MIN_TIME; } else if (sleep_time > SCAN_CHANGINGS_MAX_TIME) { sleep_time = SCAN_CHANGINGS_MAX_TIME; data->change_update_id = INIT_CHANGES_NUM; } FD_W ("SCALABE SCAN num:time [ %4u : %4u ] %s\n", data->changed_event_num, sleep_time, FN_NAME(data)); return sleep_time; } static gboolean g_timeval_lt (GTimeVal *val1, GTimeVal *val2) { if (val1->tv_sec < val2->tv_sec) return TRUE; if (val1->tv_sec > val2->tv_sec) return FALSE; /* val1->tv_sec == val2->tv_sec */ if (val1->tv_usec < val2->tv_usec) return TRUE; return FALSE; } /** * If all active children nodes are ported, then cancel monitor the parent node * * Unsafe, need lock. */ static void scan_known_children (node_t* f) { GDir *dir; GError *err = NULL; fdata* pdata; FD_W ("%s %s [0x%p]\n", __func__, NODE_NAME(f), f); pdata = node_get_data (f); /* * Currect fdata must is directly monitored. Be sure it is 1 level monitor. */ dir = g_dir_open (NODE_NAME(f), 0, &err); if (dir) { const char *basename; while ((basename = g_dir_read_name (dir))) { node_t* childf = NULL; fdata* data; GList *idx; /* * If the node is existed, and isn't ported, then emit created * event. Ignore others. */ childf = children_find (f, basename); if (childf && (data = node_get_data (childf)) != NULL && !FN_IS_PASSIVE (data)) { if (!is_monitoring (data) && port_add (&data->fobj, &data->len, data)) { fdata_emit_events (data, FN_EVENT_CREATED); } } } g_dir_close (dir); } else { FD_W (err->message); g_error_free (err); } } static void scan_children (node_t *f) { GDir *dir; GError *err = NULL; fdata* pdata; FD_W ("%s %s [0x%p]\n", __func__, NODE_NAME(f), f); pdata = node_get_data (f); /* * Currect fdata must is directly monitored. Be sure it is 1 level monitor. */ dir = g_dir_open (NODE_NAME(f), 0, &err); if (dir) { const char *basename; while ((basename = g_dir_read_name (dir))) { node_t* childf = NULL; fdata* data; GList *idx; childf = children_find (f, basename); if (childf == NULL) { gchar *filename; filename = g_build_filename (NODE_NAME(f), basename, NULL); childf = add_node (f, filename); g_assert (childf); data = fdata_new (childf, FALSE); g_free (filename); } if ((data = node_get_data (childf)) == NULL) { data = fdata_new (childf, FALSE); } /* Be sure data isn't ported and add to port successfully */ /* Don't need delete it, it will be deleted by the parent */ if (is_monitoring (data)) { /* Ignored */ } else if (/* !is_ported (data) && */ port_add (&data->fobj, &data->len, data)) { fdata_emit_events (data, FN_EVENT_CREATED); } } g_dir_close (dir); } else { FD_W (err->message); g_error_free (err); } } static gboolean scan_deleting_data (gpointer data) { fdata *f; GList* i; GList* deleted_list = NULL; gboolean ret = TRUE; if (G_TRYLOCK (fen_lock)) { for (i = deleting_data; i; i = i->next) { f = (fdata*)i->data; if (fdata_delete (f)) { deleted_list = g_list_prepend (deleted_list, i); } } for (i = deleted_list; i; i = i->next) { deleting_data = g_list_remove_link (deleting_data, (GList *)i->data); g_list_free_1 ((GList *)i->data); } g_list_free (deleted_list); if (deleting_data == NULL) { deleting_data_id = 0; ret = FALSE; } G_UNLOCK (fen_lock); } return ret; } gboolean is_monitoring (fdata* data) { return is_ported (data) || data->change_update_id > 0; } fdata* get_parent_data (fdata* data) { if (FN_NODE(data) && !IS_TOPNODE(FN_NODE(data))) { return node_get_data (FN_NODE(data)->parent); } return NULL; } node_t* get_parent_node (fdata* data) { if (FN_NODE(data)) { return (FN_NODE(data)->parent); } return NULL; } fdata * fdata_new (node_t* node, gboolean is_mondir) { fdata *f = NULL; g_assert (node); if ((f = g_new0 (fdata, 1)) != NULL) { FN_NODE(f) = node; FN_NAME(f) = g_strdup (NODE_NAME(node)); f->is_dir = is_mondir; f->eventq = g_queue_new (); FD_W ("[ %s ] 0x%p %s\n", __func__, f, FN_NAME(f)); node_set_data (node, f); } return f; } static gboolean fdata_delete (fdata *f) { fnode_event_t *ev; FD_W ("[ TRY %s ] 0x%p id[%4d:%4d] %s\n", __func__, f, f->eventq_id, f->change_update_id, FN_NAME(f)); g_assert (FN_IS_PASSIVE(f)); port_remove (f); /* missing_remove (f); */ if (f->node != NULL) { node_set_data (f->node, NULL); f->node = NULL; } if (f->change_update_id > 0 || f->eventq_id > 0) { if (FN_IS_LIVING(f)) { f->is_cancelled = TRUE; deleting_data = g_list_prepend (deleting_data, f); if (deleting_data_id == 0) { deleting_data_id = g_idle_add (scan_deleting_data, NULL); g_assert (deleting_data_id > 0); } } return FALSE; } FD_W ("[ %s ] 0x%p %s\n", __func__, f, FN_NAME(f)); while ((ev = g_queue_pop_head (f->eventq)) != NULL) { fnode_event_delete (ev); } g_queue_free (f->eventq); g_free (FN_NAME(f)); g_free (f); return TRUE; } void fdata_reset (fdata* data) { fnode_event_t *ev; g_assert (data); while ((ev = g_queue_pop_head (data->eventq)) != NULL) { fnode_event_delete (ev); } } static gint fdata_sub_find (gpointer a, gpointer b) { if (a != b) { return 1; } else { return 0; } } void fdata_sub_add (fdata *f, gpointer sub) { FD_W ("[%s] [data: 0x%p ] [s: 0x%p ] %s\n", __func__, f, sub, FN_NAME(f)); g_assert (g_list_find_custom (f->subs, sub, (GCompareFunc)fdata_sub_find) == NULL); f->subs = g_list_prepend (f->subs, sub); } void fdata_sub_remove (fdata *f, gpointer sub) { GList *l; FD_W ("[%s] [data: 0x%p ] [s: 0x%p ] %s\n", __func__, f, sub, FN_NAME(f)); g_assert (g_list_find_custom (f->subs, sub, (GCompareFunc)fdata_sub_find) != NULL); l = g_list_find_custom (f->subs, sub, (GCompareFunc)fdata_sub_find); g_assert (l); g_assert (sub == l->data); f->subs = g_list_delete_link (f->subs, l); } /** * Adjust self on failing to Port */ void fdata_adjust_deleted (fdata* f) { node_t* parent; fdata* pdata; node_op_t op = {NULL, NULL, pre_del_cb, NULL}; /* * It's a top node. We move it to missing list. */ parent = get_parent_node (f); pdata = get_parent_data (f); if (!FN_IS_PASSIVE(f) || children_num (FN_NODE(f)) > 0 || (pdata && !FN_IS_PASSIVE(pdata))) { if (parent) { if (pdata == NULL) { pdata = fdata_new (parent, FALSE); } g_assert (pdata); if (!port_add (&pdata->fobj, &pdata->len, pdata)) { fdata_adjust_deleted (pdata); } } else { /* f is root */ g_assert (IS_TOPNODE(FN_NODE(f))); missing_add (f); } } else { #ifdef GIO_COMPILATION pending_remove_node (FN_NODE(f), &op); #else remove_node (FN_NODE(f), &op); #endif } } static gboolean fdata_adjust_changed (fdata *f) { fnode_event_t *ev; struct stat buf; node_t* parent; fdata* pdata; G_LOCK (fen_lock); parent = get_parent_node (f); pdata = get_parent_data (f); if (!FN_IS_LIVING(f) || (children_num (FN_NODE(f)) == 0 && FN_IS_PASSIVE(f) && pdata && FN_IS_PASSIVE(pdata))) { f->change_update_id = 0; G_UNLOCK (fen_lock); return FALSE; } FD_W ("[ %s ] %s\n", __func__, FN_NAME(f)); if (FN_STAT (FN_NAME(f), &buf) != 0) { FD_W ("LSTAT [%-20s] %s\n", FN_NAME(f), g_strerror (errno)); goto L_delete; } f->is_dir = S_ISDIR (buf.st_mode) ? TRUE : FALSE; if (f->len != buf.st_size) { /* FD_W ("LEN [%lld:%lld] %s\n", f->len, buf.st_size, FN_NAME(f)); */ f->len = buf.st_size; ev = fnode_event_new (FILE_MODIFIED, TRUE, f); if (ev != NULL) { ev->is_pending = TRUE; fdata_add_event (f, ev); } /* Fdata is still changing, so scalable scan */ f->change_update_id = g_timeout_add (get_scalable_scan_time (f), (GSourceFunc)fdata_adjust_changed, (gpointer)f); G_UNLOCK (fen_lock); return FALSE; } else { f->changed_event_num = 0; f->fobj.fo_atime = buf.st_atim; f->fobj.fo_mtime = buf.st_mtim; f->fobj.fo_ctime = buf.st_ctim; if (FN_IS_DIR(f)) { if (FN_IS_MONDIR(f)) { scan_children (FN_NODE(f)); } else { scan_known_children (FN_NODE(f)); if ((children_num (FN_NODE(f)) == 0 && FN_IS_PASSIVE(f) && pdata && FN_IS_PASSIVE(pdata))) { port_remove (f); goto L_exit; } } } if (!port_add_simple (&f->fobj, f)) { L_delete: ev = fnode_event_new (FILE_DELETE, FALSE, f); if (ev != NULL) { fdata_add_event (f, ev); } } } L_exit: f->change_update_id = 0; G_UNLOCK (fen_lock); return FALSE; } void fdata_emit_events_once (fdata *f, int event, gpointer sub) { emit_once_cb (f, _event_converter (event), sub); } void fdata_emit_events (fdata *f, int event) { emit_cb (f, _event_converter (event)); } static gboolean process_events (gpointer udata) { node_op_t op = {NULL, NULL, pre_del_cb, NULL}; fdata* f; fnode_event_t* ev; int e; /* FD_W ("IN <======== %s\n", __func__); */ f = (fdata*)udata; FD_W ("%s 0x%p id:%-4d %s\n", __func__, f, f->eventq_id, FN_NAME(f)); G_LOCK (fen_lock); if (!FN_IS_LIVING(f)) { f->eventq_id = 0; G_UNLOCK (fen_lock); return FALSE; } if ((ev = (fnode_event_t*)g_queue_pop_head (f->eventq)) != NULL) { /* Send events to clients. */ e = ev->e; if (!ev->is_pending) { #ifdef GIO_COMPILATION if (ev->has_twin) { fdata_emit_events (f, FILE_ATTRIB); } #endif fdata_emit_events (f, ev->e); } fnode_event_delete (ev); ev = NULL; /* Adjust node state. */ /* * Node the node has been created, so we can delete create event in * optimizing. To reduce the statings, we add it to Port on discoving * it then emit CREATED event. So we don't need to do anything here. */ switch (e) { case FILE_MODIFIED: case MOUNTEDOVER: case UNMOUNTED: /* If the event is a changed event, then pending process it */ if (f->change_update_id == 0) { f->change_update_id = g_timeout_add (get_scalable_scan_time(f), (GSourceFunc)fdata_adjust_changed, (gpointer)f); g_assert (f->change_update_id > 0); } break; case FILE_ATTRIB: g_assert (f->change_update_id == 0); if (!port_add (&f->fobj, &f->len, f)) { ev = fnode_event_new (FILE_DELETE, FALSE, f); if (ev != NULL) { fdata_add_event (f, ev); } } break; case FILE_DELETE: /* Ignored */ break; default: g_assert_not_reached (); break; } /* Process one event a time */ G_UNLOCK (fen_lock); return TRUE; } f->eventq_id = 0; G_UNLOCK (fen_lock); /* FD_W ("OUT ========> %s\n", __func__); */ return FALSE; } /** * fdata_add_event: * */ void fdata_add_event (fdata *f, fnode_event_t *ev) { node_op_t op = {NULL, NULL, pre_del_cb, NULL}; fnode_event_t *tail; if (!FN_IS_LIVING(f)) { fnode_event_delete (ev); return; } FD_W ("%s %d\n", __func__, ev->e); g_get_current_time (&ev->t); /* * If created/deleted events of child node happened, then we use parent * event queue to handle. * If child node emits deleted event, it seems no changes for the parent * node, but the attr is changed. So we may try to cancel processing the * coming changed events of the parent node. */ tail = (fnode_event_t*)g_queue_peek_tail (f->eventq); switch (ev->e) { case FILE_RENAME_FROM: case FILE_RENAME_TO: case FILE_ACCESS: fnode_event_delete (ev); g_assert_not_reached (); return; case FILE_DELETE: /* clear changed event number */ f->changed_event_num = 0; /* * We will cancel all previous events. */ if (tail) { g_queue_pop_tail (f->eventq); do { fnode_event_delete (tail); } while ((tail = (fnode_event_t*)g_queue_pop_tail (f->eventq)) != NULL); } /* * Given a node "f" is deleted, process it ASAP. */ fdata_emit_events (f, ev->e); fnode_event_delete (ev); fdata_adjust_deleted (f); return; case FILE_MODIFIED: case UNMOUNTED: case MOUNTEDOVER: /* clear changed event number */ f->changed_event_num ++; case FILE_ATTRIB: default: /* * If in the time range, we will try optimizing * (changed+) to (changed) * (attrchanged changed) to ([changed, attrchanged]) * (event attrchanged) to ([event, attrchanged]) */ if (tail) { do { if (tail->e == ev->e) { if (g_timeval_lt (&ev->t, &tail->t)) { g_queue_peek_tail (f->eventq); /* Add the increment */ g_time_val_add (&ev->t, PAIR_EVENTS_INC_TIMEVAL); /* skip the previous event */ FD_W ("SKIPPED -- %s\n", _event_string (tail->e)); fnode_event_delete (tail); } else { break; } } else if (ev->e == FILE_MODIFIED && tail->e == FILE_ATTRIB) { ev->has_twin = TRUE; fnode_event_delete (tail); } else if (ev->e == FILE_ATTRIB && f->change_update_id > 0) { tail->has_twin = TRUE; /* skip the current event */ fnode_event_delete (ev); return; } else { break; } } while ((tail = (fnode_event_t*)g_queue_peek_tail (f->eventq)) != NULL); } } /* must add the threshold time */ g_time_val_add (&ev->t, PAIR_EVENTS_TIMEVAL); g_queue_push_tail (f->eventq, ev); /* starting process_events */ if (f->eventq_id == 0) { f->eventq_id = g_timeout_add (PROCESS_EVENTQ_TIME, process_events, (gpointer)f); g_assert (f->eventq_id > 0); } FD_W ("%s 0x%p id:%-4d %s\n", __func__, f, f->eventq_id, FN_NAME(f)); } gboolean fdata_class_init (void (*user_emit_cb) (fdata*, int), void (*user_emit_once_cb) (fdata*, int, gpointer), int (*user_event_converter) (int event)) { FD_W ("%s\n", __func__); if (user_emit_cb == NULL) { return FALSE; } if (user_emit_once_cb == NULL) { return FALSE; } if (user_event_converter == NULL) { return FALSE; } emit_cb = user_emit_cb; emit_once_cb = user_emit_once_cb; _event_converter = user_event_converter; if (!port_class_init (fdata_add_event)) { FD_W ("port_class_init failed."); return FALSE; } return TRUE; }