作者:荣姐聊AI
链接:https://zhuanlan.zhihu.com/p/1926576366784091941
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

相信我们每个人,多少都和发票打过交道,有不少人都体验过报销时整理票据的烦恼。

如果你是开公司的小伙伴,还没有专职财务,那每月处理发票的滋味,就更是一言难尽了。

当然,如果只是一两张发票,现在随便一个AI模型都能帮你搞定。但真正的考验是,当几十上百张发票堆在面前,并且每一个数据都要求精准无误时,该怎么办?

尤其对于小微企业和代理记账公司来说,这几乎是每月固定的体力活。

当用AI处理掉这些琐碎事情的时候,真的挺有成就感的,不管是自己的需求还是帮朋友解决的问题,都能让我们暂时喘口气。

今天,我就想完整分享一下,我是如何用Dify来搭建一套批量处理发票的自动化流程,彻底告别这种烦恼。

下图是在官方税务查询的界面,但是数据还是不够明细。

1 准备工作

在正式搭建工作流之前,需要完成两项准备。

1.1 发票PDF文件转图片

先聊一下为什么要转成图片吧,开始用文档提取器识别发票的PDF文件,但是识别的效果不好,尤其是商品详情那块,格式有点乱,识别的结果总是串行。

所以最后决定用阿里千问的多模型来做发票识别,把本地发票的PDF文件统一转换成图片进行输出。

代码中需要用到一个开源工具poppler:

github.com/oschwartz106

转换代码:pdf_to_images.py

from pdf2image import convert_from_path
import os

# 配置参数
input_folder = 'pdf'  # 当前目录下的 pdf 文件夹
output_folder = 'output_images'  # 图片输出文件夹
poppler_path = os.path.abspath('poppler-24.08.0/Library/bin')

if not os.path.exists(output_folder):
    os.makedirs(output_folder)

# 批量处理 pdf 文件夹下所有 PDF 文件
for filename in os.listdir(input_folder):
    if filename.lower().endswith('.pdf'):
        pdf_path = os.path.join(input_folder, filename)
        pdf_base_name = os.path.splitext(os.path.basename(pdf_path))[0]
        try:
            for i, page in enumerate(convert_from_path(pdf_path, poppler_path=poppler_path)):
                img_path = os.path.join(output_folder, f'{pdf_base_name}_page_{i+1}.png')
                page.save(img_path, 'PNG')
            print(f'{filename} 已成功转换为图片并保存到文件夹!')
        except Exception as e:
            print(f'转换 {filename} 时出错:{e}') 

本地执行:

转换后的目录:

1.2 创建飞书多维表格

工作流最后是写入到飞书多维表格中,需要创建一个多维表格,记录url地址。

多维表格需要关联飞书应用,这块不会的参考我以前的文章:

Dify接入飞书多维表格

我这里是提取了发票上能提取的所有信息

开票日期是日期类型,合计金额、合计税额、价税合计三个字段为数字类型,其他为文本类型。

2 工作流搭建

工作流就是上传图片->大模型分析数据->代码节点格式转换->写入飞书多维表格->输出结果。

2.1 开始节点

开始节点增加一个upload参数,单文件,图片类型。

2.2 大模型发票识别节点

这是实现发票识别的核心节点。

  • 模型选择:我选用了支持多模态视觉理解的Qwen2.5-VL-72B-Instruct模型。
  • 视觉能力:必须打开该模型节点的“视觉”开关,并将开始节点的upload变量传入。
  • Prompt设计:为了让模型能稳定、准确地返回结构化数据,需要在提示词中明确定义AI的角色、任务,并给出了严格的JSON输出格式要求,包括处理项目列表、数据清洗规则等。

提示词如下:

# 角色
你是一个专业的发票信息提取助手。

# 任务
你的任务是从图片中,精准地提取所有关键信息,并严格按照指定的 JSON 格式输出。请忽略文本中与发票无关的干扰项(如“下载次数”)。

# 输入变量
发票的原始文本内容将通过变量 `{{upload}}` 传入。

# 输出要求
1.  **严格的JSON格式**:你的回答必须是一个完整的、格式正确的 JSON 对象,不能包含任何 JSON 之外的解释、注释或文字。
2.  **数据清洗**:
    * 对于金额和数量,只保留数字,去除货币符号(如 `¥`)。
    * 对于日期,统一输出为 `YYYY-MM-DD` 格式。
    * 对于税率,输出百分比字符串,例如 `"13%"`。
    * 如果某一项信息在发票中不存在,请使用 `null` 或者空字符串 `""` 作为值。
3.  **处理项目列表**:发票中的“项目名称”部分可能包含多个商品或服务,甚至可能包含折扣(金额为负数)。你需要将每一个项目都作为一个独立的对象,放入 `items` 数组中。

# JSON 输出格式

