# C++ 调用 MYSQL API 连接池

# 高并发下频繁处理瓶颈

  • 建立通信: TCP三次握手
  • 数据库服务器的 连接认证
  • 服务器 关闭连接 的资源回收
  • 断开 通信的 TCP 四次挥手

如果客户端和服务端 频繁 进行类似操纵,影响整个 开发效率

# 数据库连接池

为了 提高 数据库 (关系型数据库) 的访问 瓶颈 ,除在服务器端添加缓存服务器缓存常用的数据,还可添加连接池来提高服务器访问效率

# 创建连接池思路

连接池URML

连接池主要用于 网络服务器端 ,用于同时接受 多个用户端 请求,数据库与数据库客户端采用 TCP通信 .

  • 数据库客户端和服务端先建立起 多个连接
  • 多线程通过 套接字通信 取出连接池中的一个连接,然后和服务器直接进行通信,通信之后再将此连接 还给连接池 (减少数据库连接和断开的次数)
  • 数据库连接池对应 C++ 中的一个数据库连接对象,即 单例模式
  • 连接池中包括数据库服务器连接对应的 IP,端口,用户,密码等信息
  • 对数据库对象存入 STL 当中,需要设置最大值,最小值限制队列
  • 多线程从连接池中取出数据库对象若有取出, 没有等待 调用算法
  • 对 连接池中的数据库连接 (空间时间长的即调度算法) 进行 适当 断开连接
  • 共享资源的访问,需要 互斥锁 (生产者消费者问题)

# 单例模式

  • 懒汉模式

当使用这个类的时候才创建它
创建对象时,加锁保证有且仅有一个
(有线程安全问题)

  • 饿汉模式

不管用不用它,只要类被创建,这个实例就有
没有线程安全问题

# 连接池算法实现 C++

换将配置参考 jsoncppMySQL API 文章
Jsoncpp 配置
MySQl 配置

连接池

# MysqlConnect.h

#include "MySQLConnect.h"
// 释放结果集空间
void MySqlConnect::freeResult()
{
	if (m_result)
	{
		// 释放数据库连接
		mysql_free_result(m_result);
		// 将数据库指针置空
		m_result = nullptr;
	}
}
// 连接 MySQL 数据库
MySqlConnect::MySqlConnect()
{
	// 初始化 MySQL
	m_conn = mysql_init(nullptr);
	// 设置 MySQL 的格式字符 utf8
	mysql_set_character_set(m_conn, "utf8");
}
//MySQL 的析构函数
MySqlConnect::~MySqlConnect()
{
	// 如果连接不为空
	if (m_conn != nullptr)
	{
		// 关闭 MySQL 连接
		mysql_close(m_conn);
	}
	freeResult();
}
bool MySqlConnect::connect(string user, string passwd, string dbName, string ip, unsigned short port)
{
	//ip 传入为 string,使用.str 将 ip 转为 char * 类型
	MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);
	// 连接成功返回 true
	// 如果连接成功返回 TRUE,失败返回 FALSE
	return ptr != nullptr;
}
bool MySqlConnect::update(string sql)
{
	//query 执行成功返回 0
	try
	{
		if (mysql_query(m_conn, sql.c_str()))
		{
			throw invalid_argument("执行语句插入/更新/删除失败!请检查数据库");
			return false;
		};
	}
	catch (exception& e)
	{
		cout << e.what() << endl;
	}
	
	return true;
}
bool
MySqlConnect::query(string sql)
{
	freeResult();
	//query 执行成功返回 0
	try
	{
		if (mysql_query(m_conn, sql.c_str()))
		{
			throw invalid_argument("查询数据库失败!");
			return false;
		};
		m_result = mysql_store_result(m_conn);
	}
	catch (exception& e)
	{
		cout << e.what() << endl;
	}
	
	return true;
}
bool MySqlConnect::next()
{
	// 如果结果集为空则没有必要遍历
	if (m_result != nullptr)
	{
		// 保存着当前字段的所有列的数值
		m_row = mysql_fetch_row(m_result);
		// 如果字段不为空
		if (m_row != nullptr)
		{
			return true;
		}
		
	}
	return false;
}
string MySqlConnect::value(int index)
{
	// 表示列的数量
	int row_num = mysql_num_fields(m_result); // 函数得到结果集中的列数
	// 如果查询的的 index 列大于总列,或小于 0,是错误的
	if (index >= row_num || index < 0)
	{
		return string();
	}
	char* val = m_row[index]; // 若为二进制数据,中间是有 "\0" 的
	unsigned long length = mysql_fetch_lengths(m_result)[index];
	return string(val, length); // 传入 length 就不会以 "\0" 为结束符,而是通过长度把对应的字符转换为 string 类型
}
bool MySqlConnect::transaction()
{
	return mysql_autocommit(m_conn, false); // 函数返回值本身就是 bool 类型
}
bool MySqlConnect::commit()
{
	return mysql_commit(m_conn);// 提交
}
bool MySqlConnect::rollback()
{
	return mysql_rollback(m_conn);//bool 类型,函数成功返回 TRUE,失败返回 FALSE
}
// 刷新起始空闲时间
void MySqlConnect::refreshAliveTime()
{
	m_alivetime = steady_clock::now();
}
long long MySqlConnect::getAliveTime()
{
	nanoseconds res = steady_clock::now() - m_alivetime;
	// 将纳秒转换为毫秒
	milliseconds millsec = duration_cast<milliseconds>(res);
	return millsec.count();
}

