Drizzled Public API Documentation

sql_insert.cc
1 /* Copyright (C) 2000-2006 MySQL AB
2 
3  This program is free software; you can redistribute it and/or modify
4  it under the terms of the GNU General Public License as published by
5  the Free Software Foundation; version 2 of the License.
6 
7  This program is distributed in the hope that it will be useful,
8  but WITHOUT ANY WARRANTY; without even the implied warranty of
9  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10  GNU General Public License for more details.
11 
12  You should have received a copy of the GNU General Public License
13  along with this program; if not, write to the Free Software
14  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
15 
16 
17 /* Insert of records */
18 
19 #include <config.h>
20 #include <cstdio>
21 #include <drizzled/sql_select.h>
22 #include <drizzled/show.h>
23 #include <drizzled/error.h>
24 #include <drizzled/name_resolution_context_state.h>
25 #include <drizzled/probes.h>
26 #include <drizzled/sql_base.h>
27 #include <drizzled/sql_load.h>
28 #include <drizzled/field/epoch.h>
29 #include <drizzled/lock.h>
30 #include <drizzled/sql_table.h>
31 #include <drizzled/pthread_globals.h>
32 #include <drizzled/transaction_services.h>
33 #include <drizzled/plugin/transactional_storage_engine.h>
34 #include <drizzled/select_insert.h>
35 #include <drizzled/select_create.h>
36 #include <drizzled/table/shell.h>
37 #include <drizzled/alter_info.h>
38 #include <drizzled/sql_parse.h>
39 #include <drizzled/sql_lex.h>
40 #include <drizzled/statistics_variables.h>
41 #include <drizzled/session/transactions.h>
42 #include <drizzled/open_tables_state.h>
43 #include <drizzled/table/cache.h>
44 #include <drizzled/create_field.h>
45 
46 namespace drizzled {
47 
48 extern plugin::StorageEngine *heap_engine;
49 extern plugin::StorageEngine *myisam_engine;
50 
51 /*
52  Check if insert fields are correct.
53 
54  SYNOPSIS
55  check_insert_fields()
56  session The current thread.
57  table The table for insert.
58  fields The insert fields.
59  values The insert values.
60  check_unique If duplicate values should be rejected.
61 
62  NOTE
63  Clears TIMESTAMP_AUTO_SET_ON_INSERT from table->timestamp_field_type
64  or leaves it as is, depending on if timestamp should be updated or
65  not.
66 
67  RETURN
68  0 OK
69  -1 Error
70 */
71 
72 static int check_insert_fields(Session *session, TableList *table_list,
73  List<Item> &fields, List<Item> &values,
74  bool check_unique,
75  table_map *)
76 {
77  Table *table= table_list->table;
78 
79  if (fields.size() == 0 && values.size() != 0)
80  {
81  if (values.size() != table->getShare()->sizeFields())
82  {
83  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
84  return -1;
85  }
86  clear_timestamp_auto_bits(table->timestamp_field_type,
87  TIMESTAMP_AUTO_SET_ON_INSERT);
88  /*
89  No fields are provided so all fields must be provided in the values.
90  Thus we set all bits in the write set.
91  */
92  table->setWriteSet();
93  }
94  else
95  { // Part field list
96  Select_Lex *select_lex= &session->lex().select_lex;
97  Name_resolution_context *context= &select_lex->context;
98  Name_resolution_context_state ctx_state;
99  int res;
100 
101  if (fields.size() != values.size())
102  {
103  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1L);
104  return -1;
105  }
106 
107  session->dup_field= 0;
108 
109  /* Save the state of the current name resolution context. */
110  ctx_state.save_state(context, table_list);
111 
112  /*
113  Perform name resolution only in the first table - 'table_list',
114  which is the table that is inserted into.
115  */
116  table_list->next_local= 0;
117  context->resolve_in_table_list_only(table_list);
118  res= setup_fields(session, 0, fields, MARK_COLUMNS_WRITE, 0, 0);
119 
120  /* Restore the current context. */
121  ctx_state.restore_state(context, table_list);
122 
123  if (res)
124  return -1;
125 
126  if (check_unique && session->dup_field)
127  {
128  my_error(ER_FIELD_SPECIFIED_TWICE, MYF(0), session->dup_field->field_name);
129  return -1;
130  }
131  if (table->timestamp_field) // Don't automaticly set timestamp if used
132  {
133  if (table->timestamp_field->isWriteSet())
134  {
135  clear_timestamp_auto_bits(table->timestamp_field_type,
136  TIMESTAMP_AUTO_SET_ON_INSERT);
137  }
138  else
139  {
140  table->setWriteSet(table->timestamp_field->position());
141  }
142  }
143  }
144 
145  return 0;
146 }
147 
148 
149 /*
150  Check update fields for the timestamp field.
151 
152  SYNOPSIS
153  check_update_fields()
154  session The current thread.
155  insert_table_list The insert table list.
156  table The table for update.
157  update_fields The update fields.
158 
159  NOTE
160  If the update fields include the timestamp field,
161  remove TIMESTAMP_AUTO_SET_ON_UPDATE from table->timestamp_field_type.
162 
163  RETURN
164  0 OK
165  -1 Error
166 */
167 
168 static int check_update_fields(Session *session, TableList *insert_table_list,
169  List<Item> &update_fields,
170  table_map *)
171 {
172  Table *table= insert_table_list->table;
173  bool timestamp_mark= false;
174 
175  if (table->timestamp_field)
176  {
177  /*
178  Unmark the timestamp field so that we can check if this is modified
179  by update_fields
180  */
181  timestamp_mark= table->write_set->test(table->timestamp_field->position());
182  table->write_set->reset(table->timestamp_field->position());
183  }
184 
185  /* Check the fields we are going to modify */
186  if (setup_fields(session, 0, update_fields, MARK_COLUMNS_WRITE, 0, 0))
187  return -1;
188 
189  if (table->timestamp_field)
190  {
191  /* Don't set timestamp column if this is modified. */
192  if (table->timestamp_field->isWriteSet())
193  {
194  clear_timestamp_auto_bits(table->timestamp_field_type,
195  TIMESTAMP_AUTO_SET_ON_UPDATE);
196  }
197 
198  if (timestamp_mark)
199  {
200  table->setWriteSet(table->timestamp_field->position());
201  }
202  }
203  return 0;
204 }
205 
206 
215 static
217  thr_lock_type *lock_type,
218  enum_duplicates duplic,
219  bool )
220 {
221  if (duplic == DUP_UPDATE ||
222  (duplic == DUP_REPLACE && *lock_type == TL_WRITE_CONCURRENT_INSERT))
223  {
224  *lock_type= TL_WRITE_DEFAULT;
225  return;
226  }
227 }
228 
229 
238 bool insert_query(Session *session,TableList *table_list,
239  List<Item> &fields,
240  List<List_item> &values_list,
241  List<Item> &update_fields,
242  List<Item> &update_values,
243  enum_duplicates duplic,
244  bool ignore)
245 {
246  int error;
247  bool transactional_table, joins_freed= false;
248  bool changed;
249  uint32_t value_count;
250  ulong counter = 1;
251  uint64_t id;
252  CopyInfo info;
253  Table *table= 0;
254  List<List_item>::iterator its(values_list.begin());
255  List_item *values;
256  Name_resolution_context *context;
258  Item *unused_conds= 0;
259 
260 
261  /*
262  Upgrade lock type if the requested lock is incompatible with
263  the current connection mode or table operation.
264  */
265  upgrade_lock_type(session, &table_list->lock_type, duplic,
266  values_list.size() > 1);
267 
268  if (session->openTablesLock(table_list))
269  {
270  DRIZZLE_INSERT_DONE(1, 0);
271  return true;
272  }
273 
274  session->set_proc_info("init");
275  session->used_tables=0;
276  values= its++;
277  value_count= values->size();
278 
279  if (prepare_insert(session, table_list, table, fields, values,
280  update_fields, update_values, duplic, &unused_conds,
281  false,
282  (fields.size() || !value_count ||
283  (0) != 0), !ignore))
284  {
285  if (table != NULL)
287  if (!joins_freed)
288  free_underlaid_joins(session, &session->lex().select_lex);
289  session->setAbortOnWarning(false);
290  DRIZZLE_INSERT_DONE(1, 0);
291  return true;
292  }
293 
294  /* mysql_prepare_insert set table_list->table if it was not set */
295  table= table_list->table;
296 
297  context= &session->lex().select_lex.context;
298  /*
299  These three asserts test the hypothesis that the resetting of the name
300  resolution context below is not necessary at all since the list of local
301  tables for INSERT always consists of one table.
302  */
303  assert(!table_list->next_local);
304  assert(!context->table_list->next_local);
306 
307  /* Save the state of the current name resolution context. */
308  ctx_state.save_state(context, table_list);
309 
310  /*
311  Perform name resolution only in the first table - 'table_list',
312  which is the table that is inserted into.
313  */
314  table_list->next_local= 0;
315  context->resolve_in_table_list_only(table_list);
316 
317  while ((values= its++))
318  {
319  counter++;
320  if (values->size() != value_count)
321  {
322  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
323 
324  if (table != NULL)
326  if (!joins_freed)
327  free_underlaid_joins(session, &session->lex().select_lex);
328  session->setAbortOnWarning(false);
329  DRIZZLE_INSERT_DONE(1, 0);
330 
331  return true;
332  }
333  if (setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0))
334  {
335  if (table != NULL)
337  if (!joins_freed)
338  free_underlaid_joins(session, &session->lex().select_lex);
339  session->setAbortOnWarning(false);
340  DRIZZLE_INSERT_DONE(1, 0);
341  return true;
342  }
343  }
344  its= values_list.begin();
345 
346  /* Restore the current context. */
347  ctx_state.restore_state(context, table_list);
348 
349  /*
350  Fill in the given fields and dump it to the table cursor
351  */
352  info.ignore= ignore;
353  info.handle_duplicates=duplic;
354  info.update_fields= &update_fields;
355  info.update_values= &update_values;
356 
357  /*
358  Count warnings for all inserts.
359  For single line insert, generate an error if try to set a NOT NULL field
360  to NULL.
361  */
362  session->count_cuted_fields= ignore ? CHECK_FIELD_WARN : CHECK_FIELD_ERROR_FOR_NULL;
363 
364  session->cuted_fields = 0L;
366 
367  error=0;
368  session->set_proc_info("update");
369  if (duplic == DUP_REPLACE)
370  table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
371  if (duplic == DUP_UPDATE)
372  table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
373  {
374  if (duplic != DUP_ERROR || ignore)
375  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
376  table->cursor->ha_start_bulk_insert(values_list.size());
377  }
378 
379 
380  session->setAbortOnWarning(not ignore);
381 
382  table->mark_columns_needed_for_insert();
383 
384  while ((values= its++))
385  {
386  if (fields.size() || !value_count)
387  {
388  table->restoreRecordAsDefault(); // Get empty record
389  if (fill_record(session, fields, *values))
390  {
391  if (values_list.size() != 1 && ! session->is_error())
392  {
393  info.records++;
394  continue;
395  }
396  /*
397  TODO: set session->abort_on_warning if values_list.elements == 1
398  and check that all items return warning in case of problem with
399  storing field.
400  */
401  error=1;
402  break;
403  }
404  }
405  else
406  {
407  table->restoreRecordAsDefault(); // Get empty record
408 
409  if (fill_record(session, table->getFields(), *values))
410  {
411  if (values_list.size() != 1 && ! session->is_error())
412  {
413  info.records++;
414  continue;
415  }
416  error=1;
417  break;
418  }
419  }
420 
421  // Release latches in case bulk insert takes a long time
423 
424  error=write_record(session, table ,&info);
425  if (error)
426  break;
427  session->row_count++;
428  }
429 
430  free_underlaid_joins(session, &session->lex().select_lex);
431  joins_freed= true;
432 
433  /*
434  Now all rows are inserted. Time to update logs and sends response to
435  user
436  */
437  {
438  /*
439  Do not do this release if this is a delayed insert, it would steal
440  auto_inc values from the delayed_insert thread as they share Table.
441  */
443  if (table->cursor->ha_end_bulk_insert() && !error)
444  {
445  table->print_error(errno,MYF(0));
446  error=1;
447  }
448  if (duplic != DUP_ERROR || ignore)
449  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
450 
451  transactional_table= table->cursor->has_transactions();
452 
453  changed= (info.copied || info.deleted || info.updated);
454  if ((changed && error <= 0) || session->transaction.stmt.hasModifiedNonTransData())
455  {
456  if (session->transaction.stmt.hasModifiedNonTransData())
457  session->transaction.all.markModifiedNonTransData();
458  }
459  (void)(transactional_table);
460  assert(transactional_table || !changed || session->transaction.stmt.hasModifiedNonTransData());
461 
462  }
463  session->set_proc_info("end");
464  /*
465  We'll report to the client this id:
466  - if the table contains an autoincrement column and we successfully
467  inserted an autogenerated value, the autogenerated value.
468  - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
469  called, X.
470  - if the table contains an autoincrement column, and some rows were
471  inserted, the id of the last "inserted" row (if IGNORE, that value may not
472  have been really inserted but ignored).
473  */
474  id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
476  (session->arg_of_last_insert_id_function ?
478  ((table->next_number_field && info.copied) ?
479  table->next_number_field->val_int() : 0));
480  table->next_number_field=0;
481  session->count_cuted_fields= CHECK_FIELD_IGNORE;
482  table->auto_increment_field_not_null= false;
483  if (duplic == DUP_REPLACE)
484  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
485 
486  if (error)
487  {
488  if (table != NULL)
490  if (!joins_freed)
491  free_underlaid_joins(session, &session->lex().select_lex);
492  session->setAbortOnWarning(false);
493  DRIZZLE_INSERT_DONE(1, 0);
494  return true;
495  }
496 
497  if (values_list.size() == 1 && (!(session->options & OPTION_WARNINGS) ||
498  !session->cuted_fields))
499  {
500  session->row_count_func= info.copied + info.deleted + info.updated;
501  session->my_ok((ulong) session->rowCount(),
502  info.copied + info.deleted + info.touched, id);
503  }
504  else
505  {
506  char buff[160];
507  if (ignore)
508  snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
509  (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
510  else
511  snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
512  (ulong) (info.deleted + info.updated), (ulong) session->cuted_fields);
513  session->row_count_func= info.copied + info.deleted + info.updated;
514  session->my_ok((ulong) session->rowCount(),
515  info.copied + info.deleted + info.touched, id, buff);
516  }
517  session->status_var.inserted_row_count+= session->rowCount();
518  session->setAbortOnWarning(false);
519  DRIZZLE_INSERT_DONE(0, session->rowCount());
520 
521  return false;
522 }
523 
524 
525 /*
526  Check if table can be updated
527 
528  SYNOPSIS
529  prepare_insert_check_table()
530  session Thread handle
531  table_list Table list
532  fields List of fields to be updated
533  where Pointer to where clause
534  select_insert Check is making for SELECT ... INSERT
535 
536  RETURN
537  false ok
538  true ERROR
539 */
540 
541 static bool prepare_insert_check_table(Session *session, TableList *table_list,
542  List<Item> &,
543  bool select_insert)
544 {
545 
546 
547  /*
548  first table in list is the one we'll INSERT into, requires INSERT_ACL.
549  all others require SELECT_ACL only. the ACL requirement below is for
550  new leaves only anyway (view-constituents), so check for SELECT rather
551  than INSERT.
552  */
553 
554  return setup_tables_and_check_access(session, &session->lex().select_lex.context,
555  &session->lex().select_lex.top_join_list, table_list, &session->lex().select_lex.leaf_tables, select_insert);
556 }
557 
558 
559 /*
560  Prepare items in INSERT statement
561 
562  SYNOPSIS
563  prepare_insert()
564  session Thread handler
565  table_list Global/local table list
566  table Table to insert into (can be NULL if table should
567  be taken from table_list->table)
568  where Where clause (for insert ... select)
569  select_insert true if INSERT ... SELECT statement
570  check_fields true if need to check that all INSERT fields are
571  given values.
572  abort_on_warning whether to report if some INSERT field is not
573  assigned as an error (true) or as a warning (false).
574 
575  TODO (in far future)
576  In cases of:
577  INSERT INTO t1 SELECT a, sum(a) as sum1 from t2 GROUP BY a
578  ON DUPLICATE KEY ...
579  we should be able to refer to sum1 in the ON DUPLICATE KEY part
580 
581  WARNING
582  You MUST set table->insert_values to 0 after calling this function
583  before releasing the table object.
584 
585  RETURN VALUE
586  false OK
587  true error
588 */
589 
590 bool prepare_insert(Session *session, TableList *table_list,
591  Table *table, List<Item> &fields, List_item *values,
592  List<Item> &update_fields, List<Item> &update_values,
593  enum_duplicates duplic,
594  COND **,
595  bool select_insert,
596  bool check_fields, bool abort_on_warning)
597 {
598  Select_Lex *select_lex= &session->lex().select_lex;
599  Name_resolution_context *context= &select_lex->context;
600  Name_resolution_context_state ctx_state;
601  bool insert_into_view= (0 != 0);
602  bool res= 0;
603  table_map map= 0;
604 
605  /* INSERT should have a SELECT or VALUES clause */
606  assert (!select_insert || !values);
607 
608  /*
609  For subqueries in VALUES() we should not see the table in which we are
610  inserting (for INSERT ... SELECT this is done by changing table_list,
611  because INSERT ... SELECT share Select_Lex it with SELECT.
612  */
613  if (not select_insert)
614  {
615  for (Select_Lex_Unit *un= select_lex->first_inner_unit();
616  un;
617  un= un->next_unit())
618  {
619  for (Select_Lex *sl= un->first_select();
620  sl;
621  sl= sl->next_select())
622  {
623  sl->context.outer_context= 0;
624  }
625  }
626  }
627 
628  if (duplic == DUP_UPDATE)
629  {
630  /* it should be allocated before Item::fix_fields() */
631  table_list->set_insert_values();
632  }
633 
634  if (prepare_insert_check_table(session, table_list, fields, select_insert))
635  return true;
636 
637 
638  /* Prepare the fields in the statement. */
639  if (values)
640  {
641  /* if we have INSERT ... VALUES () we cannot have a GROUP BY clause */
642  assert (!select_lex->group_list.elements);
643 
644  /* Save the state of the current name resolution context. */
645  ctx_state.save_state(context, table_list);
646 
647  /*
648  Perform name resolution only in the first table - 'table_list',
649  which is the table that is inserted into.
650  */
651  table_list->next_local= 0;
652  context->resolve_in_table_list_only(table_list);
653 
654  res= check_insert_fields(session, context->table_list, fields, *values,
655  !insert_into_view, &map) ||
656  setup_fields(session, 0, *values, MARK_COLUMNS_READ, 0, 0);
657 
658  if (!res && check_fields)
659  {
660  bool saved_abort_on_warning= session->abortOnWarning();
661 
662  session->setAbortOnWarning(abort_on_warning);
663  res= check_that_all_fields_are_given_values(session,
664  table ? table :
665  context->table_list->table,
666  context->table_list);
667  session->setAbortOnWarning(saved_abort_on_warning);
668  }
669 
670  if (!res && duplic == DUP_UPDATE)
671  {
672  res= check_update_fields(session, context->table_list, update_fields, &map);
673  }
674 
675  /* Restore the current context. */
676  ctx_state.restore_state(context, table_list);
677 
678  if (not res)
679  res= setup_fields(session, 0, update_values, MARK_COLUMNS_READ, 0, 0);
680  }
681 
682  if (res)
683  return res;
684 
685  if (not table)
686  table= table_list->table;
687 
688  if (not select_insert && unique_table(table_list, table_list->next_global, true))
689  {
690  my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->alias);
691  return true;
692  }
693 
694  if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
695  table->prepare_for_position();
696 
697  return false;
698 }
699 
700 
701  /* Check if there is more uniq keys after field */
702 
703 static int last_uniq_key(Table *table,uint32_t keynr)
704 {
705  while (++keynr < table->getShare()->sizeKeys())
706  if (table->key_info[keynr].flags & HA_NOSAME)
707  return 0;
708  return 1;
709 }
710 
711 
712 /*
713  Write a record to table with optional deleting of conflicting records,
714  invoke proper triggers if needed.
715 
716  SYNOPSIS
717  write_record()
718  session - thread context
719  table - table to which record should be written
720  info - CopyInfo structure describing handling of duplicates
721  and which is used for counting number of records inserted
722  and deleted.
723 
724  NOTE
725  Once this record will be written to table after insert trigger will
726  be invoked. If instead of inserting new record we will update old one
727  then both on update triggers will work instead. Similarly both on
728  delete triggers will be invoked if we will delete conflicting records.
729 
730  Sets session->transaction.stmt.modified_non_trans_data to true if table which is updated didn't have
731  transactions.
732 
733  RETURN VALUE
734  0 - success
735  non-0 - error
736 */
737 
738 
739 int write_record(Session *session, Table *table,CopyInfo *info)
740 {
741  int error;
742  std::vector<unsigned char> key;
743  boost::dynamic_bitset<> *save_read_set, *save_write_set;
744  uint64_t prev_insert_id= table->cursor->next_insert_id;
745  uint64_t insert_id_for_cur_row= 0;
746 
747 
748  info->records++;
749  save_read_set= table->read_set;
750  save_write_set= table->write_set;
751 
752  if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE)
753  {
754  while ((error=table->cursor->insertRecord(table->getInsertRecord())))
755  {
756  uint32_t key_nr;
757  /*
758  If we do more than one iteration of this loop, from the second one the
759  row will have an explicit value in the autoinc field, which was set at
760  the first call of handler::update_auto_increment(). So we must save
761  the autogenerated value to avoid session->insert_id_for_cur_row to become
762  0.
763  */
764  if (table->cursor->insert_id_for_cur_row > 0)
765  insert_id_for_cur_row= table->cursor->insert_id_for_cur_row;
766  else
767  table->cursor->insert_id_for_cur_row= insert_id_for_cur_row;
768  bool is_duplicate_key_error;
769  if (table->cursor->is_fatal_error(error, HA_CHECK_DUP))
770  goto err;
771  is_duplicate_key_error= table->cursor->is_fatal_error(error, 0);
772  if (!is_duplicate_key_error)
773  {
774  /*
775  We come here when we had an ignorable error which is not a duplicate
776  key error. In this we ignore error if ignore flag is set, otherwise
777  report error as usual. We will not do any duplicate key processing.
778  */
779  if (info->ignore)
780  goto gok_or_after_err; /* Ignoring a not fatal error, return 0 */
781  goto err;
782  }
783  if ((int) (key_nr = table->get_dup_key(error)) < 0)
784  {
785  error= HA_ERR_FOUND_DUPP_KEY; /* Database can't find key */
786  goto err;
787  }
788  /* Read all columns for the row we are going to replace */
789  table->use_all_columns();
790  /*
791  Don't allow REPLACE to replace a row when a auto_increment column
792  was used. This ensures that we don't get a problem when the
793  whole range of the key has been used.
794  */
795  if (info->handle_duplicates == DUP_REPLACE &&
796  table->next_number_field &&
797  key_nr == table->getShare()->next_number_index &&
798  (insert_id_for_cur_row > 0))
799  goto err;
800  if (table->cursor->getEngine()->check_flag(HTON_BIT_DUPLICATE_POS))
801  {
802  if (table->cursor->rnd_pos(table->getUpdateRecord(),table->cursor->dup_ref))
803  goto err;
804  }
805  else
806  {
807  if (table->cursor->extra(HA_EXTRA_FLUSH_CACHE)) /* Not needed with NISAM */
808  {
809  error=errno;
810  goto err;
811  }
812 
813  if (not key.size())
814  {
815  key.resize(table->getShare()->max_unique_length);
816  }
817  key_copy(&key[0], table->getInsertRecord(), table->key_info+key_nr, 0);
818  if ((error=(table->cursor->index_read_idx_map(table->getUpdateRecord(),key_nr,
819  &key[0], HA_WHOLE_KEY,
820  HA_READ_KEY_EXACT))))
821  goto err;
822  }
823  if (info->handle_duplicates == DUP_UPDATE)
824  {
825  /*
826  We don't check for other UNIQUE keys - the first row
827  that matches, is updated. If update causes a conflict again,
828  an error is returned
829  */
830  assert(table->insert_values.size());
831  table->storeRecordAsInsert();
832  table->restoreRecord();
833  assert(info->update_fields->size() ==
834  info->update_values->size());
835  if (fill_record(session, *info->update_fields,
836  *info->update_values,
837  info->ignore))
838  goto before_err;
839 
840  table->cursor->restore_auto_increment(prev_insert_id);
841  if (table->next_number_field)
842  table->cursor->adjust_next_insert_id_after_explicit_value(
843  table->next_number_field->val_int());
844  info->touched++;
845 
846  if (! table->records_are_comparable() || table->compare_records())
847  {
848  if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
849  table->getInsertRecord())) &&
850  error != HA_ERR_RECORD_IS_THE_SAME)
851  {
852  if (info->ignore &&
853  !table->cursor->is_fatal_error(error, HA_CHECK_DUP_KEY))
854  {
855  goto gok_or_after_err;
856  }
857  goto err;
858  }
859 
860  if (error != HA_ERR_RECORD_IS_THE_SAME)
861  info->updated++;
862  else
863  error= 0;
864  /*
865  If ON DUP KEY UPDATE updates a row instead of inserting one, it's
866  like a regular UPDATE statement: it should not affect the value of a
867  next SELECT LAST_INSERT_ID() or insert_id().
868  Except if LAST_INSERT_ID(#) was in the INSERT query, which is
869  handled separately by Session::arg_of_last_insert_id_function.
870  */
871  insert_id_for_cur_row= table->cursor->insert_id_for_cur_row= 0;
872  info->copied++;
873  }
874 
875  if (table->next_number_field)
876  table->cursor->adjust_next_insert_id_after_explicit_value(
877  table->next_number_field->val_int());
878  info->touched++;
879 
880  goto gok_or_after_err;
881  }
882  else /* DUP_REPLACE */
883  {
884  /*
885  The manual defines the REPLACE semantics that it is either
886  an INSERT or DELETE(s) + INSERT; FOREIGN KEY checks in
887  InnoDB do not function in the defined way if we allow MySQL
888  to convert the latter operation internally to an UPDATE.
889  We also should not perform this conversion if we have
890  timestamp field with ON UPDATE which is different from DEFAULT.
891  Another case when conversion should not be performed is when
892  we have ON DELETE trigger on table so user may notice that
893  we cheat here. Note that it is ok to do such conversion for
894  tables which have ON UPDATE but have no ON DELETE triggers,
895  we just should not expose this fact to users by invoking
896  ON UPDATE triggers.
897  */
898  if (last_uniq_key(table,key_nr) &&
899  !table->cursor->referenced_by_foreign_key() &&
900  (table->timestamp_field_type == TIMESTAMP_NO_AUTO_SET ||
901  table->timestamp_field_type == TIMESTAMP_AUTO_SET_ON_BOTH))
902  {
903  if ((error=table->cursor->updateRecord(table->getUpdateRecord(),
904  table->getInsertRecord())) &&
905  error != HA_ERR_RECORD_IS_THE_SAME)
906  goto err;
907  if (error != HA_ERR_RECORD_IS_THE_SAME)
908  info->deleted++;
909  else
910  error= 0;
911  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
912  /*
913  Since we pretend that we have done insert we should call
914  its after triggers.
915  */
916  goto after_n_copied_inc;
917  }
918  else
919  {
920  if ((error=table->cursor->deleteRecord(table->getUpdateRecord())))
921  goto err;
922  info->deleted++;
923  if (!table->cursor->has_transactions())
924  session->transaction.stmt.markModifiedNonTransData();
925  /* Let us attempt do write_row() once more */
926  }
927  }
928  }
929  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
930  /*
931  Restore column maps if they where replaced during an duplicate key
932  problem.
933  */
934  if (table->read_set != save_read_set ||
935  table->write_set != save_write_set)
936  table->column_bitmaps_set(*save_read_set, *save_write_set);
937  }
938  else if ((error=table->cursor->insertRecord(table->getInsertRecord())))
939  {
940  if (!info->ignore ||
941  table->cursor->is_fatal_error(error, HA_CHECK_DUP))
942  goto err;
943  table->cursor->restore_auto_increment(prev_insert_id);
944  goto gok_or_after_err;
945  }
946 
947 after_n_copied_inc:
948  info->copied++;
949  session->record_first_successful_insert_id_in_cur_stmt(table->cursor->insert_id_for_cur_row);
950 
951 gok_or_after_err:
952  if (!table->cursor->has_transactions())
953  session->transaction.stmt.markModifiedNonTransData();
954  return 0;
955 
956 err:
957  info->last_errno= error;
958  /* current_select is NULL if this is a delayed insert */
959  if (session->lex().current_select)
960  session->lex().current_select->no_error= 0; // Give error
961  table->print_error(error,MYF(0));
962 
963 before_err:
964  table->cursor->restore_auto_increment(prev_insert_id);
965  table->column_bitmaps_set(*save_read_set, *save_write_set);
966  return 1;
967 }
968 
969 
970 /******************************************************************************
971  Check that all fields with arn't null_fields are used
972 ******************************************************************************/
973 
974 int check_that_all_fields_are_given_values(Session *session, Table *entry,
975  TableList *)
976 {
977  int err= 0;
978 
979  for (Field **field=entry->getFields() ; *field ; field++)
980  {
981  if (not (*field)->isWriteSet())
982  {
983  /*
984  * If the field doesn't have any default value
985  * and there is no actual value specified in the
986  * INSERT statement, throw error ER_NO_DEFAULT_FOR_FIELD.
987  */
988  if (((*field)->flags & NO_DEFAULT_VALUE_FLAG) &&
989  ((*field)->real_type() != DRIZZLE_TYPE_ENUM))
990  {
991  my_error(ER_NO_DEFAULT_FOR_FIELD, MYF(0), (*field)->field_name);
992  err= 1;
993  }
994  }
995  else
996  {
997  /*
998  * However, if an actual NULL value was specified
999  * for the field and the field is a NOT NULL field,
1000  * throw ER_BAD_NULL_ERROR.
1001  *
1002  * Per the SQL standard, inserting NULL into a NOT NULL
1003  * field requires an error to be thrown.
1004  */
1005  if (((*field)->flags & NOT_NULL_FLAG) &&
1006  (*field)->is_null())
1007  {
1008  my_error(ER_BAD_NULL_ERROR, MYF(0), (*field)->field_name);
1009  err= 1;
1010  }
1011  }
1012  }
1013  return session->abortOnWarning() ? err : 0;
1014 }
1015 
1016 /***************************************************************************
1017  Store records in INSERT ... SELECT *
1018 ***************************************************************************/
1019 
1020 
1021 /*
1022  make insert specific preparation and checks after opening tables
1023 
1024  SYNOPSIS
1025  insert_select_prepare()
1026  session thread handler
1027 
1028  RETURN
1029  false OK
1030  true Error
1031 */
1032 
1033 bool insert_select_prepare(Session *session)
1034 {
1035  LEX *lex= &session->lex();
1036  Select_Lex *select_lex= &lex->select_lex;
1037 
1038  /*
1039  Select_Lex do not belong to INSERT statement, so we can't add WHERE
1040  clause if table is VIEW
1041  */
1042 
1043  if (prepare_insert(session, lex->query_tables,
1044  lex->query_tables->table, lex->field_list, 0,
1045  lex->update_list, lex->value_list,
1046  lex->duplicates,
1047  &select_lex->where, true, false, false))
1048  return true;
1049 
1050  /*
1051  exclude first table from leaf tables list, because it belong to
1052  INSERT
1053  */
1054  assert(select_lex->leaf_tables != 0);
1055  lex->leaf_tables_insert= select_lex->leaf_tables;
1056  /* skip all leaf tables belonged to view where we are insert */
1057  select_lex->leaf_tables= select_lex->leaf_tables->next_leaf;
1058  return false;
1059 }
1060 
1061 
1062 select_insert::select_insert(TableList *table_list_par, Table *table_par,
1063  List<Item> *fields_par,
1064  List<Item> *update_fields,
1065  List<Item> *update_values,
1066  enum_duplicates duplic,
1067  bool ignore_check_option_errors) :
1068  table_list(table_list_par), table(table_par), fields(fields_par),
1069  autoinc_value_of_last_inserted_row(0),
1070  insert_into_view(table_list_par && 0 != 0)
1071 {
1072  info.handle_duplicates= duplic;
1073  info.ignore= ignore_check_option_errors;
1074  info.update_fields= update_fields;
1075  info.update_values= update_values;
1076 }
1077 
1078 
1079 int
1080 select_insert::prepare(List<Item> &values, Select_Lex_Unit *u)
1081 {
1082  int res;
1083  table_map map= 0;
1084  Select_Lex *lex_current_select_save= session->lex().current_select;
1085 
1086 
1087  unit= u;
1088 
1089  /*
1090  Since table in which we are going to insert is added to the first
1091  select, LEX::current_select should point to the first select while
1092  we are fixing fields from insert list.
1093  */
1094  session->lex().current_select= &session->lex().select_lex;
1095  res= check_insert_fields(session, table_list, *fields, values,
1096  !insert_into_view, &map) ||
1097  setup_fields(session, 0, values, MARK_COLUMNS_READ, 0, 0);
1098 
1099  if (!res && fields->size())
1100  {
1101  bool saved_abort_on_warning= session->abortOnWarning();
1102  session->setAbortOnWarning(not info.ignore);
1103  res= check_that_all_fields_are_given_values(session, table_list->table,
1104  table_list);
1105  session->setAbortOnWarning(saved_abort_on_warning);
1106  }
1107 
1108  if (info.handle_duplicates == DUP_UPDATE && !res)
1109  {
1110  Name_resolution_context *context= &session->lex().select_lex.context;
1111  Name_resolution_context_state ctx_state;
1112 
1113  /* Save the state of the current name resolution context. */
1114  ctx_state.save_state(context, table_list);
1115 
1116  /* Perform name resolution only in the first table - 'table_list'. */
1117  table_list->next_local= 0;
1118  context->resolve_in_table_list_only(table_list);
1119 
1120  res= res || check_update_fields(session, context->table_list,
1121  *info.update_fields, &map);
1122  /*
1123  When we are not using GROUP BY and there are no ungrouped aggregate functions
1124  we can refer to other tables in the ON DUPLICATE KEY part.
1125  We use next_name_resolution_table descructively, so check it first (views?)
1126  */
1127  assert (!table_list->next_name_resolution_table);
1128  if (session->lex().select_lex.group_list.elements == 0 and
1129  not session->lex().select_lex.with_sum_func)
1130  /*
1131  We must make a single context out of the two separate name resolution contexts :
1132  the INSERT table and the tables in the SELECT part of INSERT ... SELECT.
1133  To do that we must concatenate the two lists
1134  */
1135  table_list->next_name_resolution_table=
1136  ctx_state.get_first_name_resolution_table();
1137 
1138  res= res || setup_fields(session, 0, *info.update_values,
1139  MARK_COLUMNS_READ, 0, 0);
1140  if (!res)
1141  {
1142  /*
1143  Traverse the update values list and substitute fields from the
1144  select for references (Item_ref objects) to them. This is done in
1145  order to get correct values from those fields when the select
1146  employs a temporary table.
1147  */
1148  List<Item>::iterator li(info.update_values->begin());
1149  Item *item;
1150 
1151  while ((item= li++))
1152  {
1153  item->transform(&Item::update_value_transformer,
1154  (unsigned char*)session->lex().current_select);
1155  }
1156  }
1157 
1158  /* Restore the current context. */
1159  ctx_state.restore_state(context, table_list);
1160  }
1161 
1162  session->lex().current_select= lex_current_select_save;
1163  if (res)
1164  return 1;
1165  /*
1166  if it is INSERT into join view then check_insert_fields already found
1167  real table for insert
1168  */
1169  table= table_list->table;
1170 
1171  /*
1172  Is table which we are changing used somewhere in other parts of
1173  query
1174  */
1175  if (unique_table(table_list, table_list->next_global))
1176  {
1177  /* Using same table for INSERT and SELECT */
1178  session->lex().current_select->options|= OPTION_BUFFER_RESULT;
1179  session->lex().current_select->join->select_options|= OPTION_BUFFER_RESULT;
1180  }
1181  else if (not (session->lex().current_select->options & OPTION_BUFFER_RESULT))
1182  {
1183  /*
1184  We must not yet prepare the result table if it is the same as one of the
1185  source tables (INSERT SELECT). The preparation may disable
1186  indexes on the result table, which may be used during the select, if it
1187  is the same table (Bug #6034). Do the preparation after the select phase
1188  in select_insert::prepare2().
1189  We won't start bulk inserts at all if this statement uses functions or
1190  should invoke triggers since they may access to the same table too.
1191  */
1192  table->cursor->ha_start_bulk_insert((ha_rows) 0);
1193  }
1194  table->restoreRecordAsDefault(); // Get empty record
1195  table->next_number_field=table->found_next_number_field;
1196 
1197  session->cuted_fields=0;
1198 
1199  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1200  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1201 
1202  if (info.handle_duplicates == DUP_REPLACE)
1203  table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1204 
1205  if (info.handle_duplicates == DUP_UPDATE)
1206  table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1207 
1208  session->setAbortOnWarning(not info.ignore);
1209  table->mark_columns_needed_for_insert();
1210 
1211 
1212  return res;
1213 }
1214 
1215 
1216 /*
1217  Finish the preparation of the result table.
1218 
1219  SYNOPSIS
1220  select_insert::prepare2()
1221  void
1222 
1223  DESCRIPTION
1224  If the result table is the same as one of the source tables (INSERT SELECT),
1225  the result table is not finally prepared at the join prepair phase.
1226  Do the final preparation now.
1227 
1228  RETURN
1229  0 OK
1230 */
1231 
1232 int select_insert::prepare2(void)
1233 {
1234  if (session->lex().current_select->options & OPTION_BUFFER_RESULT)
1235  table->cursor->ha_start_bulk_insert((ha_rows) 0);
1236 
1237  return 0;
1238 }
1239 
1240 
1241 void select_insert::cleanup()
1242 {
1243  /* select_insert/select_create are never re-used in prepared statement */
1244  assert(0);
1245 }
1246 
1247 select_insert::~select_insert()
1248 {
1249 
1250  if (table)
1251  {
1252  table->next_number_field=0;
1253  table->auto_increment_field_not_null= false;
1254  table->cursor->ha_reset();
1255  }
1256  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1257  session->setAbortOnWarning(false);
1258  return;
1259 }
1260 
1261 
1262 bool select_insert::send_data(List<Item> &values)
1263 {
1264 
1265  bool error= false;
1266 
1267  if (unit->offset_limit_cnt)
1268  { // using limit offset,count
1269  unit->offset_limit_cnt--;
1270  return false;
1271  }
1272 
1273  session->count_cuted_fields= CHECK_FIELD_WARN; // Calculate cuted fields
1274  store_values(values);
1275  session->count_cuted_fields= CHECK_FIELD_IGNORE;
1276  if (session->is_error())
1277  return true;
1278 
1279  // Release latches in case bulk insert takes a long time
1280  plugin::TransactionalStorageEngine::releaseTemporaryLatches(session);
1281 
1282  error= write_record(session, table, &info);
1283  table->auto_increment_field_not_null= false;
1284 
1285  if (!error)
1286  {
1287  if (info.handle_duplicates == DUP_UPDATE)
1288  {
1289  /*
1290  Restore fields of the record since it is possible that they were
1291  changed by ON DUPLICATE KEY UPDATE clause.
1292 
1293  If triggers exist then whey can modify some fields which were not
1294  originally touched by INSERT ... SELECT, so we have to restore
1295  their original values for the next row.
1296  */
1297  table->restoreRecordAsDefault();
1298  }
1299  if (table->next_number_field)
1300  {
1301  /*
1302  If no value has been autogenerated so far, we need to remember the
1303  value we just saw, we may need to send it to client in the end.
1304  */
1305  if (session->first_successful_insert_id_in_cur_stmt == 0) // optimization
1306  autoinc_value_of_last_inserted_row=
1307  table->next_number_field->val_int();
1308  /*
1309  Clear auto-increment field for the next record, if triggers are used
1310  we will clear it twice, but this should be cheap.
1311  */
1312  table->next_number_field->reset();
1313  }
1314  }
1315  return(error);
1316 }
1317 
1318 
1319 void select_insert::store_values(List<Item> &values)
1320 {
1321  if (fields->size())
1322  fill_record(session, *fields, values, true);
1323  else
1324  fill_record(session, table->getFields(), values, true);
1325 }
1326 
1327 void select_insert::send_error(drizzled::error_t errcode,const char *err)
1328 {
1329  my_message(errcode, err, MYF(0));
1330 }
1331 
1332 
1333 bool select_insert::send_eof()
1334 {
1335  int error;
1336  bool const trans_table= table->cursor->has_transactions();
1337  uint64_t id;
1338  bool changed;
1339 
1340  error= table->cursor->ha_end_bulk_insert();
1341  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1342  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1343 
1344  if ((changed= (info.copied || info.deleted || info.updated)))
1345  {
1346  if (session->transaction.stmt.hasModifiedNonTransData())
1347  session->transaction.all.markModifiedNonTransData();
1348  }
1349  assert(trans_table || !changed ||
1350  session->transaction.stmt.hasModifiedNonTransData());
1351 
1352  table->cursor->ha_release_auto_increment();
1353 
1354  if (error)
1355  {
1356  table->print_error(error,MYF(0));
1357  DRIZZLE_INSERT_SELECT_DONE(error, 0);
1358  return 1;
1359  }
1360  char buff[160];
1361  if (info.ignore)
1362  snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1363  (ulong) (info.records - info.copied), (ulong) session->cuted_fields);
1364  else
1365  snprintf(buff, sizeof(buff), ER(ER_INSERT_INFO), (ulong) info.records,
1366  (ulong) (info.deleted+info.updated), (ulong) session->cuted_fields);
1367  session->row_count_func= info.copied + info.deleted + info.updated;
1368 
1369  id= (session->first_successful_insert_id_in_cur_stmt > 0) ?
1370  session->first_successful_insert_id_in_cur_stmt :
1371  (session->arg_of_last_insert_id_function ?
1372  session->first_successful_insert_id_in_prev_stmt :
1373  (info.copied ? autoinc_value_of_last_inserted_row : 0));
1374  session->my_ok((ulong) session->rowCount(),
1375  info.copied + info.deleted + info.touched, id, buff);
1376  session->status_var.inserted_row_count+= session->rowCount();
1377  DRIZZLE_INSERT_SELECT_DONE(0, session->rowCount());
1378  return 0;
1379 }
1380 
1381 void select_insert::abort() {
1382 
1383 
1384  /*
1385  If the creation of the table failed (due to a syntax error, for
1386  example), no table will have been opened and therefore 'table'
1387  will be NULL. In that case, we still need to execute the rollback
1388  and the end of the function.
1389  */
1390  if (table)
1391  {
1392  bool changed, transactional_table;
1393 
1394  table->cursor->ha_end_bulk_insert();
1395 
1396  changed= (info.copied || info.deleted || info.updated);
1397  transactional_table= table->cursor->has_transactions();
1398  (void)(transactional_table);
1399  (void)(changed);
1400  assert(transactional_table || !changed ||
1401  session->transaction.stmt.hasModifiedNonTransData());
1402  table->cursor->ha_release_auto_increment();
1403  }
1404 
1405  if (DRIZZLE_INSERT_SELECT_DONE_ENABLED())
1406  {
1407  DRIZZLE_INSERT_SELECT_DONE(0, info.copied + info.deleted + info.updated);
1408  }
1409 
1410  return;
1411 }
1412 
1413 
1414 /***************************************************************************
1415  CREATE TABLE (SELECT) ...
1416 ***************************************************************************/
1417 
1418 /*
1419  Create table from lists of fields and items (or just return Table
1420  object for pre-opened existing table).
1421 
1422  SYNOPSIS
1423  create_table_from_items()
1424  session in Thread object
1425  create_info in Create information (like MAX_ROWS, ENGINE or
1426  temporary table flag)
1427  create_table in Pointer to TableList object providing database
1428  and name for table to be created or to be open
1429  alter_info in/out Initial list of columns and indexes for the table
1430  to be created
1431  items in List of items which should be used to produce rest
1432  of fields for the table (corresponding fields will
1433  be added to the end of alter_info->create_list)
1434  lock out Pointer to the DrizzleLock object for table created
1435  (or open temporary table) will be returned in this
1436  parameter. Since this table is not included in
1437  Session::lock caller is responsible for explicitly
1438  unlocking this table.
1439  hooks
1440 
1441  NOTES
1442  This function behaves differently for base and temporary tables:
1443  - For base table we assume that either table exists and was pre-opened
1444  and locked at openTablesLock() stage (and in this case we just
1445  emit error or warning and return pre-opened Table object) or special
1446  placeholder was put in table cache that guarantees that this table
1447  won't be created or opened until the placeholder will be removed
1448  (so there is an exclusive lock on this table).
1449  - We don't pre-open existing temporary table, instead we either open
1450  or create and then open table in this function.
1451 
1452  Since this function contains some logic specific to CREATE TABLE ...
1453  SELECT it should be changed before it can be used in other contexts.
1454 
1455  RETURN VALUES
1456  non-zero Pointer to Table object for table created or opened
1457  0 Error
1458 */
1459 
1460 static Table *create_table_from_items(Session *session, HA_CREATE_INFO *create_info,
1461  TableList *create_table,
1462  message::Table &table_proto,
1463  AlterInfo *alter_info,
1464  List<Item> *items,
1465  bool is_if_not_exists,
1466  DrizzleLock **lock,
1467  const identifier::Table& identifier)
1468 {
1469  TableShare share(message::Table::INTERNAL);
1470  uint32_t select_field_count= items->size();
1471  /* Add selected items to field list */
1472  List<Item>::iterator it(items->begin());
1473  Item *item;
1474  Field *tmp_field;
1475 
1476  if (not (identifier.isTmp()) && create_table->table->db_stat)
1477  {
1478  /* Table already exists and was open at openTablesLock() stage. */
1479  if (is_if_not_exists)
1480  {
1481  create_info->table_existed= 1; // Mark that table existed
1482  push_warning_printf(session, DRIZZLE_ERROR::WARN_LEVEL_NOTE,
1483  ER_TABLE_EXISTS_ERROR, ER(ER_TABLE_EXISTS_ERROR),
1484  create_table->getTableName());
1485  return create_table->table;
1486  }
1487 
1488  my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1489  return NULL;
1490  }
1491 
1492  {
1493  table::Shell tmp_table(share); // Used during 'CreateField()'
1494 
1495  if (not table_proto.engine().name().compare("MyISAM"))
1496  tmp_table.getMutableShare()->db_low_byte_first= true;
1497  else if (not table_proto.engine().name().compare("MEMORY"))
1498  tmp_table.getMutableShare()->db_low_byte_first= true;
1499 
1500  tmp_table.in_use= session;
1501 
1502  while ((item=it++))
1503  {
1504  CreateField *cr_field;
1505  Field *field, *def_field;
1506  if (item->type() == Item::FUNC_ITEM)
1507  {
1508  if (item->result_type() != STRING_RESULT)
1509  {
1510  field= item->tmp_table_field(&tmp_table);
1511  }
1512  else
1513  {
1514  field= item->tmp_table_field_from_field_type(&tmp_table, 0);
1515  }
1516  }
1517  else
1518  {
1519  field= create_tmp_field(session, &tmp_table, item, item->type(),
1520  (Item ***) 0, &tmp_field, &def_field, false,
1521  false, false, 0);
1522  }
1523 
1524  if (!field ||
1525  !(cr_field=new CreateField(field,(item->type() == Item::FIELD_ITEM ?
1526  ((Item_field *)item)->field :
1527  (Field*) 0))))
1528  {
1529  return NULL;
1530  }
1531 
1532  if (item->maybe_null)
1533  {
1534  cr_field->flags &= ~NOT_NULL_FLAG;
1535  }
1536 
1537  alter_info->create_list.push_back(cr_field);
1538  }
1539  }
1540 
1541  /*
1542  Create and lock table.
1543 
1544  Note that we either creating (or opening existing) temporary table or
1545  creating base table on which name we have exclusive lock. So code below
1546  should not cause deadlocks or races.
1547  */
1548  Table *table= 0;
1549  {
1550  if (not create_table_no_lock(session,
1551  identifier,
1552  create_info,
1553  table_proto,
1554  alter_info,
1555  false,
1556  select_field_count,
1557  is_if_not_exists))
1558  {
1559  if (create_info->table_existed && not identifier.isTmp())
1560  {
1561  /*
1562  This means that someone created table underneath server
1563  or it was created via different mysqld front-end to the
1564  cluster. We don't have much options but throw an error.
1565  */
1566  my_error(ER_TABLE_EXISTS_ERROR, MYF(0), create_table->getTableName());
1567  return NULL;
1568  }
1569 
1570  if (not identifier.isTmp())
1571  {
1572  /* CREATE TABLE... has found that the table already exists for insert and is adapting to use it */
1573  boost::mutex::scoped_lock scopedLock(table::Cache::mutex());
1574 
1575  if (create_table->table)
1576  {
1577  table::Concurrent *concurrent_table= static_cast<table::Concurrent *>(create_table->table);
1578 
1579  if (concurrent_table->reopen_name_locked_table(create_table, session))
1580  {
1581  (void)plugin::StorageEngine::dropTable(*session, identifier);
1582  }
1583  else
1584  {
1585  table= create_table->table;
1586  }
1587  }
1588  else
1589  {
1590  (void)plugin::StorageEngine::dropTable(*session, identifier);
1591  }
1592  }
1593  else
1594  {
1595  if (not (table= session->openTable(create_table, (bool*) 0,
1596  DRIZZLE_OPEN_TEMPORARY_ONLY)) &&
1597  not create_info->table_existed)
1598  {
1599  /*
1600  This shouldn't happen as creation of temporary table should make
1601  it preparable for open. But let us do close_temporary_table() here
1602  just in case.
1603  */
1604  session->open_tables.drop_temporary_table(identifier);
1605  }
1606  }
1607  }
1608  if (not table) // open failed
1609  return NULL;
1610  }
1611 
1612  table->reginfo.lock_type=TL_WRITE;
1613  if (not ((*lock)= session->lockTables(&table, 1, DRIZZLE_LOCK_IGNORE_FLUSH)))
1614  {
1615  if (*lock)
1616  {
1617  session->unlockTables(*lock);
1618  *lock= 0;
1619  }
1620 
1621  if (not create_info->table_existed)
1622  session->drop_open_table(table, identifier);
1623 
1624  return NULL;
1625  }
1626 
1627  return table;
1628 }
1629 
1630 
1631 int
1632 select_create::prepare(List<Item> &values, Select_Lex_Unit *u)
1633 {
1634  DrizzleLock *extra_lock= NULL;
1635  /*
1636  For replication, the CREATE-SELECT statement is written
1637  in two pieces: the first transaction messsage contains
1638  the CREATE TABLE statement as a CreateTableStatement message
1639  necessary to create the table.
1640 
1641  The second transaction message contains all the InsertStatement
1642  and associated InsertRecords that should go into the table.
1643  */
1644 
1645  unit= u;
1646 
1647  if (not (table= create_table_from_items(session, create_info, create_table,
1648  table_proto,
1649  alter_info, &values,
1650  is_if_not_exists,
1651  &extra_lock, identifier)))
1652  {
1653  return(-1); // abort() deletes table
1654  }
1655 
1656  if (extra_lock)
1657  {
1658  assert(m_plock == NULL);
1659 
1660  if (identifier.isTmp())
1661  m_plock= &m_lock;
1662  else
1663  m_plock= &session->open_tables.extra_lock;
1664 
1665  *m_plock= extra_lock;
1666  }
1667 
1668  if (table->getShare()->sizeFields() < values.size())
1669  {
1670  my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), 1);
1671  return(-1);
1672  }
1673 
1674  /* First field to copy */
1675  field= table->getFields() + table->getShare()->sizeFields() - values.size();
1676 
1677  /* Mark all fields that are given values */
1678  for (Field **f= field ; *f ; f++)
1679  {
1680  table->setWriteSet((*f)->position());
1681  }
1682 
1683  /* Don't set timestamp if used */
1684  table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
1685  table->next_number_field=table->found_next_number_field;
1686 
1687  table->restoreRecordAsDefault(); // Get empty record
1688  session->cuted_fields=0;
1689  if (info.ignore || info.handle_duplicates != DUP_ERROR)
1690  table->cursor->extra(HA_EXTRA_IGNORE_DUP_KEY);
1691 
1692  if (info.handle_duplicates == DUP_REPLACE)
1693  table->cursor->extra(HA_EXTRA_WRITE_CAN_REPLACE);
1694 
1695  if (info.handle_duplicates == DUP_UPDATE)
1696  table->cursor->extra(HA_EXTRA_INSERT_WITH_UPDATE);
1697 
1698  table->cursor->ha_start_bulk_insert((ha_rows) 0);
1699  session->setAbortOnWarning(not info.ignore);
1700  if (check_that_all_fields_are_given_values(session, table, table_list))
1701  return 1;
1702 
1703  table->mark_columns_needed_for_insert();
1704  table->cursor->extra(HA_EXTRA_WRITE_CACHE);
1705  return 0;
1706 }
1707 
1708 void select_create::store_values(List<Item> &values)
1709 {
1710  fill_record(session, field, values, true);
1711 }
1712 
1713 
1714 void select_create::send_error(drizzled::error_t errcode,const char *err)
1715 {
1716  /*
1717  This will execute any rollbacks that are necessary before writing
1718  the transcation cache.
1719 
1720  We disable the binary log since nothing should be written to the
1721  binary log. This disabling is important, since we potentially do
1722  a "roll back" of non-transactional tables by removing the table,
1723  and the actual rollback might generate events that should not be
1724  written to the binary log.
1725 
1726  */
1727  select_insert::send_error(errcode, err);
1728 }
1729 
1730 
1731 bool select_create::send_eof()
1732 {
1733  bool tmp=select_insert::send_eof();
1734  if (tmp)
1735  abort();
1736  else
1737  {
1738  /*
1739  Do an implicit commit at end of statement for non-temporary
1740  tables. This can fail, but we should unlock the table
1741  nevertheless.
1742  */
1743  if (!table->getShare()->getType())
1744  {
1745  TransactionServices::autocommitOrRollback(*session, 0);
1746  (void) session->endActiveTransaction();
1747  }
1748 
1749  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1750  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1751  if (m_plock)
1752  {
1753  session->unlockTables(*m_plock);
1754  *m_plock= NULL;
1755  m_plock= NULL;
1756  }
1757  }
1758  return tmp;
1759 }
1760 
1761 
1762 void select_create::abort()
1763 {
1764  /*
1765  In select_insert::abort() we roll back the statement, including
1766  truncating the transaction cache of the binary log. To do this, we
1767  pretend that the statement is transactional, even though it might
1768  be the case that it was not.
1769 
1770  We roll back the statement prior to deleting the table and prior
1771  to releasing the lock on the table, since there might be potential
1772  for failure if the rollback is executed after the drop or after
1773  unlocking the table.
1774 
1775  We also roll back the statement regardless of whether the creation
1776  of the table succeeded or not, since we need to reset the binary
1777  log state.
1778  */
1779  select_insert::abort();
1780 
1781  if (m_plock)
1782  {
1783  session->unlockTables(*m_plock);
1784  *m_plock= NULL;
1785  m_plock= NULL;
1786  }
1787 
1788  if (table)
1789  {
1790  table->cursor->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
1791  table->cursor->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
1792  if (not create_info->table_existed)
1793  session->drop_open_table(table, identifier);
1794  table= NULL; // Safety
1795  }
1796 }
1797 
1798 } /* namespace drizzled */
void my_ok(ha_rows affected_rows=0, ha_rows found_rows_arg=0, uint64_t passed_id=0, const char *message=NULL)
Definition: session.cc:1877
uint32_t row_count
Definition: session.h:447
void ha_release_auto_increment()
Definition: cursor.cc:576
void set_proc_info(const char *info)
Definition: session.h:609
ha_rows cuted_fields
Definition: session.h:408
TODO: Rename this file - func.h is stupid.
Table * table
opened table
Definition: table_list.h:145
static void upgrade_lock_type(Session *, thr_lock_type *lock_type, enum_duplicates duplic, bool)
Definition: sql_insert.cc:216
Field * found_next_number_field
Definition: table.h:143
uint64_t prev_insert_id(uint64_t nr, drizzle_system_variables *variables)
Definition: cursor.cc:341
uint64_t first_successful_insert_id_in_prev_stmt
Definition: session.h:379
uint64_t first_successful_insert_id_in_cur_stmt
Definition: session.h:385
Field * create_tmp_field(Session *session, Table *table, Item *item, Item::Type type, Item ***copy_func, Field **from_field, Field **default_field, bool group, bool modify_item, bool make_copy_field, uint32_t convert_blob_length)
Definition: item.cc:1677
TableList * next_name_resolution_table
Definition: table_list.h:197
TableList * next_local
Definition: table_list.h:107
bool insert_query(Session *session, TableList *table, List< Item > &fields, List< List_item > &values, List< Item > &update_fields, List< Item > &update_values, enum_duplicates flag, bool ignore)
Definition: sql_insert.cc:238
table_map used_tables
Definition: session.h:430
Field * next_number_field
Definition: table.h:142
void free_underlaid_joins(Session *session, Select_Lex *select)
Definition: sql_select.cc:6655
uint64_t options
Definition: session.h:400
Cursor * cursor
Definition: table.h:68
int64_t row_count_func
Definition: session.h:401
bool openTablesLock(TableList *)
Definition: session.cc:1767
bool is_error() const
Definition: session.cc:1871