```json
{
  "invoice_title": "string",
  "invoice_number": "string",
  "issue_date": "string",
  "buyer_info": {
    "name": "string",
    "tax_id": "string"
  },
  "seller_info": {
    "name": "string",
    "tax_id": "string"
  },
  "items": [
    {
      "name": "string",
      "model": "string",
      "unit": "string",
      "quantity": "number | null",
      "unit_price": "number | null",
      "amount": "number",
      "tax_rate": "string",
      "tax_amount": "number"
    }
  ],
  "total_amount_exclusive_tax": "number",
  "total_tax_amount": "number",
  "total_amount_inclusive_tax": {
    "in_words": "string",
    "in_figures": "number"
  },
  "remarks": "string",
  "issuer": "string"
}

2.3 代码节点

LLM节点返回的JSON数据无法直接被飞书插件使用,代码节点就是把大模型分析的json数组转成飞书多维表格插件可以识别的格式。

这里需要注意两个问题:

1.飞书API要求的日期格式为毫秒级时间戳;

2.LLM的输出可能被Markdown代码块包裹。

完整代码如下:

import json
import re
from datetime import datetime, timezone

def main(output: str) -> dict:
    # 去除 markdown 代码块标记
    output = re.sub(r"^```[\w]*\s*", "", output.strip(), flags=re.IGNORECASE)
    output = re.sub(r"```$", "", output.strip())
    output = output.strip()

    data = json.loads(output)
    # 日期字段转为时间戳(毫秒)
    date_str = data.get("issue_date", "").replace("/", "-").replace(".", "-").strip()
    # 转为时间戳(毫秒)
    if date_str:
        dt = datetime.strptime(date_str, "%Y-%m-%d")
        # 如果你需要时区,改为 dt.replace(tzinfo=timezone.utc)
        timestamp = int(dt.replace(tzinfo=timezone.utc).timestamp() * 1000)
    else:
        timestamp = None

    items_str = json.dumps(data["items"], ensure_ascii=False)
    record = {
        "发票抬头(invoice_title)": data.get("invoice_title", ""),
        "发票号码(invoice_number)": data.get("invoice_number", ""),
        "开票日期(issue_date)": timestamp,
        "购买方名称(buyer_info.name)": data["buyer_info"].get("name", ""),
        "购买方税号(buyer_info.tax_id)": data["buyer_info"].get("tax_id", ""),
        "销售方名称(seller_info.name)": data["seller_info"].get("name", ""),
        "销售方税号(seller_info.tax_id)": data["seller_info"].get("tax_id", ""),
        "商品详情(items)": items_str,
        "合计金额(total_amount_exclusive_tax)": data.get("total_amount_exclusive_tax", ""),
        "合计税额(total_tax_amount)": data.get("total_tax_amount", ""),
        "价税合计大写(total_amount_inclusive_tax.in_words)": data["total_amount_inclusive_tax"].get("in_words", ""),
        "价税合计小写(total_amount_inclusive_tax.in_figures)": data["total_amount_inclusive_tax"].get("in_figures", ""),
        "备注(remarks)": data.get("remarks", ""),
        "开票人(issuer)": data.get("issuer", "")
    }
    return {
        "result": json.dumps([record], ensure_ascii=False)
    }

2.4 飞书多维表格插件

填写两个参数。

  • app_token为之前创建的飞书多维表格的url地址。
  • 记录列表是代码节点转换的结果。

这里注意一下,飞书多维表格的日期类型,传入的是时间戳,所以代码节点这块做了一层转换的。

2.5 结束节点

为了测试方便,返回LLM节点识别出来的发票数据。

3 测试

3.1 工作流测试

上传一个发票图片进行测试。

整体JSON结构如下:

```json
{
  "invoice_title": "电子发票(普通发票)",
  "invoice_number": "2511XXX120",
  "issue_date": "2025-06-06",
  "buyer_info": {
    "name": "中国XXX大学",
    "tax_id": "A1XXX9621"
  },
  "seller_info": {
    "name": "北京XXX限公司",
    "tax_id": "91XXXXX057T"
  },
  "items": [
    {
      "name": "*电子元件*RFID读卡器",
      "model": "FX600-B",
      "unit": "个",
      "quantity": 1,
      "unit_price": 116.831683168317,
      "amount": 116.83,
      "tax_rate": "1%",
      "tax_amount": 1.17
    },
    {
      "name": "*电子元件*NFC标签卡",
      "model": "M1",
      "unit": "个",
      "quantity": 25,
      "unit_price": 1.039603960396,
      "amount": 25.99,
      "tax_rate": "1%",
      "tax_amount": 0.26
    }
  ],
  "total_amount_exclusive_tax": 142.82,
  "total_tax_amount": 1.43,
  "total_amount_inclusive_tax": {
    "in_words": "壹佰肆拾肆圆贰角伍分",
    "in_figures": 144.25
  },
  "remarks": "",
  "issuer": "XX"
}
```

飞书多维表格会多一行记录。代码工作流执行正常。

3.2 API接口批量执行测试

