飞书多维表格如何跨表复制?
〇、DuckCP开源
今天开源了一个数据同步的小工具 DuckCP[1],支持以下类型的数据源之间同步数据:
- PostgreSQL 数据库:以及兼容的数据库。例如 Hologres。
- MaxCompute 数据库:即 ODPS。
- SQLite 数据库:文件型 OLTP 数据库。
- DuckDB 数据库:文件型 OLAP 数据库。
- 本地文件:基于 DuckDB 实现,支持 CSV、Parquet、JSON 三种格式的读写。
- 飞书多维表格:基于 DuckDB 实现。多维表格被映射成 DuckDB 的只读 View,可执行 SQL 查询。
DuckCP于2023年9月开始在公司内部使用,前后重写了6个版本。最近遇到一位朋友有类似的需求,但他是要从钉钉同步数据到数据库,所以我上个月开始筹备开源,方便自主添加需要的功能。
也是好久没写文章了,就想着给大家讲讲故事,介绍一下DuckCP的前世今生。
第一版:DuckCP的诞生
2023年的时候,我接触到了保险线上获客业务,业务流程包含以下关键步骤:
- 公司在小红书、抖音等内容平台获客,找到对保险产品感兴趣的线索。
- 保险顾问在收到预约后会主动联系,通过电话沟通等方式进一步明确诉求,线索转为商机;然后按需分配给保险经纪公司或指定的经纪人。
- 保险经纪人会按需制定保险方案,并反馈进展。
最初大家使用Excel管理线索的信息与跟进状态。业务验证能跑通后,大家想借助系统提高管理效率。从市面上买一套成熟的客户关系管理系统(CRM,Customer Relationship Management)成本较高,普遍需要额外的定制开发以实现与保险经纪公司的云服平台打通。
经过调研,我推荐用飞书的多维表格与任务来实现:
- 首先创建一份『线索池』文档,记录所有线索的个人信息、分配信息、建联信息、跟进信息等,并用仪表盘创建业务大屏。
- 然后为每位提供线索的流量方创建各自的『线索提交』文档,并生成表单页面,方便提交新的线索。
- 最后为每位经纪人或保险经纪公司也创建各自的『商机』文档,记录商机的跟进状态。
表格的使用流程是:
- 流量方通过『线索提交』的表单提交新的线索后,线索将自动同步到『线索池』文档。
- 『线索池』文档配置了自动化流程,当收到新的线索需要跟进时,就会给保险顾问创建飞书任务,并发送飞书消息通知。
- 保险顾问评估完成后,关闭无效线索,或分配线索给经纪人或经纪公司;相应的飞书任务自动完成;有效线索同步到经纪人或经纪公司的『商机』文档。
- 『商机文档』同样配置了自动化流程,当收到新的商机需要跟进时,就会给内部的经纪人创建飞书任务,并发送飞书消息通知;或调用经纪公司的云服平台接口,同步新的商机。
- 经纪人修改商机的状态,或收到经纪公司云服平台的变更通知后,跟进状态会同步回『线索池』。
- 『线索池』中状态发生变更后,同样也会把状态同步回『线索提交』的文档内。
飞书已经满足大部分需求,并且不用做任何前端开发,只需要实现多维表格之间以及多维表格与远程接口之间同步数据就行,所以我用Python快速开发了内部DuckCP的第一版,用简单粗暴的方式实现数据同步功能。
第二版:领域特定语言
随着对接的保险经纪公司越来越多,不同的云服平台的数据字段、加密方式等都不尽相同。第一版代码写得比较随意,没有仔细推敲和设计,所以代码迅速腐化,变得像意大利面条一样,每次新接入一家经纪公司,需要修改好多处代码。于是我开始重写第二版:
- 根据积累的业务知识,我定义了一套统一的“线索”领域实体,其他实体都是“线索”的子集。
- 把所有接口操作统一成
insert
、update
、delete
和select
四个操作“线索”实体的CRUD接口; - 在CRUD基础上定义一套类似SQL的领域特定语言(DSL,Domain Specific Language),实现用于跨数据源同步数据。
后续新增平台时,只需实现CRUD四个接口,就能统一用DSL实现数据同步了。因为Python的DSL能力较弱,第二版我采用Kotlin/Native实现。以下是DSL示例:
select(客户手机, 平台昵称, 客户性别, 客户省份, 客户偏好, 客户留咨, 笔记标题)
.from(线索池)
.where((线索去向 eq "大童") and (是否提交 eq "false"))
.into(大童保险)
.returning
.updateTo(线索池)
上述代码从『线索池』文档中筛选出分配给“大童保险经纪公司”的并且尚未提交的线索;然后调用大童DAC系统的接口推送线索;最后把接口返回的业务编号等信息更新回『线索池』。
第三版:支持ODPS
Kotlin/Native开发构建过程有点久,但构建出来的文件尺寸很小,开发效率、程序运行性能、稳定性也都很优秀。第二版我们用了很久,到了今年3月我们遇到一个新业务,需要从阿里云的ODPS(即MaxCompute)同步数据到多维表格。但我发现,这次接入ODPS不像之前接入RESTful API那么简单:
- 我们使用的是免费版多维表格,每张数据表最多只能保存2万行数据;之前业务热数据并不多,可以全量保存在多维表格中;并且之前业务逻辑也只需要搬运数据,不需要加工数据,因此DSL可以满足需求。
- 这次业务需要处理的数据远超2万行,需要先执行一些复杂的SQL,把数据加工成数量较少的统计结果,再同步到多维表格用于展示。DSL无法满足复杂SQL的场景。
为快速支持业务,我想了一个折中的方案:
- 使用阿里云提供的命令行工具(odpscmd[2])执行复杂的SQL查询;
- 再用
tunnel
指令把查询结果下载到本地的CSV文件中; - DuckCP读取本地CSV文件的内容,并同步到多维表格。
因此,第三版的DuckCP并没有真正支持ODPS,只是支持读取本地CSV文件。
第四版:改回Python
通过调用多个命令才能完成一件事情,这显然不符合我的品味;而且odpscmd这个命令行工具很难用,执行速度慢,日志也没法关闭。所以第三版暂时能满足业务需求后,我正好腾出空挡去改造DuckCP,让它真正支持ODPS数据源。
我在阿里云MaxCompute产品文档里翻了半天,在API目录[3]文档里至今都找不到查询数据的API,好像是只能通过SDK执行SQL查询,但官方只提供了Java、Python、Go三种编程语言的SDK。经过权衡,我最终选择用Python重写DuckCP,因为用Kotlin/JVM重写Kotlin/Native代码的工作量也很大,Go语言我不熟悉,从简便的角度只有Python这一个选项了。
这次改造放弃了之前DSL的思路,改成执行外部SQL文件,后续数据处理逻辑发生变化时就不需要修改Python代码,直接修改SQL语句就行。改造完成后只需一条命令就能完成数据同步,性能比第三版执行多条命令要好。
第五版:支持Hologres
第四版完成没多久又来了新的需求:支持阿里云的实时数仓Hologres;并且Hologres数据需要与ODPS数据汇总计算。我的思路是:
- 使用psycopg2连接Hologres:因为Hologres兼容PostgreSQL。
- 使用crontab创建多个数据同步任务:因为Hologres数据实时更新,ODPS数据每天只更新一次,不同的数据源更新频率不同。
- 使用DuckDB在本地汇总两个数据源的数据:在DuckDB内可以JOIN不同数据源的数据。
因此,DuckCP的逻辑变成了:
- 同步ODPS数据至DuckDB;
- 同步Hologres数据至DuckDB;
- 同步DuckDB数据至多维表格。
改造后还带来了一个额外的好处:DuckDB文件内拥有全量数据。可以下载一个数据库的副本到本地用于分析,体验可比访问ODPS好太多了!
第六版:DuckCP开源
如前文所述,DuckCP在公司内部用了一年多,从未考虑过对外开源。因为核心功能只是把数据从其他数据源同步到多维表格,场景特别窄;即使有其他人遇到相同需求,自己也能快速实现。所以我一直不觉得把DuckCP开源出去有什么价值,直到最近我遇到以下两个场景:
- 在另一家公司遇到几乎一模一样的场景:也需要把Hologres和ODPS的数据同步到多维表格,但他们不需要汇总两个数据源的数据,因此不需要DuckDB做中间层。
- 如前文所述,一位在制造业工作的朋友希望把数据从钉钉同步到SQL Server数据库,并且需要在Windows上运行。
第一个场景让我相信还是会遇到有相似需求的客户的,并且他们可能没有程序员能帮助他们解决这些简单的需求;同时也让我意识到我把功能写得太死板了——数据只能固定地从Hologres/ODPS到DuckDB再到多维表格——应该做成允许任意两个数据源之间同步数据,然后允许随意组合。
第二个场景更是让我醍醐灌顶。其实除了电商、金融等少数行业,大多数行业的信息化程度都非常低,往往有非常多的信息孤岛,也有很多特殊的数据源。之前在为制造业做MES时,就遇到过一家企业采购了不同厂家的WMS系统与AGV系统,结果数据无法互通,两个厂家都要求很高的定制开发费用;还遇到过一家企业需要通过工业以太网PROFINET从西门子的PLC读取数据,并同步给远程的HTTP服务……这样的例子数不胜数。我当时也都是一个萝卜一个坑地处理,没想过搭建一套通用的数据交换网格。
这两个场景彻底改变了我的想法,于是我又再次重写了DuckCP,用以下四个概念描述所有数据同步任务:
- 数据仓库(Repository):定义各类数据仓库的连接信息。
- 存储单元(Storage):定义数据仓库内的存储单元。例如数据库的表、本地目录下的文件等。
- 迁移任务(Transformer):定义来源仓库、目标存储单元、迁移脚本(SQL)等迁移信息。
- 迁移作业(Task):定义可同时执行的迁移任务,及其执行顺序。
通过迁移任务,实现任意类型的数据仓库之间同步数据。后续接入新类型的数据源,就只需要实现Repository的接口即可。目前已内置了PostgreSQL、ODPS、DuckDB、SQLite、多维表格、本地文件(CSV、Parquet、JSON)六种数据仓库类型;同时,这次使用sqlglot[4]把DuckDB的SQL语句下推到多维表格的查询操作,真正实现统一用SQL查询所有数据源。
目前开源版的DuckCP已发布到Github上,后续我计划把之前在工业领域做的PROFINET等特殊数据源也加进来,希望能慢慢打磨出数据同步的netcat。
参考资料
[1] DuckCP: https://github.com/redraiment/duckcp
[2] odpscmd: https://help.aliyun.com/zh/maxcompute/user-guide/maxcompute-client
[3] API目录: https://help.aliyun.com/zh/maxcompute/user-guide/api-maxcompute-2022-01-04-dir/
[4] sqlglot: https://sqlglot.com