83 #include <drizzled/error.h>
84 #include <drizzled/thr_lock.h>
85 #include <drizzled/session.h>
86 #include <drizzled/session/times.h>
87 #include <drizzled/sql_base.h>
88 #include <drizzled/lock.h>
89 #include <drizzled/pthread_globals.h>
90 #include <drizzled/internal/my_sys.h>
91 #include <drizzled/pthread_globals.h>
92 #include <drizzled/plugin/storage_engine.h>
93 #include <drizzled/util/test.h>
94 #include <drizzled/open_tables_state.h>
95 #include <drizzled/table/cache.h>
96 #include <drizzled/catalog/instance.h>
101 #include <functional>
103 #include <boost/thread/shared_mutex.hpp>
104 #include <boost/thread/condition_variable.hpp>
111 static boost::mutex LOCK_global_read_lock;
112 static boost::condition_variable_any COND_global_read_lock;
119 static void print_lock_error(
int error,
const char *);
143 static drizzled::error_t thr_lock_errno_to_mysql[]=
144 { EE_OK, EE_ERROR_FIRST, ER_LOCK_WAIT_TIMEOUT, ER_LOCK_DEADLOCK };
174 DrizzleLock *Session::lockTables(Table **tables, uint32_t count, uint32_t flags)
176 DrizzleLock *sql_lock;
177 Table *write_lock_used;
178 vector<plugin::StorageEngine *> involved_engines;
182 if (! (sql_lock= get_lock_data(tables, count,
true, &write_lock_used)))
185 if (global_read_lock && write_lock_used and (not (flags & DRIZZLE_LOCK_IGNORE_GLOBAL_READ_LOCK)))
191 if (wait_if_global_read_lock(1, 1))
198 if (open_tables.version != g_refresh_version)
206 set_proc_info(
"Notify start statement");
211 if (sql_lock->sizeTable())
213 size_t num_tables= sql_lock->sizeTable();
214 plugin::StorageEngine *engine;
215 std::set<size_t> involved_slots;
217 for (
size_t x= 1; x <= num_tables; x++, tables++)
219 engine= (*tables)->cursor->getEngine();
221 if (involved_slots.count(engine->getId()) > 0)
224 involved_engines.push_back(engine);
225 involved_slots.insert(engine->getId());
228 for_each(involved_engines.begin(),
229 involved_engines.end(),
230 bind2nd(mem_fun(&plugin::StorageEngine::startStatement),
this));
233 set_proc_info(
"External lock");
240 if (sql_lock->sizeTable() && lock_external(sql_lock->getTable(), sql_lock->sizeTable()))
246 set_proc_info(
"Table lock");
248 memcpy(sql_lock->getLocks() + sql_lock->sizeLock(),
249 sql_lock->getLocks(),
250 sql_lock->sizeLock() *
sizeof(*sql_lock->getLocks()));
253 drizzled::error_t rc;
254 rc= thr_lock_errno_to_mysql[(int) thr_multi_lock(*
this,
255 sql_lock->getLocks() +
256 sql_lock->sizeLock(),
257 sql_lock->sizeLock(),
261 if (sql_lock->sizeTable())
262 unlock_external(sql_lock->getTable(), sql_lock->sizeTable());
264 my_error(rc, MYF(0));
274 unlockTables(sql_lock);
279 times.set_time_after_lock();
285 int Session::lock_external(Table **tables, uint32_t count)
288 for (uint32_t i= 1 ; i <= count ; i++, tables++)
290 assert((*tables)->reginfo.lock_type >= TL_READ);
292 if ((*tables)->db_stat & HA_READ_ONLY ||
293 ((*tables)->reginfo.lock_type >= TL_READ &&
294 (*tables)->reginfo.lock_type <= TL_READ_NO_INSERT))
297 if ((error=(*tables)->cursor->ha_external_lock(
this,lock_type)))
299 print_lock_error(error, (*tables)->cursor->getEngine()->getName().c_str());
303 (*tables)->cursor->ha_external_lock(
this, F_UNLCK);
304 (*tables)->current_lock=F_UNLCK;
310 (*tables)->db_stat &= ~ HA_BLOCK_LOCK;
311 (*tables)->current_lock= lock_type;
318 void Session::unlockTables(DrizzleLock *sql_lock)
320 if (sql_lock->sizeLock())
321 sql_lock->unlock(sql_lock->sizeLock());
322 if (sql_lock->sizeTable())
323 unlock_external(sql_lock->getTable(), sql_lock->sizeTable());
333 void Session::unlockSomeTables(
Table **table, uint32_t count)
336 Table *write_lock_used;
337 if ((sql_lock= get_lock_data(table, count,
false,
339 unlockTables(sql_lock);
353 for (i=found=0 ; i < sql_lock->sizeLock(); i++)
355 if (sql_lock->getLocks()[i]->type >= TL_WRITE_ALLOW_READ)
357 std::swap(*lock_local, sql_lock->getLocks()[i]);
365 thr_multi_unlock(lock_local, i - found);
366 sql_lock->setLock(found);
371 Table **table= sql_lock->getTable();
372 for (i=found=0 ; i < sql_lock->sizeTable() ; i++)
375 if ((uint32_t) sql_lock->getTable()[i]->reginfo.lock_type >= TL_WRITE_ALLOW_READ)
377 std::swap(*table, sql_lock->getTable()[i]);
385 unlock_external(table, i - found);
386 sql_lock->resizeTable(found);
389 table= sql_lock->getTable();
391 for (i= 0; i < sql_lock->sizeTable(); i++)
422 void Session::removeLock(
Table *table)
424 unlockSomeTables(&table, 1);
430 void Session::abortLock(
Table *table)
433 Table *write_lock_used;
435 if ((locked= get_lock_data(&table, 1,
false,
438 for (uint32_t x= 0; x < locked->sizeLock(); x++)
439 locked->getLocks()[x]->lock->abort_locks();
457 bool Session::abortLockForThread(
Table *table)
460 Table* write_lock_used;
461 if (
DrizzleLock* locked= get_lock_data(&table, 1,
false, &write_lock_used))
463 for (uint32_t i= 0; i < locked->sizeLock(); i++)
465 if (locked->getLocks()[i]->lock->abort_locks_for_thread(table->
in_use->thread_id))
475 int Session::unlock_external(
Table **table, uint32_t count)
482 if ((*table)->current_lock != F_UNLCK)
485 if ((error=(*table)->cursor->ha_external_lock(
this, F_UNLCK)))
488 print_lock_error(error_code, (*table)->cursor->getEngine()->getName().c_str());
509 bool should_lock,
Table **write_lock_used)
513 Table **to, **table_buf;
516 for (uint32_t i= lock_count= 0 ; i < count ; i++)
518 Table *t= table_ptr[i];
520 if (! (t->getEngine()->check_flag(HTON_BIT_SKIP_STORE_LOCK)))
537 locks= locks_buf= sql_lock->getLocks();
538 to= table_buf= sql_lock->getTable();
540 for (uint32_t i= 0; i < count ; i++)
543 thr_lock_type lock_type;
545 if (table_ptr[i]->getEngine()->check_flag(HTON_BIT_SKIP_STORE_LOCK))
549 lock_type= table->reginfo.lock_type;
550 assert (lock_type != TL_WRITE_DEFAULT);
551 if (lock_type >= TL_WRITE_ALLOW_WRITE)
553 *write_lock_used=table;
554 if (table->
db_stat & HA_READ_ONLY)
556 my_error(ER_OPEN_AS_READONLY, MYF(0), table->getAlias());
558 sql_lock->setLock(locks - sql_lock->getLocks());
564 locks= table->
cursor->
store_lock(
this, locks, should_lock ? lock_type : TL_IGNORE);
569 table->
lock_count= (uint32_t) (locks - locks_start);
588 sql_lock->setLock(locks - locks_buf);
623 table_list->getSchemaName(),
624 table_list->getTableName());
627 table::CacheRange ppp= table::getCache().equal_range(identifier.getKey());
628 for (table::CacheMap::const_iterator iter= ppp.first; iter != ppp.second; ++iter)
630 Table *table= iter->second;
631 if (table->reginfo.lock_type < TL_WRITE)
633 if (table->
in_use ==
this)
635 table->getMutableShare()->resetVersion();
636 table->locked_by_name=
true;
643 table_list->
table=
reinterpret_cast<Table*
>(table);
646 return (test(table::Cache::removeTable(*
this, identifier, RTFC_NO_FLAG)));
650 void TableList::unlock_table_name()
654 table::remove_table(static_cast<table::Concurrent *>(table));
660 static bool locked_named_table(TableList *table_list)
662 for (; table_list; table_list=table_list->next_local)
664 Table *table= table_list->table;
667 Table *save_next= table->getNext();
668 table->setNext(NULL);
669 bool result= table::Cache::areTablesUsed(table_list->table, 0);
670 table->setNext(save_next);
679 bool Session::wait_for_locked_table_names(TableList *table_list)
684 assert(ownership of table::Cache::mutex());
687 while (locked_named_table(table_list))
694 wait_for_condition(table::Cache::mutex(), COND_refresh);
695 table::Cache::mutex().lock();
717 bool got_all_locks=
true;
727 got_all_locks=
false;
731 if (not got_all_locks && wait_for_locked_table_names(table_list))
758 bool Session::lock_table_names_exclusively(
TableList *table_list)
760 if (lock_table_names(table_list))
769 table->table->open_placeholder= 1;
797 void TableList::unlock_table_names(
TableList *last_table)
799 for (
TableList *table_iter=
this; table_iter != last_table; table_iter= table_iter->
next_local)
801 table_iter->unlock_table_name();
807 static void print_lock_error(
int error,
const char *table)
809 drizzled::error_t textno;
812 case HA_ERR_LOCK_WAIT_TIMEOUT:
813 textno=ER_LOCK_WAIT_TIMEOUT;
815 case HA_ERR_READ_ONLY_TRANSACTION:
816 textno=ER_READ_ONLY_TRANSACTION;
818 case HA_ERR_LOCK_DEADLOCK:
819 textno=ER_LOCK_DEADLOCK;
821 case HA_ERR_WRONG_COMMAND:
822 textno=ER_ILLEGAL_HA;
829 if ( textno == ER_ILLEGAL_HA )
830 my_error(textno, MYF(ME_BELL+ME_OLDWIN+ME_WAITTANG), table);
832 my_error(textno, MYF(ME_BELL+ME_OLDWIN+ME_WAITTANG), error);
911 volatile uint32_t global_read_lock=0;
912 volatile uint32_t global_read_lock_blocks_commit=0;
913 static volatile uint32_t protect_against_global_read_lock=0;
914 static volatile uint32_t waiting_for_read_lock=0;
916 bool Session::lockGlobalReadLock()
918 if (isGlobalReadLock() == Session::NONE)
920 const char *old_message;
921 LOCK_global_read_lock.lock();
922 old_message= enter_cond(COND_global_read_lock, LOCK_global_read_lock,
923 "Waiting to get readlock");
925 waiting_for_read_lock++;
926 boost::mutex::scoped_lock scopedLock(LOCK_global_read_lock, boost::adopt_lock_t());
927 while (protect_against_global_read_lock && not getKilled())
928 COND_global_read_lock.wait(scopedLock);
929 waiting_for_read_lock--;
930 scopedLock.release();
933 exit_cond(old_message);
936 setGlobalReadLock(Session::GOT_GLOBAL_READ_LOCK);
938 exit_cond(old_message);
953 void Session::unlockGlobalReadLock(
void)
957 if (not isGlobalReadLock())
961 boost::mutex::scoped_lock scopedLock(LOCK_global_read_lock);
962 tmp= --global_read_lock;
963 if (isGlobalReadLock() == Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT)
964 --global_read_lock_blocks_commit;
969 COND_global_read_lock.notify_all();
971 setGlobalReadLock(Session::NONE);
974 static inline bool must_wait(
bool is_not_commit)
976 return (global_read_lock &&
978 global_read_lock_blocks_commit));
981 bool Session::wait_if_global_read_lock(
bool abort_on_refresh,
bool is_not_commit)
983 const char *old_message= NULL;
984 bool result= 0, need_exit_cond;
991 safe_mutex_assert_not_owner(table::Cache::mutex().native_handle());
993 LOCK_global_read_lock.lock();
994 if ((need_exit_cond= must_wait(is_not_commit)))
996 if (isGlobalReadLock())
999 my_message(ER_CANT_UPDATE_WITH_READLOCK,
1000 ER(ER_CANT_UPDATE_WITH_READLOCK), MYF(0));
1001 LOCK_global_read_lock.unlock();
1007 return is_not_commit;
1009 old_message= enter_cond(COND_global_read_lock, LOCK_global_read_lock,
1010 "Waiting for release of readlock");
1012 while (must_wait(is_not_commit) && not getKilled() &&
1013 (!abort_on_refresh || open_tables.version == g_refresh_version))
1015 boost::mutex::scoped_lock scoped(LOCK_global_read_lock, boost::adopt_lock_t());
1016 COND_global_read_lock.wait(scoped);
1023 if (not abort_on_refresh && not result)
1024 protect_against_global_read_lock++;
1030 if (unlikely(need_exit_cond))
1032 exit_cond(old_message);
1036 LOCK_global_read_lock.unlock();
1043 void Session::startWaitingGlobalReadLock()
1045 if (unlikely(isGlobalReadLock()))
1048 LOCK_global_read_lock.lock();
1049 bool tmp= (!--protect_against_global_read_lock && (waiting_for_read_lock || global_read_lock_blocks_commit));
1050 LOCK_global_read_lock.unlock();
1053 COND_global_read_lock.notify_all();
1057 bool Session::makeGlobalReadLockBlockCommit()
1060 const char *old_message;
1065 if (isGlobalReadLock() != Session::GOT_GLOBAL_READ_LOCK)
1067 LOCK_global_read_lock.lock();
1069 global_read_lock_blocks_commit++;
1070 old_message= enter_cond(COND_global_read_lock, LOCK_global_read_lock,
1071 "Waiting for all running commits to finish");
1072 while (protect_against_global_read_lock && not getKilled())
1074 boost::mutex::scoped_lock scopedLock(LOCK_global_read_lock, boost::adopt_lock_t());
1075 COND_global_read_lock.wait(scopedLock);
1076 scopedLock.release();
1078 if ((error= test(getKilled())))
1080 global_read_lock_blocks_commit--;
1084 setGlobalReadLock(Session::MADE_GLOBAL_READ_LOCK_BLOCK_COMMIT);
1087 exit_cond(old_message);
1114 COND_refresh.notify_all();
1115 COND_global_read_lock.notify_all();
void broadcast_refresh(void)
static void reset_lock_data_and_free(DrizzleLock *&lock)
UNIV_INTERN ulint lock_table(ulint flags, dict_table_t *table, enum lock_mode mode, que_thr_t *thr)
TODO: Rename this file - func.h is stupid.
Table * table
opened table
void unlock_table_names(TableList *last_table=NULL)
virtual THR_LOCK_DATA ** store_lock(Session *, THR_LOCK_DATA **to, enum thr_lock_type)