3.2.1 创建API密钥

记住这个API秘钥:app-zT4UQvqqeLKfq0hX8sKDAOxj,后面脚本中会用到。

3.2.2 编写脚本调用工作流API

批量处理本地发票文件,是转换好的图片文件。

这个脚本会遍历本地

output_images文件夹中的所有图片,对每张图片执行两个步骤:

  • 调用Dify的/v1/files/upload接口上传图片文件。
  • 使用返回的file_id,调用/v1/workflows/run接口来执行工作流。

参数类型记得是图片,与开始节点我们选择的类型一致。

batch_invoice_recognize.py

import os
import requests
import json
import csv

# 配置区
API_KEY = "app-zT4UQvqqeLKfq0hX8sKDAOxj"
USER = "rongjie"
DIFY_SERVER = "http://127.0.0.1"  # 你的Dify服务器地址
DIR_PATH = "output_images"         # 需要批量处理的图片目录
RESULT_FILE = "result.csv"         # 结果保存文件

def upload_image(file_path, api_key):
    url = f'{DIFY_SERVER}/v1/files/upload'
    headers = {'Authorization': f'Bearer {api_key}'}
    ext = os.path.splitext(file_path)[-1].lower()
    mime = 'image/png' if ext == '.png' else 'image/jpeg'
    with open(file_path, 'rb') as file:
        files = {'file': (os.path.basename(file_path), file, mime)}
        data = {'user': USER}
        response = requests.post(url, headers=headers, files=files, data=data)
    if response.status_code == 201:
        print(f"[上传成功] {file_path}")
        resp = response.json()
        print("上传返回信息:", resp)
        return resp['id']
    else:
        print(f"[上传失败] {file_path},状态码: {response.status_code},返回: {response.text}")
        return None

def run_workflow(file_id, api_key):
    url = f"{DIFY_SERVER}/v1/workflows/run"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    data = {
        "inputs": {
            "upload": {
                "type": "image",
                "transfer_method": "local_file",
                "upload_file_id": file_id
            }
        },
        "user": USER,
        "response_mode": "blocking"
    }
    import pprint
    print("工作流调用参数:")
    pprint.pprint(data)
    response = requests.post(url, headers=headers, json=data)
    print("工作流返回内容:", response.status_code, response.text)
    if response.status_code == 200:
        resp_json = response.json()
        wf_status = resp_json.get("data", {}).get("status")
        if wf_status == "succeeded":
            outputs = resp_json.get("data", {}).get("outputs")
            print(f"[工作流执行成功] 文件ID: {file_id}")
            return resp_json, True, outputs
        else:
            error_info = resp_json.get("data", {}).get("error") or resp_json.get("message")
            print(f"[工作流内部失败] 文件ID: {file_id},状态: {wf_status}")
            return resp_json, False, error_info
    else:
        print(f"[工作流执行失败] 文件ID: {file_id},状态码: {response.status_code},返回: {response.text}")
        return {"status": "error", "message": f"Failed to execute workflow, status code: {response.status_code}"}, False, response.text

def batch_process_images(dir_path, result_file):
    results = []
    for filename in os.listdir(dir_path):
        file_path = os.path.join(dir_path, filename)
        if not os.path.isfile(file_path) or not filename.lower().endswith((".jpg", ".jpeg", ".png")):
            continue
        print(f"\n处理文件: {file_path}")
        file_id = upload_image(file_path, API_KEY)
        upload_status = "成功" if file_id else "失败"
        workflow_status = "失败"  # 默认失败
        result_info = ""
        if file_id:
            result, success, info = run_workflow(file_id, API_KEY)
            workflow_status = "成功" if success else "失败"
            result_info = json.dumps(info, ensure_ascii=False) if info else ""
        else:
            result_info = "上传失败"
        results.append({
            "filename": filename,
            "upload_status": upload_status,
            "workflow_status": workflow_status,
            "result_info": result_info
        })
    # 写入CSV文件
    with open(result_file, "w", newline='', encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=["filename", "upload_status", "workflow_status", "result_info"])
        writer.writeheader()
        for row in results:
            writer.writerow(row)
    print(f"\n所有结果已写入 {result_file}")

if __name__ == "__main__":
    batch_process_images(DIR_PATH, RESULT_FILE) 

结果会记录到本地result.csv文件中。记录文件上传的状态、工作流执行的结果、工作流输出的结果。

截取的部分飞书多维表格的数据

经过这一系列操作,成功打造了一个高效的自动化工具。

现在,只需要将发票的PDF文件放入指定文件夹,运行两个脚本,所有发票的数据就能自动、准确地归档到飞书多维表格中,甚至可以直接利用表格生成各种维度的统计图表。

看到数据唰唰地自动写入表格,我心里总会涌起一种特别的开心。

看看下个阶段是不是优化一下这个流程,比如Python程序做成GUI界面,或者把记录到飞书表格换成写到本地Excel表格。