OpenDNSSEC-signer  1.3.4
/build/buildd/opendnssec-1.3.4/signer/src/daemon/worker.c
Go to the documentation of this file.
00001 /*
00002  * $Id: worker.c 5637 2011-09-15 09:09:12Z matthijs $
00003  *
00004  * Copyright (c) 2009 NLNet Labs. All rights reserved.
00005  *
00006  * Redistribution and use in source and binary forms, with or without
00007  * modification, are permitted provided that the following conditions
00008  * are met:
00009  * 1. Redistributions of source code must retain the above copyright
00010  *    notice, this list of conditions and the following disclaimer.
00011  * 2. Redistributions in binary form must reproduce the above copyright
00012  *    notice, this list of conditions and the following disclaimer in the
00013  *    documentation and/or other materials provided with the distribution.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
00016  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00017  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
00019  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00020  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
00021  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
00023  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
00024  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
00025  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026  *
00027  */
00028 
00034 #include "adapter/adapi.h"
00035 #include "daemon/engine.h"
00036 #include "daemon/worker.h"
00037 #include "shared/allocator.h"
00038 #include "scheduler/schedule.h"
00039 #include "scheduler/task.h"
00040 #include "shared/locks.h"
00041 #include "shared/log.h"
00042 #include "shared/status.h"
00043 #include "shared/util.h"
00044 #include "signer/tools.h"
00045 #include "signer/zone.h"
00046 #include "signer/zonedata.h"
00047 
00048 #include <time.h> /* time() */
00049 
00050 ods_lookup_table worker_str[] = {
00051     { WORKER_WORKER, "worker" },
00052     { WORKER_DRUDGER, "drudger" },
00053     { 0, NULL }
00054 };
00055 
00056 
00061 worker_type*
00062 worker_create(allocator_type* allocator, int num, worker_id type)
00063 {
00064     worker_type* worker;
00065 
00066     if (!allocator) {
00067         return NULL;
00068     }
00069     ods_log_assert(allocator);
00070 
00071     worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
00072     if (!worker) {
00073         return NULL;
00074     }
00075 
00076     ods_log_debug("create worker[%i]", num +1);
00077     lock_basic_init(&worker->worker_lock);
00078     lock_basic_set(&worker->worker_alarm);
00079     lock_basic_lock(&worker->worker_lock);
00080     worker->allocator = allocator;
00081     worker->thread_num = num +1;
00082     worker->engine = NULL;
00083     worker->task = NULL;
00084     worker->working_with = TASK_NONE;
00085     worker->need_to_exit = 0;
00086     worker->type = type;
00087     worker->clock_in = 0;
00088     worker->jobs_appointed = 0;
00089     worker->jobs_completed = 0;
00090     worker->jobs_failed = 0;
00091     worker->sleeping = 0;
00092     worker->waiting = 0;
00093     lock_basic_unlock(&worker->worker_lock);
00094     return worker;
00095 }
00096 
00097 
00102 static const char*
00103 worker2str(worker_id type)
00104 {
00105     ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
00106     if (lt) {
00107         return lt->name;
00108     }
00109     return NULL;
00110 }
00111 
00112 
00117 static int
00118 worker_fulfilled(worker_type* worker)
00119 {
00120     return (worker->jobs_completed + worker->jobs_failed) ==
00121         worker->jobs_appointed;
00122 }
00123 
00124 
00129 static void
00130 worker_perform_task(worker_type* worker)
00131 {
00132     engine_type* engine = NULL;
00133     zone_type* zone = NULL;
00134     task_type* task = NULL;
00135     task_id what = TASK_NONE;
00136     time_t when = 0;
00137     time_t never = (3600*24*365);
00138     ods_status status = ODS_STATUS_OK;
00139     int fallthrough = 0;
00140     int backup = 0;
00141     char* working_dir = NULL;
00142     char* cfg_filename = NULL;
00143     uint32_t tmpserial = 0;
00144     time_t start = 0;
00145     time_t end = 0;
00146 
00147     /* sanity checking */
00148     if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
00149         return;
00150     }
00151     ods_log_assert(worker);
00152     ods_log_assert(worker->task);
00153     ods_log_assert(worker->task->zone);
00154 
00155     engine = (engine_type*) worker->engine;
00156     task = (task_type*) worker->task;
00157     zone = (zone_type*) worker->task->zone;
00158     ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
00159        worker2str(worker->type), worker->thread_num, task_what2str(task->what),
00160        task_who2str(task->who), (uint32_t) worker->clock_in);
00161 
00162     /* do what you have been told to do */
00163     switch (task->what) {
00164         case TASK_SIGNCONF:
00165             worker->working_with = TASK_SIGNCONF;
00166             /* perform 'load signconf' task */
00167             ods_log_verbose("[%s[%i]] load signconf for zone %s",
00168                 worker2str(worker->type), worker->thread_num,
00169                 task_who2str(task->who));
00170             status = zone_load_signconf(zone, &what);
00171             if (status == ODS_STATUS_UNCHANGED) {
00172                 if (!zone->signconf->last_modified) {
00173                     ods_log_debug("[%s[%i]] no signconf.xml for zone %s yet",
00174                         worker2str(worker->type), worker->thread_num,
00175                         task_who2str(task->who));
00176                 }
00177                 status = ODS_STATUS_ERR;
00178             }
00179 
00180             /* what to do next */
00181             when = time_now();
00182             if (status == ODS_STATUS_UNCHANGED) {
00183                 if (task->halted != TASK_NONE) {
00184                     goto task_perform_continue;
00185                 } else {
00186                     status = ODS_STATUS_OK;
00187                 }
00188             }
00189 
00190             if (status == ODS_STATUS_OK) {
00191                 status = zone_publish_dnskeys(zone, 0);
00192             }
00193             if (status == ODS_STATUS_OK) {
00194                 status = zone_prepare_nsec3(zone, 0);
00195             }
00196             if (status == ODS_STATUS_OK) {
00197                 status = zonedata_commit(zone->zonedata);
00198             }
00199 
00200             if (status == ODS_STATUS_OK) {
00201                 zone->prepared = 1;
00202                 task->interrupt = TASK_NONE;
00203                 task->halted = TASK_NONE;
00204             } else {
00205                 if (task->halted == TASK_NONE) {
00206                     goto task_perform_fail;
00207                 }
00208                 goto task_perform_continue;
00209             }
00210             fallthrough = 0;
00211             break;
00212         case TASK_READ:
00213             worker->working_with = TASK_READ;
00214             /* perform 'read input adapter' task */
00215             ods_log_verbose("[%s[%i]] read zone %s",
00216                 worker2str(worker->type), worker->thread_num,
00217                 task_who2str(task->who));
00218             if (!zone->prepared) {
00219                 ods_log_debug("[%s[%i]] no valid signconf.xml for zone %s yet",
00220                     worker2str(worker->type), worker->thread_num,
00221                     task_who2str(task->who));
00222                 status = ODS_STATUS_ERR;
00223             } else {
00224                 status = tools_input(zone);
00225             }
00226 
00227             /* what to do next */
00228             what = TASK_NSECIFY;
00229             when = time_now();
00230             if (status != ODS_STATUS_OK) {
00231                 if (task->halted == TASK_NONE) {
00232                     goto task_perform_fail;
00233                 }
00234                 goto task_perform_continue;
00235             }
00236             fallthrough = 1;
00237         case TASK_NSECIFY:
00238             worker->working_with = TASK_NSECIFY;
00239             ods_log_verbose("[%s[%i]] nsecify zone %s",
00240                 worker2str(worker->type), worker->thread_num,
00241                 task_who2str(task->who));
00242             status = tools_nsecify(zone);
00243 
00244             /* what to do next */
00245             what = TASK_SIGN;
00246             when = time_now();
00247             if (status == ODS_STATUS_OK) {
00248                 if (task->interrupt > TASK_SIGNCONF) {
00249                     task->interrupt = TASK_NONE;
00250                     task->halted = TASK_NONE;
00251                 }
00252             } else {
00253                 if (task->halted == TASK_NONE) {
00254                     goto task_perform_fail;
00255                 }
00256                 goto task_perform_continue;
00257             }
00258             fallthrough = 1;
00259         case TASK_SIGN:
00260             worker->working_with = TASK_SIGN;
00261             ods_log_verbose("[%s[%i]] sign zone %s",
00262                 worker2str(worker->type), worker->thread_num,
00263                 task_who2str(task->who));
00264             tmpserial = zone->zonedata->internal_serial;
00265             status = zone_update_serial(zone);
00266             if (status != ODS_STATUS_OK) {
00267                 ods_log_error("[%s[%i]] unable to sign zone %s: "
00268                     "failed to increment serial",
00269                     worker2str(worker->type), worker->thread_num,
00270                     task_who2str(task->who));
00271             } else {
00272                 /* start timer */
00273                 start = time(NULL);
00274                 if (zone->stats) {
00275                     lock_basic_lock(&zone->stats->stats_lock);
00276                     if (!zone->stats->start_time) {
00277                         zone->stats->start_time = start;
00278                     }
00279                     zone->stats->sig_count = 0;
00280                     zone->stats->sig_soa_count = 0;
00281                     zone->stats->sig_reuse = 0;
00282                     zone->stats->sig_time = 0;
00283                     lock_basic_unlock(&zone->stats->stats_lock);
00284                 }
00285 
00286                 /* queue menial, hard signing work */
00287                 status = zonedata_queue(zone->zonedata, engine->signq, worker);
00288                 ods_log_debug("[%s[%i]] wait until drudgers are finished "
00289                     " signing zone %s, %u signatures queued",
00290                     worker2str(worker->type), worker->thread_num,
00291                     task_who2str(task->who), worker->jobs_appointed);
00292 
00293                 /* sleep until work is done */
00294                 if (!worker->need_to_exit) {
00295                     worker_sleep_unless(worker, 0);
00296                 }
00297                 if (worker->jobs_failed) {
00298                     ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00299                         "signatures failed", worker2str(worker->type),
00300                         worker->thread_num, task_who2str(task->who),
00301                         worker->jobs_failed, worker->jobs_appointed);
00302                     status = ODS_STATUS_ERR;
00303                 } else if (!worker_fulfilled(worker)) {
00304                     ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00305                         "signatures completed", worker2str(worker->type),
00306                         worker->thread_num, task_who2str(task->who),
00307                         worker->jobs_completed, worker->jobs_appointed);
00308                     status = ODS_STATUS_ERR;
00309                 } else if (worker->need_to_exit) {
00310                     ods_log_debug("[%s[%i]] sign zone %s failed: worker "
00311                         "needs to exit", worker2str(worker->type),
00312                         worker->thread_num, task_who2str(task->who));
00313                     status = ODS_STATUS_ERR;
00314                 } else {
00315                     ods_log_debug("[%s[%i]] sign zone %s ok: %u of %u "
00316                         "signatures succeeded", worker2str(worker->type),
00317                         worker->thread_num, task_who2str(task->who),
00318                         worker->jobs_completed, worker->jobs_appointed);
00319                     ods_log_assert(worker->jobs_appointed ==
00320                         worker->jobs_completed);
00321                 }
00322                 worker->jobs_appointed = 0;
00323                 worker->jobs_completed = 0;
00324                 worker->jobs_failed = 0;
00325 
00326                 /* stop timer */
00327                 end = time(NULL);
00328                 if (status == ODS_STATUS_OK && zone->stats) {
00329                     lock_basic_lock(&zone->stats->stats_lock);
00330                     zone->stats->sig_time = (end-start);
00331                     lock_basic_unlock(&zone->stats->stats_lock);
00332                 }
00333             }
00334 
00335             /* what to do next */
00336             if (status != ODS_STATUS_OK) {
00337                 /* rollback serial */
00338                 zone->zonedata->internal_serial = tmpserial;
00339                 if (task->halted == TASK_NONE) {
00340                     goto task_perform_fail;
00341                 }
00342                 goto task_perform_continue;
00343             } else {
00344                 if (task->interrupt > TASK_SIGNCONF) {
00345                     task->interrupt = TASK_NONE;
00346                     task->halted = TASK_NONE;
00347                 }
00348             }
00349             what = TASK_AUDIT;
00350             when = time_now();
00351             fallthrough = 1;
00352         case TASK_AUDIT:
00353             worker->working_with = TASK_AUDIT;
00354             if (zone->signconf->audit) {
00355                 ods_log_verbose("[%s[%i]] audit zone %s",
00356                     worker2str(worker->type), worker->thread_num,
00357                     task_who2str(task->who));
00358                 working_dir = strdup(engine->config->working_dir);
00359                 cfg_filename = strdup(engine->config->cfg_filename);
00360                 status = tools_audit(zone, working_dir, cfg_filename);
00361                 if (working_dir)  { free((void*)working_dir); }
00362                 if (cfg_filename) { free((void*)cfg_filename); }
00363                 working_dir = NULL;
00364                 cfg_filename = NULL;
00365             } else {
00366                 status = ODS_STATUS_OK;
00367             }
00368 
00369             /* what to do next */
00370             if (status != ODS_STATUS_OK) {
00371                 if (task->halted == TASK_NONE) {
00372                     goto task_perform_fail;
00373                 }
00374                 goto task_perform_continue;
00375             }
00376             what = TASK_WRITE;
00377             when = time_now();
00378             fallthrough = 1;
00379         case TASK_WRITE:
00380             worker->working_with = TASK_WRITE;
00381             ods_log_verbose("[%s[%i]] write zone %s",
00382                 worker2str(worker->type), worker->thread_num,
00383                 task_who2str(task->who));
00384 
00385             status = tools_output(zone);
00386             zone->processed = 1;
00387 
00388             /* what to do next */
00389             if (status != ODS_STATUS_OK) {
00390                 if (task->halted == TASK_NONE) {
00391                     goto task_perform_fail;
00392                 }
00393                 goto task_perform_continue;
00394             } else {
00395                 if (task->interrupt > TASK_SIGNCONF) {
00396                     task->interrupt = TASK_NONE;
00397                     task->halted = TASK_NONE;
00398                 }
00399             }
00400             if (duration2time(zone->signconf->sig_resign_interval)) {
00401                 what = TASK_SIGN;
00402                 when = time_now() +
00403                     duration2time(zone->signconf->sig_resign_interval);
00404             } else {
00405                 what = TASK_NONE;
00406                 when = time_now() + never;
00407             }
00408             backup = 1;
00409             fallthrough = 0;
00410             break;
00411         case TASK_NONE:
00412             worker->working_with = TASK_NONE;
00413             ods_log_warning("[%s[%i]] none task for zone %s",
00414                 worker2str(worker->type), worker->thread_num,
00415                 task_who2str(task->who));
00416             when = time_now() + never;
00417             fallthrough = 0;
00418             break;
00419         default:
00420             ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
00421                 worker2str(worker->type), worker->thread_num,
00422                 task_who2str(task->who));
00423             what = TASK_SIGNCONF;
00424             when = time_now();
00425             fallthrough = 0;
00426             break;
00427     }
00428 
00429     /* no error, reset backoff */
00430     task->backoff = 0;
00431 
00432     /* set next task */
00433     if (fallthrough == 0 && task->interrupt != TASK_NONE &&
00434         task->interrupt != what) {
00435         ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
00436             worker2str(worker->type), worker->thread_num,
00437             task_what2str(what), task_who2str(task->who));
00438 
00439         task->what = task->interrupt;
00440         task->when = time_now();
00441         task->halted = what;
00442     } else {
00443         ods_log_debug("[%s[%i]] next task %s for zone %s",
00444             worker2str(worker->type), worker->thread_num,
00445             task_what2str(what), task_who2str(task->who));
00446 
00447         task->what = what;
00448         task->when = when;
00449         if (!fallthrough) {
00450             task->interrupt = TASK_NONE;
00451             task->halted = TASK_NONE;
00452         }
00453     }
00454 
00455     /* backup the last successful run */
00456     if (backup) {
00457         status = zone_backup(zone);
00458         if (status != ODS_STATUS_OK) {
00459             ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
00460             worker2str(worker->type), worker->thread_num,
00461             task_who2str(task->who), ods_status2str(status));
00462             /* just a warning */
00463             status = ODS_STATUS_OK;
00464         }
00465         backup = 0;
00466     }
00467     return;
00468 
00469 task_perform_fail:
00470     /* in case of failure, also mark zone processed (for single run usage) */
00471     zone->processed = 1;
00472 
00473     if (task->backoff) {
00474         task->backoff *= 2;
00475         if (task->backoff > ODS_SE_MAX_BACKOFF) {
00476             task->backoff = ODS_SE_MAX_BACKOFF;
00477         }
00478     } else {
00479         task->backoff = 60;
00480     }
00481     ods_log_info("[%s[%i]] backoff task %s for zone %s with %u seconds",
00482         worker2str(worker->type), worker->thread_num,
00483         task_what2str(task->what), task_who2str(task->who), task->backoff);
00484 
00485     task->when = time_now() + task->backoff;
00486     return;
00487 
00488 task_perform_continue:
00489     ods_log_info("[%s[%i]] continue task %s for zone %s",
00490         worker2str(worker->type), worker->thread_num,
00491         task_what2str(task->halted), task_who2str(task->who));
00492 
00493     what = task->halted;
00494     task->what = what;
00495     task->when = time_now();
00496     task->interrupt = TASK_NONE;
00497     task->halted = TASK_NONE;
00498     if (zone->processed) {
00499         task->when += duration2time(zone->signconf->sig_resign_interval);
00500     }
00501     return;
00502 }
00503 
00504 
00509 static void
00510 worker_work(worker_type* worker)
00511 {
00512     time_t now, timeout = 1;
00513     zone_type* zone = NULL;
00514     ods_status status = ODS_STATUS_OK;
00515 
00516     ods_log_assert(worker);
00517     ods_log_assert(worker->type == WORKER_WORKER);
00518 
00519     while (worker->need_to_exit == 0) {
00520         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00521             worker->thread_num);
00522         lock_basic_lock(&worker->engine->taskq->schedule_lock);
00523         /* [LOCK] schedule */
00524         worker->task = schedule_pop_task(worker->engine->taskq);
00525         /* [UNLOCK] schedule */
00526         if (worker->task) {
00527             worker->working_with = worker->task->what;
00528             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00529 
00530             zone = worker->task->zone;
00531             lock_basic_lock(&zone->zone_lock);
00532             /* [LOCK] zone */
00533             ods_log_debug("[%s[%i]] start working on zone %s",
00534                 worker2str(worker->type), worker->thread_num, zone->name);
00535 
00536             worker->clock_in = time(NULL);
00537             worker_perform_task(worker);
00538 
00539             zone->task = worker->task;
00540 
00541             ods_log_debug("[%s[%i]] finished working on zone %s",
00542                 worker2str(worker->type), worker->thread_num, zone->name);
00543             /* [UNLOCK] zone */
00544 
00545             lock_basic_lock(&worker->engine->taskq->schedule_lock);
00546             /* [LOCK] zone, schedule */
00547             worker->task = NULL;
00548             worker->working_with = TASK_NONE;
00549             status = schedule_task(worker->engine->taskq, zone->task, 1);
00550             /* [UNLOCK] zone, schedule */
00551             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00552             lock_basic_unlock(&zone->zone_lock);
00553 
00554             timeout = 1;
00555         } else {
00556             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00557                 worker->thread_num);
00558 
00559             /* [LOCK] schedule */
00560             worker->task = schedule_get_first_task(worker->engine->taskq);
00561             /* [UNLOCK] schedule */
00562             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00563 
00564             now = time_now();
00565             if (worker->task && !worker->engine->taskq->loading) {
00566                 timeout = (worker->task->when - now);
00567             } else {
00568                 timeout *= 2;
00569                 if (timeout > ODS_SE_MAX_BACKOFF) {
00570                     timeout = ODS_SE_MAX_BACKOFF;
00571                 }
00572             }
00573             worker->task = NULL;
00574             worker_sleep(worker, timeout);
00575         }
00576     }
00577     return;
00578 }
00579 
00580 
00585 static void
00586 worker_drudge(worker_type* worker)
00587 {
00588     zone_type* zone = NULL;
00589     task_type* task = NULL;
00590     rrset_type* rrset = NULL;
00591     ods_status status = ODS_STATUS_OK;
00592     worker_type* chief = NULL;
00593     hsm_ctx_t* ctx = NULL;
00594 
00595     ods_log_assert(worker);
00596     ods_log_assert(worker->type == WORKER_DRUDGER);
00597 
00598     ctx = hsm_create_context();
00599     if (ctx == NULL) {
00600         ods_log_error("[%s[%i]] unable to drudge: error "
00601             "creating libhsm context", worker2str(worker->type),
00602             worker->thread_num);
00603     }
00604 
00605     while (worker->need_to_exit == 0) {
00606         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00607             worker->thread_num);
00608         chief = NULL;
00609         zone = NULL;
00610         task = NULL;
00611 
00612         lock_basic_lock(&worker->engine->signq->q_lock);
00613         /* [LOCK] schedule */
00614         rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief);
00615         /* [UNLOCK] schedule */
00616         lock_basic_unlock(&worker->engine->signq->q_lock);
00617         if (rrset) {
00618             /* set up the work */
00619             if (chief) {
00620                 task = chief->task;
00621             }
00622             if (task) {
00623                 zone = task->zone;
00624             }
00625             if (!zone) {
00626                 ods_log_error("[%s[%i]] unable to drudge: no zone reference",
00627                     worker2str(worker->type), worker->thread_num);
00628             }
00629             if (zone && ctx) {
00630                 ods_log_assert(rrset);
00631                 ods_log_assert(zone);
00632                 ods_log_assert(zone->dname);
00633                 ods_log_assert(zone->signconf);
00634                 ods_log_assert(ctx);
00635 
00636                 worker->clock_in = time(NULL);
00637                 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf,
00638                     chief->clock_in, zone->stats);
00639             } else {
00640                 status = ODS_STATUS_ASSERT_ERR;
00641             }
00642 
00643             if (chief) {
00644                 lock_basic_lock(&chief->worker_lock);
00645                 if (status == ODS_STATUS_OK) {
00646                     chief->jobs_completed += 1;
00647                 } else {
00648                     chief->jobs_failed += 1;
00649                     /* destroy context? */
00650                 }
00651                 lock_basic_unlock(&chief->worker_lock);
00652 
00653                 if (worker_fulfilled(chief) && chief->sleeping) {
00654                     ods_log_debug("[%s[%i]] wake up chief[%u], work is done",
00655                         worker2str(worker->type), worker->thread_num,
00656                         chief->thread_num);
00657                     worker_wakeup(chief);
00658                     chief = NULL;
00659                 }
00660             }
00661             rrset = NULL;
00662         } else {
00663             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00664                 worker->thread_num);
00665             worker_wait(&worker->engine->signq->q_lock,
00666                 &worker->engine->signq->q_threshold);
00667         }
00668     }
00669     /* wake up chief */
00670     if (chief && chief->sleeping) {
00671         ods_log_debug("[%s[%i]] wake up chief[%u], i am exiting",
00672             worker2str(worker->type), worker->thread_num, chief->thread_num);
00673          worker_wakeup(chief);
00674     }
00675 
00676     /* cleanup open HSM sessions */
00677     hsm_destroy_context(ctx);
00678     ctx = NULL;
00679     return;
00680 }
00681 
00682 
00687 void
00688 worker_start(worker_type* worker)
00689 {
00690     ods_log_assert(worker);
00691     switch (worker->type) {
00692         case WORKER_DRUDGER:
00693             worker_drudge(worker);
00694             break;
00695         case WORKER_WORKER:
00696             worker_work(worker);
00697             break;
00698         default:
00699             ods_log_error("[worker] illegal worker (id=%i)", worker->type);
00700             return;
00701     }
00702     return;
00703 }
00704 
00705 
00710 void
00711 worker_sleep(worker_type* worker, time_t timeout)
00712 {
00713     ods_log_assert(worker);
00714     lock_basic_lock(&worker->worker_lock);
00715     /* [LOCK] worker */
00716     worker->sleeping = 1;
00717     lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00718         timeout);
00719     /* [UNLOCK] worker */
00720     lock_basic_unlock(&worker->worker_lock);
00721     return;
00722 }
00723 
00724 
00729 void
00730 worker_sleep_unless(worker_type* worker, time_t timeout)
00731 {
00732     ods_log_assert(worker);
00733     lock_basic_lock(&worker->worker_lock);
00734     /* [LOCK] worker */
00735     while (!worker->need_to_exit && !worker_fulfilled(worker)) {
00736         worker->sleeping = 1;
00737         lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00738             timeout);
00739 
00740         ods_log_debug("[%s[%i]] somebody poked me, check completed jobs %u "
00741            "appointed, %u completed, %u failed", worker2str(worker->type),
00742            worker->thread_num, worker->jobs_appointed, worker->jobs_completed,
00743            worker->jobs_failed);
00744     }
00745     /* [UNLOCK] worker */
00746     lock_basic_unlock(&worker->worker_lock);
00747     return;
00748 }
00749 
00750 
00755 void
00756 worker_wakeup(worker_type* worker)
00757 {
00758     ods_log_assert(worker);
00759     if (worker && worker->sleeping && !worker->waiting) {
00760         ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
00761            worker->thread_num);
00762         lock_basic_lock(&worker->worker_lock);
00763         /* [LOCK] worker */
00764         lock_basic_alarm(&worker->worker_alarm);
00765         worker->sleeping = 0;
00766         /* [UNLOCK] worker */
00767         lock_basic_unlock(&worker->worker_lock);
00768     }
00769     return;
00770 }
00771 
00772 
00777 void
00778 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
00779 {
00780     lock_basic_lock(lock);
00781     /* [LOCK] worker */
00782     lock_basic_sleep(condition, lock, 0);
00783     /* [UNLOCK] worker */
00784     lock_basic_unlock(lock);
00785     return;
00786 }
00787 
00788 
00793 void
00794 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
00795 {
00796     lock_basic_lock(lock);
00797     /* [LOCK] lock */
00798     lock_basic_alarm(condition);
00799     /* [UNLOCK] lock */
00800     lock_basic_unlock(lock);
00801     return;
00802 }
00803 
00804 
00809 void
00810 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
00811 {
00812     lock_basic_lock(lock);
00813     /* [LOCK] lock */
00814     lock_basic_broadcast(condition);
00815     /* [UNLOCK] lock */
00816     lock_basic_unlock(lock);
00817     return;
00818 }
00819 
00820 
00825 void
00826 worker_cleanup(worker_type* worker)
00827 {
00828     allocator_type* allocator;
00829     cond_basic_type worker_cond;
00830     lock_basic_type worker_lock;
00831 
00832     if (!worker) {
00833         return;
00834     }
00835     allocator = worker->allocator;
00836     worker_cond = worker->worker_alarm;
00837     worker_lock = worker->worker_lock;
00838 
00839     allocator_deallocate(allocator, (void*) worker);
00840     lock_basic_destroy(&worker_lock);
00841     lock_basic_off(&worker_cond);
00842     return;
00843 }