您当前的位置:首页 > IT编程 > python
| C语言 | Java | VB | VC | python | Android | TensorFlow | C++ | oracle | 学术与代码 | cnn卷积神经网络 | gnn | 图像修复 | Keras | 数据集 | Neo4j | 自然语言处理 | 深度学习 | 医学CAD | 医学影像 | 超参数 | pointnet | pytorch | 异常检测 |

自学教程:Python大批量写入数据(百万级别)的方法

51自学网 2023-07-22 10:35:55
  python
这篇教程Python大批量写入数据(百万级别)的方法写得很实用,希望能帮到您。

背景

现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。

方案

方案一:多线程+协程+异步MySql方案二:多线程+MySql批量插入

代码

    1,先通过pandas读取所有csv数据存入列表。
    2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程)
    3,方案二 线程内以  executemany 方法批量插入所有数据。
    4,方案一 线程内使用异步事件循环遍历所有数据异步插入。 
    5,方案一纯属没事找事型。

方案二

import threadingimport pandas as pdimport asyncioimport timeimport aiomysqlimport pymysqldata=[]error_data=[]def run(start,end):    global data    global error_data    print("start"+threading.current_thread().name)    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))    mysdb = getDb("*", *, "*", "*", "*")    cursor = mysdb.cursor()    sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""    cursor.executemany(sql,data[start:end])    mysdb.commit()    mysdb.close()    print("end" + threading.current_thread().name)    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))def csv_file_read_use_pd(csvFile):    csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='/t')    csv_result = csv_result.fillna(value="None")    result = csv_result.values.tolist()    return resultclass MyDataBase:    def __init__(self,host=None,port=None,username=None,password=None,database=None):        self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database)    def close(self):        self.db.close()def getDb(host,port,username,password,database):    MyDb = MyDataBase(host, port, username, password,database)    return MyDb.dbdef main(csvFile):    global data  #获取全局对象  csv全量数据    #读取所有的数据   将所有数据均分成   thread_lens   份 分发给  thread_lens  个线程去执行    thread_lens=20    csv_result=csv_file_read_use_pd(csvFile)    day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))    for item in csv_result:        item.insert(0,day)    data=csv_result    thread_exe_count_list=[]   #线程需要执行的区间    csv_lens=len(csv_result)    avg = csv_lens // thread_lens    remainder=csv_lens % thread_lens    # 0,27517  27517,55,034    nowIndex=0    for i in range(thread_lens):        temp=[nowIndex,nowIndex+avg]        nowIndex=nowIndex+avg        thread_exe_count_list.append(temp)    thread_exe_count_list[-1:][0][1]+=remainder  #余数分给最后一个线程    # print(thread_exe_count_list)    #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])    for i in range(thread_lens):        sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))        sub_thread.start()        sub_thread.join()        time.sleep(3)if __name__=="__main__":    #csv_file_read_use_pd("分公司箱型箱量.csv")    main("分公司箱型箱量.csv")

方案一

import threadingimport pandas as pdimport asyncioimport timeimport aiomysqldata=[]error_data=[]async def async_basic(loop,start,end):    global data    global error_data    print("start"+threading.current_thread().name)    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))    conn = await aiomysql.connect(        host="*",        port=*,        user="*",        password="*",        db="*",        loop=loop    )    day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))    sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""    async with conn.cursor() as cursor:        for item in data[start:end]:            params=[day]            params.extend(item)            try:                x=await cursor.execute(sql,params)                if x==0:                    error_data.append(item)                print(threading.current_thread().name+"   result "+str(x))            except Exception as e:                print(e)                error_data.append(item)                time.sleep(10)                pass    await conn.close()    #await conn.commit()    #关闭连接池    # pool.close()    # await pool.wait_closed()    print("end" + threading.current_thread().name)    print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))def csv_file_read_use_pd(csvFile):    csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='/t')    csv_result = csv_result.fillna(value="None")    result = csv_result.values.tolist()    return resultdef th(start,end):    loop = asyncio.new_event_loop()    loop.run_until_complete(async_basic(loop,start,end))def main(csvFile):    global data  #获取全局对象  csv全量数据    #读取所有的数据   将所有数据均分成   thread_lens   份 分发给  thread_lens  个线程去执行    thread_lens=20    csv_result=csv_file_read_use_pd(csvFile)    data=csv_result    thread_exe_count_list=[]   #线程需要执行的区间    csv_lens=len(csv_result)    avg = csv_lens // thread_lens    remainder=csv_lens % thread_lens    # 0,27517  27517,55,034    nowIndex=0    for i in range(thread_lens):        temp=[nowIndex,nowIndex+avg]        nowIndex=nowIndex+avg        thread_exe_count_list.append(temp)    thread_exe_count_list[-1:][0][1]+=remainder  #余数分给最后一个线程    print(thread_exe_count_list)    #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1])    for i in range(thread_lens):        sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],))        sub_thread.start()        time.sleep(3)if __name__=="__main__":    #csv_file_read_use_pd("分公司箱型箱量.csv")    main("分公司箱型箱量.csv")

总结

到此这篇关于Python大批量写入数据的文章就介绍到这了,更多相关Python大批量写入数据内容请搜索51zixue.net以前的文章或继续浏览下面的相关文章希望大家以后多多支持51zixue.net!


Python使用Matplotlib库创建3D 图形和交互式图形详解
Python爬虫原理与基本请求库urllib详解
51自学网自学EXCEL、自学PS、自学CAD、自学C语言、自学css3实例,是一个通过网络自主学习工作技能的自学平台,网友喜欢的软件自学网站。
京ICP备13026421号-1