前段时间有位客户问: 你们的程序能不能给我们生成个 txt 文件,把新增的员工都放进来,字段也不需要太多,就要 员工姓名/卡号/员工编号/员工职位/公司 这些字段就行了,然后我们的程序会去读取这个 txt 文件,拿里面的内容,读完之后会这个文件删掉
我: 可以接受延迟吗?可能没办法实时生成,写个脚本,用定时任务去跑还是可以实现的
客户: 可以呀,那个脚本 30mins 跑一次就行
这篇文章就记录一下大概实现思路,后续如果遇到类似需求,就可以直接 copy 代码了
实现思路:
- 定义全局变量 update_time
- 判断 update_time 是否为空
- 为空,说明是第一次查询数据,取查到的最后一条记录的 create_time ,赋值给 update_time
- 不为空,说明不是第一次查询数据,查询数据时, create_time > update_time ,同时更新 update_time
- 判断 txt 文件是否存在
- 存在,则 txt 表头不需要重新生成
- 不存在, txt 表头需要重新生成
其实逻辑很简单, Python 类库也很丰富,接下来就上代码:
import pymysql
import os
import argparse
import schedule
import datetime
update_time = ""
def handle_variables(server_ip,password):
global real_ip
real_ip = server_ip
global root_password
root_password = password
def get_person_info():
connection = pymysql.connect(host = real_ip,
user = 'root',
password = root_password,
db = 'xxx'
)
try:
global update_time
cur = connection.cursor()
if "" == update_time :
sql = "select `name`,`card_num`,`code`,`postion`,`company_id`,`gmt_create` from person"
else :
sql = "select `name`,`card_num`,`code`,`postion`,`company_id`,`gmt_create` from person where `gmt_create` > '" + update_time + "'"
cur.execute(sql)
query_person_info_list = cur.fetchall()
# get the time of the last record
if len(query_person_info_list) != 0:
temp_list = query_person_info_list[-1]
update_time = temp_list[-1]
if isinstance(update_time, datetime.datetime):
update_time = update_time.strftime('%Y-%m-%d %H:%M:%S')
# if the txt not exists, add new text
if not os.path.exists('add_person.txt') :
with open('add_person.txt', 'w') as f:
f.write('Lastname, Firstname, CardNumber, Employee ID, Designation, Department \n')
for person_info in query_person_info_list :
person_name = person_info[0]
# the name is generally first + last, which needs to be converted
person_name_list = person_name.split()
person_first_name = person_name_list[0]
start_length = len(person_first_name) + 1
person_last_name = person_name[start_length:]
f.write(str(person_last_name) + ', ' + str(person_first_name) + ', ')
card_number = person_info[1]
employee_id = person_info[2]
designation = person_info[3]
department = person_info[4]
if card_number is not None :
f.write(str(card_number) + ', ')
else :
f.write(', ')
if employee_id is not None :
f.write(str(employee_id) + ', ')
else :
f.write(', ')
if designation is not None :
f.write(str(designation) + ', ')
else :
f.write(', ')
if department is not None :
# get department name
department_sql = "select `name` from zone where id = " + str(department)
cur.execute(department_sql)
department_result = cur.fetchone()
department_name = department_result[0]
f.write(str(department_name))
else :
f.write(', ')
f.write('\n')
# if the txt exists, update the text
else :
# get original data
with open('add_person.txt', 'r') as e:
data = e.readlines()
with open('add_person.txt', 'w') as f:
# first save the original data
for write_data in data :
f.write(write_data)
# then save new data
for person_info in query_person_info_list :
person_name = person_info[0]
# the name is generally first + last, which needs to be converted
person_name_list = person_name.split()
person_first_name = person_name_list[0]
start_length = len(person_first_name) + 1
person_last_name = person_name[start_length:]
f.write(str(person_last_name) + ', ' + str(person_first_name) + ', ')
card_number = person_info[1]
employee_id = person_info[2]
designation = person_info[3]
department = person_info[4]
if card_number is not None :
f.write(str(card_number) + ', ')
else :
f.write(', ')
if employee_id is not None :
f.write(str(employee_id) + ', ')
else :
f.write(', ')
if designation is not None :
f.write(str(designation) + ', ')
else :
f.write(', ')
if department is not None :
# get department name
department_sql = "select `name` from zone where id = " + str(department)
cur.execute(department_sql)
department_result = cur.fetchone()
department_name = department_result[0]
f.write(str(department_name))
else :
f.write(', ')
f.write('\n')
except IOError:
print('Error: get update person info fail')
finally:
cur.close()
connection.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description="please enter")
parser.add_argument("--server_ip", "-server", help="server ip", default="", type=str, required=False)
parser.add_argument("--password", "-p", help="devops infra password", default="", type=str, required=False)
args = parser.parse_args()
# let the variable become a global variable, no need to pass it
handle_variables(args.server_ip, args.password)
# execute the method every 30mins
#schedule.every(30).minutes.do(get_person_info)
# for test -- execute the method every 30 seconds
schedule.every(30).seconds.do(get_person_info)
while True:
# run task
schedule.run_pending()
以上~