23 #include <drizzled/current_session.h>
24 #include <drizzled/error.h>
25 #include <drizzled/session.h>
26 #include <drizzled/statistics_variables.h>
27 #include <drizzled/system_variables.h>
35 #include <sys/socket.h>
59 #define NET_HEADER_SIZE 4
60 #define COMP_HEADER_SIZE 3
62 #define MAX_PACKET_LENGTH (256L*256L*256L-1)
72 max_packet= (uint32_t) buffer_length;
73 max_packet_size= max(buffer_length, drizzled::global_system_variables.max_allowed_packet);
75 buff= (
unsigned char*) malloc((
size_t) max_packet + NET_HEADER_SIZE + COMP_HEADER_SIZE);
76 buff_end= buff + max_packet;
78 pkt_nr= compress_pkt_nr= 0;
79 write_pos= read_pos= buff;
81 where_b= remain_in_buf= 0;
94 drizzled::safe_delete(vio);
97 bool NET::peer_addr(
char *buf,
size_t buflen, uint16_t& port)
99 return vio->peer_addr(buf, buflen, port);
102 void NET::keepalive(
bool flag)
104 vio->keepalive(flag);
107 int NET::get_sd()
const
109 return vio->get_fd();
116 if (length >= net->max_packet_size)
120 net->last_errno= ER_NET_PACKET_TOO_LARGE;
121 my_error(ER_NET_PACKET_TOO_LARGE, MYF(0));
124 size_t pkt_length = (length + IO_SIZE - 1) & ~(IO_SIZE - 1);
129 unsigned char* buff= (
unsigned char*)realloc((
char*) net->buff, pkt_length + NET_HEADER_SIZE + COMP_HEADER_SIZE);
130 net->buff=net->write_pos= buff;
131 net->buff_end= buff + (net->max_packet= (uint32_t) pkt_length);
138 if (buff != write_pos)
145 pkt_nr= compress_pkt_nr;
167 const unsigned char* packet=
reinterpret_cast<const unsigned char*
>(packet0);
168 unsigned char buff[NET_HEADER_SIZE];
169 if (unlikely(!net->vio))
176 while (len >= MAX_PACKET_LENGTH)
178 const uint32_t z_size = MAX_PACKET_LENGTH;
179 int3store(buff, z_size);
180 buff[3]= (
unsigned char) net->pkt_nr++;
188 buff[3]= (
unsigned char) net->pkt_nr++;
221 const unsigned char *header,
size_t head_len,
222 const unsigned char *packet,
size_t len)
224 uint32_t length=len+1+head_len;
225 unsigned char buff[NET_HEADER_SIZE+1];
226 uint32_t header_size=NET_HEADER_SIZE+1;
230 if (length >= MAX_PACKET_LENGTH)
233 len= MAX_PACKET_LENGTH - 1 - head_len;
236 int3store(buff, MAX_PACKET_LENGTH);
237 buff[3]= (
unsigned char) net->pkt_nr++;
243 length-= MAX_PACKET_LENGTH;
244 len= MAX_PACKET_LENGTH;
246 header_size= NET_HEADER_SIZE;
247 }
while (length >= MAX_PACKET_LENGTH);
250 int3store(buff,length);
251 buff[3]= (
unsigned char) net->pkt_nr++;
286 const unsigned char* packet=
reinterpret_cast<const unsigned char*
>(packet0);
287 uint32_t left_length;
288 if (net->compress && net->max_packet > MAX_PACKET_LENGTH)
289 left_length= MAX_PACKET_LENGTH - (net->write_pos - net->buff);
291 left_length= (uint32_t) (net->buff_end - net->write_pos);
293 if (len > left_length)
295 if (net->write_pos != net->buff)
298 memcpy(net->write_pos,packet,left_length);
300 (
size_t) (net->write_pos - net->buff) + left_length))
302 net->write_pos= net->buff;
303 packet+= left_length;
312 left_length= MAX_PACKET_LENGTH;
313 while (len > left_length)
317 packet+= left_length;
321 if (len > net->max_packet)
325 memcpy(net->write_pos,packet,len);
326 net->write_pos+= len;
348 if (net->error_ == 2)
353 const uint32_t header_length=NET_HEADER_SIZE+COMP_HEADER_SIZE;
354 unsigned char* b= (
unsigned char*) malloc(len + NET_HEADER_SIZE + COMP_HEADER_SIZE);
355 memcpy(b+header_length,packet,len);
357 size_t complen= len * 120 / 100 + 12;
358 unsigned char* compbuf=
new unsigned char[complen];
359 uLongf tmp_complen= complen;
360 int res= compress((Bytef*) compbuf, &tmp_complen,
361 (Bytef*) (b+header_length),
363 complen= tmp_complen;
367 if (res != Z_OK || complen >= len)
371 size_t tmplen= complen;
375 int3store(&b[NET_HEADER_SIZE],complen);
377 b[3]=(
unsigned char) (net->compress_pkt_nr++);
382 uint32_t retry_count= 0;
383 const unsigned char* pos= packet;
384 const unsigned char* end= pos + len;
391 if ((
long) (length= net->vio->
write( pos, (
size_t) (end-pos))) <= 0)
398 if (net->vio == NULL)
407 if (interrupted || length == 0)
410 while (net->vio->
blocking(
true, &old_mode) < 0)
412 if (net->vio->
should_retry() && retry_count++ < net->retry_count)
415 net->last_errno= ER_NET_PACKET_TOO_LARGE;
416 my_error(ER_NET_PACKET_TOO_LARGE, MYF(0));
424 if (retry_count++ < net->retry_count)
433 net->last_errno= interrupted ? CR_NET_WRITE_INTERRUPTED : CR_NET_ERROR_ON_WRITE;
440 current_session->status_var.bytes_sent+= length;
446 return (
int) (pos != end);
463 uint32_t retry_count=0;
464 size_t len=packet_error;
465 uint32_t remain= net->compress ? NET_HEADER_SIZE+COMP_HEADER_SIZE : NET_HEADER_SIZE;
471 unsigned char* pos = net->buff + net->where_b;
473 for (uint32_t i= 0; i < 2 ; i++)
478 if ((
long) (length= net->vio->
read(pos, remain)) <= 0L)
480 if (net->vio == NULL)
487 if (retry_count++ < net->retry_count)
496 net->last_errno= net->vio->
was_interrupted() ? CR_NET_READ_INTERRUPTED : CR_NET_READ_ERROR;
499 remain -= (uint32_t) length;
501 current_session->status_var.bytes_received+= length;
507 if (net->buff[net->where_b + 3] != (
unsigned char) net->pkt_nr)
511 my_error(ER_NET_PACKETS_OUT_OF_ORDER, MYF(0));
514 net->compress_pkt_nr= ++net->pkt_nr;
521 *complen=uint3korr(&(net->buff[net->where_b + NET_HEADER_SIZE]));
524 len=uint3korr(net->buff+net->where_b);
527 helping = max(len,*complen) + net->where_b;
529 if (helping >= net->max_packet)
536 length= read(net->vio->
get_fd(), net->buff, min((
size_t)net->max_packet, len));
537 assert((
long)length > 0L);
545 pos=net->buff + net->where_b;
546 remain = (uint32_t) len;
576 if (not net->compress)
579 if (len == MAX_PACKET_LENGTH)
582 uint32_t save_pos = net->where_b;
583 size_t total_length= 0;
590 }
while (len == MAX_PACKET_LENGTH);
592 if (len != packet_error)
596 net->where_b = save_pos;
598 net->read_pos = net->buff + net->where_b;
600 if (len != packet_error)
601 net->read_pos[len]=0;
610 uint32_t start_of_packet;
611 uint32_t first_packet_offset;
612 uint32_t read_length, multi_byte_packet=0;
614 if (net->remain_in_buf)
616 buf_length= net->buf_length;
617 first_packet_offset= start_of_packet= (net->buf_length -
620 net->buff[start_of_packet]= net->save_char;
625 buf_length= start_of_packet= first_packet_offset= 0;
631 if (buf_length - start_of_packet >= NET_HEADER_SIZE)
633 read_length = uint3korr(net->buff+start_of_packet);
637 start_of_packet += NET_HEADER_SIZE;
640 if (read_length + NET_HEADER_SIZE <= buf_length - start_of_packet)
642 if (multi_byte_packet)
645 memmove(net->buff + first_packet_offset + start_of_packet,
646 net->buff + first_packet_offset + start_of_packet +
648 buf_length - start_of_packet);
649 start_of_packet += read_length;
650 buf_length -= NET_HEADER_SIZE;
653 start_of_packet+= read_length + NET_HEADER_SIZE;
655 if (read_length != MAX_PACKET_LENGTH)
657 multi_byte_packet= 0;
660 multi_byte_packet= NET_HEADER_SIZE;
662 if (first_packet_offset)
664 memmove(net->buff,net->buff+first_packet_offset,
665 buf_length-first_packet_offset);
666 buf_length-=first_packet_offset;
667 start_of_packet -= first_packet_offset;
668 first_packet_offset=0;
674 if (first_packet_offset)
676 memmove(net->buff,net->buff+first_packet_offset,
677 buf_length-first_packet_offset);
678 buf_length-=first_packet_offset;
679 start_of_packet -= first_packet_offset;
680 first_packet_offset=0;
683 net->where_b=buf_length;
684 if ((packet_len =
my_real_read(net,&complen)) == packet_error)
689 unsigned char * compbuf= (
unsigned char *) malloc(complen);
692 uLongf tmp_complen= complen;
693 int error= uncompress((Bytef*) compbuf, &tmp_complen,
694 (Bytef*) (net->buff + net->where_b),
696 complen= tmp_complen;
701 net->last_errno= CR_NET_UNCOMPRESS_ERROR;
705 memcpy((net->buff + net->where_b), compbuf, complen);
716 buf_length+= complen;
718 net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE;
719 net->buf_length= buf_length;
720 net->remain_in_buf= (uint32_t) (buf_length - start_of_packet);
721 len = ((uint32_t) (start_of_packet - first_packet_offset) - NET_HEADER_SIZE -
723 net->save_char= net->read_pos[len];
724 net->read_pos[len]=0;
729 void NET::set_read_timeout(uint32_t timeout)
731 read_timeout_= timeout;
734 vio->timeout(0, timeout);
739 void NET::set_write_timeout(uint32_t timeout)
741 write_timeout_= timeout;
744 vio->timeout(1, timeout);
749 bool NET::write(
const void* data,
size_t size)
754 bool NET::write_command(
unsigned char command,
data_ref header,
data_ref body)
static bool drizzleclient_net_write(NET *net, const void *packet0, size_t len)
static uint32_t my_real_read(NET *net, size_t *complen)
void init(int sock, uint32_t buffer_length)
TODO: Rename this file - func.h is stupid.
bool was_interrupted() const
static bool drizzleclient_net_write_command(NET *net, unsigned char command, const unsigned char *header, size_t head_len, const unsigned char *packet, size_t len)
static int drizzleclient_net_real_write(NET *net, const unsigned char *packet, size_t len)
int blocking(bool set_blocking_mode, bool *old_mode)
bool should_retry() const
Virtual I/O layer, only used with TCP/IP sockets at the moment.
static uint32_t drizzleclient_net_read(NET *net)
size_t read(unsigned char *buf, size_t size)
size_t write(const unsigned char *buf, size_t size)
static bool drizzleclient_net_realloc(NET *net, size_t length)
static bool net_write_buff(NET *, const void *, uint32_t len)