# MysqlConnect.cpp

#include "MySQLConnect.h"
// 释放结果集空间
void MySqlConnect::freeResult()
{
	if (m_result)
	{
		// 释放数据库连接
		mysql_free_result(m_result);
		// 将数据库指针置空
		m_result = nullptr;
	}
}
// 连接 MySQL 数据库
MySqlConnect::MySqlConnect()
{
	// 初始化 MySQL
	m_conn = mysql_init(nullptr);
	// 设置 MySQL 的格式字符 utf8
	mysql_set_character_set(m_conn, "utf8");
}
//MySQL 的析构函数
MySqlConnect::~MySqlConnect()
{
	// 如果连接不为空
	if (m_conn != nullptr)
	{
		// 关闭 MySQL 连接
		mysql_close(m_conn);
	}
	freeResult();
}
bool MySqlConnect::connect(string user, string passwd, string dbName, string ip, unsigned short port)
{
	//ip 传入为 string,使用.str 将 ip 转为 char * 类型
	MYSQL* ptr = mysql_real_connect(m_conn, ip.c_str(), user.c_str(), passwd.c_str(), dbName.c_str(), port, nullptr, 0);
	// 连接成功返回 true
	// 如果连接成功返回 TRUE,失败返回 FALSE
	return ptr != nullptr;
}
bool MySqlConnect::update(string sql)
{
	//query 执行成功返回 0
	try
	{
		if (mysql_query(m_conn, sql.c_str()))
		{
			throw invalid_argument("执行语句插入/更新/删除失败!请检查数据库");
			return false;
		};
	}
	catch (exception& e)
	{
		cout << e.what() << endl;
	}
	
	return true;
}
bool
MySqlConnect::query(string sql)
{
	freeResult();
	//query 执行成功返回 0
	try
	{
		if (mysql_query(m_conn, sql.c_str()))
		{
			throw invalid_argument("查询数据库失败!");
			return false;
		};
		m_result = mysql_store_result(m_conn);
	}
	catch (exception& e)
	{
		cout << e.what() << endl;
	}
	
	return true;
}
bool MySqlConnect::next()
{
	// 如果结果集为空则没有必要遍历
	if (m_result != nullptr)
	{
		// 保存着当前字段的所有列的数值
		m_row = mysql_fetch_row(m_result);
		// 如果字段不为空
		if (m_row != nullptr)
		{
			return true;
		}
		
	}
	return false;
}
string MySqlConnect::value(int index)
{
	// 表示列的数量
	int row_num = mysql_num_fields(m_result); // 函数得到结果集中的列数
	// 如果查询的的 index 列大于总列,或小于 0,是错误的
	if (index >= row_num || index < 0)
	{
		return string();
	}
	char* val = m_row[index]; // 若为二进制数据,中间是有 "\0" 的
	unsigned long length = mysql_fetch_lengths(m_result)[index];
	return string(val, length); // 传入 length 就不会以 "\0" 为结束符,而是通过长度把对应的字符转换为 string 类型
}
bool MySqlConnect::transaction()
{
	return mysql_autocommit(m_conn, false); // 函数返回值本身就是 bool 类型
}
bool MySqlConnect::commit()
{
	return mysql_commit(m_conn);// 提交
}
bool MySqlConnect::rollback()
{
	return mysql_rollback(m_conn);//bool 类型,函数成功返回 TRUE,失败返回 FALSE
}
// 刷新起始空闲时间
void MySqlConnect::refreshAliveTime()
{
	m_alivetime = steady_clock::now();
}
long long MySqlConnect::getAliveTime()
{
	nanoseconds res = steady_clock::now() - m_alivetime;
	// 将纳秒转换为毫秒
	milliseconds millsec = duration_cast<milliseconds>(res);
	return millsec.count();
}

