python 验证代码
#!/usr/bin/python
# -*- coding: utf-8 -*-
import urllib.parse
import urllib
import urllib.request
import urllib.parse
import urllib.error
import os, sys, time, logging, json, shutil
from datetime import datetime
import configparser
from logging.handlers import TimedRotatingFileHandler
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
CHECK_FILE_PATH = u"G:/数据文件/数据上报/实名验证/待验数据/"
RCE_FILE_PATH = u"G:/数据文件/数据上报/实名验证/接收数据/"
SAVE_FILE_PATH = u"G:/数据文件/数据上报/实名验证/通过数据/"
LOG_DIR = u"G:/数据文件/数据上报/实名验证/log/"
appCode = "######"
url = "http://eid.shumaidata.com/eid/check"
headers = {'Authorization':'APPCODE ' + appCode}
logging.basicConfig(level=logging.ERROR,
format='%(asctime)s[%(levelname)s]: %(message)s')
logger = logging.getLogger(__name__)
# console_handler = logging.StreamHandler()
# logger.addHandler(console_handler)
# 创建一个按天滚动的日志处理器
file_handler = TimedRotatingFileHandler(
filename=os.path.join(LOG_DIR, 'app.log'), # 日志文件名
when='midnight', # 按天滚动
interval=1, # 每隔一天
backupCount=7 # 保留最近7天的日志文件
)
# 添加到 logger
logger.addHandler(file_handler)
err_info = {
"0": u"成功",
"400": u"参数错误",
"20010": u"身份证号为空或非法",
"20310": u"姓名为空或非法",
"404": u"请求资源不存在",
"500": u"服务器内部错误,请联系服务商",
"501": u"第三方服务异常",
"604": u"接口停用",
"1001": u"其他,以实际返回为准",
}
# 配置日志
logger = logging.getLogger(__name__)
# 取文件名
def preFilename(abs_file_name):
file_name = os.path.basename(abs_file_name)
my_file_name = file_name.split(".txt")
preName = my_file_name[0]
return preName
# 删除上报过的文件
def del_old_file(abs_file_name):
try:
os.remove(abs_file_name)
return True
except Exception as e:
logger.error(f"删除文件 {abs_file_name} 异常: {e}")
return False
# 读取ini文件数据
def readini(abs_file_name, section, item):
try:
record = configparser.ConfigParser()
record.read(abs_file_name)
data = record.get(section, item)
# 排除空串
if data.strip() == "":
logger.info(f"文件 {abs_file_name} 中的 section {section} 项 {item} 为空")
return False
json_data = json.loads(data, strict=False)
return json_data
except FileNotFoundError:
logger.error(f"文件 {abs_file_name} 不存在")
return False
except configparser.NoSectionError:
logger.error(f"在文件 {abs_file_name} 中找不到指定的 section: {section}")
return False
except configparser.NoOptionError:
logger.error(f"在文件 {abs_file_name} 中找不到指定的 option: {item}")
return False
except json.JSONDecodeError as jde:
logger.error(f"解析 JSON 数据时出错: {jde} 文件 {abs_file_name}, section {section}, item {item}")
return False
except Exception as e:
logger.error(f"读取 ini 数据文件异常: {e} 文件 {abs_file_name}, section {section}, item {item}")
return False
# 写入验证结果
def writeini(abs_file_name, section, data):
try:
# 使用ConfigParser处理INI文件
record = configparser.ConfigParser()
record.add_section(section)
for k, v in data.items():
record.set(section, k, v)
# 尝试写入文件
with open(abs_file_name, 'w', encoding='gbk') as configfile: # 添加编码设置
record.write(configfile)
return True
except FileNotFoundError:
logger.error(f"文件未找到:{abs_file_name}")
return False
except PermissionError:
logger.error(f"写入文件时权限不足:{abs_file_name}")
return False
except IOError as e: # 捕获I/O相关的异常
logger.error(f"写入ini数据文件异常:{e}")
return False
except Exception as e:
logger.error(f"未知错误:{e}")
return False
def clear_folder(path):
# 验证路径是否有效
if not os.path.isdir(path):
logging.error(f"Invalid directory path: {path}")
return
# 遍历所有文件和文件夹
for file_name in os.listdir(path):
file_path = os.path.join(path, file_name)
try:
# 如果是文件则删除
if os.path.isfile(file_path):
logging.info(f"Deleting file: {file_path}")
os.remove(file_path)
# 如果是目录则递归删除
# elif os.path.isdir(file_path):
# logging.info(f"Deleting directory: {file_path}")
# clear_folder(file_path)
# os.rmdir(file_path)
except Exception as e:
logging.error(f"Error while deleting {file_path}: {e}")
class JSFileEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_created(self, event):
if not event.is_directory:
time.sleep(0.01)
if u"待验数据" in event.src_path:
player_data = readini(event.src_path, u"身份信息", u"数据")
if player_data:
data = {
"idcard": player_data[u"身份证号"],
"name": player_data[u"姓名"],
"server": player_data[u"分区"],
"username": player_data[u"角色"],
}
fdata = urllib.parse.urlencode(data).encode(encoding='UTF8')
req = urllib.request.Request(url, headers=headers, data=fdata)
uid = preFilename(event.src_path)
try:
response = urllib.request.urlopen(req)
content = response.read()
rec_data = json.loads(content.decode('UTF-8'))
if rec_data:
if rec_data["code"] in err_info and rec_data["result"]:
chk_res = rec_data["result"]
chk_res["code"] = rec_data["code"]
chk_res["message"] = rec_data["message"]
chk_res["time"] = datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
writeini(RCE_FILE_PATH + os.path.basename(event.src_path), uid, chk_res)
if rec_data["code"] == "0" and chk_res["res"] == "1": # 身份证通过,保存方便后续其它区检查
writeini(os.path.join(SAVE_FILE_PATH, chk_res["idcard"] + ".txt"), u"通过数据", chk_res)
else:
logger.error(u"{}-{}-实名验证失败:{}".format(data["server"], data["username"], err_info.get(rec_data["code"])))
except urllib.error.HTTPError as e:
content = e.read().decode('UTF-8')
rec_data = json.loads(content)
if rec_data:
chk_res = {
"server": data["server"],
"username": data["username"],
"code": rec_data["code"],
"message": rec_data["message"],
"time": datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
}
src_file = os.path.join(RCE_FILE_PATH, os.path.basename(event.src_path))
writeini(src_file, uid, chk_res)
logger.error(u"{}-{}-实名验证返回异常:{}".format(data["server"], data["username"], content))
except urllib.error.URLError as e:
logger.error(u"{}-{}-网络请求失败: {}".format(data["server"], data["username"], e.reason))
except Exception as e:
logger.error(u"{}-{}-未知错误: {}".format(data["server"], data["username"], str(e)))
finally:
del_old_file(event.src_path)
if __name__ == '__main__':
# 测试日志记录
observer = Observer()
create_event_handler = JSFileEventHandler()
dirs = [CHECK_FILE_PATH, ]
for dir in dirs:
clear_folder(dir)
clear_folder(RCE_FILE_PATH)
observer.schedule(create_event_handler, dir, recursive=False)
try:
observer.start()
while True:
time.sleep(0.01)
except KeyboardInterrupt:
observer.stop()
finally:
observer.join()