# Python使用脚本进行用户信用评分体系计算的案例教程

2017-09-27

Python使用脚本进行用户信用评分体系计算的案例教程。项目的目的是根据各影响因子和权重计算出对应用户的信用分值。数据源涉及到库表（Hive&Mysql）的数据、打点的日志数据，数据分别在Hive及Mysql中装载及计算，相对有些复杂。

mysql关联更新;

mysql数据装载，中文乱码的处理;

1、日期处理的函数脚本

/Users/nisj/PycharmProjects/BiDataProc/Demand/CreditScore/DateCalc.py

```# -*- coding=utf-8 -*-
import warnings
import datetime
import calendar

warnings.filterwarnings("ignore")

def getNowDay():
DayNow = datetime.datetime.today().strftime(&#39;%Y-%m-%d&#39;)
return DayNow

def getYesterDay():
YesterDay = datetime.datetime.today() - datetime.timedelta(days=1)
YesterDay = YesterDay.strftime(&#39;%Y-%m-%d&#39;)
return YesterDay

def getDaysXAgo(pt_day):
DaysXAgo = datetime.datetime.strptime(pt_day, &#39;%Y-%m-%d&#39;) - datetime.timedelta(days=7)
DaysXAgo = DaysXAgo.strftime(&#39;%Y-%m-%d&#39;)
return DaysXAgo

def getDays1Ago(pt_day):
Days1Ago = datetime.datetime.strptime(pt_day, &#39;%Y-%m-%d&#39;) - datetime.timedelta(days=1)
Days1Ago = Days1Ago.strftime(&#39;%Y-%m-%d&#39;)
return Days1Ago

def getAllMonthOfYear():
Month_list = [&#39;{num:02d}&#39;.format(num=i) for i in range(1, 13, 1)]
return Month_list

def getAllHourOfDay():
Hour_list = [&#39;{num:02d}&#39;.format(num=i) for i in range(0, 24, 1)]
return Hour_list

def getDayOfMonth(year, month):
day_list = range(calendar.monthrange(year, month)[1]+1)[1:]
day_list2 = []
for day in day_list:
day_list2.append(str(day).zfill(2))
return day_list2

def dateRange(beginDate, endDate):
dates = []
dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
date = beginDate[:]
while date <= endDate:
dates.append(date)
dt = dt + datetime.timedelta(1)
date = dt.strftime("%Y-%m-%d")
return dates

# for date in dateRange(&#39;2017-02-01&#39;, &#39;2017-03-14&#39;):
#     year = date[0:4]
#     month = date[5:7]
#     day = date[8:10]
#     print year, month, day

# print getAllMonthOfYear()
# print getAllHourOfDay()
# print getNowDay(),getYesterDay()
# print getDayOfMonth(year=2017, month=10)
# print dateRange(beginDate=&#39;2017-09-01&#39;, endDate=&#39;2017-11-19&#39;)
# print getDaysXAgo(&#39;2017-12-01&#39;)
```

2、源表数据处理
/Users/nisj/PycharmProjects/BiDataProc/Demand/CreditScore/tabBasicDataProc.py

