23 #include <plugin/slave/queue_producer.h>
24 #include <drizzled/errmsg_print.h>
25 #include <drizzled/sql/result_set.h>
26 #include <drizzled/execute.h>
27 #include <drizzled/gettext.h>
28 #include <drizzled/message/transaction.pb.h>
29 #include <boost/lexical_cast.hpp>
30 #include <google/protobuf/text_format.h>
40 QueueProducer::~QueueProducer()
46 bool QueueProducer::init()
49 return reconnect(
true);
52 bool QueueProducer::process()
54 if (_saved_max_commit_id == 0)
56 if (not queryForMaxCommitId(&_saved_max_commit_id))
58 if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
66 _last_error_message=
"Master offline";
78 enum drizzled::error_t err;
79 while ((err= (queryForReplicationEvents(_saved_max_commit_id))) == EE_OK)
84 if (_last_return == DRIZZLE_RETURN_LOST_CONNECTION)
92 _last_error_message=
"Master offline";
105 void QueueProducer::shutdown()
107 setIOState(_last_error_message,
false);
112 bool QueueProducer::reconnect(
bool initial_connection)
114 if (not initial_connection)
116 errmsg_printf(error::ERROR, _(
"Lost connection to master. Reconnecting."));
119 _is_connected=
false;
120 _last_return= DRIZZLE_RETURN_OK;
121 _last_error_message.clear();
122 boost::posix_time::seconds duration(_seconds_between_reconnects);
124 uint32_t attempts= 1;
126 setIOState(
"Connecting...",
true);
128 while (not openConnection())
131 snprintf(buf,
sizeof(buf),_(
"Connection attempt %d of %d failed, sleeping for %d seconds and retrying. %s"), attempts, _max_reconnects, _seconds_between_reconnects, _last_error_message.c_str());
133 setIOState(buf,
true);
134 if (attempts++ == _max_reconnects)
136 boost::this_thread::sleep(duration);
139 setIOState(_is_connected ? _(
"Connected") : _(
"Disconnected"),
true);
141 return _is_connected;
144 bool QueueProducer::openConnection()
146 if ((_drizzle= drizzle_create()) == NULL)
148 _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
149 _last_error_message=
"Replication slave: ";
150 _last_error_message.append(drizzle_error(_drizzle));
151 errmsg_printf(error::ERROR, _(
"%s"), _last_error_message.c_str());
155 if ((_connection= drizzle_con_create(_drizzle)) == NULL)
157 _last_return= DRIZZLE_RETURN_INTERNAL_ERROR;
158 _last_error_message=
"Replication slave: ";
159 _last_error_message.append(drizzle_error(_drizzle));
160 errmsg_printf(error::ERROR, _(
"%s"), _last_error_message.c_str());
164 drizzle_con_set_tcp(_connection, _master_host.c_str(), _master_port);
165 drizzle_con_set_auth(_connection, _master_user.c_str(), _master_pass.c_str());
167 drizzle_return_t ret= drizzle_con_connect(_connection);
169 if (ret != DRIZZLE_RETURN_OK)
172 _last_error_message=
"Replication slave: ";
173 _last_error_message.append(drizzle_error(_drizzle));
174 errmsg_printf(error::ERROR, _(
"%s"), _last_error_message.c_str());
183 bool QueueProducer::closeConnection()
185 drizzle_return_t ret;
186 drizzle_result_st result;
188 _is_connected=
false;
190 if (drizzle_quit(_connection, &result, &ret) == NULL)
193 drizzle_result_free(&result);
197 drizzle_result_free(&result);
202 bool QueueProducer::queryForMaxCommitId(uint64_t *max_commit_id)
211 string sql(
"SELECT MAX(x.cid) FROM"
212 " (SELECT MAX(`commit_order`) AS cid FROM `sys_replication`.`queue`"
213 " WHERE `master_id` = "
214 + boost::lexical_cast<string>(masterId())
215 +
" UNION ALL SELECT `last_applied_commit_id` AS cid"
216 +
" FROM `sys_replication`.`applier_state` WHERE `master_id` = "
217 + boost::lexical_cast<string>(masterId())
221 Execute execute(*(_session.get()),
true);
222 execute.run(sql, result_set);
223 assert(result_set.getMetaData().getColumnCount() == 1);
226 uint32_t found_rows= 0;
227 while (result_set.next())
229 string value= result_set.getString(0);
231 if ((value ==
"") || (found_rows == 1))
234 assert(result_set.isNull(0) ==
false);
235 *max_commit_id= boost::lexical_cast<uint64_t>(value);
241 _last_error_message=
"Could not determine last committed transaction.";
248 bool QueueProducer::queryForTrxIdList(uint64_t max_commit_id,
249 vector<uint64_t> &list)
252 string sql(
"SELECT `id` FROM `data_dictionary`.`sys_replication_log`"
253 " WHERE `commit_id` > ");
254 sql.append(boost::lexical_cast<string>(max_commit_id));
255 sql.append(
" AND `originating_server_uuid` != ");
257 sql.append(_session.get()->getServerUUID());
259 sql.append(
" ORDER BY `commit_id` LIMIT 25");
261 drizzle_return_t ret;
262 drizzle_result_st result;
263 drizzle_query_str(_connection, &result, sql.c_str(), &ret);
265 if (ret != DRIZZLE_RETURN_OK)
268 _last_error_message=
"Replication slave: ";
269 _last_error_message.append(drizzle_error(_drizzle));
270 errmsg_printf(error::ERROR, _(
"%s"), _last_error_message.c_str());
271 drizzle_result_free(&result);
275 ret= drizzle_result_buffer(&result);
277 if (ret != DRIZZLE_RETURN_OK)
280 _last_error_message=
"Replication slave: ";
281 _last_error_message.append(drizzle_error(_drizzle));
282 errmsg_printf(error::ERROR, _(
"%s"), _last_error_message.c_str());
283 drizzle_result_free(&result);
289 while ((row= drizzle_row_next(&result)) != NULL)
293 list.push_back(boost::lexical_cast<uint32_t>(row[0]));
298 _last_error_message=
"Replication slave: Unexpected NULL for trx id";
299 errmsg_printf(error::ERROR, _(
"%s"), _last_error_message.c_str());
300 drizzle_result_free(&result);
305 drizzle_result_free(&result);
310 bool QueueProducer::queueInsert(
const char *trx_id,
312 const char *commit_id,
313 const char *originating_server_uuid,
314 const char *originating_commit_id,
316 const char *msg_length)
320 message.ParseFromArray(msg, boost::lexical_cast<int>(msg_length));
325 string sql=
"INSERT INTO `sys_replication`.`queue`"
326 " (`master_id`, `trx_id`, `seg_id`, `commit_order`,"
327 " `originating_server_uuid`, `originating_commit_id`, `msg`) VALUES (";
328 sql.append(boost::lexical_cast<string>(masterId()));
334 sql.append(commit_id);
335 sql.append(
", '", 3);
336 sql.append(originating_server_uuid);
337 sql.append(
"' , ", 4);
338 sql.append(originating_commit_id);
339 sql.append(
", '", 3);
348 google::protobuf::TextFormat::PrintToString(message, &message_text);
353 string::iterator it= message_text.begin();
354 for (; it != message_text.end(); ++it)
358 it= message_text.insert(it,
'\\');
361 else if (*it ==
'\'')
363 it= message_text.insert(it,
'\\');
365 it= message_text.insert(it,
'\\');
368 else if (*it ==
'\\')
370 it= message_text.insert(it,
'\\');
372 it= message_text.insert(it,
'\\');
374 it= message_text.insert(it,
'\\');
379 it= message_text.insert(it,
'\\');
384 sql.append(message_text);
387 vector<string> statements;
388 statements.push_back(sql);
390 if (not executeSQL(statements))
396 uint64_t tmp_commit_id= boost::lexical_cast<uint64_t>(commit_id);
397 if (tmp_commit_id > _saved_max_commit_id)
398 _saved_max_commit_id= tmp_commit_id;
404 enum drizzled::error_t QueueProducer::queryForReplicationEvents(uint64_t max_commit_id)
406 vector<uint64_t> trx_id_list;
408 if (not queryForTrxIdList(max_commit_id, trx_id_list))
411 if (trx_id_list.size() == 0)
419 string sql=
"SELECT `id`, `segid`, `commit_id`, `originating_server_uuid`,"
420 " `originating_commit_id`, `message`, `message_len` "
421 " FROM `data_dictionary`.`sys_replication_log` WHERE `id` IN (";
423 for (
size_t x= 0; x < trx_id_list.size(); x++)
427 sql.append(boost::lexical_cast<string>(trx_id_list[x]));
431 sql.append(
" ORDER BY `commit_id` ASC");
433 drizzle_return_t ret;
434 drizzle_result_st result;
435 drizzle_query_str(_connection, &result, sql.c_str(), &ret);
437 if (ret != DRIZZLE_RETURN_OK)
440 _last_error_message=
"Replication slave: ";
441 _last_error_message.append(drizzle_error(_drizzle));
442 errmsg_printf(error::ERROR, _(
"%s"), _last_error_message.c_str());
443 drizzle_result_free(&result);
449 ret= drizzle_result_buffer(&result);
451 if (ret != DRIZZLE_RETURN_OK)
454 _last_error_message=
"Replication slave: ";
455 _last_error_message.append(drizzle_error(_drizzle));
456 errmsg_printf(error::ERROR, _(
"%s"), _last_error_message.c_str());
457 drizzle_result_free(&result);
463 while ((row= drizzle_row_next(&result)) != NULL)
465 if (not queueInsert(row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
467 errmsg_printf(error::ERROR,
468 _(
"Replication slave: Unable to insert into queue."));
469 drizzle_result_free(&result);
474 drizzle_result_free(&result);
480 void QueueProducer::setIOState(
const string &err_msg,
bool status)
482 vector<string> statements;
488 sql=
"UPDATE `sys_replication`.`io_state` SET `status` = 'STOPPED'";
492 sql=
"UPDATE `sys_replication`.`io_state` SET `status` = 'RUNNING'";
495 sql.append(
", `error_msg` = '", 17);
499 for (it= msg.begin(); it != msg.end(); ++it)
503 it= msg.insert(it,
'\'');
508 it= msg.insert(it,
'\\');
514 sql.append(
"' WHERE `master_id` = ");
515 sql.append(boost::lexical_cast<string>(masterId()));
517 statements.push_back(sql);
518 executeSQL(statements);
TODO: Rename this file - func.h is stupid.