python pillow模块详解
2024年7月3日大约 9 分钟约 2586 字
一个图片功能封装示例
image_process.py
"""
图像处理模块
依赖:pip install pillow requests alibabacloud_imageseg20191230 alibabacloud_ocr_api20210707==2.0.1 alibabacloud_darabonba_stream
"""
from typing import Literal, Union
import base64
import urllib.request
import io
import json
import os
import requests
from PIL import Image, ExifTags
from alibabacloud_ocr_api20210707.client import Client as ocr_api20210707Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ocr_api20210707 import models as ocr_api_20210707_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_imageseg20191230.client import Client
from alibabacloud_imageseg20191230.models import SegmentCommonImageAdvanceRequest
from alibabacloud_imageseg20191230.client import Client as imageseg20191230Client
from alibabacloud_tea_openapi.models import Config
from alibabacloud_tea_util.models import RuntimeOptions
from alibabacloud_imageseg20191230 import models as imageseg_20191230_models
from alibabacloud_tea_util.client import Client as UtilClient
class ImageProcess:
"""图像处理综合类"""
def __init__(self, data_type: Literal['base64', 'path', 'url', 'stream', 'bytes'], data):
# 将所有的输入数据类型转换为字节IO流
if data_type == 'base64':
decoded_data = base64.b64decode(data)
self.stream_data = io.BytesIO(decoded_data)
elif data_type == 'path':
with open(data, 'rb') as file:
file_data = file.read()
self.stream_data = io.BytesIO(file_data)
elif data_type == 'url':
response = requests.get(data)
response.raise_for_status()
self.stream_data = io.BytesIO(response.content)
elif data_type == 'stream':
self.stream_data = data
elif data_type == 'bytes':
self.stream_data = io.BytesIO(data)
else:
raise ValueError('不能识别的data_type数据')
# 分析图像格式
# self.stream_data.seek(0) # 重置流的位置
with Image.open(self.stream_data) as img:
self.format = img.format # 注意:格式得在应用旋转前获取,旋转后会丢失EXIF信息
# self.stream_data.seek(0) # 重置流的位置
# 通过django settings导入阿里云秘钥
from django.conf import settings
self.ALIBABA_CLOUD_ACCESS_KEY_ID = settings.ALIBABA_CLOUD_ACCESS_KEY_ID
self.ALIBABA_CLOUD_ACCESS_KEY_SECRET = settings.ALIBABA_CLOUD_ACCESS_KEY_SECRET
@property
def size_mb(self) -> float:
"""获取图片大小(MB)"""
self.stream_data.seek(0, io.SEEK_END) # 移动到流的末尾
size = self.stream_data.tell() / 1024 / 1024 # 计算大小(MB)
self.stream_data.seek(0) # 重置流的位置
return size
@property
def width(self) -> int:
"""获取图片宽度(px)"""
with Image.open(self.stream_data) as img:
# 检查并应用 EXIF 旋转信息
img = self.__apply_exif_rotation(img)
return img.size[0]
@property
def height(self) -> int:
"""获取图片高度(px)"""
with Image.open(self.stream_data) as img:
# 检查并应用 EXIF 旋转信息
img = self.__apply_exif_rotation(img)
return img.size[1]
def __apply_exif_rotation(self, img):
"""
检查并应用图像的 EXIF 旋转信息。
使用 with Image.open(self.stream_data) as img: 打开图像后,图像旋转方向可能有问题,需要在打开图像后应用图片旋转信息
"""
orientation = None
try:
# 寻找 Orientation 标签的编码
for tag, value in ExifTags.TAGS.items():
if value == 'Orientation':
orientation = tag
break
# 如果找到了 Orientation 标签
if orientation and img._getexif():
exif = dict(img._getexif().items())
# 应用 EXIF 旋转
if exif.get(orientation) == 3:
img = img.rotate(180, expand=True)
elif exif.get(orientation) == 6:
img = img.rotate(270, expand=True)
elif exif.get(orientation) == 8:
img = img.rotate(90, expand=True)
except (AttributeError, KeyError, IndexError):
# EXIF 信息不可用或不存在旋转信息
pass
return img
def resize_image(self, max_width, max_height):
"""
调整图像大小,使其宽度不超过 max_width,高度不超过 max_height,
同时保持宽高比不变。
参数:
is_return: 是否返回,如果返回则不保存到self.stream_data中
"""
# 确保当前流在开始位置
self.stream_data.seek(0)
# 打开图像
with Image.open(self.stream_data) as img:
# 检查并应用 EXIF 旋转信息
img = self.__apply_exif_rotation(img)
# 计算新的尺寸,保持宽高比不变
ratio = min(max_width / img.width, max_height / img.height)
new_size = (int(img.width * ratio), int(img.height * ratio))
# 调整图像大小
img_resized = img.resize(new_size, Image.Resampling.LANCZOS)
# 保存调整后的图像回流
output_stream = io.BytesIO()
img_resized.save(output_stream, format=self.format)
output_stream.seek(0)
# 重置流的位置
self.stream_data.seek(0)
# 更新流数据
self.stream_data = output_stream
def auto_compress(self):
"""图片调整像素大小时,会做自动压缩,这个压缩效果很好"""
self.resize_image(self.width, self.height)
def compress_to_size(self, target_size_mb: float, quality_step=5):
"""
压缩图像,使其大小接近 target_size_mb(兆字节)。
quality_step 定义了每次尝试压缩时质量降低的步长。
"""
# if self.format == 'PNG':
# raise ValueError('png不支持图像质量调整')
# 确保当前流在开始位置
self.stream_data.seek(0)
# 打开图像
with Image.open(self.stream_data) as img:
# 检查并应用 EXIF 旋转信息
img = self.__apply_exif_rotation(img)
if self.format == 'PNG':
img = img.convert('RGB')
# 从高质量开始尝试压缩
quality = 95
while quality > 0:
# 创建一个新的 BytesIO 对象用于存储压缩后的图像
output_stream = io.BytesIO()
# 以当前质量保存图像
img.save(output_stream, format='JPEG', quality=quality, optimize=True)
# 检查压缩后的大小是否符合要求
current_size_mb = output_stream.tell() / 1024 / 1024
if current_size_mb <= target_size_mb:
# 更新流数据和文件大小
self.stream_data = output_stream
return output_stream
else:
# 减小质量并重试
quality -= quality_step
# 如果循环结束仍未达到目标大小,则抛出错误
raise ValueError("无法将图像压缩到目标大小")
def auto_revise_image_from_tianrang(self):
"""天壤自动校正图像
云市场控制台:https://market.console.aliyun.com/
应用主页:https://market.aliyun.com/products/57124001/cmapi00061683.html
API调试:https://market.aliyun.com/apimarket/detail/cmapi00061683
API文档:https://netmarket.oss-cn-hangzhou.aliyuncs.com/b1bdb0c3062e4916963069d0c32ba88d.pdf
"""
# 图片大小处理
if self.size_mb >= 5:
self.auto_compress()
if self.size_mb >= 5:
self.compress_to_size(5)
# API details
api_url = 'http://scanimage.market.alicloudapi.com/scan_image'
appcode = 'bb438a4c603d438e81a86a8356f5f12d' # 这里是阿里云市场应用的appcode
payload = {
'media_id': self.get_base64(), # 需去掉前缀data:image/png;base63
'type': 'color', # 返回彩色
'keep_ori': 'false', # 图片会旋转成正方向
'keep_distortion': 'false' # 图片会进行自动裁边和畸变矫正
}
headers = {
'Authorization': 'APPCODE ' + appcode,
'Content-Type': 'application/json; charset=UTF-8'
}
response = requests.post(api_url, data=json.dumps(payload), headers=headers)
result = response.json()
if result.get('code') == 0:
base64_data = result['data']['data']['mediaId']
# 解码 base64 字符串以获得二进制数据
image_data = base64.b64decode(base64_data)
# 使用解码后的数据创建 BytesIO 对象
self.stream_data = io.BytesIO(image_data)
def auto_revise_image_from_tianrang2(self):
"""天壤自动校正图像,接口二
云市场控制台:https://market.console.aliyun.com/
应用主页:https://market.aliyun.com/products/57124001/cmapi00060054.html
API调试:https://market.aliyun.com/apimarket/detail/cmapi00060054
API文档:https://netmarket.oss-cn-hangzhou.aliyuncs.com/8626137f126e41b4a141f1bea45ccf24.pdf
"""
# 图片大小处理
if self.size_mb >= 5:
self.auto_compress()
if self.size_mb >= 5:
self.compress_to_size(5)
# API details
api_url = 'http://docdewarp.market.alicloudapi.com/doc-dewarping'
appcode = 'bb438a4c603d438e81a86a8356f5f12d' # 这里是阿里云市场应用的appcode
payload = {
'media_id': self.get_base64(), # 需去掉前缀data:image/png;base63
'keep_ori': False, # 图片会旋转成正方向
}
headers = {
'Authorization': 'APPCODE ' + appcode,
'Content-Type': 'application/json; charset=UTF-8'
}
response = requests.post(api_url, json=payload, headers=headers)
result = response.json()
if result.get('code') == 0:
base64_data = result['data']['media_id']
# 解码 base64 字符串以获得二进制数据
image_data = base64.b64decode(base64_data)
# 使用解码后的数据创建 BytesIO 对象
self.stream_data = io.BytesIO(image_data)
def auto_revise_image_from_aike(self):
"""艾科瑞特 图像扭曲矫正
云市场控制台:https://market.console.aliyun.com/
应用主页:https://market.aliyun.com/products/57124001/cmapi034677.html
API文档:https://apihub.icredit.link/api-79746323
API调试:https://market.aliyun.com/apimarket/detail/cmapi034677
手机拍照的高清图像识别会有点问题
"""
api_url = 'http://redirect.market.alicloudapi.com/ai_image_process/distortion_correction/v1'
appcode = 'bb438a4c603d438e81a86a8356f5f12d' # 这里是阿里云市场应用的appcode
payload = {
'IMAGE_TYPE': '0', # 0: 图片base64,1: 图片URL(需要urlencode编码)
'IMAGE': self.get_base64(),
}
headers = {
'Authorization': 'APPCODE ' + appcode,
'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8'
}
response = requests.post(api_url, data=payload, headers=headers)
if response.status_code == 200:
result = response.json()
image_base64_data = result['智能图像透视矫正方向检测实体信息']['图像透视矫正BASE64编码']
base64_data = image_base64_data.split(',')[1]
# 解码 base64 字符串以获得二进制数据
image_data = base64.b64decode(base64_data)
# 使用解码后的数据创建 BytesIO 对象
self.stream_data = io.BytesIO(image_data)
def __aliyun_ocr(self, file_url: str = None, type: str = 'General', **kwargs):
"""
阿里云 OCR 文字识别
OCR 管理后台:https://ocr.console.aliyun.com/overview
OCR OpenAPI文档:https://api.aliyun.com/api-tools/sdk/ocr-api?language=python-tea
依赖: pip install alibabacloud_ocr_api20210707==2.0.1 alibabacloud_darabonba_stream
参数:
type (str): 识别类型,General Advanced
file_url (str): 文件URL
file_stream (str): 文件bytes流
注意:
file_url 与 file_stream 二选一
"""
# 确保流数据在开始位置
self.stream_data.seek(0)
config = open_api_models.Config(access_key_id=self.ALIBABA_CLOUD_ACCESS_KEY_ID,
access_key_secret=self.ALIBABA_CLOUD_ACCESS_KEY_SECRET)
# Endpoint 请参考 https://api.aliyun.com/product/ocr-api
config.endpoint = f'ocr-api.cn-hangzhou.aliyuncs.com'
client: ocr_api20210707Client = ocr_api20210707Client(config)
recognize_all_text_request = ocr_api_20210707_models.RecognizeAllTextRequest(
type=type,
body=self.stream_data,
url=file_url,
**kwargs
)
runtime = util_models.RuntimeOptions()
try:
return client.recognize_all_text_with_options(recognize_all_text_request, runtime)
except Exception as error:
# 如有需要,请打印 error
print(error.message)
def get_general_orc_text(self, *args, **kwargs):
"""通用文字识别"""
result = self.__aliyun_ocr(type='General', *args, **kwargs)
# content = result.body.data.content
result_data = ''
tmp_data = ''
for i in result.body.data.sub_images[0].block_info.block_details:
if i.block_confidence > 95:
# result_data += i.block_content
if len(i.block_content) == 1:
tmp_data += i.block_content
continue
if len(tmp_data) != 0:
result_data += tmp_data
result_data += '\r\n'
tmp_data = ''
result_data += i.block_content
result_data += '\r\n'
# result_data += content
return result_data
def auto_png_split(self, is_crop=True):
"""智能抠图转png
is_crop代表是否自动剪裁
示例代码:https://help.aliyun.com/zh/viapi/use-cases/general-image-segmentation?spm=a2c4g.11186623.0.i0
python依赖:pip install alibabacloud_imageseg20191230
上传要求:JPEG、JPG、PNG(不支持8位、16位、64位PNG)、BMP、WEBP。 图片大小:小于3MB。 图片分辨率:分辨率大于32×32小于2000×2000。
"""
# 检查图像尺寸和大小并调整
if self.width >= 2000 or self.height >= 2000:
self.resize_image(1999, 1999)
if self.size_mb >= 3:
self.auto_compress()
if self.size_mb >= 3:
self.compress_to_size(3)
# 原图像
source_stream_data = self.stream_data
source_stream_data.read()
source_size_mb = source_stream_data.tell()
# 确保当前流在开始位置
source_stream_data.seek(0)
# 打开图像
with Image.open(source_stream_data) as img:
source_width = img.width
source_height = img.height
config = Config(
# 创建AccessKey ID和AccessKey Secret,请参考https://help.aliyun.com/document_detail/175144.html。
# 如果您用的是RAM用户的AccessKey,还需要为RAM用户授予权限AliyunVIAPIFullAccess,请参考https://help.aliyun.com/document_detail/145025.html。
# 从环境变量读取配置的AccessKey ID和AccessKey Secret。运行代码示例前必须先配置环境变量。
access_key_id=self.ALIBABA_CLOUD_ACCESS_KEY_ID,
access_key_secret=self.ALIBABA_CLOUD_ACCESS_KEY_SECRET,
# 访问的域名。
endpoint='imageseg.cn-shanghai.aliyuncs.com',
# 访问的域名对应的region
region_id='cn-shanghai'
)
# 确保流数据在开始位置
self.stream_data.seek(0)
segment_common_image_request = SegmentCommonImageAdvanceRequest()
segment_common_image_request.image_urlobject = self.stream_data
# 对边缘空白区域进行裁剪
if is_crop:
segment_common_image_request.return_form = 'crop'
runtime = RuntimeOptions()
try:
# 初始化Client
client = Client(config)
response = client.segment_common_image_advance(segment_common_image_request, runtime)
# 获取图片URL结果
result_image_url = response.body.data.image_url
# 打印URL
# print(result_image_url)
# 转换为字节流
with urllib.request.urlopen(result_image_url) as image_url_response:
result_image_bytes = image_url_response.read()
self.stream_data = io.BytesIO(result_image_bytes)
self.stream_data.read()
new_size_mb = self.stream_data.tell()
# 确保当前流在开始位置
self.stream_data.seek(0)
# 打开图像
with Image.open(self.stream_data) as img:
new_width = img.width
new_height = img.height
split_mb_ratio = new_size_mb / source_size_mb
split_width_ratio = new_width / source_width
split_height_ratio = new_height / source_height
if split_mb_ratio <= 0.35 or (split_width_ratio <= 0.1 and split_height_ratio <= 0.1):
self.stream_data = source_stream_data
return
self.format = 'PNG'
except Exception as error:
if hasattr(error, 'code'):
return None, error.code
else:
return None, str(error)
def get_avatar_png(self, is_show=False) -> io.BytesIO:
"""将人体抠出来返回
对与未处理的拍照图片可能识别不了,尝试抠图或者校正后再试
API示例:https://next.api.aliyun.com/api/imageseg/2019-12-30/SegmentBody
要求:图像分辨率小于2000*2000,图片大小小于3MB
参数:
is_show: 是否打开调试状态,显示处理完后的头像
"""
# 备份原数据,完成后还原
old_stream_data = self.stream_data
# 检查图像尺寸和大小并调整
if self.width >= 2000 or self.height >= 2000:
self.resize_image(1999, 1999)
if self.size_mb >= 3:
self.auto_compress()
if self.size_mb >= 3:
self.compress_to_size(3)
config = open_api_models.Config(
# 必填,您的 AccessKey ID,
access_key_id=self.ALIBABA_CLOUD_ACCESS_KEY_ID,
# 必填,您的 AccessKey Secret,
access_key_secret=self.ALIBABA_CLOUD_ACCESS_KEY_SECRET
)
# Endpoint 请参考 https://api.aliyun.com/product/imageseg
config.endpoint = f'imageseg.cn-shanghai.aliyuncs.com'
client = imageseg20191230Client(config)
segment_body_request = imageseg_20191230_models.SegmentBodyAdvanceRequest()
segment_body_request.image_urlobject = self.stream_data
segment_body_request.return_form = 'crop'
runtime = util_models.RuntimeOptions()
try:
# response = client.segment_body_with_options(segment_body_request, runtime)
response = client.segment_body_advance(segment_body_request, runtime)
# 获取图片URL结果
result_image_url = response.body.data.image_url
# 转换为字节流
with urllib.request.urlopen(result_image_url) as image_url_response:
result_image_bytes = image_url_response.read()
image_stream = io.BytesIO(result_image_bytes)
# 打印
if is_show:
print(result_image_url)
self.show_image(image_stream)
return result_image_bytes
except Exception as error:
# 错误 message
print(error.message)
# 诊断地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
finally:
# 还原数据
self.stream_data = old_stream_data
def show_image(self, stream_data=None):
"""
使用默认图像查看器展示图像,并在控制台打印图像的大小、像素和格式信息。
"""
if not stream_data:
stream_data = self.stream_data
# 确保流数据在开始位置
stream_data.seek(0)
# 计算大小(MB)
stream_data.seek(0, io.SEEK_END) # 移动到流的末尾
size_mb = stream_data.tell() / 1024 / 1024 # 计算大小(MB)
stream_data.seek(0) # 重置流的位置
# 从流中加载图像
img = Image.open(stream_data)
img = self.__apply_exif_rotation(img)
# 图像信息
info_text = f"尺寸(像素): {img.size[0]} x {img.size[1]}\n" \
f"文件大小(MB): {size_mb:.2f}\n" \
f"格式: {img.format}"
print(info_text)
# 显示图像
img.show()
def save_to_file(self, file_path):
"""
将图像保存到文件。
"""
with open(file_path, 'wb') as file:
file.write(self.stream_data.getvalue())
def get_stream(self):
"""
返回一个包含图像数据的 I/O 流。
"""
# 创建一个新的 BytesIO 对象,以免更改原始流的位置
return io.BytesIO(self.stream_data.getvalue())
def get_base64(self):
"""
返回图像数据的 base64 编码字符串。
"""
return base64.b64encode(self.stream_data.getvalue()).decode()
def get_bytes(self):
"""
返回图像数据的字节串。
"""
return self.stream_data.getvalue()
if __name__ == '__main__':
# 加载Django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "conf.settings")
import django
django.setup()
# img_process = ImageProcess('path', '../../docs/tmp/jianli_avatar.png')
# img_process = ImageProcess('path', '../../docs/tmp/aa.png')
result = ''
for i in ['../../docs/tmp/test4.jpg','../../docs/tmp/test5.jpg','../../docs/tmp/test6.jpg']:
img_process = ImageProcess('path', i)
img_process.auto_png_split()
img_process.show_image()
img_process.auto_revise_image_from_tianrang2()
img_process.show_image()
# img_process.resize_image(1279, 1279)
# img_process.auto_compress()
# img_process.auto_revise_image_from_tianrang2()
# img_process.get_avatar_png(is_show=True)
# img_process.show_image()
# content = img_process.get_general_orc_text()
# print(content)
# img_process.save_to_file('../../docs/tmp/aa_resized.jpg')
result += img_process.get_general_orc_text()
print(result)
from utils.api.aliyun.aliyun_nlp import ResumeNPL
result_data = ResumeNPL().get_from_pdf(resume_text=result)
# pdf_file_obj = open('../../docs/tmp/111111.pdf','rb')
# result_data = ResumeNPL().get_from_pdf(pdf_file_obj=pdf_file_obj)
print(result_data)