ma-cleaner移除图片水印大概流程:先在本地创建一个服务,然后通过浏览器标注需要处理的水印,然后发送请求,本地进行处理,并返回。因此,视频去水印的原理就是先提取视频的每一帧,然后依次发送给lama-cleaner进行处理,最终再合成一个视频。

创作灵感来自:

GitHub – david419kr/video-watermark-removal-script-for-lama-cleaner: A simple script for removing video watermark, using Lama Cleaner. Only tested at NVIDIA windows environment.

一、安装lama-cleaner

pip install lama-cleaner

一、代码思路

1.启动服务,在pycharm的终端里输入以下内容

lama-cleaner –model=lama –device=cpu –port=8080

2.判断文件是类型,图片、视频、其他

file_type = is_video_or_image(self.file_path)

3.获取视频中的一张图片,用来标注水印的位置

def get_mask_img(self):

cap = cv2.VideoCapture(self.file_path)

# 设置视频帧指针到第100帧

cap.set(cv2.CAP_PROP_POS_FRAMES, 99) # 注意:帧计数从0开始,所以第100帧是第99索引

success, frame = cap.read() # 读取当前帧(第100帧)

if success: # 确保成功读取帧

cv2.imwrite(‘mask_frame.png’, frame)

else:

print(“Failed to retrieve frame.”)

cap.release()

input(‘mask修改好了,请回车’)

self.mask = open(‘mask_frame.png’, “rb”).read()

mask_frame.png获取到后,用ps或其他图片软件将需要去除的水印区域用白色填充,图片的其他部分用黑色填充。python lama-cleaner批量去水印mask_frame.png获取到后,用ps或其他图片软件将需要去除的水印区域用白色填充,图片的其他部分用黑色填充。

4.将视频提取为每一帧

def video_to_frames(self):

# 确保存放帧的目录存在

if not os.path.exists(self.frames_path):

os.makedirs(self.frames_path)

else:

recreate_folder(self.frames_path)

if not os.path.exists(self.frames_out_folder):

os.makedirs(self.frames_out_folder)

# 使用OpenCV读取视频

vidcap = cv2.VideoCapture(self.file_path)

# 获取视频的总帧数

self.fps = int(vidcap.get(cv2.CAP_PROP_FPS))

success, image = vidcap.read()

count = 0

while success:

# 保存当前帧图片

frame_path = os.path.join(self.frames_path, f”{str(count).zfill(8)}.jpg”)

#cv2.imwrite(frame_path, image)不能保存中文路径

cv2.imencode(‘.jpg’, image)[1].tofile(frame_path) # 存储成功

success, image = vidcap.read() # 读取下一帧

count += 1

print(f”Finished! Extracted {count} frames.”)

5.提取音频

def extract_audio_from_video(self):

“””

从视频文件中提取音频并保存。

参数:

file_path (str): 视频文件的路径。

audio_path (str): 要保存的音频文件的路径。

“””

if not os.path.exists(self.audio_path):

os.makedirs(self.audio_path)

video_clip = VideoFileClip(self.file_path)

audio_clip = video_clip.audio

audio_clip.write_audiofile(os.path.join(self.audio_path, ‘out.mp3’))

audio_clip.close()

video_clip.close()

6.开始处理水印

for i in tqdm(os.listdir(self.frames_path), desc=”Processing frames”):

self.removeWatermark(img_path=os.path.join(self.frames_path, i), mask=self.mask, image_name=i)

def removeWatermark(self, img_path, mask, image_name):

url = “127.0.0.1:8080/inpaint

# print(img_path)

image = open(img_path, “rb”).read()

