图像畸变-径向切向畸变实时图像RTSP推流

实验环境

注意:ffmpeg进程stdin写入两张图片的时间间隔不能太长,否则mediamtx会出现对应的推流session超时退出。

在这里插入图片描述
在这里插入图片描述

实验效果

在这里插入图片描述

全部代码

my_util.py

#进度条
import os
import sys
import time
import shutil
import logging
import time
from datetime import datetimedef print_progress_bar(iteration, total, prefix='', suffix='', decimals=1, length=100, fill='█', print_end="\r"):"""调用在Python终端中打印自定义进度条的函数iteration - 当前迭代(Int)total - 总迭代(Int)prefix - 前缀字符串(Str)suffix - 后缀字符串(Str)decimals - 正数的小数位数(Int)length - 进度条的长度(Int)fill - 进度条填充字符(Str)print_end - 行尾字符(Str)"""percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total)))filled_length = int(length * iteration // total)bar = fill * filled_length + '-' * (length - filled_length)print(f'\r{prefix} |{bar}| {percent}% {suffix}', end=print_end)# 打印新行,完成进度条if iteration == total:print()class Logger(object):"""终端打印不同颜色的日志"""ch = logging.StreamHandler()  # 创建日志处理器对象,在__init__外创建,是类当中的静态属性,不是__init__中的实例属性# #创建静态的日志处理器可以减少内存消耗# # 创建 FileHandler 实例,指定日志文件路径# ch = logging.FileHandler(filename='app1.log')def __init__(self):self.logger = logging.getLogger()  # 创建日志记录对象self.logger.setLevel(logging.DEBUG)  # 设置日志等级info,其他低于此等级的不打印def debug(self, message):self.fontColor('\033[0;37m%s\033[0m')self.logger.debug(message)def info(self, message):self.fontColor('\033[0;32m%s\033[0m')self.logger.info(message)def warning(self, message):self.fontColor('\033[0;33m%s\033[0m')self.logger.warning(message)def error(self, message):self.fontColor('\033[0;31m%s\033[0m')self.logger.error(message)def fontColor(self, color):formatter = logging.Formatter(color % '%(asctime)s - %(name)s - %(levelname)s - %(message)s')  # 控制日志输出颜色self.ch.setFormatter(formatter)self.logger.addHandler(self.ch)  # 向日志记录对象中加入日志处理器对象def delete_files(folder_path, max_files):"""监控指定文件夹中的文件数量,并在超过max_files时删除最旧的文件。"""print("进入删除图片文件夹"+folder_path)print("需要删除文件数量")print(max_files)if True:# 获取文件夹中的文件列表files = os.listdir(folder_path)file_count = len(files)print(f"当前文件夹 {folder_path} 中的文件数量: {file_count}")# 如果文件数量超过max_files,则删除最旧的文件if file_count > max_files:# 获取文件夹中所有文件的完整路径,并带上修改时间file_paths_with_mtime = [(os.path.join(folder_path, f), os.path.getmtime(os.path.join(folder_path, f))) forf in files]# 按修改时间排序sorted_files = sorted(file_paths_with_mtime, key=lambda x: x[1])# 删除最旧的文件,直到文件数量在阈值以下for file_path, mtime in sorted_files[:file_count - max_files]:try:os.remove(file_path)print(f"已删除文件: {file_path}")except OSError as e:print(f"删除文件时出错: {e.strerror}")def copy_file(src, dst):shutil.copy2(src, dst)  # copy2会尝试保留文件的元数据def end_sentence(text, max_length):'''保证在max_length长度前以句号或点号结束文本:param text: 文本:param max_length: 最大长度:return:'''# 如果文本长度已经超过最大长度,则直接截断if len(text) > max_length:text = text[:max_length]# print("结果长度 {}".format(len(text)))# 查找句号的位置(en)period_index = max(text.rfind('.'), text.rfind(','),text.rfind(':'), text.rfind(';'),text.rfind('!'), text.rfind('?'))  # 从后往前找,找到最后一个句号# 如果找到了句号且它在最大长度内if period_index != -1 and (period_index + 1 < max_length ormax_length == -1):# 如果需要替换,则替换句号text = text[:period_index] + '.'# 查找句号的位置(cn)period_index = max(text.rfind('。'), text.rfind(','),text.rfind(':'), text.rfind(';'),text.rfind('!'), text.rfind('?'))  # 从后往前找,找到最后一个句号# 如果找到了句号且它在最大长度内if period_index != -1 and (period_index + 1 < max_length ormax_length == -1):# 如果需要替换,则替换句号text = text[:period_index] + '。'return textimport base64def encode_base64(input_string):"""对字符串进行Base64编码"""encoded_bytes = base64.b64encode(input_string.encode('utf-8'))encoded_string = encoded_bytes.decode('utf-8')return encoded_stringdef decode_base64(input_string):"""对Base64编码的字符串进行解码"""decoded_bytes = base64.b64decode(input_string.encode('utf-8'))decoded_string = decoded_bytes.decode('utf-8')return decoded_stringimport socketdef get_local_ip():try:# 创建一个 UDP 套接字s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)# 连接到一个公共的 IP 地址和端口s.connect(("8.8.8.8", 80))# 获取本地 IP 地址local_ip = s.getsockname()[0]s.close()return local_ipexcept Exception as e:print(f"获取本地 IP 地址时出错: {e}")return None

make_pics.py

import numpy as np
import cv2
import math
import time
from PIL import Image, ImageDraw, ImageFontdef distort_image(image, k1, k2, p1, p2):"""对图像应用径向和切向畸变:param image: 输入图像:param k1: 径向畸变系数:param k2: 径向畸变系数:param p1: 切向畸变系数:param p2: 切向畸变系数:return: 畸变后的图像"""h, w = image.shape[:2]camera_matrix = np.array([[w, 0, w / 2],[0, h, h / 2],[0, 0, 1]], dtype=np.float32)distort_coeffs = np.array([k1, k2, p1, p2, 0], dtype=np.float32)# 生成畸变映射map1, map2 = cv2.initUndistortRectifyMap(camera_matrix, distort_coeffs, np.eye(3), camera_matrix, (w, h), cv2.CV_32FC1)# 应用畸变映射distorted_img = cv2.remap(image, map1, map2, cv2.INTER_LINEAR)return distorted_imgdef put_chinese_text(img, text, position, font_path, font_size, color):"""在图像上添加中文文字:param img: 输入图像:param text: 要添加的文字:param position: 文字位置:param font_path: 字体文件路径:param font_size: 字体大小:param color: 文字颜色:return: 添加文字后的图像"""img_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))draw = ImageDraw.Draw(img_pil)font = ImageFont.truetype(font_path, font_size)draw.text(position, text, font=font, fill=color)return cv2.cvtColor(np.array(img_pil), cv2.COLOR_RGB2BGR)def make_pics(pic_path="picture.jpg", k1=-0.5, k2=0.0, p1=0.0, p2=0.0):# 生成棋盘图像chessboard = np.zeros((400, 400, 3), dtype=np.uint8)for i in range(0, 400, 40):for j in range(0, 400, 40):if (i // 40 + j // 40) % 2 == 0:chessboard[i:i + 40, j:j + 40] = [255, 255, 255]# 生成雷达图radar = chessboard.copy()x0, y0 = radar.shape[1] // 2, radar.shape[0] // 2for radius in range(0, 400, 40):cv2.circle(radar, (x0, y0), radius, (0, 0, 255), 1)# 绘制径向线for angle in range(0, 360, 40):# 使用最大半径 400 计算径向线的终点坐标x = int(x0 + 400 * math.cos(math.radians(angle)))y = int(y0 + 400 * math.sin(math.radians(angle)))cv2.line(radar, (x0, y0), (x, y), (0, 0, 255), 1)font_size = 15font_color = (250, 100, 0)combined_distorted_chessboard = distort_image(radar, k1, k2, p1, p2)text1 = "k1={:.2f},k2={:.2f},p1={:.2f},p2={:.2f}".format(k1,k2,p1,p2)text2 = "图像畸变"combined_distorted_chessboard = put_chinese_text(combined_distorted_chessboard, text1, (10, 30), 'simhei.ttf', font_size, font_color)combined_distorted_chessboard = put_chinese_text(combined_distorted_chessboard, text2, (10, 60), 'simhei.ttf', font_size, font_color)# 保存图像cv2.imwrite(pic_path, combined_distorted_chessboard)# cv2.imshow(pic_path, combined_distorted_chessboard)# cv2.waitKey(0)# cv2.destroyAllWindows()returnif False:for k1 in np.arange(-100,100,0.1):for k2 in np.arange(-100, 100, 0.1):for p1 in np.arange(-100, 100, 0.1):for p2 in np.arange(-100, 100, 0.1):make_pics("picture.jpg", k1, k2, p1, p2)

pic_2_rtsp.py

import numpy as np
import make_pics
import sys
import msvcrt
import subprocess
import time
import shlex
import my_util
from PIL import Image, ImageDraw
import random
import oslog = my_util.Logger()
# RTSP_DEF_IP = "192.168.31.185"
RTSP_DEF_IP = my_util.get_local_ip()
RTSP_PORT = 8554
local_ip = my_util.get_local_ip()
if local_ip:RTSP_URL = "rtsp://{}:{}/live".format(local_ip, RTSP_PORT)
else:RTSP_URL = "rtsp://{}:{}/live".format(RTSP_DEF_IP, RTSP_PORT)
frame_duration = 1/25  # 每张图片显示的时长(秒),process.stdin.wirte写入速度需要够快,否则可能接收端接受数据不足无法获取解码信息(抓包看看)
frame_num = 0old_picname = "past.jpg"
new_picname = "now.jpg"
k1 = k2 = p1 = p2 = 0.0def generate_orig(old_picname):"""生成默认图片"""image = Image.new('RGB', (640, 480), color=(random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)))draw = ImageDraw.Draw(image)draw.text((100, 100), 'No Signal', fill=(255, 255, 255))image.save(old_picname)returndef generate_image(new_picname, k1, k2, p1, p2):"""生成图片"""make_pics.make_pics(new_picname, k1, k2, p1, p2)return# 构建 ffmpeg 命令使用图片序列
command_line = 'ffmpeg -loglevel error -re -i - -c:v libx264 -pix_fmt yuv420p -r {} -f rtsp {}'.format(1/frame_duration, RTSP_URL)
command_list = shlex.split(command_line)log.debug(command_list)def start_process():global processprocess = subprocess.Popen(command_list, stdin=subprocess.PIPE, text=False)log.info("Process started.")start_time = time.time()class QuitException(Exception):passtry:# 默认先生成初始图step_length = 0.125generate_orig(old_picname)start_process()while True:for p1 in np.arange(-1 * step_length, step_length, step_length):for p2 in np.arange(-1 * step_length, step_length, step_length):for k1 in np.arange(-1, 1, step_length):for k2 in np.arange(-1, 1, step_length):log.debug("畸变系数 k1={}, k2={}, p1={}, p2={}".format(k1, k2, p1, p2))generate_image(new_picname, k1, k2, p1, p2)if msvcrt.kbhit():  # 检查是否有键盘输入input_char = msvcrt.getch().decode('utf-8')if input_char == 'q' or input_char == 'Q':try:# 向进程的标准输入发送 'q' 并换行if process.stdin:process.stdin.write('q\n'.encode())process.stdin.flush()except Exception as e:passraise QuitException()# 持续生成新图片替换旧图片try:if os.path.exists(new_picname):with open(new_picname, 'rb') as f:process.stdin.write(f.read())else:with open(old_picname, 'rb') as f:process.stdin.write(f.read())except Exception as e:log.error(f"Error writing to process stdin: {e}")log.info("Restarting process...")process.terminate()try:process.wait(timeout=1)except subprocess.TimeoutExpired:process.kill()start_process()time.sleep(frame_duration)except QuitException:pass
finally:try:process.terminate()try:process.wait(timeout=1)except subprocess.TimeoutExpired:process.kill()except Exception:passtry:if os.path.exists(new_picname):os.remove(new_picname)except Exception as e:log.error(f"Error removing {new_picname}: {e}")end_time = time.time()time_cnt = end_time - start_timelog.info("FFmpeg进程已执行{}秒并通过输入 'q' 退出。".format(round(time_cnt)))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/78825.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis Sentinel 和 Redis Cluster 各自的原理、优缺点及适用场景是什么?