# ConnectionPool.h

#pragma once
// 连接池头文件
#ifndef CONNECTIONPOOL_H
#define CONNECTIONPOOL_H
#include<queue>
#include "MySQLConnect.h"
#include <mutex> //C++ 独占的互斥锁
#include <condition_variable> // 引用 C++ 条件变量
using namespace std;
// 连接池
class ConnectionPool
{
public:
	// 静态实例,通过静态方法获得唯一的单例对象
	static ConnectionPool* getConnectPool();
	// 删除掉构造函数
	ConnectionPool(const ConnectionPool& obj) = delete;
	// 移动赋值函数重载,删除掉,防止对象的复制
	ConnectionPool& operator =(const ConnectionPool& obj)= delete;
	// 获取连接时返回一个可用的连接,返回共享的智能指针
	shared_ptr<MySqlConnect> getConnection();
	// 析构函数
	~ConnectionPool();
private:
	ConnectionPool();
	// 解析 JSON 文件的函数
	bool paraseJsonFile();
	// 用来生产数据库连接
	void produceConnection();
	
	// 用来销毁数据库连接 回收数据库连接
	void recycleConnection();
	// 添加连接
	void addConnection();
	// 数据库相关信息
	// 通过加载配置文件 Json,访问用户指定的数据库
	// 数据库 ip
	string m_ip; 
	// 数据库用户
	string m_user;
	// 数据库密码
	string m_passwd;
	// 数据库名称
	string m_dbName;
	// 数据库访问端口
	unsigned short m_port;
	// 设置连接上限
	int m_minSize;
	// 设置连接的上限
	int m_maxSize;
	// 设置线程等待最大时长,单位毫秒
	// 超时时长
	int m_timeout; 
	// 最大空闲时长单位毫秒
	int m_maxIdleTime;
	// 存储若干数据库连接队列
	queue<MySqlConnect*> m_connectionQ;
	// 设置互斥锁
	mutex m_mutexQ;
	// 设置条件变量
	condition_variable m_cond;
};
#endif // !CONNECTIONPOOL_H

# ConnectionPool.cpp