response = requests.post(

url,

files={“image”: image, “mask”: mask},

data={

“ldmSteps”: 25,

“ldmSampler”: “plms”,

“hdStrategy”: “Crop”,

“zitsWireframe”: True,

“hdStrategyCropMargin”: 196,

“hdStrategyCropTrigerSize”: 800,

“hdStrategyResizeLimit”: 2048,

“prompt”: “”,

“negativePrompt”: “”,

“useCroper”: False,

“croperX”: 64,

“croperY”: -16,

“croperHeight”: 512,

“croperWidth”: 512,

“sdScale”: 1.0,

“sdMaskBlur”: 5,

“sdStrength”: 0.75,

“sdSteps”: 50,

“sdGuidanceScale”: 7.5,

“sdSampler”: “uni_pc”,

“sdSeed”: -1,

“sdMatchHistograms”: False,

“cv2Flag”: “INPAINT_NS”,

“cv2Radius”: 5,

“paintByExampleSteps”: 50,

“paintByExampleGuidanceScale”: 7.5,

“paintByExampleMaskBlur”: 5,

“paintByExampleSeed”: -1,

“paintByExampleMatchHistograms”: False,

“paintByExampleExampleImage”: None,

“p2pSteps”: 50,

“p2pImageGuidanceScale”: 1.5,

“p2pGuidanceScale”: 7.5,

“controlnet_conditioning_scale”: 0.4,

“controlnet_method”: “control_v11p_sd15_canny”,

“paint_by_example_example_image”: None,

},

)

