文章目录 1. 说明 2. 准备工作 3. 代码 3.1 导入库: 3.2 遍历发票指定处理方式 3.3 发票识别相关函数 3.4 发票字段定位函数 3.6 识别记录相关函数 3.6 识别结果校验 3.7 文件预处理等其他函数 3.8 main主函数
1. 说明
1.1 以paddle识别引擎为基础的增值税发票识别程序,可批量识别和累积纸质发票和电子发票数据。已经生产环境中测试。
1.2 识别的源发票数据:- 文件夹中存放的用高速连续发票扫描仪批量扫描的JPG格式图片- 文件夹中汇集的电子发票PDF格式文件
1.3 可选择用识别引擎:快速-mb 平衡:sv 精细-pp (总体上,预识别用mb,精细用pd,速度和精确度比较好。
1.4 适配断续工作,跳过已扫描的重复发票,边识别边存储。
1.5 可装在闲置低配置的win7老台式,资源利用,识别速度视电脑配置差异大概2-3秒一张。
1.6 在实际生产环境中测试,如果纸质发票不清晰,综合识别准确率大概85%-95%左右。如果数电发票比较多,识别准确率大概达到97%以上。
1.7 对于识别有误或缺失的数据,在结果中提示错误并链接原发票文件,以便人工直接对照修改。
1.8 其他: - 公司名称税号可在代码中预置设定好,位置在发票字段定位函数Loc_range_content_pandas。- 可自行预置对方公司名称错误的更正,详细可在Check_result函数中此处文字内容"字段修正:公司名错别字"所在位置的字典修改。
2. 准备工作
2.1 准备工作发票电子文件夹:已用高速连续发票扫描仪扫描完纸质发票的图片文件夹,和已汇集的电子发票PDF格式文件夹。
2.2 安装好辅助程序 acrobat pro dc
2.3 语言环境 anaconda,python3.7(虚拟环境)
2.4 环境中安装好所需要的库(自行安装好虚拟环境中所需的第三方库):imghdr, shutil, glob, pathlib, tkinter, cv2, numpy, paddlehub, pandas, psutil, openpyxl, paddleocr, pillow, pyzbar, ZipFile, pymupdf
3. 代码
3.1 导入库:
import imghdr
import math
import os
import re
import shutil
from collections import OrderedDict
from datetime import datetime
from glob import glob
from pathlib import Path
from tkinter import filedialog
from tkinter import Tk
import cv2
import numpy as np
import paddlehub as hub
import pandas as pd
import psutil
from openpyxl import cell, load_workbook
from openpyxl. styles import Font, colors
from paddleocr import PaddleOCR, draw_ocr
from PIL import Image, ImageDraw, ImageEnhance, ImageFont
from pyzbar import pyzbar
from zipfile import ZipFile
import fitz
3.2 遍历发票指定处理方式
def walk_folder_ocr ( origin_pandas, duplicate_pandas, origin_folder_path, ** walk_folder_args) : ocr_engines = walk_folder_args[ 'ocr_engines' ] temp_folder_path = walk_folder_args[ 'temp_folder_path' ] prepare_engine = walk_folder_args[ 'engine_switch' ] result_pandas = origin_pandascnt_file = len ( { p. resolve( ) for p in Path( origin_folder_path) . glob( "*" ) if p. suffix in [ ".jpg" , ".pdf" ] } ) inv_dict = { } if not result_pandas. empty: for i, ( index, row) in enumerate ( result_pandas. iterrows( ) ) : if row[ '01票号' ] is np. NAN: continue if row[ '01票号' ] not in inv_dict: inv_dict[ row[ '01票号' ] ] = [ row[ 'file_path' ] ] else : inv_dict[ row[ '01票号' ] ] . append( row[ 'file_path' ] ) if not duplicate_pandas. empty: for i, ( index, row) in enumerate ( duplicate_pandas. iterrows( ) ) : if row[ '重复票号' ] is np. NAN: continue if row[ '重复票号' ] not in inv_dict: inv_dict[ row[ '重复票号' ] ] = [ row[ 'file_path' ] ] else : inv_dict[ row[ '重复票号' ] ] . append( row[ 'file_path' ] ) cnt_done = 0 cnt_duplicate = 0 if not origin_pandas. empty: cnt_done = len ( origin_pandas. loc[ origin_pandas[ 'file_path' ] . notnull( ) , : ] ) if not duplicate_pandas. empty: cnt_duplicate = len ( duplicate_pandas. loc[ duplicate_pandas[ 'file_path' ] . notnull( ) , : ] ) for file_name in os. listdir( origin_folder_path) : file_path = os. path. join( origin_folder_path, file_name) if os. path. isfile( file_path) : pr, nm, fr, ex = pathsplit( file_path) if ex not in [ '.pdf' , '.jpg' ] : continue inv_out_of_result_pandas = True inv_out_of_duplicate_pandas = True try : inv_out_of_result_pandas = result_pandas. loc[ result_pandas[ 'file_path' ] == file_path, : ] . emptyinv_out_of_duplicate_pandas = duplicate_pandas. loc[ duplicate_pandas[ 'file_path' ] == file_path, : ] . emptyexcept : pass if not ( inv_out_of_result_pandas and inv_out_of_duplicate_pandas) : continue result_series_orderdic = OrderedDict( ) err_info = '' if ex == '.pdf' : inv_code = '' pdf_trans_file_fr = frpdf_trans_file_ex = '.xlsx' pdf_trans_file_nm = pdf_trans_file_fr + pdf_trans_file_expdf_trans_folder_name = 'temp_pdf_trans_excel' pdf_trans_folder_path = os. path. join( temp_folder_path, pdf_trans_folder_name) if not os. path. exists( pdf_trans_folder_path) : os. mkdir( pdf_trans_folder_path) pdf_trans_file_path = os. path. join( pdf_trans_folder_path, pdf_trans_file_nm) if not os. path. exists( pdf_trans_file_path) : trans_type = '.xlsx' pdf_trans_file_path = Pdf_tans_to( file_path, pdf_trans_file_path, trans_type = trans_type, temp_pdf_trans_excel_out = True ) if os. path. exists( pdf_trans_file_path) : result_series_orderdic, err_info, inv_dict = Tele_inv_ocr( ocr_engines, result_series_orderdic, inv_dict, file_path, pdf_trans_file_path, err_info, engine_switch = precise_engine) if len ( result_series_orderdic) != 0 : if '01票号' in result_series_orderdic: inv_code = result_series_orderdic[ '01票号' ] [ 0 ] . values[ 0 ] if inv_code not in inv_dict: inv_dict[ inv_code] = [ file_path] else : if file_path not in inv_dict[ inv_code] : inv_dict[ inv_code] . append( file_path) if len ( inv_dict[ inv_code] ) > 1 : if duplicate_pandas. empty: duplicate_pandas = pd. DataFrame( data= { '重复票号' : [ inv_code] , 'file_path' : [ file_path] } ) else : duplicate_pandas = pd. concat( [ duplicate_pandas, pd. DataFrame( data= { '重复票号' : [ inv_code] , 'file_path' : [ file_path] } ) ] , ignore_index = True , axis = 0 ) Log_result_file( duplicate_pandas, result_file_path, duplicate_sheet_name) cnt_duplicate = cnt_duplicate + 1 print ( datetime. now( ) . strftime( "%H:%M:%S" ) , file_path, 'Skip. ' , '\n\t\tDuplicate:' , inv_code, inv_dict[ inv_code] [ 0 ] ) continue else : pdf_trans_file_ex = '.jpg' pdf_trans_file_nm = pdf_trans_file_fr + '.jpg' pdf_trans_folder_name = 'temp_pdf_trans_jpg' pdf_trans_folder_path = os. path. join( temp_folder_path, pdf_trans_folder_name) pdf_trans_jpg_file_path = os. path. join( pdf_trans_folder_path, pdf_trans_file_nm) pdf_trans_jpg_file_path = Pdf_tans_jpg( file_path, pdf_trans_jpg_file_path, temp_pdf_trans_jpg_out = True ) if len ( pdf_trans_jpg_file_path) > 0 : if os. path. exists( pdf_trans_jpg_file_path) : print ( '\n\nPDF转成图片识别:' , pdf_trans_jpg_file_path, '【此模块待添加。】\n\n' ) elif str . lower( ex) == '.jpg' : known_dict = { } inv_code = '' temp_img_trans_excel_folder = os. path. join( temp_folder_path, 'temp_img_trans_excel' ) img_trans_xls_name = 'result_' + fr + '.xlsx' img_trans_xls_path = os. path. join( temp_img_trans_excel_folder, img_trans_xls_name) if os. path. exists( img_trans_xls_path) : origin_df = pd. read_excel( img_trans_xls_path, sheet_name= 0 , header= 0 , index_col= 0 , na_values= None , keep_default_na= False , dtype= object ) else : known_dict = Crop_known_from_qrcode( file_path) if len ( known_dict) > 0 : inv_code = known_dict[ '01票号' ] . values[ 0 ] if inv_code not in inv_dict: inv_dict[ inv_code] = [ file_path] else : if file_path not in inv_dict[ inv_code] : inv_dict[ inv_code] . append( file_path) if len ( inv_dict[ inv_code] ) > 1 : if duplicate_pandas. empty: duplicate_pandas = pd. DataFrame( data= { '重复票号' : [ inv_code] , 'file_path' : [ file_path] } ) else : duplicate_pandas = pd. concat( [ duplicate_pandas, pd. DataFrame( data= { '重复票号' : [ inv_code] , 'file_path' : [ file_path] } ) ] , ignore_index = True , axis = 0 ) Log_result_file( duplicate_pandas, result_file_path, duplicate_sheet_name) cnt_duplicate = cnt_duplicate + 1 print ( datetime. now( ) . strftime( "%H:%M:%S" ) , file_path, 'Skip. ' , '\n\t\tDuplicate:' , inv_code, inv_dict[ inv_code] [ 0 ] ) continue origin_df = Ocr_func( ocr_engines, img_path = file_path, temp_folder_path = temp_folder_path, range_title = '' , known_dict= known_dict, ocr_excel_out = ocr_excel_out, draw_result_out = draw_result_out, engine_switch= prepare_engine) if not origin_df. empty: result_series_orderdic, err_info = Loc_range_content_pandas( ocr_engines, origin_df, result_series_orderdic, err_info, known_dict, file_path, temp_folder_path, enhance = enhance, engine_switch= precise_engine) if len ( result_series_orderdic[ '01票号' ] ) > 0 : inv_code = result_series_orderdic[ '01票号' ] . values[ 0 ] if inv_code not in inv_dict: inv_dict[ inv_code] = [ file_path] else : if file_path not in inv_dict[ inv_code] : inv_dict[ inv_code] . append( file_path) if len ( inv_code) > 0 and inv_code in inv_dict and len ( inv_dict[ inv_code] ) > 1 : if duplicate_pandas. empty: duplicate_pandas = pd. DataFrame( data= { '重复票号' : [ inv_code] , 'file_path' : [ file_path] } ) else : duplicate_pandas = pd. concat( [ duplicate_pandas, pd. DataFrame( data= { '重复票号' : [ inv_code] , 'file_path' : [ file_path] } ) ] , ignore_index = True , axis = 0 ) Log_result_file( duplicate_pandas, result_file_path, duplicate_sheet_name) cnt_duplicate = cnt_duplicate + 1 print ( datetime. now( ) . strftime( "%H:%M:%S" ) , file_path, 'Skip. ' , '\n\t\tDuplicate:' , inv_code, inv_dict[ inv_code] [ 0 ] ) continue bind_df = pd. DataFrame( [ result_series_orderdic[ series_title] [ 0 ] if isinstance ( result_series_orderdic[ series_title] , list ) else result_series_orderdic[ series_title] for series_title in result_series_orderdic] ) . Tcolumns_list = [ '01票号' , '02代码' , '03日期' , '04购方' , '05购方税号' , '06品名' , '07单位' , '08数量' , '09单价' , '10税前' , '11税率' , '12税额' , '13合计税前' , '14合计税额' , '15总额' , '16大写' , '17销方' , '18销方税号' ] if len ( bind_df) == 0 : bind_df = pd. DataFrame( columns = columns_list) result_df = bind_df. copy( ) result_df[ 'file_path' ] = '' if len ( result_df) == 0 : result_df = result_df. append( { 'file_path' : file_path} , ignore_index = True ) else : result_df[ 'file_path' ] . values[ 0 ] = file_path result_df[ 'err_info' ] = '' result_df. loc[ result_df. index[ 0 ] , 'err_info' ] = err_info result_df = Fill_na_result( result_df) if result_pandas. empty: result_pandas = result_dfelse : result_pandas = pd. concat( [ result_pandas, result_df] , ignore_index = True , axis = 0 ) result_pandas = Check_result( result_pandas) Log_result_file( result_pandas, result_file_path, result_sheet_name) Add_hyperlink( result_file_path, result_sheet_name) cnt_done = cnt_done + 1 print ( datetime. now( ) . strftime( "%H:%M:%S" ) , file_name, inv_code, 'done: ' + str ( cnt_done) + ' / ' + str ( cnt_file) ) return result_pandas, duplicate_pandas
3.3 发票识别相关函数
def Ocr_func ( ocr_engines, img_path, temp_folder_path, range_title= '' , known_dict = { } , ocr_excel_out = True , draw_result_out = False , engine_switch = 0 ) - > object : p, n, fr, ex = pathsplit( img_path) temp_img_trans_excel_folder = os. path. join( temp_folder_path, 'temp_img_trans_excel' ) temp_draw_result_folder = os. path. join( temp_folder_path, 'temp_draw_result' ) if engine_switch == 0 : engine = 'mb' elif engine_switch == 1 : engine = 'pp' elif engine_switch == 2 : engine = 'sv' if range_title == '' : img_trans_xls_name = 'result(' + engine + ')_' + fr + '.xlsx' else : img_trans_xls_name = 'result(' + engine + ')_' + fr + '_' + range_title + '.xlsx' img_trans_xls_path = os. path. join( temp_img_trans_excel_folder, img_trans_xls_name) if not os. path. exists( temp_img_trans_excel_folder) : Create_clear_dir( temp_img_trans_excel_folder) if not os. path. exists( temp_draw_result_folder) : Create_clear_dir( temp_draw_result_folder) result = '' if engine_switch == 1 : paddleOcr = ocr_engines[ engine_switch] results = paddleOcr. ocr( img_path, cls= True ) df0 = pd. DataFrame( data= results, columns= [ 'pix' , 'result' ] ) df1 = pd. concat( [ pd. DataFrame( df0[ 'pix' ] . values. tolist( ) , columns= [ 'lu' , 'ru' , 'rd' , 'ld' ] ) , pd. DataFrame( df0[ 'result' ] . values. tolist( ) , columns= [ 'content' , 'trust' ] ) ] , axis= 1 ) title_list = [ 'lu' , 'ru' , 'rd' , 'ld' ] df = df1[ [ 'content' , 'trust' ] ] for i, title in enumerate ( title_list) : df = pd. concat( [ df, pd. DataFrame( df1[ title] . values. tolist( ) , columns= [ title + 'w' , title + 'h' ] ) ] , axis= 1 ) if ocr_excel_out == True : df. to_excel( img_trans_xls_path, index= False ) if draw_result_out == True : from PIL import Imageimage = Image. open ( img_path) . convert( 'RGB' ) boxes = [ line[ 0 ] for line in result] txts = [ line[ 1 ] [ 0 ] for line in result] scores = [ line[ 1 ] [ 1 ] for line in result] im_show = draw_ocr( image, boxes, txts, scores, font_path= './fonts/simfang.ttf' ) im_show = Image. fromarray( im_show) if range_title == '' : draw_result_name = 'draw_result_' + fr + exelse : draw_result_name = 'draw_result_' + fr + '_' + range_title + ex draw_result_path = os. path. join( temp_draw_result_folder, draw_result_name) im_show. save( draw_result_path) elif engine_switch == 0 or engine_switch == 2 : hubOcr = ocr_engines[ engine_switch] img = cv_imread( img_path) np_images = [ img]
hub_result = hubOcr. recognize_text( images= np_images, use_gpu= False , output_dir= temp_draw_result_folder, visualization= True , box_thresh= 0.5 , text_thresh= 0.5 ) results = hub_result[ 0 ] [ 'data' ] df = pd. DataFrame( ) column_list = [ 'content' , 'confdence' , 'luw' , 'luh' , 'ruw' , 'ruh' , 'rdw' , 'rdh' , 'ldw' , 'ldh' ] for infomation in results: content = infomation[ 'text' ] confidence = infomation[ 'confidence' ] box = infomation[ 'text_box_position' ] luw, luh, ruw, ruh = box[ 0 ] [ 0 ] , box[ 0 ] [ 1 ] , box[ 1 ] [ 0 ] , box[ 1 ] [ 1 ] rdw, rdh, ldw, ldh = box[ 2 ] [ 0 ] , box[ 2 ] [ 1 ] , box[ 3 ] [ 0 ] , box[ 3 ] [ 1 ] line = [ content, confidence, luw, luh, ruw, ruh, rdw, rdh, ldw, ldh] line_df = pd. DataFrame( data = line, index = column_list) . Tif df. empty: df = line_dfelse : df = pd. concat( [ df, line_df] , axis= 0 , ignore_index= True ) if ocr_excel_out == True : df. to_excel( img_trans_xls_path, index = False ) return df
def Crop_known_from_qrcode ( file_path) - > dict : known_dict = { } pr, nm, fr, ex = pathsplit( file_path) qrcode_folder_name = 'temp_crop_qrcode' qrcode_folder_path = os. path. join( temp_folder_path, qrcode_folder_name) if not os. path. exists( qrcode_folder_path) : Create_clear_dir( qrcode_folder_path) qrcode_file_name = 'qrcode_' + nmqrcode_file_path = os. path. join( qrcode_folder_path, qrcode_file_name) qrcode_image_crop = Crop_qrcode_image( file_path, qrcode_file_path) qrcode_result = '' if qrcode_image_crop == True : qrcode_result = qrcode_recongnize( qrcode_file_path) if len ( qrcode_result) > 0 : if len ( qrcode_result) > 20 : qrcode_list = qrcode_result. split( ',' ) for index, range_title in enumerate ( [ '02代码' , '01票号' , '13合计税前' , '04日期' ] ) : known_dict[ range_title] = pd. Series( data= qrcode_list[ index+ 2 ] , name = range_title) return known_dict
def Crop_qrcode_image ( origin_file_path, crop_file_path) : result = False img_inv = cv_imread( origin_file_path) img_crop = img_inv[ 100 : 400 , 50 : 350 ] img_magnify = cv2. resize( img_crop, ( 1200 , 1200 ) ) cv2. imencode( '.jpg' , img_magnify) [ 1 ] . tofile( crop_file_path) if os. path. exists( crop_file_path) : result = True return result
def qrcode_recongnize ( file_path, method = 'cv2' , drawframe = False , enhance= False ) : pr = os. path. split( file_path) [ 0 ] nm = os. path. split( file_path) [ 1 ] output_img_path = os. path. join( pr, 'draw_qrcode_' + nm) if method == 'cv2' : img = cv_imread( file_path) gray_img = cv2. cvtColor( img, cv2. COLOR_BGR2GRAY) barcodes = pyzbar. decode( gray_img) barcodeData = '' if len ( barcodes) > 0 : for barcode in barcodes: ( x, y, w, h) = barcode. rectcv2. rectangle( img, ( x, y) , ( x + w, y + h) , ( 255 , 255 , 0 ) , 2 ) barcodeData = barcode. data. decode( "utf-8" ) if len ( barcodeData) > 20 : if drawframe == True : from PIL import Image, ImageFont, ImageDrawbarcodeType = barcode. type img_PIL = Image. fromarray( cv2. cvtColor( img, cv2. COLOR_BGR2RGB) ) font = ImageFont. truetype( 'STFANGSO.TTF' , 25 ) fillColor = ( 0 , 255 , 0 ) position = ( x, y- 25 ) strl = barcodeDatadraw = ImageDraw. Draw( img_PIL) draw. text( position, strl, font= font, fill= fillColor) img_PIL. save( output_img_path, 'jpeg' ) break return barcodeDataelif method == 'pil' : from PIL import Image, ImageEnhanceimg = Image. open ( file_path) . convert( 'RGB' ) if enhance == True : img = ImageEnhance. Brightness( img) . enhance( 1.0 ) img = ImageEnhance. Sharpness( img) . enhance( 1.5 ) img = ImageEnhance. Contrast( img) . enhance( 2.0 ) img = img. convert( 'L' ) decoded = pyzbar. decode( img) result = decoded[ 0 ] [ 0 ] . decode( 'utf-8' ) return result
def Crop_ocr ( ocr_engines, result_series_orderdic, known_dict, img_inv, file_path, crop_folder_path, set_h_adjust, cond_list, enhance = False , engine_switch = 0 ) : pr, nm, fr, ex = pathsplit( file_path) range_title = cond_list[ 0 ] loc_method = cond_list[ 1 ] reg_type = cond_list[ 2 ] reg = cond_list[ 3 ] count_limit = cond_list[ 4 ] loc_dict = cond_list[ 5 ] chop_pix = loc_dict[ 'crop' ] [ min_w, max_w, min_h, max_h] = chop_pixadjust_ratio_dict = { '02代码' : 1 , '03日期' : 1 , '10税前' : 0.6 , '11税率' : 0.7 , '12税额' : 0.8 } if range_title in adjust_ratio_dict: adjust_ratio = adjust_ratio_dict[ range_title] min_h = min_h - int ( set_h_adjust * adjust_ratio) max_h = max_h - int ( set_h_adjust * adjust_ratio) crop_center_h = ( max_h - min_h) // 2 img_crop = img_inv[ min_h: max_h, min_w: max_w] enhance_title = [ '04购方' , '05购方税号' , '06品名' , '07单位' , '16大写' , '17销方' , '18销方税号' ] if enhance == True : if range_title in enhance_title: img_pil = cv2_pil( img_crop) img_enhance = pil_enhance( img_pil) img_crop = pil_cv2( img_enhance) crop_file_name = 'crop_' + range_title + '_' + nmcrop_file_path = os. path. join( crop_folder_path, crop_file_name) cv2. imencode( '.jpg' , img_crop) [ 1 ] . tofile( crop_file_path) df = Ocr_func( ocr_engines, img_path = crop_file_path, temp_folder_path = crop_folder_path, range_title = range_title, known_dict= known_dict, ocr_excel_out = True , draw_result_out = True , engine_switch = engine_switch) get_h_adjust = 0 result_sr = pd. Series( name = range_title) if reg_type == 'extract' : cond_df = df[ 'content' ] . str . extract( reg) cond_df. loc[ : , [ 'luh' , 'ldh' ] ] = df. loc[ : , [ 'luh' , 'ldh' ] ] content_result = pd. notna( cond_df[ 0 ] ) if 'center_limit' in loc_dict: center_df = df center_df[ [ 'luw' , 'ruw' , 'luh' , 'ldh' ] ] . astype( int ) center_df[ 'center_w' ] = ( center_df[ 'luw' ] + center_df[ 'ruw' ] ) // 2 center_df[ 'center_h' ] = ( center_df[ 'luh' ] + center_df[ 'ldh' ] ) // 2 [ center_w_min, center_w_max, center_h_min, center_h_max] = loc_dict[ 'center_limit' ] cond_center = ( center_w_min <= center_df. loc[ : , 'center_w' ] ) & ( center_df. loc[ : , 'center_w' ] <= center_w_max) & \( center_h_min <= center_df. loc[ : , 'center_h' ] ) & ( center_df. loc[ : , 'center_h' ] <= center_h_max) content_result = content_result & cond_centertemp_df = df. loc[ cond_df[ content_result] . index, : ] if not temp_df. empty: temp_sr = temp_df. iloc[ : , 0 ] if range_title == '07单位' : list ( temp_sr. replace( to_replace = '[单|位|数|量]' , value= '' , regex= True ) . values[ 0 ] ) else : result_list = temp_sr. to_list( ) result_sr = pd. Series( data = result_list, name = range_title) if range_title == '01票号' : data_center_h = ( temp_df[ 'luh' ] . values[ 0 ] + temp_df[ 'ldh' ] . values[ 0 ] ) // 2 get_h_adjust = int ( crop_center_h - data_center_h) if reg_type == 'contains' : content_result = df[ 'content' ] . str . contains( reg) if 'center_limit' in loc_dict: center_df = df center_df[ [ 'luw' , 'ruw' , 'luh' , 'ldh' ] ] . astype( int ) center_df[ 'center_w' ] = ( center_df[ 'luw' ] + center_df[ 'ruw' ] ) // 2 center_df[ 'center_h' ] = ( center_df[ 'luh' ] + center_df[ 'ldh' ] ) // 2 [ center_w_min, center_w_max, center_h_min, center_h_max] = loc_dict[ 'center_limit' ] cond_center = ( center_w_min <= center_df. loc[ : , 'center_w' ] ) & ( center_df. loc[ : , 'center_w' ] <= center_w_max) & \( center_h_min <= center_df. loc[ : , 'center_h' ] ) & ( center_df. loc[ : , 'center_h' ] <= center_h_max) content_result = content_result & cond_centerif range_title == '07单位' : cond_special = ~ df[ 'content' ] . str . contains( '单\s*位|数\s*量' ) content_result = content_result & cond_specialcontent_df = df. loc[ content_result, : ] if range_title == '01票号' : data_center_h = ( content_df[ 'luh' ] . values[ 0 ] + content_df[ 'ldh' ] . values[ 0 ] ) // 2 get_h_adjust = int ( crop_center_h - data_center_h) temp_df = content_df. loc[ : , [ 'content' ] ] if not temp_df. empty: temp_sr = temp_df. iloc[ : , 0 ] if range_title == '07单位' : result_list = list ( temp_sr. replace( to_replace = '[单|位|数|量]' , value= '' , regex= True ) . values[ 0 ] ) else : result_list = temp_sr. to_list( ) result_sr = pd. Series( data = result_list, name = range_title) result_series_orderdic[ range_title] = result_srreturn result_series_orderdic, get_h_adjust
def Loc_jpg_content ( df, cond_list, order_dict) : range_title = cond_list[ 0 ] loc_method = cond_list[ 1 ] reg_type = cond_list[ 2 ] reg = cond_list[ 3 ] count_limit = cond_list[ 4 ] loc_dict = cond_list[ 5 ] w_min, w_max, h_min, h_max = loc_dict[ 'direct' ] [ 0 ] , loc_dict[ 'direct' ] [ 1 ] , loc_dict[ 'direct' ] [ 2 ] , loc_dict[ 'direct' ] [ 3 ] result_sr = pd. Series( name = range_title) loc_tuple = [ ] if reg_type == 'extract' : temp_df = df[ 'content' ] . str . extract( reg) if len ( temp_df) > 0 : temp_df[ [ 'center_w' , 'center_h' , 'luw' , 'ruw' , 'luh' , 'ldh' ] ] = df[ [ 'center_w' , 'center_h' , 'luw' , 'ruw' , 'luh' , 'ldh' ] ] content_result = ( temp_df. iloc[ : , 0 ] . str . len ( ) > 0 ) cond_loc= ( w_min <= temp_df. loc[ : , 'center_w' ] ) & ( temp_df. loc[ : , 'center_w' ] <= w_max) & \( h_min <= temp_df. loc[ : , 'center_h' ] ) & ( temp_df. loc[ : , 'center_h' ] <= h_max) cond_result = content_result & cond_loc temp_cond_pandas = temp_df. loc[ cond_result, : ] if not temp_cond_pandas. empty: result_sr = temp_cond_pandas. iloc[ : , 0 ] loc_tuple = temp_cond_pandas. loc[ : , [ 'luw' , 'luh' ] ] . values[ 0 ] if len ( result_sr) == 0 : if len ( loc_dict[ 'direct' ] ) >= 8 : w_min, w_max, h_min, h_max = loc_dict[ 'direct' ] [ 4 ] , loc_dict[ 'direct' ] [ 5 ] , loc_dict[ 'direct' ] [ 6 ] , loc_dict[ 'direct' ] [ 7 ] temp_df = df[ 'content' ] . str . extract( reg) temp_df[ [ 'center_w' , 'center_h' ] ] = df[ [ 'center_w' , 'center_h' ] ] content_result = ( temp_df. iloc[ : , 0 ] . str . len ( ) > 0 ) cond_loc= ( w_min <= temp_df. loc[ : , 'center_w' ] ) & ( temp_df. loc[ : , 'center_w' ] <= w_max) & \( h_min <= temp_df. loc[ : , 'center_h' ] ) & ( temp_df. loc[ : , 'center_h' ] <= h_max) cond_result = content_result & cond_loctemp_cond_pandas = temp_df. loc[ cond_result, : ] result_sr = temp_cond_pandas. iloc[ : , 0 ] loc_tuple = temp_cond_pandas. loc[ : , [ 'luw' , 'luh' ] ] . values[ 0 ] elif len ( result_sr) >= 1 and count_limit == '1' : temp_cond_pandas = temp_df. loc[ cond_result, : ] result_sr = temp_cond_pandas. iloc[ : , 0 ] . head( 1 ) loc_tuple = temp_cond_pandas. loc[ : , [ 'luw' , 'luh' ] ] . values[ 0 ] else : result_sr = temp_df. loc[ cond_result, 0 ] loc_tuple = temp_cond_pandas. loc[ : , [ 'luw' , 'luh' ] ] . values[ 0 ] elif reg_type == 'contains' : content_result = df[ 'content' ] . str . contains( reg) temp_df = df. loc[ content_result, : ] if len ( temp_df) > 0 : cond_loc = ( w_min <= temp_df. loc[ : , 'center_w' ] ) & ( temp_df. loc[ : , 'center_w' ] <= w_max) & \( h_min <= temp_df. loc[ : , 'center_h' ] ) & ( temp_df. loc[ : , 'center_h' ] <= h_max) cond_result = content_result & cond_loctemp_cond_pandas = temp_df. loc[ cond_result, : ] if not temp_cond_pandas. empty: result_sr = temp_cond_pandas. iloc[ : , 0 ] . head( 1 ) loc_tuple = temp_cond_pandas. loc[ : , [ 'luw' , 'luh' ] ] . values[ 0 ] else : if len ( loc_dict[ 'direct' ] ) >= 8 : w_min, w_max, h_min, h_max = loc_dict[ 'direct' ] [ 4 ] , loc_dict[ 'direct' ] [ 5 ] , loc_dict[ 'direct' ] [ 6 ] , loc_dict[ 'direct' ] [ 7 ] content_result = df[ 'content' ] . str . contains( reg) temp_df = df. loc[ content_result, : ] cond_loc = ( w_min <= temp_df. loc[ : , 'center_w' ] ) & ( temp_df. loc[ : , 'center_w' ] <= w_max) & \( h_min <= temp_df. loc[ : , 'center_h' ] ) & ( temp_df. loc[ : , 'center_h' ] <= h_max) cond_result = content_result & cond_loctemp_cond_pandas = temp_df. loc[ cond_result, : ] result_sr = temp_cond_pandas. iloc[ : , 0 ] loc_tuple = temp_cond_pandas. loc[ : , [ 'luw' , 'luh' ] ] . values[ 0 ] result_list = result_sr. to_list( ) order_dict[ range_title] = [ pd. Series( result_list, name= range_title) , loc_tuple] return order_dict
def Loc_tele_content ( df, known_dict, cond_list, order_dict) : range_title = cond_list[ 0 ] loc_method = cond_list[ 1 ] reg_type = cond_list[ 2 ] reg = cond_list[ 3 ] count_limit = cond_list[ 4 ] known_sr = pd. Series( name = range_title) result_sr = pd. Series( name = range_title) err_info = '' if range_title in known_dict: known_sr = pd. Series( data= known_dict[ range_title] , name = range_title) if reg_type == 'extract' : temp_cond_pandas = pd. DataFrame( ) for _, r in enumerate ( reg) : temp_df = df[ 'content' ] . str . extract( r) cond_result = temp_df. iloc[ : , 0 ] . str . len ( ) > 0 temp_cond_pandas = temp_df. loc[ temp_df. iloc[ : , 0 ] . str . len ( ) > 0 , : ] if len ( temp_cond_pandas) > 0 : break if len ( temp_cond_pandas) > 0 : if count_limit == '1' : result_sr = temp_cond_pandas. iloc[ : , 0 ] . head( 1 ) elif count_limit == '-1' : if len ( temp_cond_pandas) == 1 : result_sr = temp_cond_pandas. iloc[ : , 0 ] . head( 1 ) else : result_sr = temp_cond_pandas. iloc[ : , 0 ] . tail( - 1 ) else : result_sr = temp_df. loc[ cond_result, 0 ] result_sr = result_sr. replace( to_replace= '^\s|\s$' , value= '' , regex= True ) if range_title == '13合计税前' : if len ( known_sr) > 0 : if len ( result_sr) == 0 : result_sr = known_srelse : result_value = result_sr. values[ 0 ] known_value = known_sr. values[ 0 ] if result_value == known_value: result_sr = known_sr. copy( ) elif range_title == '15总额' : if '13合计税前' in known_dict: known_sr = pd. Series( data= known_dict[ '13合计税前' ] , name = range_title) if len ( known_sr) > 0 : if len ( result_sr) > 0 : if result_sr. values[ 0 ] == known_sr. values[ 0 ] : result_sr = known_sr. copy( ) elif range_title == '06品名' : target_sr = result_sr. str . extractall( '([\u4e00-\u9fa5]+\s+[\u4e00-\u9fa5]+)' ) if len ( target_sr) > 0 : target_sr. index = list ( range ( len ( target_sr) ) ) replace_sr = target_sr. replace( '\s+' , '' , regex= True ) new_sr = result_sr. copy( ) for i in enumerate ( target_sr. index) : new_sr= new_sr. replace( target_sr. iloc[ i] , replace_sr. iloc[ i] , regex= True ) result_sr = new_sr. copy( ) data = result_sr. iloc[ 0 ] if data. count( ' ' ) > 0 : result_sr = pd. Series( data = data. split( ' ' ) , name= range_title) else : if len ( result_sr) == 0 and len ( known_sr) > 0 : result_sr = known_sr. copy( ) result_sr. name = range_titleresult_sr. index = list ( range ( len ( result_sr) ) ) order_dict[ range_title] = [ result_sr] return order_dict, err_infodef Get_known_from_from_xls_image ( origin_pdf_xls_path, paddle_ocr) : xls_file_path = origin_pdf_xls_pathpth_split = os. path. split( xls_file_path) pr = pth_split[ 0 ] nm = pth_split[ 1 ] nm_split = os. path. splitext( nm) fr = nm_split[ 0 ] ex = nm_split[ 1 ] unzip_path = os. path. join( pr, fr) sub_img_path = os. path. join( unzip_path, "xl\\media" ) result_title= [ 'content' ] result_df = pd. DataFrame( columns = result_title) known_dict = { } draw_result_out = True wb = load_workbook( xls_file_path) ws = wb[ 'Table 1' ] if not os. path. exists( unzip_path) : os. mkdir( unzip_path) if draw_result_out == True : draw_result_folder = os. path. join( unzip_path, 'draw_result' ) if not os. path. exists( draw_result_folder) : os. mkdir( draw_result_folder) with ZipFile( xls_file_path) as f: for file in f. namelist( ) : tempimg_path = '' if file . startswith( "xl/media" ) : f. extract( file , path= unzip_path) temp_img_name = os. path. split( file ) [ 1 ] temp_img_fr = os. path. splitext( temp_img_name) [ 0 ] ext = os. path. splitext( temp_img_name) [ 1 ] . lower( ) tempimg_path = os. path. join( unzip_path, file ) img = cv_imread( tempimg_path) ( h, w, _) = img. shapeif 80 <= max ( h, w) <= 200 and h == w: codedata = pyzbar. decode( img) if len ( codedata) > 0 : data_str = codedata[ 0 ] . data. decode( ) if len ( data_str) > 20 : data_list = data_str. split( ',' ) if len ( data_list) > 4 : known_dict[ '01票号' ] = data_list[ 3 ] , known_dict[ '02代码' ] = data_list[ 2 ] , known_dict[ '03日期' ] = data_list[ 5 ] , known_dict[ '13合计税前' ] = data_list[ 4 ] if h < 50 : enlarge = 4 img_new = new( img, enlarge) edge = 20 color = ( 255 , 255 , 255 ) img_large = cv2. copyMakeBorder( img_new, edge, edge, edge, edge, cv2. BORDER_CONSTANT, value= color) enlarge_img_folder = os. path. join( unzip_path, 'img_enlarge' ) if not os. path. exists( enlarge_img_folder) : os. mkdir( enlarge_img_folder) enlarge_img_path = os. path. join( enlarge_img_folder, 'enlarge_' + temp_img_name) cv2. imencode( ".jpg" , img_large) [ 1 ] . tofile( enlarge_img_path) result = paddle_ocr. ocr( img_large, cls= True ) if len ( result) > 0 : df = pd. DataFrame( data= [ result[ i] [ 1 ] [ 0 ] for i in range ( len ( result) ) ] , columns = result_title) result_df = Collect_df( result_df, df) if draw_result_out == True : from PIL import Imageimage = Image. open ( enlarge_img_path) . convert( 'RGB' ) boxes = [ line[ 0 ] for line in result] txts = [ line[ 1 ] [ 0 ] for line in result] scores = [ line[ 1 ] [ 1 ] for line in result] im_show = draw_ocr( image, boxes, txts, scores, font_path= './fonts/simfang.ttf' ) im_show = Image. fromarray( im_show) draw_result_name = 'draw_' + temp_img_namedraw_result_path = os. path. join( draw_result_folder, draw_result_name) im_show. save( draw_result_path) temp_df = result_df. loc[ : , 'content' ] . str . extract( '[¥¥]([.0-9]+)' ) temp_df. columns= [ 'content' ] amount_df = temp_df. loc[ temp_df[ 'content' ] . notna( ) , : ] if len ( amount_df) >= 3 : sqhj = float ( known_dict[ '13合计税前' ] ) amount_df = amount_df. astype( float ) if sqhj > 1 : values = amount_df. loc[ amount_df[ 'content' ] != sqhj, 'content' ] . valuesknown_dict[ '15总额' ] = max ( values) known_dict[ '14合计税额' ] = min ( values) temp_df = result_df. loc[ : , 'content' ] . str . extract( '^(91\S{16})$' ) temp_df. columns= [ 'content' ] tax_numbers_df = temp_df. loc[ temp_df[ 'content' ] . notna( ) , : ] if len ( tax_numbers_df) > 0 : our_number = '你公司的税号' known_dict[ '05购方税号' ] = our_numbervalues = tax_numbers_df. loc[ tax_numbers_df[ 'content' ] != our_number, 'content' ] . valuesif len ( values) > 0 : known_dict[ '18销方税号' ] = values[ 0 ] img_ocr_result_folder = os. path. join( unzip_path, 'result' ) if not os. path. exists( img_ocr_result_folder) : os. mkdir( img_ocr_result_folder) img_ocr_result_name = temp_img_fr + '.xlsx' img_ocr_result_path = os. path. join( img_ocr_result_folder, img_ocr_result_name) result_df. to_excel( img_ocr_result_path) return known_dict
def Pdf_tans_to ( file_path, pdf_trans_to_file_path, trans_type = '.xlsx' , temp_pdf_trans_excel_out = True ) : import winerrorfrom win32com. client. dynamic import ERRORS_BAD_CONTEXT, DispatchERRORS_BAD_CONTEXT. append( winerror. E_NOTIMPL) output_folder_path = os. path. split( pdf_trans_to_file_path) [ 0 ] if not os. path. exists( output_folder_path) : Create_clear_dir( output_folder_path) if trans_type == '.xlsx' : trans_engion = 'com.adobe.acrobat.xlsx' elif trans_type == '.txt' : trans_engion = 'com.adobe.acrobat.plain-text' else : trans_engion = 'com.adobe.acrobat.plain-text' try : AvDoc = Dispatch( "AcroExch.AVDoc" ) if AvDoc. Open( file_path, "" ) : pdDoc = AvDoc. GetPDDoc( ) jsObject = pdDoc. GetJSObject( ) jsObject. SaveAs( pdf_trans_to_file_path, trans_engion) except Exception as e: print ( str ( e) ) finally : AvDoc. Close( True ) jsObject = None pdDoc = None AvDoc = None if os. path. exists( pdf_trans_to_file_path) : return pdf_trans_to_file_pathelse : return None def Pdf_tans_jpg ( file_path, pdf_trans_jpg_file_path, temp_pdf_trans_jpg_out = True ) : output_folder_path = os. path. split( pdf_trans_jpg_file_path) [ 0 ] if not os. path. exists( output_folder_path) : Create_clear_dir( output_folder_path) doc = fitz. open ( file_path) pdf_name = os. path. splitext( file_path) [ 0 ] for pg in range ( doc. pageCount) : page = doc[ pg] rotate = int ( 0 ) zoom_x = 2.0 zoom_y = 2.0 trans = fitz. Matrix( zoom_x, zoom_y) . preRotate( rotate) pm = page. getPixmap( matrix= trans, alpha= False ) pm. writePNG( pdf_trans_jpg_file_path) if os. path. exists( pdf_trans_jpg_file_path) : return pdf_trans_jpg_file_pathelse : return None def pil_enhance ( img) : img = ImageEnhance. Brightness( img) . enhance( 1.0 ) img = ImageEnhance. Sharpness( img) . enhance( 1.5 ) img = ImageEnhance. Contrast( img) . enhance( 2.0 ) img_result = img. convert( 'L' ) return img_resultdef new ( img, enlarge) : img_new = np. zeros( ( img. shape[ 0 ] * enlarge, img. shape[ 1 ] * enlarge, img. shape[ 2 ] ) ) for i in range ( img. shape[ 0 ] ) : for j in range ( img. shape[ 1 ] ) : for m in range ( 4 ) : for n in range ( 4 ) : img_new[ 4 * i + m] [ 4 * j + n] = img[ i] [ j] return img_newdef Pil_make_border ( image, edge = 20 ) : iw, ih = image. size w, h = iw + edge, ih + edge target_size = ( w, h) nw = iwnh = ihimage = image. resize( ( nw, nh) , Image. BICUBIC) color= ( 255 , 255 , 255 ) new_image = Image. new( 'RGB' , target_size, color) new_image. paste( image, ( ( w - nw) // 2 , ( h - nh) // 2 ) ) return new_image
3.4 发票字段定位函数
def Loc_range_content_pandas ( ocr_engines, df, result_series_orderdic, err_info, known_dict, file_path, temp_folder_path, enhance= False , engine_switch= 0 ) : user_name, user_code = '你的公司名称' , '你公司的税号' df[ 'content' ] . astype( str ) df[ 'center_w' ] = ( df. loc[ : , 'luw' ] + df. loc[ : , 'rdw' ] ) / 2 df[ 'center_h' ] = ( df. loc[ : , 'luh' ] + df. loc[ : , 'rdh' ] ) / 2 w_ratio = 1 h_ratio = 1 w_this_loc_tradtitle = 240 h_this_loc_tradtitle = 1170 min_w_zero_distance, max_w_zero_distance , min_h_zero_distance, max_h_zero_distance \= 521 , 1550 , - 33 , 98 min_w_zero = w_this_loc_tradtitle + w_ratio * min_w_zero_distancemax_w_zero = w_this_loc_tradtitle + w_ratio * max_w_zero_distancemin_h_zero = h_this_loc_tradtitle + h_ratio * min_h_zero_distancemax_h_zero = h_this_loc_tradtitle + h_ratio * max_h_zero_distanceloc_trad_range = [ min_w_zero, max_w_zero, min_h_zero, max_h_zero] cond_trad = [ '16大写' , 'direct' , 'contains' , '[圆角分整零壹贰叁肆伍陆柒捌玖拾佰仟万亿]{2,}' , '1' , { 'direct' : loc_trad_range} ] known_dict = Loc_jpg_content( df, cond_trad, order_dict= known_dict) if len ( known_dict[ '16大写' ] [ 1 ] ) > 0 : ( w_zero, h_zero) = known_dict[ '16大写' ] [ 1 ] else : err_info = err_info + '识别失败!未找到大写金额内容。' w_zero = 750 h_zero = 1180 range_list = [ [ '01票号' , [ 'known' , 'crop' ] , 'extract' , '^\D*(\d{8})$' , '1' , { 'crop' : [ int ( w_zero + w_ratio * ( 1430 ) ) , int ( w_zero + w_ratio * ( 1685 ) ) , int ( h_zero + h_ratio * ( - 990 ) ) , int ( h_zero + h_ratio * ( - 900 ) ) ] , 'known' : known_dict} ] , [ '02代码' , [ 'known' , 'crop' ] , 'extract' , '([a-zA-Z0-9]{10})$' , '1' , { 'crop' : [ int ( w_zero + w_ratio * ( - 475 ) ) , int ( w_zero + w_ratio * ( 80 ) ) , int ( h_zero + h_ratio * ( - 1100 ) ) , int ( h_zero + h_ratio * ( - 920 ) ) ] } ] , [ '03日期' , [ 'known' , 'crop' ] , 'extract' , '(\d{4}\s*年\s*\d{2}\s*月\s*\d{2}\s*日)$' , '1' , { 'direct' : [ int ( w_zero + w_ratio * ( 1100 ) ) , int ( w_zero + w_ratio * ( 1637 ) ) , int ( h_zero + h_ratio * ( - 925 ) ) , int ( h_zero + h_ratio * ( - 840 ) ) ] , 'crop' : [ int ( w_zero + w_ratio * ( 1300 ) ) , int ( w_zero + w_ratio * ( 1637 ) ) , int ( h_zero + h_ratio * ( - 925 ) ) , int ( h_zero + h_ratio * ( - 840 ) ) ] , } ] , [ '04购方' , [ 'crop' ] , 'extract' , '([\(\)()\u4e00-\u9fa5]{8,30})' , '1' , { 'crop' : [ int ( w_zero + w_ratio * ( - 320 ) ) , int ( w_zero + w_ratio * ( 600 ) ) , int ( h_zero + h_ratio * ( - 800 ) ) , int ( h_zero + h_ratio * ( - 680 ) ) ] , } ] , [ '05购方税号' , [ 'direct' ] , 'extract' , '([a-zA-Z0-9]{18})$' , '1' , { 'direct' : [ int ( w_zero + w_ratio * ( - 240 ) ) , int ( w_zero + w_ratio * ( 540 ) ) , int ( h_zero + h_ratio * ( - 800 ) ) , int ( h_zero + h_ratio * ( - 680 ) ) ] , 'crop' : [ int ( w_zero + w_ratio * ( - 320 ) ) , int ( w_zero + w_ratio * ( 600 ) ) , int ( h_zero + h_ratio * ( - 800 ) ) , int ( h_zero + h_ratio * ( - 680 ) ) ] , } ] , [ '06品名' , [ 'crop' ] , 'contains' , '^[\*冰水米\+]?(\S*[制品]\S*[\*冰水米\+]?\S+)$' , 'n' , { 'crop' : [ int ( w_zero + w_ratio * ( - 670 ) ) , int ( w_zero + w_ratio * ( 640 ) ) , int ( h_zero + h_ratio * ( - 560 ) ) , int ( h_zero + h_ratio * ( - 100 ) ) ] , 'center_limit' : [ 10 , 500 , 10 , 450 ] , } ] , [ '07单位' , [ 'crop' ] , 'contains' , '^\D{1,8}$' , 'n' , { 'crop' : [ int ( w_zero + w_ratio * ( - 670 ) ) , int ( w_zero + w_ratio * ( 640 ) ) , int ( h_zero + h_ratio * ( - 560 ) ) , int ( h_zero + h_ratio * ( - 100 ) ) ] , 'center_limit' : [ 820 , 1100 , 10 , 450 ] } ] , [ '08数量' , [ 'crop' ] , 'contains' , '^\d+$|^\d+\.\d+$' , 'n' , { 'crop' : [ int ( w_zero + w_ratio * ( 440 ) ) , int ( w_zero + w_ratio * ( 640 ) ) , int ( h_zero + h_ratio * ( - 510 ) ) , int ( h_zero + h_ratio * ( - 100 ) ) ] , } ] , [ '09单价' , [ 'crop' ] , 'contains' , '^[\.::]?\d+[\.::]?\s*\d*\s*$' , 'n' , { 'crop' : [ int ( w_zero + w_ratio * ( 635 ) ) , int ( w_zero + w_ratio * ( 890 ) ) , int ( h_zero + h_ratio * ( - 510 ) ) , int ( h_zero + h_ratio * ( - 100 ) ) ] , } ] , [ '10税前' , [ 'crop' ] , 'contains' , '^\s*[+-]?(?:\d+|\d{1,3}(?:,\d{3})*)[\.::]\s*\d{2}\s*$' , 'n' , { 'crop' : [ int ( w_zero + w_ratio * ( 980 ) ) , int ( w_zero + w_ratio * ( 1240 ) ) , int ( h_zero + h_ratio * ( - 510 ) ) , int ( h_zero + h_ratio * ( - 100 ) ) ] , } ] , [ '11税率' , [ 'crop' ] , 'contains' , '^\d{1,2}\s*%$' , '1' , { 'crop' : [ int ( w_zero + w_ratio * ( 1240 ) ) , int ( w_zero + w_ratio * ( 1350 ) ) , int ( h_zero + h_ratio * ( - 510 ) ) , int ( h_zero + h_ratio * ( - 100 ) ) ] , } ] , [ '12税额' , [ 'crop' ] , 'contains' , '^\s*[+-]?(?:\d+|\d{1,3}(?:,\d{3}))[\.::]?\s*\d{0,2}\s*\D*' , 'n' , { 'crop' : [ int ( w_zero + w_ratio * ( 1380 ) ) , int ( w_zero + w_ratio * ( 1700 ) ) , int ( h_zero + h_ratio * ( - 510 ) ) , int ( h_zero + h_ratio * ( - 100 ) ) ] , } ] , [ '13合计税前' , [ 'known' , 'crop' ] , 'contains' , '[¥¥]?s*[+-]?(?:\d+|\d{1,3}(?:,\d{3})*)[\.::]\s*\d{2}\s*$' , '1' , { 'crop' : [ int ( w_zero + w_ratio * ( 880 ) ) , int ( w_zero + w_ratio * ( 1235 ) ) , int ( h_zero + h_ratio * ( - 100 ) ) , int ( h_zero + h_ratio * ( - 10 ) ) ] , 'known' : known_dict} ] , [ '14合计税额' , [ 'crop' ] , 'contains' , '[¥¥]?s*[+-]?(?:\d+|\d{1,3}(?:,\d{3})*)[\.::]?\s*\d{0,2}\s*$' , '1' , { 'crop' : [ int ( w_zero + w_ratio * ( 1300 ) ) , int ( w_zero + w_ratio * ( 1710 ) ) , int ( h_zero + h_ratio * ( - 110 ) ) , int ( h_zero + h_ratio * ( 0 ) ) ] , } ] , [ '15总额' , [ 'crop' ] , 'contains' , '[¥¥]?s*[+-]?(?:\d+|\d{1,3}(?:,\d{3})*)[\.::]\s*\d{2}\s*$' , '1' , { 'crop' : [ int ( w_zero + w_ratio * ( 1220 ) ) , int ( w_zero + w_ratio * ( 1700 ) ) , int ( h_zero + h_ratio * ( - 20 ) ) , int ( h_zero + h_ratio * ( 70 ) ) ] , } ] , [ '16大写' , [ 'known' ] , known_dict] , [ '17销方' , [ 'crop' ] , 'extract' , '([\(\)()\u4e00-\u9fa5]{8,30}[办|处|公|司|厂|社|部])$' , '1' , { 'crop' : [ int ( w_zero + w_ratio * ( - 280 ) ) , int ( w_zero + w_ratio * ( 540 ) ) , int ( h_zero + h_ratio * ( 60 ) ) , int ( h_zero + h_ratio * ( 165 ) ) ] , } ] , [ '18销方税号' , [ 'direct' ] , 'extract' , '([a-zA-Z0-9]{18})$' , '1' , { 'direct' : [ int ( w_zero + w_ratio * ( - 260 ) ) , int ( w_zero + w_ratio * ( 600 ) ) , int ( h_zero + h_ratio * ( 100 ) ) , int ( h_zero + h_ratio * ( 220 ) ) ] , 'crop' : [ int ( w_zero + w_ratio * ( - 320 ) ) , int ( w_zero + w_ratio * ( 600 ) ) , int ( h_zero + h_ratio * ( 100 ) ) , int ( h_zero + h_ratio * ( 220 ) ) ] , } ] ] img_inv = cv_imread( file_path) err_info = '' set_h_adjust = 0 for i, cond_list in enumerate ( range_list) : range_title = cond_list[ 0 ] loc_method = cond_list[ 1 ] result_series_orderdic[ range_title] = pd. Series( ) if 'known' in loc_method: if range_title in known_dict: known = True result_series_orderdic[ range_title] = known_dict[ range_title] if len ( result_series_orderdic[ range_title] ) > 0 : continue if 'crop' in loc_method: crop_folder_name = 'crop' crop_folder_path = os. path. join( temp_folder_path, crop_folder_name) if not os. path. exists( crop_folder_path) : Create_clear_dir( crop_folder_path) result_series_orderdic, get_h_adjust = Crop_ocr( ocr_engines, result_series_orderdic, known_dict, img_inv, file_path, crop_folder_path, set_h_adjust, cond_list, enhance, engine_switch = engine_switch) if range_title == '01票号' : if get_h_adjust > 5 : set_h_adjust = get_h_adjustif len ( result_series_orderdic[ range_title] ) > 0 : continue if 'direct' in loc_method: result_series_orderdic = Loc_jpg_content( df, cond_list, order_dict= result_series_orderdic) return result_series_orderdic, err_info
def Tele_inv_ocr ( ocr_engines, result_series_orderdic, inv_dict, file_path, excel_file_path, err_info, engine_switch = 0 ) : df_org = pd. read_excel( excel_file_path, sheet_name= 0 , header= None , index_col= None , na_values= '' , keep_default_na= True , dtype= object ) df_org = df_org. fillna( '' ) df_org = df_org. astype( str ) '去多空格' df_org = df_org. replace( to_replace = '\\n|\s+' , value= ' ' , regex= True ) df_org = df_org. replace( to_replace = '^\s+' , value= '' , regex= True ) df_new = pd. DataFrame( data= '' , index = df_org. index, columns= [ 'content' ] ) for i in df_org. columns: df_new[ 'content' ] = df_new[ 'content' ] + '|' + df_org[ i] df_new = df_new. replace( to_replace = '\|+' , value= '|' , regex= True ) df_new = df_new. replace( to_replace = '^\||\|+$' , value= '' , regex= True ) fp_mark = False if len ( df_new. loc[ df_new[ 'content' ] . str . contains( '发票' ) , : ] ) > 0 : fp_mark = True if fp_mark == False : err_info = 'inv character not found.' return result_series_orderdic, err_info, inv_dictknown_dict = { } known_dict = Get_known_from_from_xls_image( excel_file_path, ocr_engines[ engine_switch] ) range_list = [ [ '01票号' , [ 'direct' ] , 'extract' , [ '发票号码[:|:]?\s*(\d+)' ] , '1' , ] , [ '02代码' , [ 'direct' ] , 'extract' , [ '发票代码[:|:]?\s*(\d+)' ] , '1' , ] , [ '03日期' , [ 'direct' ] , 'extract' , [ '(\d{4}\s*年\s*\d{2}\s*月\s*\d{2}\s*日)' ] , '1' , ] , [ '04购方' , [ 'direct' ] , 'extract' , [ '^购买方信息\|名称:(.+?) 统一社会信用代码/纳税人识别号:' , '名\s*称:\s*(.+?)\s*纳税人识别号' ] , '1' ] , [ '05购方税号' , [ 'direct' ] , 'extract' , [ '购买[\D]+纳税人识别号:[\|\s]*([0-9A-Z]{18?})' , '纳税人识别号:([a-zA-Z0-9]{18})' , ] , '1' ] , [ '06品名' , [ 'direct' ] , 'extract' , [ '^项目名称\s*(.+)合\s*计\|' , '^项目名称\s*(.+)合|' , ] , '1' ] , [ '07单位' , [ 'direct' ] , 'extract' , [ '^([\u4e00-\u9fa5]+)[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+$' , '\|单\s*([\u4e00-\u9fa5]+)\|位\|' , '\|?单\s*\|?\s*价\s*\|?\s*([\u4e00-\u9fa5]{1,3})\s*[.0-9]+' , '\|?单[\s\|]*价[\|\s]*([\u4e00-\u9fa5]{1,3})\s*[.0-9]+' , '\|?单[\s\|]*位[\|\s]*([\u4e00-\u9fa5]{1,3})[\|\s]*数[\|\s]*量[\|\s]*[.0-9]+[\|\s]*单[\|\s]*价[\|\s]*[.0-9]+' , ] , 'n' ] , [ '08数量' , [ 'direct' ] , 'extract' , [ '^[\u4e00-\u9fa5]+[\|\s]*([.0-9]+)[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+$' , '量\s*([.0-9]+)\s*\|单' , '\|?单[\s\|]*价[\|\s]*[\u4e00-\u9fa5]{1,3}\s*([.0-9]+)' , '量[\s\|]*单[\s\|]*价[\|\s]*([.0-9]+)\s+[.0-9]+' , '([.0-9]+)[\s\|]+[.0-9]+[\s\|]+[.0-9]+[\s\|]+[.0-9]+[\s\|]+[.0-9]+' ] , 'n' ] , [ '09单价' , [ 'direct' ] , 'extract' , [ '^[\u4e00-\u9fa5]+[\|\s]*[.0-9]+[\|\s]*([.0-9]+)[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+$' , '价\s*([.0-9]+)\s*\|金' , '\|?单[\s\|]*价[\|\s]*[\u4e00-\u9fa5]{1,3}\s*[.0-9]+[\|\s]+([.0-9]+)' , '量[\s\|]*单[\s\|]*价[\|\s]*[.0-9]+\s+([.0-9]+)' , '[.0-9]+[\s\|]+([.0-9]+)[\s\|]+[.0-9]+[\s\|]+[.0-9]+[\s\|]+[.0-9]+' ] , 'n' ] , [ '10税前' , [ 'direct' ] , 'extract' , [ '^[\u4e00-\u9fa5]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*([.0-9]+)[\|\s]*[.0-9]+[\|\s]*[.0-9]+$' , '[率|\|]\s*([.0-9]+)\s+[0-9]{1,2}%[\||税]' , '金\s*额\s*([.0-9]+)[\|\s]*税率\s*[.0-9]+%[\|\s]*税\s*额' , '[.0-9]+[\s\|]+[.0-9]+[\s\|]+([.0-9]+)[\s\|]+[.0-9]+[\s\|]+[.0-9]+' ] , 'n' ] , [ '11税率' , [ 'direct' ] , 'extract' , [ '^[\u4e00-\u9fa5]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*([.0-9]+)[\|\s]*[.0-9]+$' , '[率|\|]\s*[.0-9]+\s+([0-9]{1,2}%)[\||税]' , '金\s*额\s*[.0-9]+[\|\s]*税\s*率\s*([.0-9]+%)[\|\s]*税\s*额' , '[.0-9]+[\s\|]+[.0-9]+[\s\|]+[.0-9]+[\s\|]+([.0-9]+)[\s\|]+[.0-9]+' ] , '1' ] , [ '12税额' , [ 'direct' ] , 'extract' , [ '^[\u4e00-\u9fa5]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*[.0-9]+[\|\s]*([.0-9]+)$' , '税\s*[\|]?\s*额\s*[\|]?\s*([.0-9]+)' , '[.0-9]+[\s\|]+[.0-9]+[\s\|]+[.0-9]+[\s\|]+[.0-9]+[\s\|]+([.0-9]+)' ] , 'n' ] , [ '13合计税前' , [ 'direct' ] , 'extract' , [ '[¥¥](-?\d+\.\d{0,2})[\|\s][¥¥]' , '^([.0-9]+)\|[.0-9]+$' ] , '1' ] , [ '14合计税额' , [ 'direct' ] , 'extract' , [ '[¥¥]-?\d+\.\d+[\|\s][¥¥](-?\d+\.\d+)' , '^[.0-9]+\|([.0-9]+)$' ] , '1' ] , [ '15总额' , [ 'direct' ] , 'extract' , [ '(小写)[¥¥](.+)' , '价税合计[\|\s]*[零壹贰叁肆伍陆柒捌玖拾佰仟亿角分圆整]{2,}[\|\s]*[¥¥]?([.0-9]+)$' ] , '1' ] , [ '16大写' , [ 'direct' ] , 'extract' , [ '^价税合计(大写)\|(.+)\|(小写)' , '价税合计[\|\s]*([零壹贰叁肆伍陆柒捌玖拾佰仟亿角分圆整]{2,})' ] , '1' ] , [ '17销方' , [ 'direct' ] , 'extract' , [ '销售方信息\|名称:(.+?) 统一社会信用代码' , '销售方\s*\|\s*名\s*称:\s*([\u4e00-\u9fa5]+)\s*纳税人识别号' ] , '1' ] , [ '18销方税号' , [ 'direct' ] , 'extract' , [ '销售[\D]+纳税人识别号:[\|\s]*([0-9A-Z]{18})' , '纳税人识别号:([a-zA-Z0-9]{18})' ] , '-1' ] ] result_series_orderdic = OrderedDict( ) for i, cond_list in enumerate ( range_list) : result_series_orderdic, err_info = Loc_tele_content( df_new, known_dict, cond_list, result_series_orderdic) return result_series_orderdic, err_info, inv_dict
3.6 识别记录相关函数
def Log_result_file ( result_pandas, result_file_path, result_sheet_name) : writer = pd. ExcelWriter( result_file_path, engine= 'openpyxl' , mode= 'a' , if_sheet_exists= 'replace' ) result_pandas. to_excel( writer, sheet_name= result_sheet_name, index= False ) writer. close( ) return True
def Add_hyperlink ( result_file_path, result_sheet_name) :
wb = load_workbook( result_file_path) wb. move_sheet( result_sheet_name, offset= - 1 ) ws = wb[ result_sheet_name] wb. _active_sheet_index = 0 rng = ws. iter_cols( min_row= 2 , max_row= ws. max_row, min_col= 19 , max_col= 20 ) for col in rng: for cell in col: txt = cell. valueif txt is None : continue if len ( txt) > 0 : if cell. column == 19 : pr, nm, fr, ex = pathsplit( txt) cell. hyperlink = '..\\' + nmcell. font = Font( color= colors. Color( index = 4 ) , italic= True ) else : cell. font = Font( color= colors. Color( index = 2 ) , italic= False ) wb. save( result_file_path) ws = None wb = None return True
def Collect_df ( collect_df, item_df) : if len ( item_df) == 0 : return collect_dfif collect_df. empty: collect_df = item_dfelse : test_set = { 0 , 1 } collect_df_col_set = set ( collect_df. columns) item_df_col_set = set ( item_df. columns) if len ( collect_df_col_set - item_df_col_set) > 0 : temp_collect_df = collect_df. copy( ) temp_collect_title_list = temp_collect_df. columns. to_list( ) temp_collect_title_df = pd. DataFrame( data = temp_collect_title_list) . Ttemp_collect_df. columns = list ( range ( len ( temp_collect_df. columns) ) ) collect_df = pd. concat( [ temp_collect_title_df, temp_collect_df] , ignore_index = True , axis = 0 ) temp_item_df = item_df. copy( ) temp_item_title_list = temp_item_df. columns. to_list( ) temp_item_title_df = pd. DataFrame( data = temp_item_title_list) . Ttemp_item_df. columns = list ( range ( len ( temp_item_df. columns) ) ) item_df = pd. concat( [ temp_item_title_df, temp_item_df] , ignore_index = True , axis = 0 ) collect_col_num = len ( temp_collect_title_list) item_df_col_num = len ( temp_item_title_list) max_col = max ( collect_col_num, item_df_col_num) collect_col_dif = max_col - collect_col_numitem_col_dif = max_col - item_df_col_numif collect_col_dif > 0 : for i in range ( collect_col_num, max_col + 1 ) : temp_collect_df[ i] = '' if item_col_dif > 0 : for i in range ( item_df_col_num, max_col + 1 ) : temp_item_df[ i] = '' collect_df = temp_collect_dfitem_df = temp_item_dfcollect_df = pd. concat( [ collect_df, item_df] , ignore_index = True , axis = 0 ) collect_df = reset_nature_index( collect_df) return collect_df
def Log_df_to_file ( df, save_path, sheet_name, keep_exists = True ) : writer = pd. ExcelWriter( save_path, engine= 'openpyxl' , mode= 'a' , if_sheet_exists= 'replace' ) pandas_write = pd. DataFrame( ) if not df. empty: if keep_exists == True : df_title = df. columns. to_list( ) df_non_title = dfdf_non_title. columns = list ( range ( len ( df_non_title. columns) ) ) pandas_write = pd. read_excel( save_path, sheet_name= sheet_name, index_col= 0 , header = 0 , keep_default_na= True , dtype= object ) pandas_write = Collect_df( pandas_write, df) else : pandas_write = dfif not pandas_write. empty: pandas_write. to_excel( writer, sheet_name= sheet_name) writer. close( ) return True def reset_nature_index ( df) : df. index = list ( range ( 1 , len ( df) + 1 ) ) return df
3.6 识别结果校验
def Check_result ( result_pandas) : if len ( result_pandas) == 0 : return result_pandasedit_pandas = result_pandas. copy( ) edit_pandas = edit_pandas. fillna( '' ) edit_pandas = edit_pandas. astype( str ) temp_title_list = edit_pandas. columns. tolist( ) edit_pandas[ 'err_info' ] = '' pandas_title_list = edit_pandas. columns. tolist( ) inv_title_list = pandas_title_list[ 0 : - 2 ] detail_title_list = [ '06品名' , '07单位' , '08数量' , '09单价' , '10税前' , '12税额' ] num_title_list = [ '08数量' , '09单价' , '10税前' , '11税率' , '12税额' , \'13合计税前' , '14合计税额' , '15总额' ] one_row_title_list = [ '01票号' , '02代码' , '03日期' , '04购方' , '05购方税号' , '13合计税前' , '14合计税额' , '15总额' , '16大写' , '17销方' , '18销方税号' ] one_row_title_list. sort( ) edit_pandas. loc[ : , num_title_list] = \edit_pandas. loc[ : , num_title_list] . replace( to_replace = '[¥¥%\s()\(\)\u4e00-\u9fa5]' , value= '' , regex= True ) edit_pandas. loc[ : , num_title_list] = \edit_pandas. loc[ : , num_title_list] . replace( to_replace = '[::]' , value= '.' , regex= True ) edit_pandas. loc[ : , '05购方税号' ] = \edit_pandas. loc[ : , '05购方税号' ] . replace( to_replace = '[::]' , value= '' , regex= True ) edit_pandas. loc[ : , '06品名' ] = \edit_pandas. loc[ : , '06品名' ] . replace( to_replace = '^[米水冰]|[\+\*#]' , value= ' ' , regex= True ) edit_pandas. loc[ : , '06品名' ] = \edit_pandas. loc[ : , '06品名' ] . replace( to_replace = '^\s' , value= '' , regex= True ) comp_dict = { 'A有限公司' : 'AA有限公司' , } edit_pandas = edit_pandas. replace( { '17销方' : comp_dict} ) replace_file = 'D:\\pyscripts\\发票修正.xlsx' if os. path. exists( replace_file) : replace_df = pd. read_excel( replace_file, sheet_name= 0 , header= 0 , keep_default_na= True , dtype= object ) if not replace_df. empty: replace_df = replace_df. fillna( '' ) edit_df_title_list = edit_pandas. columns. to_list( ) replace_df_title_list = replace_df. columns. to_list( ) for _, title in enumerate ( replace_df_title_list) : if title in edit_df_title_list: if not replace_df. loc[ replace_df[ title] != '' , : ] . empty: replace_title = title + '修正' if replace_title in replace_df_title_list: for _, row in enumerate ( replace_df[ [ title, replace_title] ] . iterrows( ) ) : str_origin = row[ 1 ] . values[ 0 ] str_replace = row[ 1 ] . values[ 1 ] edit_pandas[ title] = edit_pandas[ title] . replace( to_replace = str_origin, value= str_replace, regex= True ) row_start_index = edit_pandas. loc[ edit_pandas[ 'file_path' ] . str . len ( ) > 0 , 'file_path' ] . indexrow_start_list = row_start_index. to_list( ) temp_index = row_start_index - 1 temp_list = temp_index. to_list( ) row_end_list = temp_list[ 1 : ] row_pandas_last = edit_pandas. index[ - 1 ] row_end_list. append( row_pandas_last) rows_tuple = zip ( row_start_list, row_end_list) for i, ( row_start, row_end) in enumerate ( rows_tuple) : err_info = '' err_blank = '' err_code = '' err_product = '' err_num = '' this_inv_pandas = edit_pandas. iloc[ row_start: row_end+ 1 , : ] num_extract_reg = '((?:\d+|\d{0,3}(?:,\d{3})*)\.?\d{0,})\s*$' for _, num_title in enumerate ( num_title_list) : this_inv_pandas. loc[ : , num_title] = this_inv_pandas. loc[ : , num_title] . str . extract( num_extract_reg) this_inv_pandas. loc[ : , num_title_list] = this_inv_pandas. loc[ : , num_title_list] . replace( '^$' , '0' , regex= True ) this_inv_pandas. loc[ : , num_title_list] = this_inv_pandas. loc[ : , num_title_list] . astype( float ) if this_inv_pandas. loc[ : , '11税率' ] . values[ 0 ] > 1 : this_inv_pandas. loc[ : , '11税率' ] = this_inv_pandas. loc[ : , '11税率' ] / 100 num_sum_pretax_amount = round ( sum ( this_inv_pandas[ '10税前' ] . values) , 2 ) num_total_pretax_amount = this_inv_pandas[ '13合计税前' ] . values[ 0 ] num_total_tax = this_inv_pandas[ '14合计税额' ] . values[ 0 ] num_sum_detail_tax = round ( sum ( this_inv_pandas[ '12税额' ] . values) , 2 ) num_total_amount= this_inv_pandas[ '15总额' ] . values[ 0 ] sum_total = num_total_pretax_amount + num_total_taxtitle_blank_list = [ ] err_inv_list = [ ] for _, title in enumerate ( detail_title_list) : cond1 = this_inv_pandas. loc[ : , title] == '' cond2 = this_inv_pandas. loc[ : , title] == 0 cond = cond1 | cond2count_blank = len ( this_inv_pandas. loc[ cond, : ] ) if count_blank > 0 : title_blank_list. append( title) if title == '06品名' : cond = this_inv_pandas. loc[ : , title] . str . contains( '品[\u4e00-\u9fa5]' ) product_wrong_df = this_inv_pandas. loc[ cond, '06品名' ] count_product_err = len ( product_wrong_df) if count_product_err > 0 : err_product = err_product + 'Check product name:' + ',' . join( product_wrong_df. to_list( ) ) + '.' if '品名' not in err_blank: if len ( this_inv_pandas. loc[ ~ this_inv_pandas[ '06品名' ] . str . contains( '[\u4e00-\u9fa5]\s[\u4e00-\u9fa5]' ) , : ] ) > 0 : err_product = err_product + '品名格式不符“类品+空格+品名”.' for _, title in enumerate ( one_row_title_list) : if title == '发票号码' : temp_df = this_inv_pandas. loc[ this_inv_pandas[ 'file_path' ] != '' , '发票号码' ] temp_df[ '发票号长度' ] = temp_df[ '发票号' ] . apply ( lambda x: len ( x) ) temp_check_df = temp_df. loc[ ~ ( ( temp_df[ '发票号长度' ] == 8 ) | ( temp_df[ '发票号长度' ] == 20 ) ) , : ] if len ( temp_check_df) > 0 : err_inv_list. append( 'Inv number lenth illegal' ) temp_check_df= temp_df. loc[ temp_df[ '发票号' ] . str . contains( '\D' ) , : ] if len ( temp_df) > 0 : err_inv_list. append( 'Inv number character illegal' ) cond1 = this_inv_pandas. loc[ this_inv_pandas. index[ 0 ] , title] == '' cond2 = this_inv_pandas. loc[ this_inv_pandas. index[ 0 ] , title] == 0 cond = cond1 | cond2if cond == True : if title == '02代码' : if len ( this_inv_pandas. loc[ this_inv_pandas. index[ 0 ] , '01票号' ] ) == 20 : continue if title == '15总额' : txt = this_inv_pandas. loc[ this_inv_pandas. index[ 0 ] , '16大写' ] if not txt == '' : trad = txt. split( '|' ) [ 0 ] repl_dict = { '参' : '叁' , '柴' : '柒' , '什' : '仟' } trad = repl_by_dict( trad, repl_dict) money = trad_to_int( trad) if not money == trad: money = float ( money) if money > 0 : this_inv_pandas. loc[ this_inv_pandas. index[ 0 ] , title] = moneycontinue else : if num_total_pretax_amount > 0 and num_total_tax > 0 : this_inv_pandas. loc[ this_inv_pandas. index[ 0 ] , title] = sum_totalcontinue if title == '16大写' : continue title_blank_list. append( title) if '05购方税号' not in title_blank_list: if this_inv_pandas[ '05购方税号' ] . values[ 0 ] != user_code: err_code = '购方税号[' + this_inv_pandas[ '05购方税号' ] . values[ 0 ] + ']不是“' + user_code + '”。' if len ( title_blank_list) > 0 : title_blank_list. sort( ) err_blank = 'Null:[' + ',' . join( title_blank_list) + ']。' diff_pretax_amount = round ( num_total_pretax_amount - num_sum_pretax_amount, 2 ) if diff_pretax_amount != 0 : err_num = err_num + '税前之和≠合计税前[' + str ( num_total_pretax_amount) + ' - ' + str ( num_sum_pretax_amount) + ' = ' + str ( diff_pretax_amount) + ']。' sum_total_pretax_tax = round ( num_total_pretax_amount + num_total_tax, 2 ) diff_total = round ( num_total_amount - sum_total_pretax_tax, 2 ) diff_tax = round ( num_total_tax - num_sum_detail_tax, 2 ) if diff_total != 0 : err_num = err_num + '税前合计与税额合计之和≠发票总额[' + str ( sum_total_pretax_tax) + '≠' + str ( num_total_amount) + ']。' if diff_tax != 0 : err_num = err_num + '明细税额之和≠14合计税额:[' + str ( num_sum_detail_tax) + ' ≠ ' + str ( num_total_tax) + ']。' quantity_price_df = this_inv_pandas. loc[ : , [ '06品名' , '08数量' , '09单价' , '10税前' ] ] quantity_price_df[ 'diff_quantity_price' ] = quantity_price_df[ '08数量' ] * quantity_price_df[ '09单价' ] - quantity_price_df[ '10税前' ] round_quantity_price_df = quantity_price_df. loc[ : , 'diff_quantity_price' ] . astype( float ) . round ( 2 ) quantity_price_df[ 'diff_quantity_price' ] = round_quantity_price_dfdiff_quantity_price_df = quantity_price_df. loc[ quantity_price_df[ 'diff_quantity_price' ] != 0 , : ] if not diff_quantity_price_df. empty: str_temp_quantity = ',' . join( diff_quantity_price_df[ '08数量' ] . astype( str ) . tolist( ) ) str_temp_price = ',' . join( diff_quantity_price_df[ '09单价' ] . astype( str ) . tolist( ) ) str_temp_pretax_amount = ',' . join( diff_quantity_price_df[ '10税前' ] . astype( str ) . tolist( ) ) str_temp_diff = ',' . join( diff_quantity_price_df[ 'diff_quantity_price' ] . astype( str ) . tolist( ) ) err_num = err_num + '量*价≠税前,差异明细:[' + str_temp_quantity + ']×[' + str_temp_price + ']-[' + str_temp_pretax_amount + ']=[' + str_temp_diff + ']。' err_inv = '票号格式错误[' + ',' . join( err_inv_list) + ']。' if len ( err_inv_list) > 0 else '' err_info = err_inv + err_blank + err_code + err_product + err_numerr_before = this_inv_pandas. loc[ : , 'err_info' ] . values[ 0 ] err_info = err_before + err_infothis_inv_pandas. loc[ this_inv_pandas. index[ 0 ] , 'err_info' ] = err_infoedit_pandas. iloc[ row_start: row_end + 1 , : ] = this_inv_pandas result_pandas = edit_pandas return result_pandas
3.7 文件预处理等其他函数
def cv2_pil ( img_cv) : pil_image = Image. fromarray( cv2. cvtColor( img_cv, cv2. COLOR_BGR2RGB) ) return pil_image
def cv_imread ( file_path) : cv_img = cv2. imdecode( np. fromfile( file_path, dtype= np. uint8) , cv2. IMREAD_COLOR) return cv_img
def trad_to_int ( money) : trad_dict = { '零' : 0 , '壹' : 1 , '贰' : 2 , '叁' : 3 , '肆' : 4 , '伍' : 5 , '陆' : 6 , '柒' : 7 , '捌' : 8 , '玖' : 9 , '拾' : 10 , '佰' : 100 , '仟' : 1000 , '万' : 10000 , '亿' : 100000000 , '角' : 0.1 , '分' : 0.01 } trad = re. search( r"[零壹贰叁肆伍陆柒捌玖拾佰仟亿角分]+" , money) if trad is not None : num = 0 add = 0 sum = 0 for i in money: if i in [ '零' , '壹' , '贰' , '叁' , '肆' , '伍' , '陆' , '柒' , '捌' , '玖' ] : add = trad_dict[ i] sum = sum + addelif i in [ '拾' , '佰' , '仟' , '亿' , '角' , '分' ] : num = add * trad_dict[ i] sum = sum - addsum = sum + numadd = numelif i == '万' or i == '亿' : sum = sum * trad_dict[ i] sum = str ( sum ) return sum else : return moneydef Fill_na_result ( result_df) : result_df. loc[ : , '03日期' ] = result_df. loc[ : , '03日期' ] . apply ( lambda x: delta_date( x) ) result_df. loc[ : , '11税率' ] = result_df. loc[ : , '11税率' ] . fillna( method= 'ffill' ) result_df. iloc[ : , 0 : 7 ] = result_df. iloc[ : , 0 : 7 ] . fillna( '' ) result_df. iloc[ : , 7 : 15 ] = result_df. iloc[ : , 7 : 15 ] . fillna( '0' ) result_df. iloc[ : , 15 : ] = result_df. iloc[ : , 15 : ] . fillna( '' ) result_df = result_df. fillna( '' ) return result_dfdef delta_date ( para) :
time = paraif isinstance ( para, int ) : time = pd. to_datetime( '1899-12-30' ) + pd. Timedelta( str ( int ( para) ) + 'days' ) time = time. strftime( "%Y-%m-%d" ) elif isinstance ( para, float ) : time = '' return time
def repl_by_dict ( my_str, repl_dict) : for ( k, v) in repl_dict. items( ) : my_str = my_str. replace( k, v) return my_str
def pathsplit ( f) - > tuple : parent = os. path. split( f) [ 0 ] fullname = os. path. split( f) [ 1 ] frontname = os. path. splitext( fullname) [ 0 ] extname = str . lower( os. path. splitext( f) [ 1 ] ) return ( parent, fullname, frontname, extname)
def Create_clear_dir ( folder_path) : if os. path. exists( folder_path) : for dirpath, dirnames, filenames in os. walk( folder_path) : for filepath in filenames: delFolderorFile( os. path. join( dirpath, filepath) ) else : os. mkdir( folder_path) if os. path. exists( folder_path) : return True else : return False
def delFolderorFile ( folder_path) : if not os. path. exists( folder_path) : return False if os. path. isfile( folder_path) : os. remove( folder_path) return for m in os. listdir( folder_path) : n = os. path. join( folder_path, m) if os. path. isdir( n) : delFolderorFile( n) else : os. unlink( n) os. rmdir( folder_path)
def cal_angle ( p1, p2) : """ px : (横坐标,纵坐标)""" angle= math. atan2( p2[ 1 ] - p1[ 1 ] , p2[ 0 ] - p1[ 0 ] ) return angle * ( 180 / math. pi)
3.8 main主函数
if __name__ == '__main__' : print ( '\n' , datetime. now( ) . strftime( "%H:%M:%S" ) , 'Program start running...\n' ) killexcel( ) user_name = '' user_code = '' reserve_template_before = True ocr_excel_out = True draw_result_out = True enhance = False acumulate_input = 'y' prepare_engine = 1 precise_engine = 1 root = Tk( ) print ( 'Please choose the images folder:' ) origin_folder_path = filedialog. askdirectory( ) if len ( origin_folder_path) > 0 : origin_folder_path = origin_folder_path. replace( '/' , '\\' ) print ( datetime. now( ) . strftime( "%H:%M:%S" ) , 'The images folder you chose:' , origin_folder_path) else : print ( datetime. now( ) . strftime( "%H:%M:%S" ) , 'No file chosen. \nQuit.' ) exit( ) root. destroy( ) result_folder_name = 'result' result_sheet_name = 'result' result_folder_path = os. path. join( origin_folder_path, result_folder_name) if not os. path. exists( result_folder_path) : Create_clear_dir( result_folder_path) result_file_name = 'result' + '.xlsx' result_file_path = os. path. join( result_folder_path, result_file_name) run_renew = True pr, nm, fr, ex = pathsplit( result_file_path) now = datetime. now( ) back_str = now. strftime( "%Y%m%d_%H%M%S" ) back_file_name = fr + '_' + back_str + exback_file_path = os. path. join( result_folder_path, back_file_name) origin_pandas = pd. DataFrame( ) t0 = datetime. now( ) if os. path. exists( result_file_path) : print ( datetime. now( ) . strftime( "%H:%M:%S" ) , f'Found previous result: { result_file_path} .' ) ocr_input = 'y' print ( '\nChoose please: \n"y" - run the orgnize engine. "n" - only check the result, do not run engine.\n' ) ocr_input = input ( 'Input(y/n):\n' ) origin_pandas = pd. DataFrame( ) if str . lower( ocr_input) == 'n' : shutil. copy( result_file_path, back_file_path) try : origin_pandas = pd. read_excel( result_file_path, sheet_name= result_sheet_name, header= 0 , keep_default_na= True , dtype= object ) except ValueError: origin_pandas = pd. read_excel( result_file_path, sheet_name= 0 , header= 0 , keep_default_na= True , dtype= object ) result_pandas = Check_result( origin_pandas) Log_result_file( result_pandas, result_file_path, result_sheet_name) Add_hyperlink( result_file_path, result_sheet_name) print ( '\n' ) print ( datetime. now( ) . strftime( "%H:%M:%S" ) , 'Done.《' , result_file_path, '》checked over.' ) exit( ) if ocr_input. lower( ) == 'y' : print ( '\nChoose run method: \n"y" - Run acumulated to the existed result. \n"n" - Run fresh and delete all existed results and template folders.\n' ) acumulate_input = input ( 'Input(y/n):\n' ) if acumulate_input. lower( ) == 'y' : reserve_template_before = 'y' shutil. copy( result_file_path, back_file_path) try : origin_pandas = pd. read_excel( result_file_path, sheet_name= result_sheet_name, header= 0 , keep_default_na= True , dtype= object ) except ValueError: origin_pandas = pd. read_excel( result_file_path, sheet_name= 0 , header= 0 , keep_default_na= True , dtype= object ) else : Create_clear_dir( result_folder_path) if not origin_pandas. empty: origin_pandas = Fill_na_result( origin_pandas) temp_folder_name = 'temp' temp_folder_path = os. path. join( origin_folder_path, temp_folder_name) if not ( reserve_template_before) or not ( os. path. exists( temp_folder_path) ) : Create_clear_dir( temp_folder_path) print ( datetime. now( ) . strftime( "%H:%M:%S" ) , 'Start the engine...' ) mobile_ocr = hub. Module( name= "chinese_ocr_db_crnn_mobile" ) paddle_ocr = PaddleOCR( enable_mkldnn= True , use_angle_cls= True , lang= 'ch' ) ocr_engines = [ mobile_ocr, paddle_ocr] print ( datetime. now( ) . strftime( "%H:%M:%S" ) , 'Engine start running...' ) result_pandas_orderdic = OrderedDict( ) duplicate_sheet_name = 'duplicate' duplicate_pandas = pd. DataFrame( ) try : duplicate_pandas = pd. read_excel( back_file_path, sheet_name= duplicate_sheet_name, header= 0 , keep_default_na= True , dtype= object ) except : pass if duplicate_pandas. empty: duplicate_pandas. to_excel( result_file_path, sheet_name= duplicate_sheet_name, index= False ) cnt_file = len ( { p. resolve( ) for p in Path( origin_folder_path) . glob( "*" ) if p. suffix in [ ".jpg" , ".pdf" ] } ) cnt_done_pre = 0 cnt_duplicate_pre = 0 if acumulate_input. lower( ) == 'y' : if not origin_pandas. empty: cnt_done_pre = len ( origin_pandas. loc[ origin_pandas[ 'file_path' ] . notnull( ) , : ] ) if not duplicate_pandas. empty: cnt_duplicate_pre = len ( duplicate_pandas. loc[ duplicate_pandas[ 'file_path' ] . notnull( ) , : ] ) inv_dict = { } walk_folder_args = { 'ocr_engines' : ocr_engines, 'temp_folder_path' : temp_folder_path, 'engine_switch' : prepare_engine} result_pandas, duplicate_pandas = walk_folder_ocr( origin_pandas, duplicate_pandas, origin_folder_path, ** walk_folder_args) print ( '\n' ) print ( datetime. now( ) . strftime( "%H:%M:%S" ) , 'Get the result.' ) cnt_done = 0 cnt_duplicate = 0 if not result_pandas. empty: cnt_done = len ( result_pandas. loc[ ( result_pandas[ 'file_path' ] != '' ) & ( result_pandas[ 'file_path' ] . notnull( ) ) , : ] ) - cnt_done_preif not duplicate_pandas. empty: cnt_duplicate = len ( duplicate_pandas. loc[ ( duplicate_pandas[ 'file_path' ] != '' ) & ( duplicate_pandas[ 'file_path' ] . notnull( ) ) , : ] ) - cnt_duplicate_preif not result_pandas. empty: print ( datetime. now( ) . strftime( "%H:%M:%S" ) , 'Checking result data...' ) result_pandas = Check_result( result_pandas) Log_result_file( result_pandas, result_file_path, result_sheet_name) print ( datetime. now( ) . strftime( "%H:%M:%S" ) , 'Result data check over.' ) Add_hyperlink( result_file_path, result_sheet_name) paddle_ocr = None server_ocr = None mobile_ocr = None ocr_engines = None print ( '\toriginal image path: ' + origin_folder_path) print ( '\toutput file path: ' + result_file_path) t1 = datetime. now( ) tx = t1 - t0 v = 0 try : v = round ( tx. total_seconds( ) / ( cnt_done + cnt_duplicate) , 2 ) except : pass print ( '\n' ) print ( t1, '\n Done. Time spent: ' , str ( tx) . split( '.' ) [ 0 ] , '. Files total: ' + str ( cnt_file) \+ '. Already done before start: ' + str ( cnt_done_pre) \+ '. Already find duplicate before start: ' + str ( cnt_duplicate_pre) \+ '. \n Files recognized this time total: ' + str ( cnt_done + cnt_duplicate) \+ ', valid: ' + str ( cnt_done) + ', duplicate: ' + str ( cnt_duplicate) \+ ', Everage: ' + str ( v) + ' s.\n' ) cnt_done_total = cnt_done_pre + cnt_donecnt_duplicate_total = cnt_duplicate_pre + cnt_duplicateif cnt_done_total != cnt_duplicate_total: print ( 'Warning: 有效发票数:' + str ( cnt_done_total) + ' 重复发票数:' + str ( cnt_duplicate_total) + ', 检查是否有发票号码错误。' )