我们来详细分析下 Redis Sentinel (哨兵) 和 Redis Cluster (集群) 这两种方案的原理和使用场景。 Redis Sentinel (哨兵) 原理: Sentinel 本身是一个或一组独立于 Redis 数据节点的进程。它的核心职责是监控一个 Redis 主从复制 (Master-Slave) 架构。多个 Sentinel 进程协同…

基于机器学习的电影票房预测

目录 摘 要(完整下载链接附在文末) Abstract 1 绪 论 1.1 研究背景概述 1.2 国内外相关领域研究进展 1.3 电影票房预测技术概览 1.3.1 利用人口统计学特征的方法 1.3.2 基于机器学习的预测模型 2 机器学习相关理论介绍与分析 2.1 机器学习算法理论 2.1.1卷积…

SVMSPro平台获取HTTP-FLV规则

SVMSPro平台获取HTTP-FLV规则 HTTP-FLV的服务端口为&#xff1a;53372&#xff0c;如需要公网访问需要开启这个端口 这里讲的是如何获取长效URL&#xff0c;短效&#xff08;时效性&#xff09;URL也支持&#xff0c;下回讲 一、如何获取HTTP-FLV实时流视频 http://host:po…

ARM架构的微控制器总线矩阵

在 ARM 架构的微控制器&#xff08;MCU&#xff09;中&#xff0c;总线矩阵&#xff08;Bus Matrix&#xff09; 是总线系统的核心互连结构&#xff0c;负责协调多个主设备&#xff08;如 CPU、DMA、以太网控制器等&#xff09;对多个从设备&#xff08;如 Flash、SRAM、外设等…