#include "ConnectionPool.h"
#include <json/json.h>
#include <fstream>
#include <mysql.h>
#include <thread> // 加载多线程
using namespace Json;
// 静态函数
ConnectionPool* ConnectionPool::getConnectPool()
{
    static ConnectionPool pool; // 静态局部对象,不管后面调用多少次,得到的都是同一块内存地址
    return &pool;
}
// 打开数据库信息文件,并判断是否读取到相关信息
bool ConnectionPool::paraseJsonFile()
{
    try
    {
        ifstream ifs("dbconf.json");
        Reader rd;
        Value root;
        rd.parse(ifs, root);
        if (root.isObject())
        {
            m_ip = root["ip"].asString();
            m_port = root["port"].asInt();
            m_user = root["userName"].asString();
            m_passwd = root["password"].asString();
            m_dbName = root["dbName"].asString();
            m_minSize = root["minSize"].asInt();
            m_maxSize = root["maxSize"].asInt();
            m_maxIdleTime = root["maxIdlTime"].asInt();
            m_timeout = root["timeout"].asInt();
            return true;
        }
        throw("读取连接数据库json失败!");
        return false;
    }
    catch (exception& e)
    {
        cout << e.what() << endl;
    }
    
    
}
// 子线程对应的任务函数,生成新的可用连接
void ConnectionPool::produceConnection()
{
    //
    while (true) 
    {
        // 判断当前连接池是否够用
        //uniuqe 模版类,mutex 互斥锁类型 locker 对象管理
        unique_lock<mutex> locker(m_mutexQ);
        while (m_connectionQ.size() >= m_minSize)
        {
            // 阻塞条件变量
            m_cond.wait(locker);
        }
        // 生产一个数据库连接
        addConnection();
        // 调用对应的唤醒函数,唤醒的所有消费者
        m_cond.notify_all();
    }
}
// 当空闲的链接数量过多
void ConnectionPool::recycleConnection()
{
    while (true)
    {
        // 休息一段时间,每隔一秒种,进行一次检测
        this_thread::sleep_for(chrono::milliseconds(500));
        // 进行加锁
        lock_guard<mutex>locker(m_mutexQ);
        // 当大于最小连接数
        while (m_connectionQ.size() > m_minSize)
        {
            // 先进后出
            MySqlConnect* conn = m_connectionQ.front(); // 取出队头元素
            // 判断队头元素存活时长是不是大于指定的最长存活时长
            if (conn->getAliveTime() >= m_maxIdleTime)
            {
                m_connectionQ.pop(); // 将队头的链接销毁
                delete conn;
            }
            else
            {
                break;
            }
        }
    }
}
void ConnectionPool::addConnection()
{
    MySqlConnect* conn = new MySqlConnect;
    conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);
    // 数据库连接之后就开始记录时间错
    conn->refreshAliveTime();
    m_connectionQ.push(conn);
}
shared_ptr<MySqlConnect> ConnectionPool::getConnection()
{
    // 封装互斥锁,保证线程安全
    unique_lock<mutex> locker(m_mutexQ);
    // 检查是否有可用的连接,如果没有阻塞一会
    while (m_connectionQ.empty())
    {
        if (cv_status::timeout == m_cond.wait_for(locker, chrono::milliseconds(m_timeout)))
 
        {
            if (m_connectionQ.empty())
            {
                continue;
            }
        }
    }
    shared_ptr<MySqlConnect> connptr(m_connectionQ.front(), [this](MySqlConnect* conn) 
        {
            // 加锁
            lock_guard<mutex> locker(m_mutexQ);
           // 刷新起始空闲时间
            conn->refreshAliveTime();   
            m_connectionQ.push(conn);
        });
    m_connectionQ.pop();
    m_cond.notify_all();// 唤醒生产者
    return connptr;
}
// 线程池析构函数
ConnectionPool::~ConnectionPool()
{
    while (m_connectionQ.empty())
    {
        MySqlConnect* conn = m_connectionQ.front();
        m_connectionQ.pop();
        delete conn;
    }
}
// 构造函数的实现
ConnectionPool::ConnectionPool()
{
    // 加载配置文件
    if (!paraseJsonFile())
    {
        cout << "数据库连接失败" << endl;
        return;
    }
    // 初始化配置连接数
    for (int i = 0; i < m_minSize; ++i)  // 连接数
    { 
        // 如果队列总数小于最大数量
        if (m_connectionQ.size() < m_maxSize)
        {
            // 实例化对象
            MySqlConnect* conn = new MySqlConnect;
            // 链接数据库
            conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);
            m_connectionQ.push(conn);
        }
        // 当连接总数大于允许连接的最大数量
        else
        {
            cout << "当前连接数量以超过允许连接的总数" << endl;
            break;
        }
    }
    // 当前实例对象 this 指针,单例模式,
    thread producer(&ConnectionPool::produceConnection,this); // 生成线程池的连接
    thread recycler(&ConnectionPool::recycleConnection,this); // 有没有需要销毁的连接
    /*
    将 producer 线程与当前线程分离,使得它们可以独立执行,
    */
    // 主线程和子线程分离
    producer.detach();
    recycler.detach();
}

# main.cpp

