目录
19. inline_mysql_socket_send函数 15
1. 前言
Review一同事的C++代码,发现其中有一个拼接而成的多记录INSERT语句可能超大(预计最大可超过1M,甚至10M也有可能,视实际记录条数而定)。担心包大存隐患,所以特意分析一下mysql_real_query函数的实现,以确保使用是否安全。研究对象为MySQL-8.0.14,其它版本可能有小许差异,但估计差异不会太大。
2. 调用路径
1) mysql_real_query调用路径
mysql_real_query -> mysql_send_query -> simple_command -> cli_advanced_command -> net_write_command -> net_write_buff -> net_write_packet -> net_write_raw_loop -> vio_write -> struct Vio::write(...) -> io_write -> inline_mysql_socket_send -> send // 系统调用 |
2) mysql_real_connect调用路径
// 在调用mysql_real_connect之前, // 需要先调用mysql_init完成MYSQL结构的初始化 mysql_real_connect(MYSQL*) -> my_net_init -> my_net_local_init // 初始化包大小max_packet_size |
3. MAX_PACKET_LENGTH宏
// Maximum length of protocol packet. // @ref page_protocol_basic_ok_packet length limit also restricted to this value // as any length greater than this value will have first byte of // @ref page_protocol_basic_ok_packet to be 254 thus does not // provide a means to identify if this is @ref page_protocol_basic_ok_packet or // @ref page_protocol_basic_eof_packet. // // 定义一个包的最大字节数(值为16MB), // 因为是个宏,所以无法编译期修改。 // 但是否意味着只能向MySQL server发送小于16M的包了? // 答案是否,如果包大小超过这个值, // 则返回错误CR_NET_PACKET_TOO_LARGE(ER_NET_PACKET_TOO_LARGE) // CR_NET_PACKET_TOO_LARGE的值为2020 #define MAX_PACKET_LENGTH (256L * 256L * 256L - 1) // 16MB |
4. DBUG_RETURN宏
// process exit from user function #define DBUG_LEAVE \ _db_return_(__LINE__, &_db_stack_frame_) #define DBUG_RETURN(a1) \ do { \ DBUG_LEAVE; \ return (a1); \ } while (0) |
5. COM_QUERY枚举值
// A list of all MySQL protocol commands // These are the top level commands the server can receive // while it listens for a new command in ::dispatch_command enum enum_server_command { 。。。。。。 // COM_QUERY为mysql_real_query对应的命令字 COM_QUERY // See @ref page_protocol_com_query 。。。。。。 }; |
6. mysql_query函数
// Do a query. If query returned rows, free old rows. // Read data by mysql_store_result or // by repeat call of mysql_fetch_row int STDCALL mysql_query(MYSQL *mysql, const char *query) { // 可以看到mysql_query和mysql_real_query实际是一样的 return mysql_real_query(mysql, query, (ulong)strlen(query)); } |
7. mysql_real_query函数
// Do a query. If query returned rows, free old rows. // Read data by mysql_store_result or // by repeat call of mysql_fetch_row // client.cc int STDCALL mysql_real_query( MYSQL *mysql, const char *query, ulong length) { int retval;
// 可以看到MySQL的日志记录十分详细, // 这样十分有利于分析和定位问题。 DBUG_ENTER("mysql_real_query"); // 跟踪日志 DBUG_PRINT("enter", ("handle: %p", mysql)); DBUG_PRINT("query", ("Query = '%-.*s'", (int)length, query)); DBUG_EXECUTE_IF("inject_ER_NET_READ_INTERRUPTED", { mysql->net.last_errno = ER_NET_READ_INTERRUPTED; DBUG_SET("-d,inject_ER_NET_READ_INTERRUPTED"); DBUG_RETURN(1); });
// 调用mysql_send_query if (mysql_send_query(mysql, query, length)) DBUG_RETURN(1);
// read_query_result指向cli_read_query_result // read_query_result的初始化,参见《MYSQL_METHODS结构体》一节 retval = (int)(*mysql->methods->read_query_result)(mysql); DBUG_RETURN(retval); } |
8. mysql_send_query函数
// Send the query and return so we can do something else. // Needs to be followed by mysql_read_query_result() // when we want to finish processing it. // client.cc int STDCALL mysql_send_query( MYSQL *mysql, const char *query, ulong length) { STATE_INFO *info; DBUG_ENTER("mysql_send_query");
if ((info = STATE_DATA(mysql))) free_state_change_info( static_cast( mysql->extension)); // 转为对simple_command的调用, // simple_command实际为一个指向 // cli_advanced_command的宏。 DBUG_RETURN(simple_command( mysql, COM_QUERY, (uchar *)query, length, 1)); // 最后一个参数值1表示跳过检查 } |
9. simple_command宏
// 注意传递给advanced_command的第3个和第4个参数值均为0, // 第3个和第4个参数分别为包头和包头长度 // advanced_command是指向的cli_advanced_command函数指针 // 注:mysql_real_query是一个simple command // 特点是:advanced_command的第3个和第4个参数为空。 #define simple_command(mysql,command,arg,length,skip_check) \ ((mysql)->methods \ ?(*(mysql)->methods->advanced_command)(mysql,command,0,0,arg, \ length,skip_check,NULL) \ :(set_mysql_error(mysql,CR_COMMANDS_OUT_OF_SYNC,unknown_sqlstate),1))
// 类似的: #define stmt_command(mysql, command, arg, length, stmt) \ ((mysql)->methods \ ?(*(mysql)->methods->advanced_command)(mysql,command,0,0,arg, \ length,1,stmt) \ :(set_mysql_error(mysql,CR_COMMANDS_OUT_OF_SYNC,unknown_sqlstate),1)) |
10. MYSQL_METHODS结构体
static MYSQL_METHODS client_methods = { // mysql_real_query调用mysql_send_query发送请求, // 调用cli_read_query_result接收请求结果, // 而mysql_send_query又调用了cli_advanced_command cli_read_query_result, /* read_query_result */ // 将结构体MYSQL_METHODS的advanced_command初始化为cli_advanced_command cli_advanced_command, /* advanced_command */ cli_read_rows, /* read_rows */ cli_use_result, /* use_result */ cli_fetch_lengths, /* fetch_lengths */ cli_flush_use_result, /* flush_use_result */ cli_read_change_user_result /* read_change_user_result */ #ifndef MYSQL_SERVER , cli_list_fields, /* list_fields */ cli_read_prepare_result, /* read_prepare_result */ cli_stmt_execute, /* stmt_execute */ cli_read_binary_rows, /* read_binary_rows */ cli_unbuffered_fetch, /* unbuffered_fetch */ cli_read_statistics, /* read_statistics */ cli_read_query_result, /* next_result */ cli_read_binary_rows, /* read_rows_from_cursor */ free_rows #endif }; |
11. cli_advanced_command函数
// client.cc bool cli_advanced_command( MYSQL *mysql, enum enum_server_command command, // COM_QUERY const uchar *header, size_t header_length, const uchar *arg, size_t arg_length, bool skip_check, MYSQL_STMT *stmt) { bool stmt_skip = stmt ? stmt->state != MYSQL_STMT_INIT_DONE : false; // 执行连接,如果需要 if (mysql->net.vio == 0) { // Do reconnect if possible if (mysql_reconnect(mysql) || stmt_skip) DBUG_RETURN(1); } 。。。。。。 // net_write_command只有发包,并没有收包, // 而且没有发现有设置错误码ER_NET_PACKET_TOO_LARGE的地方 if (net_write_command( net, (uchar)command, // COM_QUERY header, header_length, arg, arg_length)) { // 如果包太大,返回包太大错误 // 走读代码,发现不可能出现错误ER_NET_PACKET_TOO_LARGE, // 也许这是老的实现遗留的 // // 唯一设置错误码为ER_NET_PACKET_TOO_LARGE的地方是net_serv.cc中的net_realloc函数 if (net->last_errno == ER_NET_PACKET_TOO_LARGE) { set_mysql_error( mysql, CR_NET_PACKET_TOO_LARGE, unknown_sqlstate); goto end; } } 。。。。。。 result = 0; if (!skip_check) { // 如果skip_check值为0 result = ((mysql->packet_length = cli_safe_read_with_ok(mysql, 1, NULL)) == packet_error ? 1: 0); 。。。。。。 } |
12. int3store函数
// Stores an unsinged integer in a platform independent way // @param T The destination buffer. Must be at least 3 bytes long // @param A The integer to store. // _Example:_ // A @ref a_protocol_type_int3 "int \" with the value 1 is stored as: // ~~~~~~~~~~~~~~~~~~~~~ // 01 00 00 // ~~~~~~~~~~~~~~~~~~~~~ // 字节序转换 // 将一个整数转换成与平台无关的字节存储, // 因为只有双字节或以才有字节序的问题, // 单字节是不存在字节序问题的。 // 常规的做法一般是直接做主机字节序到 // 网络字节序的转换,这里的目的是节省一个字节, // 这可以通过net_write_command的实现看到, // T的第3个字节用作它途了。 static inline void int3store(uchar *T, uint A) { *(T) = (uchar)(A); *(T + 1) = (uchar)(A >> 8); *(T + 2) = (uchar)(A >> 16); } |
13. net_write_command函数
// Send a command to the server. // // 对于mysql_real_query是没有header部分的 // The reason for having both header and packet is so that libmysql // can easy add a header to a special command (like prepared statements) // without having to re-alloc the string. // // As the command is part of the first data packet, // we have to do some data // juggling to put the command in there, // without having to create a new // packet. // // This function will split big packets into sub-packets if needed. // (Each sub packet can only be 2^24 bytes) // // @param net NET handler // @param command Command in MySQL server (enum enum_server_command) // @param header Header to write after command // @param head_len Length of header // @param packet Query or parameter to query // @param len Length of packet // // @retval // 0 ok // @retval // 1 error // // net_serv.cc // // #define NET_HEADER_SIZE 4 // standard header size // // 大包分解成子包发送 bool net_write_command( NET *net, uchar command, // COM_QUERY const uchar *header, size_t head_len, // 注意mysql_real_query无header const uchar *packet, size_t len) { // 1 extra byte for command // 多留出1个字节存储命令字 size_t length = len + 1 + head_len; uchar buff[NET_HEADER_SIZE + 1]; // 5字节 uint header_size = NET_HEADER_SIZE + 1; // 5字节
// buff的第5个字节 // For first packet buff[4] = command; // 命令字(对于mysql_real_query值为COM_QUERY)
// 从下面可以看出, // 即使发送的包大于MAX_PACKET_LENGTH, // 也是可以发送的, // 函数net_write_command会将它按照 // MAX_PACKET_LENGTH为边界拆分成多个子包(sub packet)。 if (length >= MAX_PACKET_LENGTH) { // Take into account that we have the command in the first header len = MAX_PACKET_LENGTH - 1 - head_len; do { // 由于int3store的实现限定, // 因此MAX_PACKET_LENGTH的值不能超过3个字节 // // buff头3个字节存储了子包(sub packet)的长度,理论上最大可达:2^24字节, // 但因为MAX_PACKET_LENGTH只是个宏,所以实际限定为16M。 int3store(buff, MAX_PACKET_LENGTH); // int3store设置buff的前3个字节, // 下面这条语句设置buff的第4个字节 buff[3] = (uchar)net->pkt_nr++;
if (net_write_buff(net, buff, header_size) || // 包头大小,加命令字 net_write_buff(net, header, head_len) || // 对于mysql_real_query实际为空 net_write_buff(net, packet, len)) // 子包数据 { DBUG_RETURN(1); }
length -= MAX_PACKET_LENGTH; } while (length >= MAX_PACKET_LENGTH);
// Data left to be written // 这里做法可能和一般人想法不一样, // 一般的想法是if/else,而这里没有else, // 所以while循环的终止条件为: // length >= MAX_PACKET_LENGTH // 而不是: // length > 0 len = length; }
int3store(buff, static_cast(length)); buff[3] = (uchar)net->pkt_nr++; bool rc = net_write_buff(net,buff,header_size) || (head_len && net_write_buff(net,header,head_len)) || net_write_buff(net,packet,len) || net_flush(net); DBUG_RETURN(rc); } |
14. net_write_buff函数
// Caching the data in a local buffer before sending it. // Fill up net->buffer and send it to the client when full. // If the rest of the to-be-sent-packet is bigger than buffer, // send it in one big block (to avoid copying to internal buffer). // If not, copy the rest of the data to // the buffer and return without sending data. // // Notes the cached buffer can be sent as it is with 'net_flush()'. // In this code we have to be careful to not send a packet // longer than MAX_PACKET_LENGTH to net_write_packet() // if we are using the compressed protocol as we store // the length of the compressed packet in 3 bytes. // // @retval // 0 ok // @retval // 1 static bool net_write_buff( NET *net, const uchar *packet, size_t len) { 。。。。。。 // 只有buff存不下了(len > left_length), // 才调用net_write_packet真正发包。 if (len > left_length) { 。。。。。。 if (net_write_packet(net, packet, left_length)) return 1; 。。。。。。 }
// 如果len // 则不会真发送,而是缓存到struct NET的buff中 // struct NET { // MYSQL_VIO vio; // unsigned char *buff,*buff_end,*write_pos,*read_pos; // 。。。。。。 // } // 对于mysql_real_query,这里的len值可能为0,因为没有头(head) if (len > 0) memcpy(net->write_pos, packet, len); net->write_pos += len; return 0; } |
15. net_write_packet函数
// Write a MySQL protocol packet to the network handler. // // @param net NET handler. // @param packet The packet to write. // @param length Length of the packet. // // @remark The packet might be encapsulated into a compressed packet. // // @return true on error, false on success. bool net_write_packet( NET *net, const uchar *packet, size_t length) { bool res;
net->reading_or_writing = 2; const bool do_compress = net->compress; if (do_compress) { if ((packet = compress_packet(net, packet, &length)) == NULL) { net->error = 2; net->last_errno = ER_OUT_OF_RESOURCES; /* In the server, allocation failure raises a error. */ net->reading_or_writing = 0; DBUG_RETURN(true); } }
res = net_write_raw_loop(net, packet, length); if (do_compress) my_free((void *)packet); net->reading_or_writing = 0; } |
16. net_write_raw_loop函数
// Write a determined number of bytes to a network handler. // // @param net NET handler. // @param buf Buffer containing the data to be written. // @param count The length, in bytes, of the buffer. // // @return true on error, false on success. // // #define vio_write(vio, buf, size) \ // ((vio)->write)(vio, buf, size) // #define vio_was_timeout(vio) (vio)->was_timeout(vio) // // #define MYSQL_VIO struct Vio * static bool net_write_raw_loop( NET *net, const uchar *buf, size_t count) { // 同步阻塞方式发送,直到全部发完为止 while (count) { size_t sentcnt = vio_write(net->vio, buf, count);
// VIO_SOCKET_ERROR (-1) indicates an error. if (sentcnt == VIO_SOCKET_ERROR) { // A recoverable I/O error occurred? if (net_should_retry(net, &retry_count)) continue; else break; }
count -= sentcnt; buf += sentcnt; }
if (count) { // Socket should be closed. net->error = 2;
// Interrupted by a timeout? if (vio_was_timeout(net->vio)) net->last_errno = ER_NET_WRITE_INTERRUPTED; else net->last_errno = ER_NET_ERROR_ON_WRITE; }
return count != 0; } |
17. Vio结构体
// This structure is for every connection on both sides. // Note that it has a non-default move assignment operator, // so if adding more members, // you'll need to update operator= // // Vio实际是一个C++11类,非C的struct // violite.h struct Vio { MYSQL_SOCKET mysql_socket; // Instrumented socket 。。。。。。 // MySQL客户端库调用write往MySQL server发包, // write是指向什么的函数指针了?答案在vio.cc文件中。 // 指向vio_ssl_write或vio_write,但编译后只可能指向其中一个, // 但如果是WIN32,则可能指向vio_write_pipe或vio_write_shared_memory // 这里,我们只关注非WIN32环境。 // #ifdef HAVE_OPENSSL // if (type == VIO_TYPE_SSL) { // 。。。。。。 // vio->write = vio_ssl_write; // 。。。。。。 // return false; // } // 。。。。。。 // vio->write = vio_write; // 。。。。。。 // return false; size_t (*write)(MYSQL_VIO,const uchar*,size_t)={nullptr}; 。。。。。。 // Flag to indicate whether we are in poll or shutdown. // A true value of flag indicates either the socket // has called shutdown or it is sleeping on a poll call. // False value of this flag means that the socket is // not sleeping on a poll call. std::atomic_flag poll_shutdown_flag = ATOMIC_FLAG_INIT;
private: // 禁掉构造和析构函数 friend Vio *internal_vio_create(uint flags); explicit Vio(uint flags); ~Vio();
public: // 禁掉拷贝构造和拷贝赋值函数 Vio(const Vio &) = delete; Vio &operator=(const Vio &) = delete; Vio &operator=(Vio &&vio); }; |
18. vio_write函数
// #define mysql_socket_send(FD, B, N, FL) inline_mysql_socket_send(FD, B, N, FL) // viosocket.cc size_t vio_write(Vio *vio,const uchar *buf,size_t size) { 。。。。。。 // 调用inline_mysql_socket_send发送 while ((ret = mysql_socket_send( vio->mysql_socket, (SOCKBUF_T *)buf, size, flags)) == -1) { int error = socket_errno; 。。。。。。 // Wait for the output buffer to become writable if ((ret=vio_socket_io_wait(vio,VIO_IO_EVENT_WRITE))) break; } DBUG_RETURN(ret); } |
19. inline_mysql_socket_send函数
static inline ssize_t inline_mysql_socket_send( MYSQL_SOCKET mysql_socket, const SOCKBUF_T *buf, size_t n, int flags) { 。。。。。。 // Non instrumented code result = send(mysql_socket.fd, buf, IF_WIN((int), ) n, flags); 。。。。。。 return result; } |
20. net_realloc函数
调用者:net_read_packet和my_realloc_str,net_realloc会对包大小进行判断,如果超过大小,设置出错码为ER_NET_PACKET_TOO_LARGE。
// Realloc the packet buffer. // net_serv.cc bool net_realloc(NET *net, size_t length) { uchar *buff; size_t pkt_length; DBUG_ENTER("net_realloc"); DBUG_PRINT("enter", ("length: %lu", (ulong)length));
if (length >= net->max_packet_size) { DBUG_PRINT("error", ("Packet too large. Max size: %lu", net->max_packet_size)); /* @todo: 1 and 2 codes are identical. */ net->error = 1; // 设置出错码ER_NET_PACKET_TOO_LARGE net->last_errno = ER_NET_PACKET_TOO_LARGE; #ifdef MYSQL_SERVER my_error(ER_NET_PACKET_TOO_LARGE, MYF(0)); #endif DBUG_RETURN(1); } 。。。。。。 } |
21. net_read_packet函数
// Read one (variable-length) MySQL protocol packet. // A MySQL packet consists of a header and a payload. // // @remark Reads one packet to net->buff + net->where_b. // @remark Long packets are handled by my_net_read(). // @remark The network buffer is expanded if necessary. // // @return The length of the packet, or @c packet_error on error. // // net_serv.cc static size_t net_read_packet(NET *net, size_t *complen) { if (net_read_packet_header(net)) goto error; 。。。。。。 // Expand packet buffer if necessary // 函数net_realloc可能触发错误ER_NET_PACKET_TOO_LARGE if ((pkt_data_len >= net->max_packet) && net_realloc(net, pkt_data_len)) goto error; // Read the packet data (payload) if (net_read_raw_loop(net, pkt_len)) goto error; 。。。。。。 } |
22. mysql_real_connect函数
// client.cc MYSQL *STDCALL mysql_real_connect( MYSQL *mysql, // 如果传NULL,则会自动创建 const char *host, const char *user, const char *passwd, const char *db, uint port, const char *unix_socket, ulong client_flag) { 。。。。。。 NET *net = &mysql->net; 。。。。。。 if (my_net_init(net, net->vio)) { 。。。。。。 }
// Init with packet info // client.cc bool my_net_init(NET *net, Vio *vio) { net->vio = vio; // Set some limits my_net_local_init(net); 。。。。。。 }
// Functions called my my_net_init() to // set some application specific variables // libmysql.cc void my_net_local_init(NET *net) { (void)mysql_get_option(NULL, MYSQL_OPT_MAX_ALLOWED_PACKET, &local_max_allowed_packet); (void)mysql_get_option(NULL, MYSQL_OPT_NET_BUFFER_LENGTH, &local_net_buffer_length); 。。。。。。 // MY_MAX作用是取两者中的最大值 net->max_packet_size = MY_MAX(local_net_buffer_length, local_max_allowed_packet); }
int STDCALL mysql_get_option( MYSQL *mysql, enum mysql_option option, const void *arg) { if (!arg) DBUG_RETURN(1); switch (option) { 。。。。。。 // client.cc // ulong g_net_buffer_length = 8192; / 8KB // ulong g_max_allowed_packet = 1024L * 1024L * 1024L; // 1GB case MYSQL_OPT_MAX_ALLOWED_PACKET: if (mysql) *((ulong *)arg) = mysql->options.max_allowed_packet; else *((ulong *)arg) = g_max_allowed_packet; break; case MYSQL_OPT_NET_BUFFER_LENGTH: *((ulong *)arg) = g_net_buffer_length; break; 。。。。。。 default: DBUG_RETURN(1); } DBUG_RETURN(0); } |
23. mysql_init函数
// Init MySQL structure or allocate one MYSQL *STDCALL mysql_init(MYSQL *mysql) { if (mysql_server_init(0, NULL, NULL)) return 0; if (!mysql) { if (!(mysql=(MYSQL *)my_malloc( key_memory_MYSQL,sizeof(*mysql), MYF(MY_WME | MY_ZEROFILL)))) { set_mysql_error(NULL, CR_OUT_OF_MEMORY, unknown_sqlstate); return 0; } // 自己创建的,就需要负责释放, // 否则如果是调用者创建的,就不用管释放, // 注意,调用者可在栈上创建的。 mysql->free_me = 1; } else { memset(mysql, 0, sizeof(*(mysql))); mysql->charset = default_client_charset_info; 。。。。。。 } 。。。。。。 mysql->reconnect = 0; 。。。。。。 return mysql; } |
24. Packet Too Large
MySQL-8.0对“Packet Too Large”的官方说明:
MySQL 8.0的服务端和客户端可收和发的最大包大小为1GB。 包是指发送给服务端的单条SQL,或发送给客户端的单行数据,或master发给slave的binlog。 A communication packet is a single SQL statement sent to the MySQL server, a single row that is sent to the client, or a binary log event sent from a master replication server to a slave. The largest possible packet that can be transmitted to or from a MySQL 8.0 server or client is 1GB. When a MySQL client or the mysqld server receives a packet bigger than max_allowed_packet bytes, it issues an ER_NET_PACKET_TOO_LARGE error and closes the connection. With some clients, you may also get a Lost connection to MySQL server during query error if the communication packet is too large. Both the client and the server have their own max_allowed_packet variable, so if you want to handle big packets, you must increase this variable both in the client and in the server.
If you are using the mysql client program, its default max_allowed_packet variable is 16MB. To set a larger value, start mysql like this: shell> mysql --max_allowed_packet=32M
The server's default max_allowed_packet value is 64MB. You can increase this if the server needs to handle big queries (for example, if you are working with big BLOB columns). For example, to set the variable to 128MB, start the server like this: shell> mysqld --max_allowed_packet=128M
You can also use an option file to set max_allowed_packet. For example, to set the size for the server to 128MB, add the following lines in an option file: [mysqld] max_allowed_packet=128M |
25. 分析结论
1) 包的最大限制为1GB;
2) 如果包大小超过16M,则会被分解为多个子包,每个子包大小小于16M;
3) 分包是函数net_write_command的行为;
4) 即使被分解成了多个包,也并不立即发送;
5) 发送缓冲区满了才会立即发送;
6) 在函数net_read_packet中,调用net_realloc会判断包大小,如果超过大小,则错误码(last_errno)设置为ER_NET_PACKET_TOO_LARGE;
7) mysql_real_query并没有对要发送的大小进行判断,超过1G大小也是可以发的,但服务端会报错,这个由net_realloc决定,而调用者是;
8) 只有收包的时候才知道包大小是否超过了。