```# -*- coding=utf-8 -*-
import time
import datetime
import os
import re
import warnings

warnings.filterwarnings("ignore")

def CsPayActiveProc(pt_day):
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
drop table if exists xcs_pay_active; \
create table xcs_pay_active(pt_day varchar(10),uid bigint(20),pay_active_flag int,etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \
;" """)
calcData=os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
with tab_pay_sum as( \
select pt_day,uid,sum(amount) pay_amount \
from data_chushou_pay_info \
where state=0 and pt_day = &#39;{pt_day}&#39; \
group by pt_day,uid) \
select a1.pt_day,a1.uid,case when a1.pay_amount>a2.pay_amount_per then 1 else 0 end pay_active_flag \
from tab_pay_sum a1 \
left join (select pt_day,sum(pay_amount)/count(distinct uid) pay_amount_per \
from tab_pay_sum \
group by pt_day) a2 on a1.pt_day=a2.pt_day \
where a1.pay_amount>a2.pay_amount_per;" \

nrpd_list = []
for nrp_list in calcData:
nrp = re.split(&#39;\t&#39;, nrp_list.replace(&#39;\n&#39;, &#39;&#39;))
nrpd_list.append(nrp)

i = 0
insert_sql_text = "insert into xcs_pay_active(pt_day,uid,pay_active_flag,etl_time) values "
for nrpd in nrpd_list:
pt_day = nrpd[0]
uid = nrpd[1]
pay_active_flag = nrpd[2]
etl_time = time.strftime(&#39;%Y-%m-%d %X&#39;, time.localtime())

i += 1

insert_sql_text = insert_sql_text + "( &#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;)," % (pt_day,uid,pay_active_flag,etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into xcs_pay_active(pt_day,uid,pay_active_flag,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

def CsViewActiveProc(pt_day):
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
drop table if exists xcs_view_active; \
create table xcs_view_active(pt_day varchar(10),uid bigint(20),view_active_flag int,etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \
;" """)
calcData=os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
with tab_view_sum as( \
select pt_day,uid,sum(view_time) view_time \
from recommend_data_view \
where pt_day = &#39;{pt_day}&#39; \
group by pt_day,uid) \
select a1.pt_day,a1.uid,case when a1.view_time>a2.view_time_per then 1 else 0 end view_active_flag \
from tab_view_sum a1 \
left join (select pt_day,sum(view_time)/count(distinct uid) view_time_per \
from tab_view_sum \
group by pt_day) a2 on a1.pt_day=a2.pt_day \
where a1.view_time>a2.view_time_per;" \

nrpd_list = []
for nrp_list in calcData:
nrp = re.split(&#39;\t&#39;, nrp_list.replace(&#39;\n&#39;, &#39;&#39;))
nrpd_list.append(nrp)

i = 0
insert_sql_text = "insert into xcs_view_active(pt_day,uid,view_active_flag,etl_time) values "
for nrpd in nrpd_list:
pt_day = nrpd[0]
uid = nrpd[1]
view_active_flag = nrpd[2]
etl_time = time.strftime(&#39;%Y-%m-%d %X&#39;, time.localtime())

i += 1

insert_sql_text = insert_sql_text + "( &#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;)," % (pt_day,uid,view_active_flag,etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into xcs_view_active(pt_day,uid,view_active_flag,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

def CsMessageActiveProc(pt_day):
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
drop table if exists xcs_message_active; \
create table xcs_message_active(pt_day varchar(10),uid bigint(20),message_active_flag int,etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \
;" """)
calcData=os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
with tab_message_sum as( \
select pt_day,uid,count(*) message_cnt \
from oss_chushou_message_send \
where pt_day = &#39;{pt_day}&#39; \
group by pt_day,uid) \
select a1.pt_day,a1.uid,case when a1.message_cnt>a2.message_cnt_per then 1 else 0 end message_active_flag \
from tab_message_sum a1 \
left join (select pt_day,sum(message_cnt)/count(distinct uid) message_cnt_per \
from tab_message_sum \
group by pt_day) a2 on a1.pt_day=a2.pt_day \
where a1.message_cnt>a2.message_cnt_per;" \

nrpd_list = []
for nrp_list in calcData:
nrp = re.split(&#39;\t&#39;, nrp_list.replace(&#39;\n&#39;, &#39;&#39;))
nrpd_list.append(nrp)

i = 0
insert_sql_text = "insert into xcs_message_active(pt_day,uid,message_active_flag,etl_time) values "
for nrpd in nrpd_list:
pt_day = nrpd[0]
uid = nrpd[1]
message_active_flag = nrpd[2]
etl_time = time.strftime(&#39;%Y-%m-%d %X&#39;, time.localtime())

i += 1

insert_sql_text = insert_sql_text + "( &#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;)," % (pt_day, uid, message_active_flag, etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into xcs_message_active(pt_day,uid,message_active_flag,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
drop table if exists xcs_gift_active; \
create table xcs_gift_active(pt_day varchar(10),room_creator_uid bigint(20),gift_active_flag int,etl_time datetime,KEY idx_uid (room_creator_uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \
;" """)
calcData=os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
where pt_day = &#39;{pt_day}&#39; \
group by pt_day,room_creator_uid) \
group by pt_day) a2 on a1.pt_day=a2.pt_day \

nrpd_list = []
for nrp_list in calcData:
nrp = re.split(&#39;\t&#39;, nrp_list.replace(&#39;\n&#39;, &#39;&#39;))
nrpd_list.append(nrp)

i = 0
for nrpd in nrpd_list:
pt_day = nrpd[0]
room_creator_uid = nrpd[1]
etl_time = time.strftime(&#39;%Y-%m-%d %X&#39;, time.localtime())

i += 1

insert_sql_text = insert_sql_text + "( &#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;)," % (pt_day,room_creator_uid,gift_active_flag,etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

def CsLiveActiveProc(pt_day):
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
drop table if exists xcs_live_active; \
create table xcs_live_active(pt_day varchar(10),room_creator_uid bigint(20),live_active_flag int,etl_time datetime,KEY idx_uid (room_creator_uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \
;" """)
calcData=os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
with tab_live_sum as( \
select a1.pt_day,a2.creator_uid,a1.anthor_live_time \
from (select a1.pt_day,a1.roomid,sum(live_minute) anthor_live_time \
from (select pt_day,room_id roomid,hour(time_interval)*60+minute(time_interval)+second(time_interval)/60 live_minute from ( \
select pt_day,room_id,cast(updated_time as timestamp)-cast(switch_time as timestamp) time_interval \
from honeycomb_all_live_history_status \
where pt_day = &#39;{pt_day}&#39;) x) a1 \
group by a1.pt_day,a1.roomid) a1 \
left join oss_room_v2 a2 on a1.roomid=a2.id \
where a2.pt_day=&#39;{yesterday}&#39;) \
select a1.pt_day,a1.creator_uid,case when a1.anthor_live_time>a2.anthor_live_time_per then 1 else 0 end live_active_flag \
from tab_live_sum a1 \
left join (select pt_day,sum(anthor_live_time)/count(distinct creator_uid) anthor_live_time_per \
from tab_live_sum \
group by pt_day) a2 on a1.pt_day=a2.pt_day \
where a1.anthor_live_time>a2.anthor_live_time_per;" \

nrpd_list = []
for nrp_list in calcData:
nrp = re.split(&#39;\t&#39;, nrp_list.replace(&#39;\n&#39;, &#39;&#39;))
nrpd_list.append(nrp)

i = 0
insert_sql_text = "insert into xcs_live_active(pt_day,room_creator_uid,live_active_flag,etl_time) values "
for nrpd in nrpd_list:
pt_day = nrpd[0]
room_creator_uid = nrpd[1]
live_active_flag = nrpd[2]
etl_time = time.strftime(&#39;%Y-%m-%d %X&#39;, time.localtime())

i += 1

insert_sql_text = insert_sql_text + "( &#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;)," % (pt_day,room_creator_uid,live_active_flag,etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into xcs_live_active(pt_day,room_creator_uid,live_active_flag,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

def CsUserSilentProc(pt_day):
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
drop table if exists xcs_user_silent; \
create table xcs_user_silent(pt_day varchar(10),uid bigint(20),user_silent_flag int,etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \
;" """)
calcData=os.popen("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P50506 -uMysqlUser -pMysqlPass -N -e "use jellyfish_server; \
select substr(created_time,1,10) pt_day,uid,1 user_silent_flag \
from silent_user \
where substr(created_time,1,10)=&#39;{pt_day}&#39; \
group by substr(created_time,1,10),uid;" \

nrpd_list = []
for nrp_list in calcData:
nrp = re.split(&#39;\t&#39;, nrp_list.replace(&#39;\n&#39;, &#39;&#39;))
nrpd_list.append(nrp)

i = 0
insert_sql_text = "insert into xcs_user_silent(pt_day,uid,user_silent_flag,etl_time) values "
for nrpd in nrpd_list:
pt_day = nrpd[0]
uid = nrpd[1]
user_silent_flag = nrpd[2]
etl_time = time.strftime(&#39;%Y-%m-%d %X&#39;, time.localtime())

i += 1

insert_sql_text = insert_sql_text + "( &#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;)," % (pt_day,uid,user_silent_flag,etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into xcs_user_silent(pt_day,uid,user_silent_flag,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

def CsUpperManageProc(pt_day):
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
drop table if exists xcs_upper_manage; \
create table xcs_upper_manage(pt_day varchar(10),uid bigint(20),room_id bigint(20),etl_time datetime,KEY idx_uid (uid)) ENGINE=InnoDB DEFAULT CHARSET=utf8; \
;" """)
calcData=os.popen("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P50506 -uMysqlUser -pMysqlPass -N -e "use jellyfish_server; \
select substr(created_time,1,10) pt_day,uid,room_id \
from room_manager \
where substr(created_time,1,10)=&#39;{pt_day}&#39; \
group by substr(created_time,1,10),uid,room_id;" \

nrpd_list = []
for nrp_list in calcData:
nrp = re.split(&#39;\t&#39;, nrp_list.replace(&#39;\n&#39;, &#39;&#39;))
nrpd_list.append(nrp)

i = 0
insert_sql_text = "insert into xcs_upper_manage(pt_day,uid,room_id,etl_time) values "
for nrpd in nrpd_list:
pt_day = nrpd[0]
uid = nrpd[1]
room_id = nrpd[2]
etl_time = time.strftime(&#39;%Y-%m-%d %X&#39;, time.localtime())

i += 1

insert_sql_text = insert_sql_text + "( &#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;)," % (pt_day,uid,room_id,etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into xcs_upper_manage(pt_day,uid,room_id,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

# pt_day = &#39;2017-09-19&#39;
# CsPayActiveProc(pt_day)
# CsViewActiveProc(pt_day)
# CsMessageActiveProc(pt_day)
# CsLiveActiveProc(pt_day)
# CsUserSilentProc(pt_day)
# CsUpperManageProc(pt_day)```