AI赋能金融:智能投顾、风控与反欺诈的未来

AI赋能金融&#xff1a;智能投顾、风控与反欺诈的未来 系统化学习人工智能网站&#xff08;收藏&#xff09;&#xff1a;https://www.captainbed.cn/flu 文章目录 AI赋能金融&#xff1a;智能投顾、风控与反欺诈的未来摘要引言一、智能投顾&#xff1a;从经验驱动到人机协同…

【机器学习】朴素贝叶斯

目录 一、朴素贝叶斯的算法原理 1.1 定义 1.2 贝叶斯定理 1.3 条件独立性假设 二、朴素贝叶斯算法的几种常见类型 2.1 高斯朴素贝叶斯 (Gaussian Naive Bayes) 【训练阶段】 - 从数据中学习模型参数 【预测阶段】 - 对新样本 Xnew​ 进行分类 2. 2 多项式朴素贝叶斯 (…

鸿蒙 ArkTS 组件 通用事件 通用属性 速查表

ArkTS 组件 组件 通用事件 速查表 通用事件事件名称简要说明点击事件onClick(event: Callback<ClickEvent>, distanceThreshold: number): T相较于原有 onClick 接口&#xff0c;新增 distanceThreshold 参数作为点击事件移动阈值&#xff0c;当手指的移动距离超出所设…

Java云原生+quarkus

一、Java如何实现云原生应用&#xff1f; 传统的 Java 框架&#xff08;如 Spring Boot&#xff09;虽然功能强大&#xff0c;但在云原生场景下可能显得笨重。以下是一些更适合云原生的轻量级框架&#xff1a; Quarkus(推荐) 专为云原生和 Kubernetes 设计的 Java 框架。支持…

C语言教程(二十三):C 语言强制类型转换详解

一、强制类型转换的概念 强制类型转换是指在程序中手动将一个数据类型的值转换为另一种数据类型。在某些情况下,编译器可能不会自动进行类型转换,或者自动转换的结果不符合我们的预期,这时就需要使用强制类型转换来明确指定要进行的类型转换。 二、强制类型转换的语法 强制类…

Spring Boot × K8s 监控实战-集成 Prometheus 与 Grafana

在微服务架构中&#xff0c;应用的可观测性至关重要。Kubernetes 已成为容器化部署的标准&#xff0c;但其自身的监控能力有限&#xff0c;需要与其他工具集成才能实现详细的运行数据采集与分析。 本文将通过 Spring Boot Kubernetes Prometheus Grafana 实战&#xff0c;打…

phpstudy修改Apache端口号

1. 修改Listen.conf文件 本地phpstudy安装目录&#xff1a; 2.其他问题 ① 修改httpd.conf不起作用 ② 直接通过控制面板配置好像有延迟缓存

(done) 吴恩达版提示词工程 6. 转换 (翻译,通用翻译,语气风格变换,文本格式转换,拼写检查和语法检查)

视频&#xff1a;https://www.bilibili.com/video/BV1Z14y1Z7LJ/?spm_id_from333.337.search-card.all.click&vd_source7a1a0bc74158c6993c7355c5490fc600 别人的笔记&#xff1a;https://zhuanlan.zhihu.com/p/626966526 6. 转换任务&#xff08;Transforming&#xff0…

什么是静态住宅ip,跨境电商为什么要用静态住宅ip

在数字时代&#xff0c;IP地址不仅是设备联网的“ID”&#xff0c;更是跨境电商运营中的关键工具。尤其对于需要长期稳定、安全操作的场景&#xff0c;静态住宅IP逐渐成为行业首选。 一、什么是静态住宅IP&#xff1f; 静态住宅IP&#xff08;Static Residential IP&#xff0…

Qemu-STM32(十七):STM32F103加入AFIO控制器

概述 本文主要描述了在Qemu平台中&#xff0c;如何添加STM32F103的AFIO控制器模拟代码&#xff0c;AFIO是属于GPIO引脚复用配置的功能。 参考资料 STM32F1XX TRM手册&#xff0c;手册编号&#xff1a;RM0008 添加步骤 1、在hw/arm/Kconfig文件中添加STM32F1XX_AFIO&#x…

QuecPython+audio:实现音频的录制与播放

概述 QuecPython 作为专为物联网设计的开发框架&#xff0c;通过高度封装的 Python 接口为嵌入式设备提供了完整的音频处理能力。本文主要介绍如何利用 QuecPython 快速实现音频功能的开发。 核心优势 极简开发&#xff1a;3行代码完成基础音频录制与播放。快速上手&#xf…

企业架构之旅(3):TOGAF ADM架构愿景的核心价值

一、引言&#xff1a;为什么架构愿景是企业架构的「导航图」 在企业数字化转型的浪潮中&#xff0c;TOGAF ADM&#xff08;架构开发方法&#xff09;作为公认的企业架构「方法论圣经」&#xff0c;其首个关键阶段 —— 架构愿景&#xff08;Architecture Vision&#xff09;&a…

C++:Lambda表达式

C&#xff1a;Lambda表达式 C中lambda的基本语法1. 捕获列表&#xff08;Capture List&#xff09;2. 示例代码示例 1&#xff1a;简单的lambda示例 2&#xff1a;捕获变量示例 3&#xff1a;按引用捕获示例 4&#xff1a;捕获所有变量示例 5&#xff1a;作为函数参数 3. lambd…

被关在idea小黑屏里写spark程序

一、先在idea中添加Scala插件 二、使用Maven创建新项目 1.启动idea,选择新建项目。之后的设置如下&#xff1a; 2.将Scala添加到全局库中&#xff08;注意&#xff1a;Scala的版本不宜太高&#xff0c;最好是2-12.否则后面会报下面这个错误 E:\tool接口\SparkCore_01\src\mai…

自动化立库/AGV物流仿真详细步骤

以下是一种可以在预算和周期内实现自动化立库及AGV 方案仿真分析的方法&#xff1a; 一、工具选择 软件工具FlexSim&#xff1a;这是一款流行的离散事件仿真软件。它具有直观的图形用户界面&#xff0c;通过简单的拖拽操作就可以构建自动化立库和 AGV 的模型。其内置的丰富的…

使用springboot+easyexcel实现导出excel并合并指定单元格

1&#xff1a;准备一个单元格合并策略类代码&#xff1a; import com.alibaba.excel.metadata.Head; import com.alibaba.excel.metadata.data.WriteCellData; import com.alibaba.excel.write.handler.CellWriteHandler; import com.alibaba.excel.write.metadata.holder.Writ…