56 #include <drizzled/internal/my_sys.h>
57 #include <drizzled/internal/thread_var.h>
58 #include <drizzled/statistics_variables.h>
59 #include <drizzled/pthread_globals.h>
61 #include <drizzled/session.h>
64 #include <drizzled/internal/m_string.h>
68 #if TIME_WITH_SYS_TIME
69 # include <sys/time.h>
73 # include <sys/time.h>
79 #include <drizzled/util/test.h>
81 #include <boost/interprocess/sync/lock_options.hpp>
87 uint64_t table_lock_wait_timeout;
88 static enum thr_lock_type thr_upgraded_concurrent_insert_lock = TL_WRITE;
91 uint64_t max_write_lock_count= UINT64_MAX;
98 thr_lock_owner_equal(THR_LOCK_OWNER *rhs, THR_LOCK_OWNER *lhs)
106 void thr_lock_init(THR_LOCK *lock)
108 lock->read.last= &lock->read.data;
109 lock->read_wait.last= &lock->read_wait.data;
110 lock->write_wait.last= &lock->write_wait.data;
111 lock->write.last= &lock->write.data;
115 void THR_LOCK_INFO::init()
117 thread_id= internal::my_thread_var2()->id;
123 void THR_LOCK_DATA::init(THR_LOCK *lock_arg,
void *param_arg)
128 status_param= param_arg;
134 have_old_read_lock(THR_LOCK_DATA *data, THR_LOCK_OWNER *owner)
136 for ( ; data ; data=data->next)
138 if (thr_lock_owner_equal(data->owner, owner))
147 static enum enum_thr_lock_result wait_for_lock(Session &session,
struct st_lock_list *wait, THR_LOCK_DATA *data)
149 boost::condition_variable_any *cond= &session.getThreadVar()->suspend;
150 enum enum_thr_lock_result result= THR_LOCK_ABORTED;
151 bool can_deadlock= test(data->owner->info->n_cursors);
155 data->prev= wait->last;
156 wait->last= &data->next;
159 current_global_counters.locks_waited++;
162 session.getThreadVar()->current_mutex= data->lock->native_handle();
163 session.getThreadVar()->current_cond= &session.getThreadVar()->suspend;
164 data->cond= &session.getThreadVar()->suspend;;
166 while (not session.getThreadVar()->abort)
168 boost::mutex::scoped_lock scoped(*data->lock->native_handle(), boost::adopt_lock_t());
173 xtime_get(&xt, boost::TIME_UTC_);
174 xt.sec += table_lock_wait_timeout;
175 if (not cond->timed_wait(scoped, xt))
177 result= THR_LOCK_WAIT_TIMEOUT;
199 if (data->cond == NULL)
206 if (data->cond || data->type == TL_UNLOCK)
210 if (((*data->prev)=data->next))
212 data->next->prev= data->prev;
216 wait->last=data->prev;
218 data->type= TL_UNLOCK;
224 result= THR_LOCK_SUCCESS;
226 data->lock->unlock();
229 boost::mutex::scoped_lock scopedLock(session.getThreadVar()->mutex);
230 session.getThreadVar()->current_mutex= NULL;
231 session.getThreadVar()->current_cond= NULL;
237 static enum enum_thr_lock_result thr_lock(Session &session, THR_LOCK_DATA *data, THR_LOCK_OWNER *owner,
enum thr_lock_type lock_type)
239 THR_LOCK *lock= data->lock;
240 enum enum_thr_lock_result result= THR_LOCK_SUCCESS;
241 struct st_lock_list *wait_queue;
242 THR_LOCK_DATA *lock_owner;
246 data->type=lock_type;
249 if ((
int) lock_type <= (
int) TL_READ_NO_INSERT)
252 if (lock->write.data)
263 if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
264 (lock->write.data->type <= TL_WRITE_CONCURRENT_INSERT &&
265 (((
int) lock_type <= (
int) TL_READ_WITH_SHARED_LOCKS) ||
266 (lock->write.data->type != TL_WRITE_CONCURRENT_INSERT &&
267 lock->write.data->type != TL_WRITE_ALLOW_READ))))
269 (*lock->read.last)=data;
270 data->prev=lock->read.last;
271 lock->read.last= &data->next;
272 if (lock_type == TL_READ_NO_INSERT)
273 lock->read_no_write_count++;
274 current_global_counters.locks_immediate++;
277 if (lock->write.data->type == TL_WRITE_ONLY)
280 data->type=TL_UNLOCK;
281 result= THR_LOCK_ABORTED;
285 else if (!lock->write_wait.data ||
286 lock->write_wait.data->type <= TL_WRITE_DEFAULT ||
287 have_old_read_lock(lock->read.data, data->owner))
289 (*lock->read.last)=data;
290 data->prev=lock->read.last;
291 lock->read.last= &data->next;
292 if (lock_type == TL_READ_NO_INSERT)
293 lock->read_no_write_count++;
294 current_global_counters.locks_immediate++;
302 wait_queue= &lock->read_wait;
306 if (lock_type == TL_WRITE_CONCURRENT_INSERT)
307 data->type=lock_type= thr_upgraded_concurrent_insert_lock;
309 if (lock->write.data)
311 if (lock->write.data->type == TL_WRITE_ONLY)
314 if (!thr_lock_owner_equal(data->owner, lock->write.data->owner))
317 data->type=TL_UNLOCK;
318 result= THR_LOCK_ABORTED;
328 if (thr_lock_owner_equal(data->owner, lock->write.data->owner) ||
329 (lock_type == TL_WRITE_ALLOW_WRITE &&
330 !lock->write_wait.data &&
331 lock->write.data->type == TL_WRITE_ALLOW_WRITE))
338 (*lock->write.last)=data;
339 data->prev=lock->write.last;
340 lock->write.last= &data->next;
341 current_global_counters.locks_immediate++;
347 if (!lock->write_wait.data)
349 if (!lock->read.data ||
350 (lock_type <= TL_WRITE_CONCURRENT_INSERT &&
351 ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
352 lock_type != TL_WRITE_ALLOW_WRITE) ||
353 !lock->read_no_write_count)))
355 (*lock->write.last)=data;
356 data->prev=lock->write.last;
357 lock->write.last= &data->next;
358 current_global_counters.locks_immediate++;
363 wait_queue= &lock->write_wait;
371 lock_owner= lock->read.data ? lock->read.data : lock->write.data;
372 if (lock_owner && lock_owner->owner->info == owner->info)
374 result= THR_LOCK_DEADLOCK;
379 return(wait_for_lock(session, wait_queue, data));
388 static void free_all_read_locks(THR_LOCK *lock,
bool using_concurrent_insert)
390 THR_LOCK_DATA *data= lock->read_wait.data;
393 (*lock->read.last)=data;
394 data->prev=lock->read.last;
395 lock->read.last=lock->read_wait.last;
398 lock->read_wait.last= &lock->read_wait.data;
402 boost::condition_variable_any *cond= data->cond;
403 if ((
int) data->type == (int) TL_READ_NO_INSERT)
405 if (using_concurrent_insert)
411 if (((*data->prev)=data->next))
412 data->next->prev=data->prev;
414 lock->read.last=data->prev;
415 *lock->read_wait.last= data;
416 data->prev= lock->read_wait.last;
417 lock->read_wait.last= &data->next;
420 lock->read_no_write_count++;
424 }
while ((data=data->next));
425 *lock->read_wait.last=0;
426 if (!lock->read_wait.data)
427 lock->write_lock_count=0;
432 static void thr_unlock(THR_LOCK_DATA *data)
434 THR_LOCK *lock=data->lock;
435 enum thr_lock_type lock_type=data->type;
438 if (((*data->prev)=data->next))
439 data->next->prev= data->prev;
440 else if (lock_type <= TL_READ_NO_INSERT)
441 lock->read.last=data->prev;
443 lock->write.last=data->prev;
444 if (lock_type >= TL_WRITE_CONCURRENT_INSERT)
448 if (lock_type == TL_READ_NO_INSERT)
449 lock->read_no_write_count--;
450 data->type=TL_UNLOCK;
467 enum thr_lock_type lock_type;
469 if (!lock->write.data)
471 data=lock->write_wait.data;
472 if (!lock->read.data)
476 (!lock->read_wait.data || lock->read_wait.data->type <= TL_READ_WITH_SHARED_LOCKS))
478 if (lock->write_lock_count++ > max_write_lock_count)
481 lock->write_lock_count=0;
482 if (lock->read_wait.data)
484 free_all_read_locks(lock,0);
490 if (((*data->prev)=data->next))
491 data->next->prev= data->prev;
493 lock->write_wait.last=data->prev;
494 (*lock->write.last)=data;
495 data->prev=lock->write.last;
497 lock->write.last= &data->next;
500 boost::condition_variable_any *cond= data->cond;
504 if (data->type != TL_WRITE_ALLOW_WRITE ||
505 !lock->write_wait.data ||
506 lock->write_wait.data->type != TL_WRITE_ALLOW_WRITE)
508 data=lock->write_wait.data;
510 if (data->type >= TL_WRITE)
514 if (lock->read_wait.data)
515 free_all_read_locks(lock,
517 (data->type == TL_WRITE_CONCURRENT_INSERT ||
518 data->type == TL_WRITE_ALLOW_WRITE));
521 (lock_type=data->type) <= TL_WRITE_CONCURRENT_INSERT &&
522 ((lock_type != TL_WRITE_CONCURRENT_INSERT &&
523 lock_type != TL_WRITE_ALLOW_WRITE) ||
524 !lock->read_no_write_count))
527 boost::condition_variable_any *cond= data->cond;
528 if (((*data->prev)=data->next))
529 data->next->prev= data->prev;
531 lock->write_wait.last=data->prev;
532 (*lock->write.last)=data;
533 data->prev=lock->write.last;
534 lock->write.last= &data->next;
538 }
while (lock_type == TL_WRITE_ALLOW_WRITE &&
539 (data=lock->write_wait.data) &&
540 data->type == TL_WRITE_ALLOW_WRITE);
541 if (lock->read_wait.data)
542 free_all_read_locks(lock,
543 (lock_type == TL_WRITE_CONCURRENT_INSERT ||
544 lock_type == TL_WRITE_ALLOW_WRITE));
546 else if (!data && lock->read_wait.data)
548 free_all_read_locks(lock,0);
563 #define LOCK_CMP(A,B) ((unsigned char*) (A->lock) - (uint32_t) ((A)->type) < (unsigned char*) (B->lock)- (uint32_t) ((B)->type))
565 static void sort_locks(THR_LOCK_DATA **data,uint32_t count)
567 THR_LOCK_DATA **pos,**end,**prev,*tmp;
571 for (pos=data+1,end=data+count; pos < end ; pos++)
574 if (LOCK_CMP(tmp,pos[-1]))
579 }
while (--prev != data && LOCK_CMP(tmp,prev[-1]));
586 enum enum_thr_lock_result
587 thr_multi_lock(Session &session, THR_LOCK_DATA **data, uint32_t count, THR_LOCK_OWNER *owner)
589 THR_LOCK_DATA **pos,**end;
591 sort_locks(data,count);
593 for (pos=data,end=data+count; pos < end ; pos++)
595 enum enum_thr_lock_result result= thr_lock(session, *pos, owner, (*pos)->type);
596 if (result != THR_LOCK_SUCCESS)
598 thr_multi_unlock(data,(uint32_t) (pos-data));
602 return(THR_LOCK_SUCCESS);
607 void thr_multi_unlock(THR_LOCK_DATA **data,uint32_t count)
609 THR_LOCK_DATA **pos,**end;
611 for (pos=data,end=data+count; pos < end ; pos++)
613 if ((*pos)->type != TL_UNLOCK)
618 void DrizzleLock::unlock(uint32_t count)
620 THR_LOCK_DATA **pos,**end;
622 for (pos= getLocks(),end= getLocks()+count; pos < end ; pos++)
624 if ((*pos)->type != TL_UNLOCK)
634 void THR_LOCK::abort_locks()
636 boost::mutex::scoped_lock scopedLock(mutex);
638 for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
640 local_data->type= TL_UNLOCK;
642 local_data->cond->notify_one();
643 local_data->cond= NULL;
645 for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
647 local_data->type= TL_UNLOCK;
648 local_data->cond->notify_one();
649 local_data->cond= NULL;
651 read_wait.last= &read_wait.data;
652 write_wait.last= &write_wait.data;
653 read_wait.data= write_wait.data=0;
655 write.data->type=TL_WRITE_ONLY;
665 bool THR_LOCK::abort_locks_for_thread(uint64_t thread_id_arg)
669 boost::mutex::scoped_lock scopedLock(mutex);
670 for (THR_LOCK_DATA *local_data= read_wait.data; local_data ; local_data= local_data->next)
672 if (local_data->owner->info->thread_id == thread_id_arg)
674 local_data->type= TL_UNLOCK;
677 local_data->cond->notify_one();
680 if (((*local_data->prev)= local_data->next))
681 local_data->next->prev= local_data->prev;
683 read_wait.last= local_data->prev;
686 for (THR_LOCK_DATA *local_data= write_wait.data; local_data ; local_data= local_data->next)
688 if (local_data->owner->info->thread_id == thread_id_arg)
690 local_data->type= TL_UNLOCK;
692 local_data->cond->notify_one();
693 local_data->cond= NULL;
695 if (((*local_data->prev)= local_data->next))
696 local_data->next->prev= local_data->prev;
698 write_wait.last= local_data->prev;
static void wake_up_waiters(THR_LOCK *lock)
Wake up all threads which pending requests for the lock can be satisfied.
TODO: Rename this file - func.h is stupid.