3、日志数据的oss下载及装载到mysql
/Users/nisj/PycharmProjects/BiDataProc/Demand/CreditScore/logBasicDataProc.py

```# -*- coding=utf-8 -*-
import os

from DateCalc import *

warnings.filterwarnings("ignore")

def getParameterInfo(date_start, date_end):
ParameterList = []
AccessKeyId = "AccessKeyId-string"
AccessKeySecret = "AccessKeySecret-string"
bucket = "TV-hz"
endpoint = "oss-cn-hangzhou-internal.aliyuncs.com"
server_list = ["sz-121-210", "sz-129-97", "sz-135-45", "sz-76-100", "sz-147-189", "sz-152-217", "sz-153-240", "sz-155-163", "sz-155-212", "sz-155-217", "sz-158-99", "sz-162-5", "sz-168-212", "sz-18-122", "sz-191-14", "sz-192-253", "sz-199-103", "sz-201-3", "sz-21-92", "sz-73-130", "sz-212-123", "sz-214-62", "sz-218-105", "sz-218-165", "sz-233-217", "sz-237-167", "sz-240-45", "sz-25-63", "sz-26-159", "sz-27-218", "sz-27-65", "sz-27-91", "sz-42-99", "sz-54-148", "sz-58-102", "sz-62-113", "sz-97-233", "sz-64-227", "sz-65-64", "sz-68-199", "sz-130-97", "sz-78-112", "sz-135-198", "sz-79-58", "sz-98-72", "sz-159-238", "sz-130-89", "sz-129-70", "sz-131-90"]
for server_name in server_list:
for date in dateRange(date_start, date_end):
year = date[0:4]
month = date[5:7]
day = date[8:10]
ParameterList.append((AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day))

return ParameterList

def getParameterInfo2(date_start, date_end):
ParameterList = []
AccessKeyId = "AccessKeyId-string"
AccessKeySecret = "AccessKeySecret-string"
bucket = "TV-hz"
endpoint = "oss-cn-hangzhou-internal.aliyuncs.com"
server_list = ["sz-217-183"]
for server_name in server_list:
for date in dateRange(date_start, date_end):
year = date[0:4]
month = date[5:7]
day = date[8:10]
ParameterList.append((AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day))

return ParameterList

def getOssFile2Hdfs(AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day):

date = year+"-"+month+"-"+day
ossFileName = "JellyFishLogs/"+server_name+"/"+year+"/"+month+"/"+day+"/jellyfish-server/user_credit_operator.log_"+date
# ossPath = "http://TV-hz.oss-cn-hangzhou-internal.aliyuncs.com"+ossFileName
# print ossPath

hdfsFilePath = "/tmp/nisj/CreditScore/"+date+"/"
hdfsFileName = hdfsFilePath+server_name+"#user_credit_operator_server."+date+".txt"

os.system("""source /etc/profile; \
hadoop distcp oss://%s:%s@%s.%s/%s hdfs:%s""" % (AccessKeyId, AccessKeySecret, bucket, endpoint, ossFileName, hdfsFileName))

def getOssFile2Hdfs2(AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day):

date = year+"-"+month+"-"+day
ossFileName = "JellyFishConsoleLogs/"+server_name+"/"+year+"/"+month+"/"+day+"/user_credit_operator.log_"+date
# ossPath = "http://TV-hz.oss-cn-hangzhou-internal.aliyuncs.com"+ossFileName
# print ossPath

hdfsFilePath = "/tmp/nisj/CreditScore/"+date+"/"
hdfsFileName = hdfsFilePath+server_name+"#user_credit_operator_console."+date+".txt"

os.system("""source /etc/profile; \
hadoop distcp oss://%s:%s@%s.%s/%s hdfs:%s""" % (AccessKeyId, AccessKeySecret, bucket, endpoint, ossFileName, hdfsFileName))

date = year+"-"+month+"-"+day
for dir_path in os.popen("""source /etc/profile; \
hadoop dfs -ls /tmp/nisj/CreditScore/%s/ | awk  -F &#39; &#39;  &#39;{print \$8}&#39;""" % (date)).readlines():
dir_path = dir_path.strip()
if len(dir_path) != 0:
os.system("""source /etc/profile; \

date = year+"-"+month+"-"+day
os.system("""source /etc/profile; \
awk  -F &#39; INFO : &#39; &#39;{print \$2}&#39; /home/hadoop/nisj/automationDemand/CreditScore/tmpLogDir/*user_credit_operator*.%s.txt > /home/hadoop/nisj/automationDemand/CreditScore/tmpLogDir/log_record_%s.txt """ % (date, date))
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass -e "use funnyai_data; \
delete from xcs_log_record where pt_day=&#39;{date}&#39;; \
load data local infile &#39;/home/hadoop/nisj/automationDemand/CreditScore/tmpLogDir/log_record_{date}.txt&#39; ignore into table xcs_log_record character set utf8 fields terminated by &#39;|&#39; enclosed by &#39;&#39; lines terminated by &#39;\n&#39; (uid,type,subtype,expiredtype,reason); \
update xcs_log_record set pt_day=&#39;{date}&#39; where pt_day is null;" """.format(date=date))

def logFromOss2Local(runDay):
date_start = runDay
date_end = runDay
os.system("""source /etc/profile; \
for Parameter in getParameterInfo(date_start, date_end):
AccessKeyId = Parameter[0]
AccessKeySecret = Parameter[1]
bucket = Parameter[2]
endpoint = Parameter[3]
server_name = Parameter[4]
year = Parameter[5]
month = Parameter[6]
day = Parameter[7]

getOssFile2Hdfs(AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day)

for Parameter in getParameterInfo2(date_start, date_end):
AccessKeyId = Parameter[0]
AccessKeySecret = Parameter[1]
bucket = Parameter[2]
endpoint = Parameter[3]
server_name = Parameter[4]
year = Parameter[5]
month = Parameter[6]
day = Parameter[7]

getOssFile2Hdfs2(AccessKeyId, AccessKeySecret, bucket, endpoint, server_name, year, month, day)

for date in dateRange(date_start, date_end):
year = date[0:4]
month = date[5:7]
day = date[8:10]

# test code
# runDay = &#39;2017-09-19&#39;
# logFromOss2Local(runDay)
```

