Drizzled Public API Documentation

transaction_writer.cc
1 /* -*- mode: c++; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2009 Sun Microsystems, Inc.
5  *
6  * Authors:
7  *
8  * Jay Pipes <joinfu@sun.com>
9  *
10  * This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; version 2 of the License.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22  */
23 
24 #include <config.h>
25 #include <drizzled/algorithm/crc32.h>
26 #include <drizzled/gettext.h>
27 #include <drizzled/replication_services.h>
28 
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <string>
33 #include <fstream>
34 #include <unistd.h>
35 
36 #if TIME_WITH_SYS_TIME
37 # include <sys/time.h>
38 # include <time.h>
39 #else
40 # if HAVE_SYS_TIME_H
41 # include <sys/time.h>
42 # else
43 # include <time.h>
44 # endif
45 #endif
46 
47 #include <drizzled/message/transaction.pb.h>
48 
49 #include <google/protobuf/io/coded_stream.h>
50 #include <google/protobuf/io/zero_copy_stream_impl.h>
51 
52 #include <drizzled/gettext.h>
53 
58 using namespace std;
59 using namespace drizzled;
60 using namespace google;
61 
62 static uint32_t server_id= 1;
63 static uint64_t transaction_id= 1;
64 
65 static uint64_t getNanoTimestamp()
66 {
67 #ifdef HAVE_CLOCK_GETTIME
68  struct timespec tp;
69  clock_gettime(CLOCK_REALTIME, &tp);
70  return (uint64_t) tp.tv_sec * 10000000
71  + (uint64_t) tp.tv_nsec;
72 #else
73  struct timeval tv;
74  gettimeofday(&tv,NULL);
75  return (uint64_t) tv.tv_sec * 10000000
76  + (uint64_t) tv.tv_usec * 1000;
77 #endif
78 }
79 
80 static void initTransactionContext(message::Transaction &transaction)
81 {
82  message::TransactionContext *ctx= transaction.mutable_transaction_context();
83  ctx->set_transaction_id(transaction_id++);
84  ctx->set_start_timestamp(getNanoTimestamp());
85  ctx->set_server_id(server_id);
86 }
87 
88 static void finalizeTransactionContext(message::Transaction &transaction)
89 {
90  message::TransactionContext *ctx= transaction.mutable_transaction_context();
91  ctx->set_end_timestamp(getNanoTimestamp());
92 }
93 
94 static void doCreateTable1(message::Transaction &transaction)
95 {
96  message::Statement *statement= transaction.add_statement();
97 
98  statement->set_type(message::Statement::RAW_SQL);
99  statement->set_sql("CREATE TABLE t1 (a VARCHAR(32) NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
100  statement->set_start_timestamp(getNanoTimestamp());
101  statement->set_end_timestamp(getNanoTimestamp());
102 }
103 
104 static void doCreateTable2(message::Transaction &transaction)
105 {
106  message::Statement *statement= transaction.add_statement();
107 
108  statement->set_type(message::Statement::RAW_SQL);
109  statement->set_sql("CREATE TABLE t2 (a INTEGER NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
110  statement->set_start_timestamp(getNanoTimestamp());
111  statement->set_end_timestamp(getNanoTimestamp());
112 }
113 
114 static void doCreateTable3(message::Transaction &transaction)
115 {
116  message::Statement *statement= transaction.add_statement();
117 
118  statement->set_type(message::Statement::RAW_SQL);
119  statement->set_sql("CREATE TABLE t3 (a INTEGER NOT NULL, b BLOB NOT NULL, PRIMARY KEY a) ENGINE=InnoDB");
120  statement->set_start_timestamp(getNanoTimestamp());
121  statement->set_end_timestamp(getNanoTimestamp());
122 }
123 
124 static void doSimpleInsert(message::Transaction &transaction)
125 {
126  message::Statement *statement= transaction.add_statement();
127 
128  /* Do generic Statement setup */
129  statement->set_type(message::Statement::INSERT);
130  statement->set_sql("INSERT INTO t1 (a) VALUES (\"1\"), (\"2\")");
131  statement->set_start_timestamp(getNanoTimestamp());
132 
133  /* Do INSERT-specific header and setup */
134  message::InsertHeader *header= statement->mutable_insert_header();
135 
136  /* Add table and field metadata for the statement */
137  message::TableMetadata *t_meta= header->mutable_table_metadata();
138  t_meta->set_schema_name("test");
139  t_meta->set_table_name("t1");
140 
141  message::FieldMetadata *f_meta= header->add_field_metadata();
142  f_meta->set_name("a");
143  f_meta->set_type(message::Table::Field::VARCHAR);
144 
145  /* Add new values... */
146  message::InsertData *data= statement->mutable_insert_data();
147  data->set_segment_id(1);
148  data->set_end_segment(true);
149 
150  message::InsertRecord *record1= data->add_record();
151  message::InsertRecord *record2= data->add_record();
152 
153  record1->add_insert_value("1");
154  record2->add_insert_value("2");
155 
156  statement->set_end_timestamp(getNanoTimestamp());
157 }
158 
159 static void doNonVarcharInsert(message::Transaction &transaction)
160 {
161  message::Statement *statement= transaction.add_statement();
162 
163  /* Do generic Statement setup */
164  statement->set_type(message::Statement::INSERT);
165  statement->set_sql("INSERT INTO t2 (a) VALUES (1), (2)");
166  statement->set_start_timestamp(getNanoTimestamp());
167 
168  /* Do INSERT-specific header and setup */
169  message::InsertHeader *header= statement->mutable_insert_header();
170 
171  /* Add table and field metadata for the statement */
172  message::TableMetadata *t_meta= header->mutable_table_metadata();
173  t_meta->set_schema_name("test");
174  t_meta->set_table_name("t2");
175 
176  message::FieldMetadata *f_meta= header->add_field_metadata();
177  f_meta->set_name("a");
178  f_meta->set_type(message::Table::Field::INTEGER);
179 
180  /* Add new values... */
181  message::InsertData *data= statement->mutable_insert_data();
182  data->set_segment_id(1);
183  data->set_end_segment(true);
184 
185  message::InsertRecord *record1= data->add_record();
186  message::InsertRecord *record2= data->add_record();
187 
188  record1->add_insert_value("1");
189  record2->add_insert_value("2");
190 
191  statement->set_end_timestamp(getNanoTimestamp());
192 }
193 
194 static void doBlobInsert(message::Transaction &transaction)
195 {
196  message::Statement *statement= transaction.add_statement();
197 
198  /* Do generic Statement setup */
199  statement->set_type(message::Statement::INSERT);
200  statement->set_sql("INSERT INTO t3 (a, b) VALUES (1, 'test\0me')", 43); /* 43 == length including \0 */
201  statement->set_start_timestamp(getNanoTimestamp());
202 
203  /* Do INSERT-specific header and setup */
204  message::InsertHeader *header= statement->mutable_insert_header();
205 
206  /* Add table and field metadata for the statement */
207  message::TableMetadata *t_meta= header->mutable_table_metadata();
208  t_meta->set_schema_name("test");
209  t_meta->set_table_name("t3");
210 
211  message::FieldMetadata *f_meta= header->add_field_metadata();
212  f_meta->set_name("a");
213  f_meta->set_type(message::Table::Field::INTEGER);
214 
215  f_meta= header->add_field_metadata();
216  f_meta->set_name("b");
217  f_meta->set_type(message::Table::Field::BLOB);
218 
219  /* Add new values... */
220  message::InsertData *data= statement->mutable_insert_data();
221  data->set_segment_id(1);
222  data->set_end_segment(true);
223 
224  message::InsertRecord *record1= data->add_record();
225 
226  record1->add_insert_value("1");
227  record1->add_insert_value("test\0me", 7); /* 7 == length including \0 */
228 
229  statement->set_end_timestamp(getNanoTimestamp());
230 }
231 
232 static void doSimpleDelete(message::Transaction &transaction)
233 {
234  message::Statement *statement= transaction.add_statement();
235 
236  /* Do generic Statement setup */
237  statement->set_type(message::Statement::DELETE);
238  statement->set_sql("DELETE FROM t1 WHERE a = \"1\"");
239  statement->set_start_timestamp(getNanoTimestamp());
240 
241  /* Do DELETE-specific header and setup */
242  message::DeleteHeader *header= statement->mutable_delete_header();
243 
244  /* Add table and field metadata for the statement */
245  message::TableMetadata *t_meta= header->mutable_table_metadata();
246  t_meta->set_schema_name("test");
247  t_meta->set_table_name("t1");
248 
249  message::FieldMetadata *f_meta= header->add_key_field_metadata();
250  f_meta->set_name("a");
251  f_meta->set_type(message::Table::Field::VARCHAR);
252 
253  /* Add new values... */
254  message::DeleteData *data= statement->mutable_delete_data();
255  data->set_segment_id(1);
256  data->set_end_segment(true);
257 
258  message::DeleteRecord *record1= data->add_record();
259 
260  record1->add_key_value("1");
261 
262  statement->set_end_timestamp(getNanoTimestamp());
263 }
264 
265 static void doSimpleUpdate(message::Transaction &transaction)
266 {
267  message::Statement *statement= transaction.add_statement();
268 
269  /* Do generic Statement setup */
270  statement->set_type(message::Statement::UPDATE);
271  statement->set_sql("UPDATE t1 SET a = \"5\" WHERE a = \"1\"");
272  statement->set_start_timestamp(getNanoTimestamp());
273 
274  /* Do UPDATE-specific header and setup */
275  message::UpdateHeader *header= statement->mutable_update_header();
276 
277  /* Add table and field metadata for the statement */
278  message::TableMetadata *t_meta= header->mutable_table_metadata();
279  t_meta->set_schema_name("test");
280  t_meta->set_table_name("t1");
281 
282  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
283  kf_meta->set_name("a");
284  kf_meta->set_type(message::Table::Field::VARCHAR);
285 
286  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
287  sf_meta->set_name("a");
288  sf_meta->set_type(message::Table::Field::VARCHAR);
289 
290  /* Add new values... */
291  message::UpdateData *data= statement->mutable_update_data();
292  data->set_segment_id(1);
293  data->set_end_segment(true);
294 
295  message::UpdateRecord *record1= data->add_record();
296 
297  record1->add_after_value("5");
298  record1->add_key_value("1");
299 
300  statement->set_end_timestamp(getNanoTimestamp());
301 }
302 
303 static void doMultiKeyUpdate(message::Transaction &transaction)
304 {
305  message::Statement *statement= transaction.add_statement();
306 
307  /* Do generic Statement setup */
308  statement->set_type(message::Statement::UPDATE);
309  statement->set_sql("UPDATE t1 SET a = \"5\"");
310  statement->set_start_timestamp(getNanoTimestamp());
311 
312  /* Do UPDATE-specific header and setup */
313  message::UpdateHeader *header= statement->mutable_update_header();
314 
315  /* Add table and field metadata for the statement */
316  message::TableMetadata *t_meta= header->mutable_table_metadata();
317  t_meta->set_schema_name("test");
318  t_meta->set_table_name("t1");
319 
320  message::FieldMetadata *kf_meta= header->add_key_field_metadata();
321  kf_meta->set_name("a");
322  kf_meta->set_type(message::Table::Field::VARCHAR);
323 
324  message::FieldMetadata *sf_meta= header->add_set_field_metadata();
325  sf_meta->set_name("a");
326  sf_meta->set_type(message::Table::Field::VARCHAR);
327 
328  /* Add new values... */
329  message::UpdateData *data= statement->mutable_update_data();
330  data->set_segment_id(1);
331  data->set_end_segment(true);
332 
333  message::UpdateRecord *record1= data->add_record();
334  message::UpdateRecord *record2= data->add_record();
335 
336  record1->add_after_value("5");
337  record1->add_key_value("1");
338  record2->add_after_value("5");
339  record2->add_key_value("2");
340 
341  statement->set_end_timestamp(getNanoTimestamp());
342 }
343 
344 static void writeTransaction(protobuf::io::CodedOutputStream *output, message::Transaction &transaction)
345 {
346  std::string buffer("");
347  finalizeTransactionContext(transaction);
348  transaction.SerializeToString(&buffer);
349 
350  size_t length= buffer.length();
351 
352  output->WriteLittleEndian32(static_cast<uint32_t>(ReplicationServices::TRANSACTION));
353  output->WriteLittleEndian32(static_cast<uint32_t>(length));
354  output->WriteString(buffer);
355  output->WriteLittleEndian32(drizzled::algorithm::crc32(buffer.c_str(), length)); /* checksum */
356 }
357 
358 int main(int argc, char* argv[])
359 {
360  GOOGLE_PROTOBUF_VERIFY_VERSION;
361  int file;
362 
363  if (argc != 2)
364  {
365  fprintf(stderr, _("Usage: %s TRANSACTION_LOG\n"), argv[0]);
366  return -1;
367  }
368 
369  if ((file= open(argv[1], O_APPEND|O_CREAT|O_SYNC|O_WRONLY, S_IRWXU)) == -1)
370  {
371  fprintf(stderr, _("Cannot open file: %s\n"), argv[1]);
372  return -1;
373  }
374 
375  protobuf::io::ZeroCopyOutputStream *raw_output= new protobuf::io::FileOutputStream(file);
376  protobuf::io::CodedOutputStream *coded_output= new protobuf::io::CodedOutputStream(raw_output);
377 
378  /* Write a series of statements which test each type of Statement */
379  message::Transaction transaction;
380 
381  /* Simple CREATE TABLE statements as raw sql */
382  initTransactionContext(transaction);
383  doCreateTable1(transaction);
384  writeTransaction(coded_output, transaction);
385  transaction.Clear();
386 
387  initTransactionContext(transaction);
388  doCreateTable2(transaction);
389  writeTransaction(coded_output, transaction);
390  transaction.Clear();
391 
392  /* Simple INSERT statement */
393  initTransactionContext(transaction);
394  doSimpleInsert(transaction);
395  writeTransaction(coded_output, transaction);
396  transaction.Clear();
397 
398  /* Write a DELETE and an UPDATE in one transaction */
399  initTransactionContext(transaction);
400  doSimpleDelete(transaction);
401  doSimpleUpdate(transaction);
402  writeTransaction(coded_output, transaction);
403  transaction.Clear();
404 
405  /* Test an INSERT into non-varchar columns */
406  initTransactionContext(transaction);
407  doNonVarcharInsert(transaction);
408  writeTransaction(coded_output, transaction);
409  transaction.Clear();
410 
411  /* Write an UPDATE which affects >1 row */
412  initTransactionContext(transaction);
413  doMultiKeyUpdate(transaction);
414  writeTransaction(coded_output, transaction);
415  transaction.Clear();
416 
417  /* Write an INSERT which writes BLOB data */
418  initTransactionContext(transaction);
419  doCreateTable3(transaction);
420  doBlobInsert(transaction);
421  writeTransaction(coded_output, transaction);
422  transaction.Clear();
423 
424  delete coded_output;
425  delete raw_output;
426 
427  return 0;
428 }
TODO: Rename this file - func.h is stupid.