with open(f'{self.frames_out_folder}/{image_name}’, “wb”) as f:

f.write(response.content)

# print(image_name)

7.合成视频

def frames_to_video(self):

“””

将指定文件夹内的图像帧合并成视频文件。

参数:

– input_folder: 包含图像帧的文件夹路径。

– output_video_file: 输出视频文件的路径。

– fps: 视频的帧率(每秒帧数)。

“””

output_video_file = os.path.join(self.video_folder, ‘out.mp4’)

if not os.path.exists(self.video_folder):

os.makedirs(self.video_folder)

# 获取文件夹内所有文件的列表,并排序(确保顺序正确)

frame_files = sorted([os.path.join(self.frames_out_folder, f) for f in os.listdir(self.frames_out_folder) if

os.path.isfile(os.path.join(self.frames_out_folder, f))])

# 读取第一帧以获取视频尺寸

frame = cv2.imread(frame_files[0])

height, width, layers = frame.shape

# 定义视频编码器和创建VideoWriter对象

fourcc = cv2.VideoWriter_fourcc(*’mp4v’) # 或者使用 ‘XVID’,根据输出格式而定

video = cv2.VideoWriter(output_video_file, fourcc, self.fps, (width, height))

print(output_video_file)

# 遍历所有帧,添加到视频中

for file in frame_files:

frame = cv2.imread(file)

video.write(frame)

# 释放VideoWriter对象

video.release()

8.音频和视频合并

def merge_audio_video(self):

“””

合并音频和视频文件。

参数:

video_file_path (str): 视频文件的路径。

audio_file_path (str): 音频文件的路径。

output_file_path (str): 输出视频文件的路径。

“””

video_file_path = os.path.join(self.video_folder, ‘out.mp4’)

audio_file_path = os.path.join(self.audio_path, ‘out.mp3’)

name = os.path.basename(self.file_path)

output_file_path = os.path.join(self.result_folder, name)

if not os.path.exists(self.result_folder):

os.makedirs(self.result_folder)

# 加载视频文件

video_clip = VideoFileClip(video_file_path)

# 加载音频文件

audio_clip = AudioFileClip(audio_file_path)

# 设置视频的音频为加载的音频文件

final_clip = video_clip.set_audio(audio_clip)

# 导出合并后的视频文件

print(output_file_path)

final_clip.write_videofile(output_file_path, codec=’libx264′, audio_codec=’aac’)

二、完整的批量处理代码

# lama-cleaner –model=lama –device=cpu –port=8080

import cv2

import requests

import shutil

from moviepy.editor import VideoFileClip, AudioFileClip

import os

from tqdm import tqdm

def recreate_folder(folder_path):

try:

shutil.rmtree(folder_path) # 删除文件夹及其所有内容

os.makedirs(folder_path) # 创建新的同名文件夹

except Exception as e:

print(‘Failed to delete and recreate folder. Reason: %s’ % e)

def is_video_or_image(file_path):

video_extensions = [‘.mp4’, ‘.mov’, ‘.avi’, ‘.mkv’]

image_extensions = [‘.jpg’, ‘.jpeg’, ‘.png’, ‘.gif’]

_, file_extension = os.path.splitext(file_path)

if file_extension.lower() in video_extensions:

return ‘video’

elif file_extension.lower() in image_extensions:

return ‘image’

else:

return ‘unknown’

class removeVideoWatermark():

def __init__(self, file_path):

self.mask = None

self.file_path = file_path

self.fps = 30

folder_name = os.path.basename(self.file_path).split(‘.’)[0]

self.tmp_folder = os.path.join(‘C:\\Users\\35785\\Desktop\\tmp’, folder_name)

self.frames_path = os.path.join(‘C:\\Users\\35785\\Desktop\\tmp’, folder_name, ‘frames’)

self.audio_path = os.path.join(‘C:\\Users\\35785\\Desktop\\tmp’, folder_name, ‘audio’)

self.frames_out_folder = os.path.join(‘C:\\Users\\35785\\Desktop\\tmp’, folder_name, ‘frames_out’)

self.video_folder = os.path.join(‘C:\\Users\\35785\\Desktop\\tmp’, folder_name, ‘video’)

self.result_folder = os.path.join(‘C:\\Users\\35785\\Desktop\\tmp’, folder_name, ‘result’)

def start(self):

file_type = is_video_or_image(self.file_path)

if file_type == ‘image’:

# 待完善

pass

elif file_type == ‘video’:

self.get_mask_img()

self.video_to_frames()

self.extract_audio_from_video()

# 不使用多线程

for i in tqdm(os.listdir(self.frames_path), desc=”Processing frames”):

self.removeWatermark(img_path=os.path.join(self.frames_path, i), mask=self.mask, image_name=i)

self.frames_to_video()

self.merge_audio_video()

else:

pass

def get_mask_img(self):

cap = cv2.VideoCapture(self.file_path)

# 设置视频帧指针到第100帧

cap.set(cv2.CAP_PROP_POS_FRAMES, 99) # 注意:帧计数从0开始,所以第100帧是第99索引

success, frame = cap.read() # 读取当前帧(第100帧)

if success: # 确保成功读取帧

cv2.imwrite(‘mask_frame.png’, frame)

else:

print(“Failed to retrieve frame.”)

cap.release()

input(‘mask修改好了,请回车’)

self.mask = open(‘mask_frame.png’, “rb”).read()

def merge_audio_video(self):

“””

合并音频和视频文件。

参数:

video_file_path (str): 视频文件的路径。

audio_file_path (str): 音频文件的路径。

output_file_path (str): 输出视频文件的路径。

“””

video_file_path = os.path.join(self.video_folder, ‘out.mp4’)

audio_file_path = os.path.join(self.audio_path, ‘out.mp3’)

name = os.path.basename(self.file_path)

output_file_path = os.path.join(self.result_folder, name)

if not os.path.exists(self.result_folder):

os.makedirs(self.result_folder)

# 加载视频文件

video_clip = VideoFileClip(video_file_path)

# 加载音频文件

audio_clip = AudioFileClip(audio_file_path)

# 设置视频的音频为加载的音频文件

final_clip = video_clip.set_audio(audio_clip)

# 导出合并后的视频文件

print(output_file_path)

final_clip.write_videofile(output_file_path, codec=’libx264′, audio_codec=’aac’)

def extract_audio_from_video(self):

“””

从视频文件中提取音频并保存。

参数:

file_path (str): 视频文件的路径。

audio_path (str): 要保存的音频文件的路径。

“””

if not os.path.exists(self.audio_path):

os.makedirs(self.audio_path)

video_clip = VideoFileClip(self.file_path)

audio_clip = video_clip.audio

audio_clip.write_audiofile(os.path.join(self.audio_path, ‘out.mp3’))

audio_clip.close()

video_clip.close()

def video_to_frames(self):

# 确保存放帧的目录存在

if not os.path.exists(self.frames_path):

os.makedirs(self.frames_path)

else:

recreate_folder(self.frames_path)

if not os.path.exists(self.frames_out_folder):

os.makedirs(self.frames_out_folder)

# 使用OpenCV读取视频

vidcap = cv2.VideoCapture(self.file_path)

# 获取视频的总帧数

self.fps = int(vidcap.get(cv2.CAP_PROP_FPS))

success, image = vidcap.read()

count = 0

while success:

# 保存当前帧图片

frame_path = os.path.join(self.frames_path, f”{str(count).zfill(8)}.jpg”)

# cv2.imwrite(frame_path, image)中文路径会保存不了

cv2.imencode(‘.jpg’, image)[1].tofile(frame_path) # 存储成功

success, image = vidcap.read() # 读取下一帧

count += 1

print(f”Finished! Extracted {count} frames.”)

def removeWatermark(self, img_path, mask, image_name):

url = “127.0.0.1:8080/inpaint

# print(img_path)

image = open(img_path, “rb”).read()

response = requests.post(

url,

files={“image”: image, “mask”: mask},

data={

“ldmSteps”: 25,

“ldmSampler”: “plms”,

“hdStrategy”: “Crop”,

“zitsWireframe”: True,

“hdStrategyCropMargin”: 196,

“hdStrategyCropTrigerSize”: 800,

“hdStrategyResizeLimit”: 2048,

“prompt”: “”,

“negativePrompt”: “”,

“useCroper”: False,

“croperX”: 64,

“croperY”: -16,

“croperHeight”: 512,

“croperWidth”: 512,

“sdScale”: 1.0,

“sdMaskBlur”: 5,

“sdStrength”: 0.75,

“sdSteps”: 50,

“sdGuidanceScale”: 7.5,

“sdSampler”: “uni_pc”,

“sdSeed”: -1,

“sdMatchHistograms”: False,

“cv2Flag”: “INPAINT_NS”,

“cv2Radius”: 5,

“paintByExampleSteps”: 50,

“paintByExampleGuidanceScale”: 7.5,

“paintByExampleMaskBlur”: 5,

“paintByExampleSeed”: -1,

“paintByExampleMatchHistograms”: False,

“paintByExampleExampleImage”: None,

“p2pSteps”: 50,

“p2pImageGuidanceScale”: 1.5,

“p2pGuidanceScale”: 7.5,

“controlnet_conditioning_scale”: 0.4,

“controlnet_method”: “control_v11p_sd15_canny”,

“paint_by_example_example_image”: None,

},

)

with open(f'{self.frames_out_folder}/{image_name}’, “wb”) as f:

f.write(response.content)

# print(image_name)

def frames_to_video(self):

“””

将指定文件夹内的图像帧合并成视频文件。

参数:

– input_folder: 包含图像帧的文件夹路径。

– output_video_file: 输出视频文件的路径。

– fps: 视频的帧率(每秒帧数)。

“””

output_video_file = os.path.join(self.video_folder, ‘out.mp4’)

if not os.path.exists(self.video_folder):

os.makedirs(self.video_folder)

# 获取文件夹内所有文件的列表,并排序(确保顺序正确)

frame_files = sorted([os.path.join(self.frames_out_folder, f) for f in os.listdir(self.frames_out_folder) if

os.path.isfile(os.path.join(self.frames_out_folder, f))])

# 读取第一帧以获取视频尺寸

frame = cv2.imread(frame_files[0])

height, width, layers = frame.shape

# 定义视频编码器和创建VideoWriter对象

fourcc = cv2.VideoWriter_fourcc(*’mp4v’) # 或者使用 ‘XVID’,根据输出格式而定

video = cv2.VideoWriter(output_video_file, fourcc, self.fps, (width, height))

print(output_video_file)

# 遍历所有帧,添加到视频中

for file in frame_files:

frame = cv2.imread(file)

video.write(frame)

# 释放VideoWriter对象

video.release()

a = removeVideoWatermark(file_path=r’C:\Users\35785\Downloads\Video\tmp.mp4′)

a.start()