造个Python轮子,实现根据Excel生成Model和数据导入脚本:世界观天下
最近遇到一个需求,有几十个Excel,每个的字段都不一样,然后都差不多是第一行是表头,后面几千上万的数据,需要把这些Excel中的数据全都加入某个已经上线的Django项目
这就需要每个Excel建个表,然后一个个导入了
这样的效率太低,不能忍
(相关资料图)
所以我造了个自动生成 Model 和导入脚本的轮子
2思路首先拿出 pandas,它的 DataFrame 用来处理数据很方便
pandas 加载 Excel 之后,提取表头,我们要通过表头来生成数据表的字段。有些 Excel 的表头是中文的,需要先做个转换。
一开始我是想用翻译API,全都翻译成英文,不过发现免费的很慢有限额,微软、DeepL都要申请,很麻烦。索性用个拼音转换库,全都转换成拼音得了~
然后字段的长度也要确定,或者全部用不限制长度的 TextField
权衡一下,我还是做一下字段长度判定的逻辑,遍历整个表,找出各个字段最长的数据,然后再加一个偏移量,作为最大长度。
接着生成 Model 类,这里我用 jinja2 模板语言,先把大概的模板写好,然后根据提取出来的字段名啥的生成。
最后生成 admin 配置和导入脚本,同理,也是用 jinja2 模板。
3实现简单介绍下思路,现在开始上代码。
就几行而已,Python很省代码~
模型首先定义俩模型
字段模型classField(object):def__init__(self,name:str,verbose_name:str,max_length:int=128):self.name=nameself.verbose_name=verbose_nameself.max_length=max_lengthdef__str__(self):returnf"Model模型{self.name}:{self.verbose_name}"def__repr__(self):returnself.__str__()
为了符合Python关于变量的命名规范,snake_name属性是用正则表达式实现驼峰命名转蛇形命名
classModel(object):def__init__(self,name:str,verbose_name:str,id_field:Field,fields:List[Field]):self.name=nameself.verbose_name=verbose_nameself.id_field=id_fieldself.fields:List[Field]=fields@propertydefsnake_name(self):importrepattern=re.compile(r"(?{self.name}:{self.verbose_name}"def__repr__(self):returnself.__str__()代码模板
使用 jinja2 实现。
本身 jinja2 是 Flask、Django 之类的框架用来渲染网页的。
不过单独使用的效果也不错,我的 DjangoStarter 框架也是用这个 jinja2 来自动生成 CRUD 代码~
Model模板# -*- coding:utf-8 -*-from django.db import modelsclass {{ model.name }}(models.Model): """{{ model.verbose_name }}""" {% for field in model.fields -%} {{ field.name }} = models.CharField("{{ field.verbose_name }}", default="", null=True, blank=True, max_length={{ field.max_length }}) {% endfor %} class Meta: db_table = "{{ model.snake_name }}" verbose_name = "{{ model.verbose_name }}" verbose_name_plural = verbose_nameAdmin配置模板
@admin.register({{ model.name }})class {{ model.name }}Admin(admin.ModelAdmin): list_display = [{% for field in model.fields %}"{{ field.name }}", {% endfor %}] list_display_links = None def has_add_permission(self, request): return False def has_delete_permission(self, request, obj=None): return False def has_view_permission(self, request, obj=None): return False数据导入脚本
这里做了几件事:
使用 pandas 处理空值,填充空字符串已有数据进行批量更新新数据批量插入更新逻辑麻烦一点,因为数据库一般都有每次最大更新数量的限制,所以我做了分批处理,通过 update_data_once_max_lines控制每次最多同时更新多少条数据。
def import_{{ model.snake_name }}(): file_path = path_proc(r"{{ excel_filepath }}") logger.info(f"读取文件: {file_path}") xlsx = pd.ExcelFile(file_path) df = pd.read_excel(xlsx, 0, header={{ excel_header }}) df.fillna("", inplace=True) logger.info("开始处理数据") id_field_list = {{ model.name }}.objects.values_list("{{ model.id_field.name }}", flat=True) item_list = list({{ model.name }}.objects.all()) def get_item(id_value): for i in item_list: if i.shen_qing_ren_zheng_jian_hao_ma == id_value: return i return None insert_data = [] update_data_once_max_lines = 100 update_data_sub_set_index = 0 update_data = [[]] update_fields = set() for index, row in df.iterrows(): if "{{ model.id_field.verbose_name }}" not in row: logger.error("id_field {} is not existed".format("{{ model.id_field.verbose_name }}")) continue if row["{{ model.id_field.verbose_name }}"] in id_field_list: item = get_item(row["{{ model.id_field.verbose_name }}"]) {% for field in model.fields -%} if "{{ field.verbose_name }}" in row: if item.{{ field.name }} != row["{{ field.verbose_name }}"]: item.{{ field.name }} = row["{{ field.verbose_name }}"] update_fields.add("{{ field.name }}") {% endfor %} if len(update_data[update_data_sub_set_index]) >= update_data_once_max_lines: update_data_sub_set_index += 1 update_data.append([]) update_data[update_data_sub_set_index].append(item) else: # {% for field in model.fields -%}{{ field.verbose_name }},{%- endfor %} model_obj = {{ model.name }}() {% for field in model.fields -%} if "{{ field.verbose_name }}" in row: model_obj.{{ field.name }} = row["{{ field.verbose_name }}"] {% endfor %} insert_data.append(model_obj) logger.info("开始批量导入") {{ model.name }}.objects.bulk_create(insert_data) logger.info("导入完成") if len(update_data[update_data_sub_set_index]) > 0: logger.info("开始批量更新") for index, update_sub in enumerate(update_data): logger.info(f"正在更新 {index * update_data_once_max_lines}-{(index + 1) * update_data_once_max_lines} 条数据") {{ model.name }}.objects.bulk_update(update_sub, list(update_fields)) logger.info("更新完成")主体代码
剩下的全是核心代码了
引用依赖先把用到的库导入
importosimportrefromtypingimportList,Optionalfrompypinyinimportpinyin,lazy_pinyin,Stylefromjinja2importEnvironment,PackageLoader,FileSystemLoader
或者后面直接去我的完整代码里面拿也行~
类老规矩,我封装了一个类。
构造方法需要指定 Excel 文件地址,还有表头的行索引。
classExcelToModel(object):def__init__(self,filepath,header_index=0):self.filepath=filepathself.header_index=header_indexself.columns=[]self.fields:List[Field]=[]self.base_dir=os.path.dirname(os.path.abspath(__file__))self.template_path=os.path.join(self.base_dir,"templates")self.jinja2_env=Environment(loader=FileSystemLoader(self.template_path))self.load_file()
这里面有个 self.load_file()后面再贴。
字段名中文转拼音用了 pypinyin这个库,感觉还不错。
转换后用正则表达式,去除符号,只保留英文和数字。
代码如下,也是放在 ExcelToModel类里边。
@staticmethoddefto_pinyin(text:str)->str:pattern=r"~`!#$%^&*()_+-=|\";"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\["text=re.sub(r"[%s]+"%pattern,"",text)return"_".join(lazy_pinyin(text,style=Style.NORMAL))加载文件
拿出万能的 pandas,按照前面说的思路,提取表头转换成字段,并且遍历数据确定每个字段的最大长度,我这里偏移值是32,即在当前数据最大长度基础上加上32个字符。
defload_file(self):importpandasaspdxlsx=pd.ExcelFile(self.filepath)df=pd.read_excel(xlsx,0,header=self.header_index)df.fillna("",inplace=True)self.columns=list(df.columns)forcolinself.columns:field=Field(self.to_pinyin(col),col)self.fields.append(field)forindex,rowindf.iterrows():item_len=len(str(row[col]))ifitem_len>field.max_length:field.max_length=item_len+32print(field.verbose_name,field.name,field.max_length)
如果觉得这样生成表太慢,可以把确定最大长度的这块代码去掉,就下面这块代码
forindex,rowindf.iterrows():item_len=len(str(row[col]))ifitem_len>field.max_length:field.max_length=item_len+32
手动指定最大长度或者换成不限制长度的 TextField就行。
生成文件先构造个 context 然后直接用 jinja2 的 render功能生成代码。
为了在导入时判断数据存不存在,生成代码时要指定 id_field_verbose_name,即Excel文件中类似“证件号码”、“编号”之类的列名,注意是Excel中的表头列名。
deffind_field_by_verbose_name(self,verbose_name)->Optional[Field]:forfieldinself.fields:iffield.verbose_name==verbose_name:returnfieldreturnNonedefgenerate_file(self,model_name:str,verbose_name:str,id_field_verbose_name:str,output_filepath:str):template=self.jinja2_env.get_template("output.jinja2")context={"model":Model(model_name,verbose_name,self.find_field_by_verbose_name(id_field_verbose_name),self.fields),"excel_filepath":self.filepath,"excel_header":self.header_index,}withopen(output_filepath,"w+",encoding="utf-8")asf:render_result=template.render(context)f.write(render_result)使用
看代码。
tool=ExcelToModel("file.xlsx")tool.generate_file("CitizenFertility","房价与居民生育率","证件号码","output/citizen_fertility.py")
生成出来的代码都在一个文件里,请根据实际情况放到项目的各个位置。
4完整代码发布到Github了
地址: https://github.com/Deali-Axy/excel_to_model
5小结目前看来完美契合需求,极大节省工作量~
实际跑起来,不得不吐槽 Python 羸弱的性能,占内存还大… 凑合着用吧。也许后面有时间会优化一下~
相关阅读
-
世界热推荐:今晚7:00直播丨下一个突破...
今晚19:00,Cocos视频号直播马上点击【预约】啦↓↓↓在运营了三年... -
NFT周刊|Magic Eden宣布支持Polygon网...
Block-986在NFT这样的市场,每周都会有相当多项目起起伏伏。在过去... -
环球今亮点!头条观察 | DeFi的兴衰与...
在比特币得到机构关注之后,许多财务专家预测世界将因为加密货币的... -
重新审视合作,体育Crypto的可靠关系才能双赢
Block-987即使在体育Crypto领域,人们的目光仍然集中在FTX上。随着... -
简讯:前端单元测试,更进一步
前端测试@2022如果从2014年Jest的第一个版本发布开始计算,前端开发... -
焦点热讯:刘强东这波操作秀
近日,刘强东发布京东全员信,信中提到:自2023年1月1日起,逐步为...