#define _CRT_SECURE_NO_WARNINGS
#include <json/json.h>
#include <fstream>
#include <iostream>
#include <stdio.h>
#include <mysql.h>
#include <memory>
#include "MySQLConnect.h" //MySQL
#include "ConnectionPool.h" // 连接池
using namespace std;
// 单线程:使用 / 不使用连接池
// 多线程:使用 / 不使用连接池
// 循环插入操作,从 begin 到 end
void op1(int begin, int end)
{
	for (int i = begin; i < end; ++i)
	{
		// 建立 MySQL 连接类
		MySqlConnect conn;
		// 调用 connect 方法连接
		if (conn.connect("root", "5211314", "cpp", "localhost"))
		{
			// 建立 sql 数组
			char sql[1024] = { 0 };
			//sprintf 用于将格式化的字符串写入到一个字符数组中
			// 第一个参数 str 用于存储格式化后的字符串
			// 第二个 format 是一个格式化字符串,用于指定输出的格式,
			// 第三个为参数
			// 执行插入语句
			sprintf(sql, "insert into person values(%d,25,'man','tom')", i);
			// 执行插入操作
			
			conn.update(sql);
		}
		else
		{
			cout << "请检查数据库连接,数据库连接失败" << endl;
		}
		
	}
}
// 使用数据库连接池
void op2(ConnectionPool* pool, int begin, int end)
{
	for (int i = begin; i < end; ++i)
	{
		// 返回共享的智能指针
		shared_ptr<MySqlConnect> conn = pool->getConnection();
		
		char sql[1024] = { 0 };
		// 执行插入语句
		sprintf(sql, "insert into person values(%d,25,'man','tom')", i);
		// 执行
		conn->update(sql);
	}
}
// 单线程测试函数
void test1()
{
#if 0
	// 非连接池,单线程,用时:59071963900 纳秒,59071 毫秒 ,59 秒
	cout << "非连接池,单线程测试:" << endl;
	// 绝对时钟
	//now:表示当前时间的时间点
	steady_clock::time_point begin = steady_clock::now();
	op1(0, 5000); // 单线程每次插入数据都要建立数据库连接,而且析构数据库连接 TCP 连接浪费较多时间
	steady_clock::time_point end = steady_clock::now();
	auto length = end - begin;
	cout << "非连接池,单线程,用时:" << length.count() << "纳秒,"
		<< length.count() / 1000000 << "毫秒 ," 
		<< length.count() / 1000000000 << "秒" << endl;
#else
	// 连接池,单  线程,用时:10798039100 纳秒,10798 毫秒 ,10 秒
	cout << "连接池,单线程测试:" << endl;
	ConnectionPool* pool = ConnectionPool::getConnectPool();
	// 绝对时钟
	steady_clock::time_point begin = steady_clock::now();
	op2(pool,0,5000);
	steady_clock::time_point end = steady_clock::now();
	auto length = end - begin;
	cout << "连接池,单  线程,用时:" << length.count() << "纳秒,"
		<< length.count() / 1000000 << "毫秒 ,"
		<< length.count() / 1000000000 << "秒" << endl;
#endif
}
//
void test2()
{
#if 0
	// 多线程非连接池,用时:23374580700 纳秒,23374 毫秒 ,23 秒
	cout << "多线程非连接池:" << endl;
	// 额外的数据库连接
	MySqlConnect conn;
	conn.connect("root", "5211314", "cpp", "localhost");
	steady_clock::time_point begin = steady_clock::now();
	// 创建线程
	thread t1(op1, 0, 1000);
	thread t2(op1, 1000, 2000);
	thread t3(op1, 2000, 3000);
	thread t4(op1, 3000, 4000);
	thread t5(op1, 4000, 5000);
	// 使用 join 方法,等待线程执行完毕
	t1.join();
	t2.join();
	t3.join();
	t4.join();
	t5.join();
	steady_clock::time_point end = steady_clock::now();
	auto length = end - begin;
	cout << "多线程非连接池,用时:" << length.count() << "纳秒,"
		<< length.count() / 1000000 << "毫秒 ,"
		<< length.count() / 1000000000 << "秒" << endl;
#else
	// 连接池,单  线程,用时:5333870000 纳秒,5333 毫秒 ,5 秒
	cout << "多线程连接池:" << endl;
	ConnectionPool* pool = ConnectionPool::getConnectPool();
	steady_clock::time_point begin = steady_clock::now();
	thread t1(op2,pool, 0, 1000);
	thread t2(op2, pool, 1000, 2000);
	thread t3(op2, pool, 2000, 3000);
	thread t4(op2, pool, 3000, 4000);
	thread t5(op2, pool, 4000, 5000);
	t1.join();
	t2.join();
	t3.join();
	t4.join();
	t5.join();
	steady_clock::time_point end = steady_clock::now();
	auto length = end - begin;
	cout << "连接池,单  线程,用时:" << length.count() << "纳秒,"
		<< length.count() / 1000000 << "毫秒 ,"
		<< length.count() / 1000000000 << "秒" << endl;
#endif
}
int query()
{
	// 建立 MySQL 连接
	MySqlConnect conn;
	// 调用 connect 方法建立数据库
	conn.connect("root", "5211314", "cpp", "localhost");
	// 执行插入语句
	string sql="insert into person values(5,25,'man','tom')";
	// 执行插入,更新,删除操作
	
		
	bool flag = conn.update(sql);
	//boolalpha 以 true 和 false 形式输出
	cout << boolalpha<<  "flag value: " << flag << endl;
	
		
	
	sql = "select * from person";
	// 查询数据库
	
	conn.query(sql);
	// 返回结果集记录
	while (conn.next())
	{
		cout << conn.value(0) << ", "
			<< conn.value(1) << ", "
			<< conn.value(2) << ", "
			<< conn.value(3) << endl;
	}
	
	return 0;
}
int main(void)
{
	// 检测 MySQL 的环境以及 SQL 的环境
	printf("MySQL Environment Successful\n");
	// 直接操控 MySQL 进行管理
	//query();
	//test1();
	test2();
	return 0;
}

参考资料:

  • B 站爱编程的大丙
  • 《深入设计模式》- 亚历山大什韦茨