4、用户基础信息数据准备
/Users/nisj/PycharmProjects/BiDataProc/Demand/CreditScore/userBasicDataProc.py
```# -*- coding=utf-8 -*-
import time
import os
import re
from DateCalc import *

warnings.filterwarnings("ignore")

def CsUserInfoProc(pt_day):
yesterday=getYesterDay()
DaysXAgo=getDaysXAgo(pt_day)
# os.system("""source /etc/profile; \
#             /usr/lib/hive-current/bin/hive -e " \
#             drop table if exists xcs_user_info; \
#             create table xcs_user_info as \
#             from oss_chushou_user_profile \
#             where pt_day=&#39;{yesterday}&#39; and state=0 and last_login_time between &#39;{DaysXAgo}&#39; and &#39;{pt_day}&#39;;" \
#             """.format(pt_day=pt_day, yesterday=yesterday, DaysXAgo=DaysXAgo))

os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
drop table if exists xcs_user_info; \
CREATE TABLE xcs_user_info ( \
uid bigint(20), \
nickname varchar(2000), \
etl_time datetime, \
KEY idx_uid (uid) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8; \
" """)
calcData = os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
from oss_chushou_user_profile \
where pt_day=&#39;{yesterday}&#39; and state=0 and last_login_time between &#39;{DaysXAgo}&#39; and &#39;{pt_day}&#39;;" \

nrpd_list = []
for nrp_list in calcData:
nrp = re.split(&#39;\t&#39;, nrp_list.replace(&#39;\n&#39;, &#39;&#39;))
nrpd_list.append(nrp)

i = 0
insert_sql_text = "insert into xcs_user_info(uid,nickname,last_login_time,etl_time) values "
for nrpd in nrpd_list:
uid = nrpd[0]
nickname = str(nrpd[1]).replace("""`""", """""").replace("""\&#39;""", """""").replace("""\"""", """""").replace("""\\""", """""")
etl_time = time.strftime(&#39;%Y-%m-%d %X&#39;, time.localtime())

i += 1

insert_sql_text = insert_sql_text + "( &#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;)," % (uid,nickname,last_login_time,etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into xcs_user_info(uid,nickname,last_login_time,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

def CsRoomPartInfo(pt_day):
yesterday=getYesterDay()
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
drop table if exists xcs_room_part_info; \
CREATE TABLE xcs_room_part_info ( \
room_id bigint(20), \
subcriber_count bigint(20), \
etl_time datetime, \
KEY idx_uid (room_id) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8; \
;" """)

roomPartData = os.popen("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -N -e "use funnyai_data; \
select room_id from xcs_upper_manage where pt_day=&#39;{pt_day}&#39; group by room_id;" """.format(pt_day=pt_day)).readlines();
roomIdCon = &#39;&#39;
for roomId in roomPartData:
roomIdCon = roomId.replace(&#39;\n&#39;, &#39;&#39;) + &#39;,&#39; +roomIdCon
roomIdCon = roomIdCon[:-1]

calcData = os.popen("""source /etc/profile; \
/usr/lib/hive-current/bin/hive -e " \
select id,subcriber_count \
from oss_room_v2 \
where pt_day=&#39;{yesterday}&#39; and id in({roomIdCon}) \
;" \

nrpd_list = []
for nrp_list in calcData:
nrp = re.split(&#39;\t&#39;, nrp_list.replace(&#39;\n&#39;, &#39;&#39;))
nrpd_list.append(nrp)

i = 0
insert_sql_text = "insert into xcs_room_part_info(room_id,subcriber_count,etl_time) values "
for nrpd in nrpd_list:
room_id = nrpd[0]
subcriber_count = nrpd[1]
etl_time = time.strftime(&#39;%Y-%m-%d %X&#39;, time.localtime())

i += 1

insert_sql_text = insert_sql_text + "( &#39;%s&#39;,&#39;%s&#39;,&#39;%s&#39;)," % (room_id,subcriber_count,etl_time)

if (i % 1000 == 0):
insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

insert_sql_text = "insert into xcs_room_part_info(room_id,subcriber_count,etl_time) values "

insert_sql_text = insert_sql_text[0:-1] + ";"
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
{insert_sql_text} \
" """.format(insert_sql_text=insert_sql_text))

# pt_day = &#39;2017-09-19&#39;
# CsUserInfoProc(pt_day)
# CsRoomPartInfo(pt_day)
```

5、结果数据的汇总计算
/Users/nisj/PycharmProjects/BiDataProc/Demand/CreditScore/resultDataProc.py
```# -*- coding=utf-8 -*-
import os
from DateCalc import *

warnings.filterwarnings("ignore")

def CsResultTabCreate():
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
drop table if exists xcs_user_credit_score; \
CREATE TABLE xcs_user_credit_score ( \
pt_day varchar(10), \
uid bigint(20) DEFAULT NULL, \
nickname varchar(2000) DEFAULT NULL, \
seqing_score int(10) NOT NULL DEFAULT &#39;0&#39;, \
jubao_score int(10) DEFAULT NULL, \
chengpaopao_score int(10) NOT NULL DEFAULT &#39;0&#39;, \
weifan_score int(10) DEFAULT NULL, \
pay_active_score int(10) NOT NULL DEFAULT &#39;0&#39;, \
view_active_score int(10) NOT NULL DEFAULT &#39;0&#39;, \
message_active_score int(10) NOT NULL DEFAULT &#39;0&#39;, \
gift_active_score int(10) NOT NULL DEFAULT &#39;0&#39;, \
live_active_score int(10) NOT NULL DEFAULT &#39;0&#39;, \
upper_manage_score decimal(48,4) DEFAULT NULL, \
user_silent_score int(10) NOT NULL DEFAULT &#39;0&#39;, \
user_currday_increment_score decimal(48,4) DEFAULT NULL, \
user_currday_score decimal(48,4) DEFAULT &#39;0&#39;, \
etl_time datetime DEFAULT CURRENT_TIMESTAMP, \
KEY idx_pt_day_uid (pt_day,uid) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8;" """)

def CsResultDataInsertAndUpdate(pt_day):
Days1Ago=getDays1Ago(pt_day)
os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
delete from xcs_user_credit_score where pt_day=&#39;{pt_day}&#39;;
case when a2.uid is not null then -10000 else 0 end seqing_score, \
case when a3.uid is not null then -50*a3.jubao_cnt else 0 end jubao_score, \
case when a4.uid is not null then -2000 else 0 end chengpaopao_score, \
case when a5.uid is not null then a5.weifan_score else 0 end weifan_score, \
case when a6.uid is not null then 50 else 0 end pay_active_score, \
case when a7.uid is not null then 20 else 0 end view_active_score, \
case when a8.uid is not null then 10 else 0 end message_active_score, \
case when a9.room_creator_uid is not null then 30 else 0 end gift_active_score, \
case when a10.room_creator_uid is not null then 20 else 0 end live_active_score, \
case when a11.uid is not null then a11.upper_manage_score else 0 end upper_manage_score, \
case when a12.uid is not null then -300 else 0 end user_silent_score \
from xcs_user_info a1 \
left join (select uid from xcs_log_record where pt_day=&#39;{pt_day}&#39; and type=1 group by uid) a2 on a1.uid=a2.uid \
left join (select uid,count(*) jubao_cnt from xcs_log_record where pt_day=&#39;{pt_day}&#39; and type=2 group by uid) a3 on a1.uid=a3.uid \
left join (select uid from xcs_log_record where pt_day=&#39;{pt_day}&#39; and type=3 group by uid) a4 on a1.uid=a4.uid \
left join (select uid,sum(case when subtype in(1,2) then -50 when subtype in(5,6) then -500 when subtype=0 then -2000 else 0 end) weifan_score from xcs_log_record where pt_day=&#39;{pt_day}&#39; and type=4 group by uid) a5 on a1.uid=a5.uid \
left join xcs_pay_active a6 on a1.uid=a6.uid \
left join xcs_view_active a7 on a1.uid=a7.uid \
left join xcs_message_active a8 on a1.uid=a8.uid \
left join xcs_gift_active a9 on a1.uid=a9.room_creator_uid \
left join xcs_live_active a10 on a1.uid=a10.room_creator_uid \
left join (select a11_1.uid,sum(a11_2.subcriber_count/1000) upper_manage_score \
from xcs_upper_manage a11_1 \
left join xcs_room_part_info a11_2 on a11_1.room_id=a11_2.room_id \
group by a11_1.uid) a11 on a1.uid=a11.uid \
left join xcs_user_silent a12 on a1.uid=a12.uid) x \
;" """.format(pt_day=pt_day))

os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
update xcs_user_credit_score a1,xcs_user_credit_score a2 \
set a1.user_currday_score=ifnull(a1.user_currday_increment_score+a2.user_currday_score,0) \
where a1.pt_day=&#39;{pt_day}&#39; and a2.pt_day=&#39;{Days1Ago}&#39; \
and a1.uid=a2.uid;" """.format(pt_day=pt_day, Days1Ago=Days1Ago))

os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
update xcs_user_credit_score a1,xcs_user_credit_score a2 \
set a1.user_currday_score=ifnull((case when a2.user_currday_score-10>0 then a2.user_currday_score-10 else 0 end),0) \
where a1.pt_day=&#39;{pt_day}&#39; and a2.pt_day=&#39;{Days1Ago}&#39; \
and a1.uid=a2.uid \
and (a1.seqing_score=0 and a1.user_silent_score=0 and a1.jubao_score=0 and a1.chengpaopao_score=0 and a1.weifan_score=0 and a1.upper_manage_score=0 and a1.pay_active_score=0 and a1.view_active_score=0 and a1.message_active_score=0 and a1.gift_active_score=0 and a1.live_active_score=0) \
and a2.user_currday_score>0 \
;" """.format(pt_day=pt_day, Days1Ago=Days1Ago))

os.system("""source /etc/profile; \
/usr/bin/mysql -hMysqlHost -P6603 -uMysqlUser -pMysqlPass --default-character-set=utf8 -e "use funnyai_data; \
update xcs_user_credit_score a1,xcs_user_credit_score a2 \
set a1.user_currday_score=ifnull((case when a1.user_currday_score+10<0 then a1.user_currday_score+10 else 0 end),0) \
where a1.pt_day=&#39;{pt_day}&#39; and a2.pt_day=&#39;{Days1Ago}&#39; \
and a1.uid=a2.uid \
and (a1.seqing_score=0 and a1.user_silent_score=0 and a1.jubao_score=0 and a1.chengpaopao_score=0 and a1.weifan_score=0) \
and a1.user_currday_score<0 \
;" """.format(pt_day=pt_day, Days1Ago=Days1Ago))

# pt_day = &#39;2017-09-19&#39;
# CsResultDataInsertAndUpdate(pt_day)```

6、数据的总体批量调度脚本
/Users/nisj/PycharmProjects/BiDataProc/Demand/CreditScore/CreditScore_Ctl.py
```# -*- coding=utf-8 -*-
from tabBasicDataProc import *
from logBasicDataProc import *
from userBasicDataProc import *
from resultDataProc import *

warnings.filterwarnings("ignore")

def CreditScore_ctl(pt_day):
# 表数据装载
CsPayActiveProc(pt_day)
CsViewActiveProc(pt_day)
CsMessageActiveProc(pt_day)
CsLiveActiveProc(pt_day)
CsUserSilentProc(pt_day)
CsUpperManageProc(pt_day)

# 打点日志数据装载
logFromOss2Local(runDay=pt_day)

# 用户及房间基础信息装载
CsUserInfoProc(pt_day)
CsRoomPartInfo(pt_day)

# 最终数据的计算与更新
# CsResultTabCreate()
CsResultDataInsertAndUpdate(pt_day)

# Batch Code
pt_day = getYesterDay()
CreditScore_ctl(pt_day)

# pt_day_list=[&#39;2017-09-21&#39;, &#39;2017-09-22&#39;, &#39;2017-09-23&#39;]
# for pt_day in pt_day_list:
#     print pt_day
#     CreditScore_ctl(pt_day)```

```[hadoop@emr-worker-9 CreditScore]\$ crontab -l