One minute cron Python script reads from MySQL and updates Redis based on calculations
Clash Royale CLAN TAG#URR8PPP
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty margin-bottom:0;
up vote
3
down vote
favorite
The following code works and will be put into cron on a AWS server (not sure what specifications, cores, kernels etc) at a every 1 minute frequency.
However, it's my first time putting a script into production, so I'd like some help reviewing this code. Please look it over and point to any hidden mistakes or something that's risky to implement or is just bad design. Additionally, it currently runs within a minute because I've limited the design to optimize 100 rows of data at a time.
Could this be made faster and allow me to optimize even more rows? Hypothetically, I can have row count in the 10s of 1000s, but since I sort them according to my cost, they become less valuable down below- but still if it can be done, it'll help after a while when the product scales.
Stack: MySQL where main data is stored & fetched. Redis database is where changes need to go. This script will be placed in a 1 minute cron in a separate server. All are on independant AWS servers. I haven't really taken any redis based optimization while building this, not sure how much of a difference it could make.
import MySQLdb
import pandas as pd
import datetime
import redis
from decimal import Decimal, DecimalException
import math
import cPickle as pickle
import re
from pandas.util.testing import assert_frame_equal
conn = MySQLdb.connect(host="AWS_HOST", user="root", passwd="pasword",db="db_name", local_infile=1)
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
now = datetime.datetime.now()+datetime.timedelta(minutes=330)
subtract = now.minute if now.minute < 30 else now.minute-30
now-=datetime.timedelta(minutes=subtract)
read_query = """
select group, url, sum(acost)/1000000 cost, sum(rpm-acost)/1000000 pft,
sum(lg) imps, sum(acost)/1000000/sum(lg)*1000 cpm, sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
from db_name.db
where dh = ''
group by group, url having pft < 0
order by sum(acost) desc;""".format(now.strftime('%Y-%m-%d %H:%M:00'))
cursor.execute(read_query)
cost_data_new = pd.DataFrame(list(cursor.fetchall()))
cost_data_old = pickle.load( open( "cost_data_old.p", "rb" ))
## manage structure
global changed,r
r = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
changed=True
try:
if assert_frame_equal(cost_data_new, cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']], check_dtype=False):
changed=False
except:
changed=True
def rename_x(col_name):
if re.match(r".*_(x)$", col_name):
return (col_name[:-2])
else:
return col_name
def cal_rpm(rw):
try:
return (rw['pft_diff']+rw['cost_diff'])/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
#print rw['group'], rw['url'], rw['pft_diff'], rw['cost_diff'], rw['imps_diff']
print "proceeding with 0"
return 0
def cal_cpm(rw):
try:
return rw['cost_diff']/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
#print rw['group'], rw['url'], rw['cost_diff'], rw['imps_diff']
print "proceeding with 0"
return 0
def cal_ppm(rw):
try:
return rw['pft_diff']/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
print "proceeding with 0"
return 0
def get_diff(new,old):
diff = pd.merge(new,old,how='outer',on=['group','url'])
diff.fillna(0,inplace=True)
diff['pft_diff']=diff['pft_x']-diff['pft_y']
diff['cost_diff']=diff['cost_x']-diff['cost_y']
diff['imps_diff']=diff['imps_x']-diff['imps_y']
diff['diff_rpm']=diff[['group','url','pft_diff','cost_diff','imps_diff']].apply(cal_rpm,axis=1)
diff['diff_cpm']=diff[['group','url','cost_diff','imps_diff']].apply(cal_cpm,axis=1)
diff['diff_ppm']=diff[['group','url','pft_diff','imps_diff']].apply(cal_ppm,axis=1)
diff=diff.rename(columns=rename_x)
diff.drop(list(diff.filter(regex = '_y')), axis = 1, inplace = True)
try:
del diff['optimized']
except:
pass
return diff
def calc_bid_prob(lgpm):
beta = 0.01
alpha = 1
infl = 13
slope = 1
prob=int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
return prob
def optimize_val(rw):
ppm = rw['diff_ppm']
lg = rw['imps_diff']
rpm = rw['diff_rpm']
cpm = rw['diff_cpm']
bid = r.hget("plcmnt:".format(rw['group']),"".format(rw['url']))
try:
bid=int(bid)
except:
bid=20
b_prob=r.hget("url:prob:".format(rw['group']),"".format(rw['url']))
try:
if ppm < -1:
if rpm >= 2:
new_bid = min((1-0.3)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid)
else:
new_bid = min((1-0.5)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
elif (ppm >= -1) & (ppm < 0):
if rpm >= 2:
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
else:
new_bid = min((1-0.3)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
else:
pass
return 1
except Exception as e:
#log exception
return 0
## if not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
if changed:
## optimize diff_data
cost_data_diff=get_diff(cost_data_new,cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']])
cost_data_diff['optimized']=0
cost_data_diff.sort_values(by=['cost_diff'],ascending=False,inplace=True)
optimize=cost_data_diff.head(100)
optimize['optimized']=optimize.apply(optimize_val,axis=1)
pickle.dump(cost_data_diff,open("cost_data_old.p","wb"))
else:
cost_data_old.sort_values(by=['cost'],ascending=False,inplace=True)
optimize = cost_data_old[cost_data_old['optimized']==0].head(100)
optimize['optimized']=optimize.apply(optimize_val,axis=1)
pickle.dump(cost_data_old,open("cost_data_old.p","wb"))
import sys
sys.exit()
cost_data_old_example:
group acost_x cpm_x imps_x pft_x ppm_x rpm_x
0 6841 0.0002 0.12150000 2 -0.0002 -0.12150000 0E-8
1 6891 0.0002 0.19900000 1 -0.0002 -0.19900000 0E-8
2 7174 0.0001 0.14900000 1 -0.0001 -0.14900000 0E-8
3 6732 0.0001 0.14600000 1 -0.0001 -0.14600000 0E-8
4 6882 0.0001 0.13500000 1 -0.0001 -0.13500000 0E-8
5 6856 0.0001 0.10700000 1 -0.0001 -0.10700000 0E-8
6 6838 0.0001 0.08700000 1 -0.0001 -0.08700000 0E-8
7 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8
8 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8
url cost_y pft_y imps_y
0 url1.org 0.0002 -0.0002 2
1 url2.com 0.0002 -0.0002 1
2 url3.com 0.0001 -0.0001 1
3 url4.tv 0.0001 -0.0001 1
4 url5.com 0.0001 -0.0001 1
5 url6.com 0.0001 -0.0001 1
6 url7.com 0.0001 -0.0001 1
7 url8.com 0.0001 -0.0001 1
8 url9.com 0.0001 -0.0001 1
cpm_y rpm_y ppm_y pft_diff cost_diff imps_diff
0 0.12150000 0E-8 -0.12150000 0.0000 0.0000 0
1 0.19900000 0E-8 -0.19900000 0.0000 0.0000 0
2 0.14900000 0E-8 -0.14900000 0.0000 0.0000 0
3 0.14600000 0E-8 -0.14600000 0.0000 0.0000 0
4 0.13500000 0E-8 -0.13500000 0.0000 0.0000 0
5 0.10700000 0E-8 -0.10700000 0.0000 0.0000 0
6 0.08700000 0E-8 -0.08700000 0.0000 0.0000 0
7 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0
8 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0
cost_data_new_shape:
(1587, 8)
python performance python-2.7 pandas redis
add a comment |Â
up vote
3
down vote
favorite
The following code works and will be put into cron on a AWS server (not sure what specifications, cores, kernels etc) at a every 1 minute frequency.
However, it's my first time putting a script into production, so I'd like some help reviewing this code. Please look it over and point to any hidden mistakes or something that's risky to implement or is just bad design. Additionally, it currently runs within a minute because I've limited the design to optimize 100 rows of data at a time.
Could this be made faster and allow me to optimize even more rows? Hypothetically, I can have row count in the 10s of 1000s, but since I sort them according to my cost, they become less valuable down below- but still if it can be done, it'll help after a while when the product scales.
Stack: MySQL where main data is stored & fetched. Redis database is where changes need to go. This script will be placed in a 1 minute cron in a separate server. All are on independant AWS servers. I haven't really taken any redis based optimization while building this, not sure how much of a difference it could make.
import MySQLdb
import pandas as pd
import datetime
import redis
from decimal import Decimal, DecimalException
import math
import cPickle as pickle
import re
from pandas.util.testing import assert_frame_equal
conn = MySQLdb.connect(host="AWS_HOST", user="root", passwd="pasword",db="db_name", local_infile=1)
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
now = datetime.datetime.now()+datetime.timedelta(minutes=330)
subtract = now.minute if now.minute < 30 else now.minute-30
now-=datetime.timedelta(minutes=subtract)
read_query = """
select group, url, sum(acost)/1000000 cost, sum(rpm-acost)/1000000 pft,
sum(lg) imps, sum(acost)/1000000/sum(lg)*1000 cpm, sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
from db_name.db
where dh = ''
group by group, url having pft < 0
order by sum(acost) desc;""".format(now.strftime('%Y-%m-%d %H:%M:00'))
cursor.execute(read_query)
cost_data_new = pd.DataFrame(list(cursor.fetchall()))
cost_data_old = pickle.load( open( "cost_data_old.p", "rb" ))
## manage structure
global changed,r
r = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
changed=True
try:
if assert_frame_equal(cost_data_new, cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']], check_dtype=False):
changed=False
except:
changed=True
def rename_x(col_name):
if re.match(r".*_(x)$", col_name):
return (col_name[:-2])
else:
return col_name
def cal_rpm(rw):
try:
return (rw['pft_diff']+rw['cost_diff'])/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
#print rw['group'], rw['url'], rw['pft_diff'], rw['cost_diff'], rw['imps_diff']
print "proceeding with 0"
return 0
def cal_cpm(rw):
try:
return rw['cost_diff']/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
#print rw['group'], rw['url'], rw['cost_diff'], rw['imps_diff']
print "proceeding with 0"
return 0
def cal_ppm(rw):
try:
return rw['pft_diff']/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
print "proceeding with 0"
return 0
def get_diff(new,old):
diff = pd.merge(new,old,how='outer',on=['group','url'])
diff.fillna(0,inplace=True)
diff['pft_diff']=diff['pft_x']-diff['pft_y']
diff['cost_diff']=diff['cost_x']-diff['cost_y']
diff['imps_diff']=diff['imps_x']-diff['imps_y']
diff['diff_rpm']=diff[['group','url','pft_diff','cost_diff','imps_diff']].apply(cal_rpm,axis=1)
diff['diff_cpm']=diff[['group','url','cost_diff','imps_diff']].apply(cal_cpm,axis=1)
diff['diff_ppm']=diff[['group','url','pft_diff','imps_diff']].apply(cal_ppm,axis=1)
diff=diff.rename(columns=rename_x)
diff.drop(list(diff.filter(regex = '_y')), axis = 1, inplace = True)
try:
del diff['optimized']
except:
pass
return diff
def calc_bid_prob(lgpm):
beta = 0.01
alpha = 1
infl = 13
slope = 1
prob=int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
return prob
def optimize_val(rw):
ppm = rw['diff_ppm']
lg = rw['imps_diff']
rpm = rw['diff_rpm']
cpm = rw['diff_cpm']
bid = r.hget("plcmnt:".format(rw['group']),"".format(rw['url']))
try:
bid=int(bid)
except:
bid=20
b_prob=r.hget("url:prob:".format(rw['group']),"".format(rw['url']))
try:
if ppm < -1:
if rpm >= 2:
new_bid = min((1-0.3)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid)
else:
new_bid = min((1-0.5)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
elif (ppm >= -1) & (ppm < 0):
if rpm >= 2:
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
else:
new_bid = min((1-0.3)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
else:
pass
return 1
except Exception as e:
#log exception
return 0
## if not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
if changed:
## optimize diff_data
cost_data_diff=get_diff(cost_data_new,cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']])
cost_data_diff['optimized']=0
cost_data_diff.sort_values(by=['cost_diff'],ascending=False,inplace=True)
optimize=cost_data_diff.head(100)
optimize['optimized']=optimize.apply(optimize_val,axis=1)
pickle.dump(cost_data_diff,open("cost_data_old.p","wb"))
else:
cost_data_old.sort_values(by=['cost'],ascending=False,inplace=True)
optimize = cost_data_old[cost_data_old['optimized']==0].head(100)
optimize['optimized']=optimize.apply(optimize_val,axis=1)
pickle.dump(cost_data_old,open("cost_data_old.p","wb"))
import sys
sys.exit()
cost_data_old_example:
group acost_x cpm_x imps_x pft_x ppm_x rpm_x
0 6841 0.0002 0.12150000 2 -0.0002 -0.12150000 0E-8
1 6891 0.0002 0.19900000 1 -0.0002 -0.19900000 0E-8
2 7174 0.0001 0.14900000 1 -0.0001 -0.14900000 0E-8
3 6732 0.0001 0.14600000 1 -0.0001 -0.14600000 0E-8
4 6882 0.0001 0.13500000 1 -0.0001 -0.13500000 0E-8
5 6856 0.0001 0.10700000 1 -0.0001 -0.10700000 0E-8
6 6838 0.0001 0.08700000 1 -0.0001 -0.08700000 0E-8
7 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8
8 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8
url cost_y pft_y imps_y
0 url1.org 0.0002 -0.0002 2
1 url2.com 0.0002 -0.0002 1
2 url3.com 0.0001 -0.0001 1
3 url4.tv 0.0001 -0.0001 1
4 url5.com 0.0001 -0.0001 1
5 url6.com 0.0001 -0.0001 1
6 url7.com 0.0001 -0.0001 1
7 url8.com 0.0001 -0.0001 1
8 url9.com 0.0001 -0.0001 1
cpm_y rpm_y ppm_y pft_diff cost_diff imps_diff
0 0.12150000 0E-8 -0.12150000 0.0000 0.0000 0
1 0.19900000 0E-8 -0.19900000 0.0000 0.0000 0
2 0.14900000 0E-8 -0.14900000 0.0000 0.0000 0
3 0.14600000 0E-8 -0.14600000 0.0000 0.0000 0
4 0.13500000 0E-8 -0.13500000 0.0000 0.0000 0
5 0.10700000 0E-8 -0.10700000 0.0000 0.0000 0
6 0.08700000 0E-8 -0.08700000 0.0000 0.0000 0
7 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0
8 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0
cost_data_new_shape:
(1587, 8)
python performance python-2.7 pandas redis
add a comment |Â
up vote
3
down vote
favorite
up vote
3
down vote
favorite
The following code works and will be put into cron on a AWS server (not sure what specifications, cores, kernels etc) at a every 1 minute frequency.
However, it's my first time putting a script into production, so I'd like some help reviewing this code. Please look it over and point to any hidden mistakes or something that's risky to implement or is just bad design. Additionally, it currently runs within a minute because I've limited the design to optimize 100 rows of data at a time.
Could this be made faster and allow me to optimize even more rows? Hypothetically, I can have row count in the 10s of 1000s, but since I sort them according to my cost, they become less valuable down below- but still if it can be done, it'll help after a while when the product scales.
Stack: MySQL where main data is stored & fetched. Redis database is where changes need to go. This script will be placed in a 1 minute cron in a separate server. All are on independant AWS servers. I haven't really taken any redis based optimization while building this, not sure how much of a difference it could make.
import MySQLdb
import pandas as pd
import datetime
import redis
from decimal import Decimal, DecimalException
import math
import cPickle as pickle
import re
from pandas.util.testing import assert_frame_equal
conn = MySQLdb.connect(host="AWS_HOST", user="root", passwd="pasword",db="db_name", local_infile=1)
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
now = datetime.datetime.now()+datetime.timedelta(minutes=330)
subtract = now.minute if now.minute < 30 else now.minute-30
now-=datetime.timedelta(minutes=subtract)
read_query = """
select group, url, sum(acost)/1000000 cost, sum(rpm-acost)/1000000 pft,
sum(lg) imps, sum(acost)/1000000/sum(lg)*1000 cpm, sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
from db_name.db
where dh = ''
group by group, url having pft < 0
order by sum(acost) desc;""".format(now.strftime('%Y-%m-%d %H:%M:00'))
cursor.execute(read_query)
cost_data_new = pd.DataFrame(list(cursor.fetchall()))
cost_data_old = pickle.load( open( "cost_data_old.p", "rb" ))
## manage structure
global changed,r
r = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
changed=True
try:
if assert_frame_equal(cost_data_new, cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']], check_dtype=False):
changed=False
except:
changed=True
def rename_x(col_name):
if re.match(r".*_(x)$", col_name):
return (col_name[:-2])
else:
return col_name
def cal_rpm(rw):
try:
return (rw['pft_diff']+rw['cost_diff'])/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
#print rw['group'], rw['url'], rw['pft_diff'], rw['cost_diff'], rw['imps_diff']
print "proceeding with 0"
return 0
def cal_cpm(rw):
try:
return rw['cost_diff']/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
#print rw['group'], rw['url'], rw['cost_diff'], rw['imps_diff']
print "proceeding with 0"
return 0
def cal_ppm(rw):
try:
return rw['pft_diff']/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
print "proceeding with 0"
return 0
def get_diff(new,old):
diff = pd.merge(new,old,how='outer',on=['group','url'])
diff.fillna(0,inplace=True)
diff['pft_diff']=diff['pft_x']-diff['pft_y']
diff['cost_diff']=diff['cost_x']-diff['cost_y']
diff['imps_diff']=diff['imps_x']-diff['imps_y']
diff['diff_rpm']=diff[['group','url','pft_diff','cost_diff','imps_diff']].apply(cal_rpm,axis=1)
diff['diff_cpm']=diff[['group','url','cost_diff','imps_diff']].apply(cal_cpm,axis=1)
diff['diff_ppm']=diff[['group','url','pft_diff','imps_diff']].apply(cal_ppm,axis=1)
diff=diff.rename(columns=rename_x)
diff.drop(list(diff.filter(regex = '_y')), axis = 1, inplace = True)
try:
del diff['optimized']
except:
pass
return diff
def calc_bid_prob(lgpm):
beta = 0.01
alpha = 1
infl = 13
slope = 1
prob=int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
return prob
def optimize_val(rw):
ppm = rw['diff_ppm']
lg = rw['imps_diff']
rpm = rw['diff_rpm']
cpm = rw['diff_cpm']
bid = r.hget("plcmnt:".format(rw['group']),"".format(rw['url']))
try:
bid=int(bid)
except:
bid=20
b_prob=r.hget("url:prob:".format(rw['group']),"".format(rw['url']))
try:
if ppm < -1:
if rpm >= 2:
new_bid = min((1-0.3)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid)
else:
new_bid = min((1-0.5)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
elif (ppm >= -1) & (ppm < 0):
if rpm >= 2:
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
else:
new_bid = min((1-0.3)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
else:
pass
return 1
except Exception as e:
#log exception
return 0
## if not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
if changed:
## optimize diff_data
cost_data_diff=get_diff(cost_data_new,cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']])
cost_data_diff['optimized']=0
cost_data_diff.sort_values(by=['cost_diff'],ascending=False,inplace=True)
optimize=cost_data_diff.head(100)
optimize['optimized']=optimize.apply(optimize_val,axis=1)
pickle.dump(cost_data_diff,open("cost_data_old.p","wb"))
else:
cost_data_old.sort_values(by=['cost'],ascending=False,inplace=True)
optimize = cost_data_old[cost_data_old['optimized']==0].head(100)
optimize['optimized']=optimize.apply(optimize_val,axis=1)
pickle.dump(cost_data_old,open("cost_data_old.p","wb"))
import sys
sys.exit()
cost_data_old_example:
group acost_x cpm_x imps_x pft_x ppm_x rpm_x
0 6841 0.0002 0.12150000 2 -0.0002 -0.12150000 0E-8
1 6891 0.0002 0.19900000 1 -0.0002 -0.19900000 0E-8
2 7174 0.0001 0.14900000 1 -0.0001 -0.14900000 0E-8
3 6732 0.0001 0.14600000 1 -0.0001 -0.14600000 0E-8
4 6882 0.0001 0.13500000 1 -0.0001 -0.13500000 0E-8
5 6856 0.0001 0.10700000 1 -0.0001 -0.10700000 0E-8
6 6838 0.0001 0.08700000 1 -0.0001 -0.08700000 0E-8
7 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8
8 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8
url cost_y pft_y imps_y
0 url1.org 0.0002 -0.0002 2
1 url2.com 0.0002 -0.0002 1
2 url3.com 0.0001 -0.0001 1
3 url4.tv 0.0001 -0.0001 1
4 url5.com 0.0001 -0.0001 1
5 url6.com 0.0001 -0.0001 1
6 url7.com 0.0001 -0.0001 1
7 url8.com 0.0001 -0.0001 1
8 url9.com 0.0001 -0.0001 1
cpm_y rpm_y ppm_y pft_diff cost_diff imps_diff
0 0.12150000 0E-8 -0.12150000 0.0000 0.0000 0
1 0.19900000 0E-8 -0.19900000 0.0000 0.0000 0
2 0.14900000 0E-8 -0.14900000 0.0000 0.0000 0
3 0.14600000 0E-8 -0.14600000 0.0000 0.0000 0
4 0.13500000 0E-8 -0.13500000 0.0000 0.0000 0
5 0.10700000 0E-8 -0.10700000 0.0000 0.0000 0
6 0.08700000 0E-8 -0.08700000 0.0000 0.0000 0
7 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0
8 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0
cost_data_new_shape:
(1587, 8)
python performance python-2.7 pandas redis
The following code works and will be put into cron on a AWS server (not sure what specifications, cores, kernels etc) at a every 1 minute frequency.
However, it's my first time putting a script into production, so I'd like some help reviewing this code. Please look it over and point to any hidden mistakes or something that's risky to implement or is just bad design. Additionally, it currently runs within a minute because I've limited the design to optimize 100 rows of data at a time.
Could this be made faster and allow me to optimize even more rows? Hypothetically, I can have row count in the 10s of 1000s, but since I sort them according to my cost, they become less valuable down below- but still if it can be done, it'll help after a while when the product scales.
Stack: MySQL where main data is stored & fetched. Redis database is where changes need to go. This script will be placed in a 1 minute cron in a separate server. All are on independant AWS servers. I haven't really taken any redis based optimization while building this, not sure how much of a difference it could make.
import MySQLdb
import pandas as pd
import datetime
import redis
from decimal import Decimal, DecimalException
import math
import cPickle as pickle
import re
from pandas.util.testing import assert_frame_equal
conn = MySQLdb.connect(host="AWS_HOST", user="root", passwd="pasword",db="db_name", local_infile=1)
cursor = conn.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
now = datetime.datetime.now()+datetime.timedelta(minutes=330)
subtract = now.minute if now.minute < 30 else now.minute-30
now-=datetime.timedelta(minutes=subtract)
read_query = """
select group, url, sum(acost)/1000000 cost, sum(rpm-acost)/1000000 pft,
sum(lg) imps, sum(acost)/1000000/sum(lg)*1000 cpm, sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
from db_name.db
where dh = ''
group by group, url having pft < 0
order by sum(acost) desc;""".format(now.strftime('%Y-%m-%d %H:%M:00'))
cursor.execute(read_query)
cost_data_new = pd.DataFrame(list(cursor.fetchall()))
cost_data_old = pickle.load( open( "cost_data_old.p", "rb" ))
## manage structure
global changed,r
r = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
changed=True
try:
if assert_frame_equal(cost_data_new, cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']], check_dtype=False):
changed=False
except:
changed=True
def rename_x(col_name):
if re.match(r".*_(x)$", col_name):
return (col_name[:-2])
else:
return col_name
def cal_rpm(rw):
try:
return (rw['pft_diff']+rw['cost_diff'])/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
#print rw['group'], rw['url'], rw['pft_diff'], rw['cost_diff'], rw['imps_diff']
print "proceeding with 0"
return 0
def cal_cpm(rw):
try:
return rw['cost_diff']/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
#print rw['group'], rw['url'], rw['cost_diff'], rw['imps_diff']
print "proceeding with 0"
return 0
def cal_ppm(rw):
try:
return rw['pft_diff']/rw['imps_diff']*1000
except DecimalException:
return 0
except Exception as e:
raise e
print "proceeding with 0"
return 0
def get_diff(new,old):
diff = pd.merge(new,old,how='outer',on=['group','url'])
diff.fillna(0,inplace=True)
diff['pft_diff']=diff['pft_x']-diff['pft_y']
diff['cost_diff']=diff['cost_x']-diff['cost_y']
diff['imps_diff']=diff['imps_x']-diff['imps_y']
diff['diff_rpm']=diff[['group','url','pft_diff','cost_diff','imps_diff']].apply(cal_rpm,axis=1)
diff['diff_cpm']=diff[['group','url','cost_diff','imps_diff']].apply(cal_cpm,axis=1)
diff['diff_ppm']=diff[['group','url','pft_diff','imps_diff']].apply(cal_ppm,axis=1)
diff=diff.rename(columns=rename_x)
diff.drop(list(diff.filter(regex = '_y')), axis = 1, inplace = True)
try:
del diff['optimized']
except:
pass
return diff
def calc_bid_prob(lgpm):
beta = 0.01
alpha = 1
infl = 13
slope = 1
prob=int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
return prob
def optimize_val(rw):
ppm = rw['diff_ppm']
lg = rw['imps_diff']
rpm = rw['diff_rpm']
cpm = rw['diff_cpm']
bid = r.hget("plcmnt:".format(rw['group']),"".format(rw['url']))
try:
bid=int(bid)
except:
bid=20
b_prob=r.hget("url:prob:".format(rw['group']),"".format(rw['url']))
try:
if ppm < -1:
if rpm >= 2:
new_bid = min((1-0.3)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid)
else:
new_bid = min((1-0.5)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
elif (ppm >= -1) & (ppm < 0):
if rpm >= 2:
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
else:
new_bid = min((1-0.3)*rpm,bid)
new_bid_prob = min(calc_bid_prob(lg),b_prob)
r.hset("plcmnt:".format(rw['group']),rw['url'],new_bid)
r.hset("url:prob:".format(rw['group']),rw['url'],new_bid_prob)
else:
pass
return 1
except Exception as e:
#log exception
return 0
## if not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
if changed:
## optimize diff_data
cost_data_diff=get_diff(cost_data_new,cost_data_old[['group','url','cost','pft','imps','cpm','rpm','ppm']])
cost_data_diff['optimized']=0
cost_data_diff.sort_values(by=['cost_diff'],ascending=False,inplace=True)
optimize=cost_data_diff.head(100)
optimize['optimized']=optimize.apply(optimize_val,axis=1)
pickle.dump(cost_data_diff,open("cost_data_old.p","wb"))
else:
cost_data_old.sort_values(by=['cost'],ascending=False,inplace=True)
optimize = cost_data_old[cost_data_old['optimized']==0].head(100)
optimize['optimized']=optimize.apply(optimize_val,axis=1)
pickle.dump(cost_data_old,open("cost_data_old.p","wb"))
import sys
sys.exit()
cost_data_old_example:
group acost_x cpm_x imps_x pft_x ppm_x rpm_x
0 6841 0.0002 0.12150000 2 -0.0002 -0.12150000 0E-8
1 6891 0.0002 0.19900000 1 -0.0002 -0.19900000 0E-8
2 7174 0.0001 0.14900000 1 -0.0001 -0.14900000 0E-8
3 6732 0.0001 0.14600000 1 -0.0001 -0.14600000 0E-8
4 6882 0.0001 0.13500000 1 -0.0001 -0.13500000 0E-8
5 6856 0.0001 0.10700000 1 -0.0001 -0.10700000 0E-8
6 6838 0.0001 0.08700000 1 -0.0001 -0.08700000 0E-8
7 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8
8 6838 0.0001 0.08600000 1 -0.0001 -0.08600000 0E-8
url cost_y pft_y imps_y
0 url1.org 0.0002 -0.0002 2
1 url2.com 0.0002 -0.0002 1
2 url3.com 0.0001 -0.0001 1
3 url4.tv 0.0001 -0.0001 1
4 url5.com 0.0001 -0.0001 1
5 url6.com 0.0001 -0.0001 1
6 url7.com 0.0001 -0.0001 1
7 url8.com 0.0001 -0.0001 1
8 url9.com 0.0001 -0.0001 1
cpm_y rpm_y ppm_y pft_diff cost_diff imps_diff
0 0.12150000 0E-8 -0.12150000 0.0000 0.0000 0
1 0.19900000 0E-8 -0.19900000 0.0000 0.0000 0
2 0.14900000 0E-8 -0.14900000 0.0000 0.0000 0
3 0.14600000 0E-8 -0.14600000 0.0000 0.0000 0
4 0.13500000 0E-8 -0.13500000 0.0000 0.0000 0
5 0.10700000 0E-8 -0.10700000 0.0000 0.0000 0
6 0.08700000 0E-8 -0.08700000 0.0000 0.0000 0
7 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0
8 0.08600000 0E-8 -0.08600000 0.0000 0.0000 0
cost_data_new_shape:
(1587, 8)
python performance python-2.7 pandas redis
edited Apr 29 at 18:02
Jamalâ¦
30.1k11114225
30.1k11114225
asked Apr 29 at 9:44
Deepak
161
161
add a comment |Â
add a comment |Â
1 Answer
1
active
oldest
votes
up vote
0
down vote
In no particular order:
- I have a hard time understanding why the
get_diff
function use vectorized operations sometimes andapply
some other times. If the aim it to avoidNaN
s in the dataframe, you can simply usefillna
afterwards. If you want to avoidinf
that would come from a division by 0, you couldreplace
them with zeros orNaN
s depending on your use-case. In any case, thecal_xxx
functions are better replaced by vectorized operations. - Your lack of whitespace and your usage of acronyms as variable names makes your code hard to read and understand.
- You open things that you never close, this includes files and database connections. The
with
statement and theclosing
utility are your friends here. assert_frame_equal
can safely be replaced byequals
that behaves more nicely: it only returnsTrue
orFalse
. Thechanged
variable can then be safely eliminated.- Your top-level code is better put into one or several functions to improve maintainability and testing. In the same vein, the
global
keyword is to be avoided and having functions (such asoptimize_val
) rely on the external initialisation of an object they need to use is prone to errors. Instead, pass the object as a parameter; and if you can't controll the calling point (such as usingapply
that will call the function with a single parameter), you can make use offunctools.partial
to bind some parameters beforehand. - You should avoid putting credentials into your script, pass them on the command line (and use
argparse
for instance to retrieve them). - Don't fall into the habit of preparing SQL statement using
format
. Even though in this instance it sounds rather safe, you better train yourself to use parametrized statements.
Proposed improvements:
import re
import math
import datetime
import cPickle as pickle
from functools import partial
from contextlib import closing
import redis
import MySQLdb
import pandas as pd
READ_QUERY = """
SELECT group, url,
sum(lg) imps,
sum(acost)/1000000 cost,
sum(rpm-acost)/1000000 pft,
sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(acost)/1000000/sum(lg)*1000 cpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
FROM db_name.db
WHERE dh = %s
GROUP BY group, url HAVING pft < 0
ORDER BY sum(acost) DESC;"""
def read_database(host, user, password, database, local_infile=True):
now = datetime.datetime.now() + datetime.timedelta(minutes=330)
minutes = 0 if now.minute < 30 else 30
date = now.replace(minute=minutes, second=0, microsecond=0)
with closing(MySQLdb.connect(host=host, user=user, passwd=password, db=database, local_infile=int(local_infile))) as connection:
cursor = connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
cursor.execute(READ_QUERY, (date,))
return pd.DataFrame(list(cursor.fetchall()))
def rename_x(col_name):
if re.match(r'.*_(x)$', col_name):
return (col_name[:-2])
else:
return col_name
def get_diff(new, old):
diff = pd.merge(new, old, how='outer', on=['group', 'url'])
diff.fillna(0, inplace=True)
diff['pft_diff'] = diff['pft_x'] - diff['pft_y']
diff['cost_diff'] = diff['cost_x'] - diff['cost_y']
diff['imps_diff'] = diff['imps_x'] - diff['imps_y']
diff['diff_rpm'] = (diff['pft_diff'] + diff['cost_diff']) / diff['imps_diff'] * 1000
diff['diff_cpm'] = diff['cost_diff'] / diff['imps_diff'] * 1000
diff['diff_ppm'] = diff['pft_diff'] / diff['imps_diff'] * 1000
diff = diff.rename(columns=rename_x)
diff.drop(list(diff.filter(regex='_y')), axis=1, inplace=True)
diff.replace(pd.np.inf, 0, inplace=True)
diff['optimized'] = 0
return diff
def calc_bid_prob(lgpm, alpha=1, beta=0.01, infl=13, slope=1):
return int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
def optimize_val(rw, redis):
ppm = rw['diff_ppm']
lg = rw['imps_diff']
rpm = rw['diff_rpm']
redis_group_plcmnt = 'plcmnt:'.format(rw['group'])
redis_group_url = 'url:prob:'.format(rw['group'])
redis_url = rw['url']
bid = redis.hget(redis_group_plcmnt, redis_url)
try:
bid = int(bid)
except ValueError:
bid = 20
b_prob = redis.hget(redis_group_url, redis_url)
try:
if ppm < 0:
if ppm < -1:
if rpm >= 2:
new_bid = min((1-0.3)*rpm, bid)
else:
new_bid = min((1-0.5)*rpm, bid)
else:
if rpm < 2:
new_bid = min((1-0.3)*rpm, bid)
new_bid_prob = min(calc_bid_prob(lg), b_prob)
redis.hset(redis_group_plcmnt, redis_url, new_bid)
redis.hset(redis_group_url, redis_url, new_bid_prob)
return 1
except Exception:
return 0
def compare_and_optimize(cost_data, optimizer, filename='cost_data_old.p'):
with open(filename, 'rb') as pickled_file:
cost_data_old = pickle.load(pickled_file)
if cost_data.equals(cost_data_old):
# If not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
cost_data_result = cost_data_old
cost_data_old.sort_values(by=['cost'], ascending=False, inplace=True)
optimize = cost_data_old[cost_data_old['optimized'] == 0].head(100)
else:
# Optimize diff_data
cost_data_result = get_diff(cost_data, cost_data_old[['group', 'url', 'cost', 'pft', 'imps', 'cpm', 'rpm', 'ppm']])
cost_data_result.sort_values(by=['cost_diff'], ascending=False, inplace=True)
optimize = cost_data_result.head(100)
optimize['optimized'] = optimize.apply(optimizer, axis=1)
with open(filename, 'wb') as pickle_file:
pickle.dump(cost_data_result, pickle_file)
if __name__ == '__main__':
cost_data = read_database('AWS_HOST', 'root', 'password', 'db_name', True)
redis_db = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
compare_and_optimize(cost_data, partial(optimize_val, redis=redis_db))
Thank you for the suggestions. Let me test this and get back to you
â Deepak
May 1 at 9:10
add a comment |Â
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
In no particular order:
- I have a hard time understanding why the
get_diff
function use vectorized operations sometimes andapply
some other times. If the aim it to avoidNaN
s in the dataframe, you can simply usefillna
afterwards. If you want to avoidinf
that would come from a division by 0, you couldreplace
them with zeros orNaN
s depending on your use-case. In any case, thecal_xxx
functions are better replaced by vectorized operations. - Your lack of whitespace and your usage of acronyms as variable names makes your code hard to read and understand.
- You open things that you never close, this includes files and database connections. The
with
statement and theclosing
utility are your friends here. assert_frame_equal
can safely be replaced byequals
that behaves more nicely: it only returnsTrue
orFalse
. Thechanged
variable can then be safely eliminated.- Your top-level code is better put into one or several functions to improve maintainability and testing. In the same vein, the
global
keyword is to be avoided and having functions (such asoptimize_val
) rely on the external initialisation of an object they need to use is prone to errors. Instead, pass the object as a parameter; and if you can't controll the calling point (such as usingapply
that will call the function with a single parameter), you can make use offunctools.partial
to bind some parameters beforehand. - You should avoid putting credentials into your script, pass them on the command line (and use
argparse
for instance to retrieve them). - Don't fall into the habit of preparing SQL statement using
format
. Even though in this instance it sounds rather safe, you better train yourself to use parametrized statements.
Proposed improvements:
import re
import math
import datetime
import cPickle as pickle
from functools import partial
from contextlib import closing
import redis
import MySQLdb
import pandas as pd
READ_QUERY = """
SELECT group, url,
sum(lg) imps,
sum(acost)/1000000 cost,
sum(rpm-acost)/1000000 pft,
sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(acost)/1000000/sum(lg)*1000 cpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
FROM db_name.db
WHERE dh = %s
GROUP BY group, url HAVING pft < 0
ORDER BY sum(acost) DESC;"""
def read_database(host, user, password, database, local_infile=True):
now = datetime.datetime.now() + datetime.timedelta(minutes=330)
minutes = 0 if now.minute < 30 else 30
date = now.replace(minute=minutes, second=0, microsecond=0)
with closing(MySQLdb.connect(host=host, user=user, passwd=password, db=database, local_infile=int(local_infile))) as connection:
cursor = connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
cursor.execute(READ_QUERY, (date,))
return pd.DataFrame(list(cursor.fetchall()))
def rename_x(col_name):
if re.match(r'.*_(x)$', col_name):
return (col_name[:-2])
else:
return col_name
def get_diff(new, old):
diff = pd.merge(new, old, how='outer', on=['group', 'url'])
diff.fillna(0, inplace=True)
diff['pft_diff'] = diff['pft_x'] - diff['pft_y']
diff['cost_diff'] = diff['cost_x'] - diff['cost_y']
diff['imps_diff'] = diff['imps_x'] - diff['imps_y']
diff['diff_rpm'] = (diff['pft_diff'] + diff['cost_diff']) / diff['imps_diff'] * 1000
diff['diff_cpm'] = diff['cost_diff'] / diff['imps_diff'] * 1000
diff['diff_ppm'] = diff['pft_diff'] / diff['imps_diff'] * 1000
diff = diff.rename(columns=rename_x)
diff.drop(list(diff.filter(regex='_y')), axis=1, inplace=True)
diff.replace(pd.np.inf, 0, inplace=True)
diff['optimized'] = 0
return diff
def calc_bid_prob(lgpm, alpha=1, beta=0.01, infl=13, slope=1):
return int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
def optimize_val(rw, redis):
ppm = rw['diff_ppm']
lg = rw['imps_diff']
rpm = rw['diff_rpm']
redis_group_plcmnt = 'plcmnt:'.format(rw['group'])
redis_group_url = 'url:prob:'.format(rw['group'])
redis_url = rw['url']
bid = redis.hget(redis_group_plcmnt, redis_url)
try:
bid = int(bid)
except ValueError:
bid = 20
b_prob = redis.hget(redis_group_url, redis_url)
try:
if ppm < 0:
if ppm < -1:
if rpm >= 2:
new_bid = min((1-0.3)*rpm, bid)
else:
new_bid = min((1-0.5)*rpm, bid)
else:
if rpm < 2:
new_bid = min((1-0.3)*rpm, bid)
new_bid_prob = min(calc_bid_prob(lg), b_prob)
redis.hset(redis_group_plcmnt, redis_url, new_bid)
redis.hset(redis_group_url, redis_url, new_bid_prob)
return 1
except Exception:
return 0
def compare_and_optimize(cost_data, optimizer, filename='cost_data_old.p'):
with open(filename, 'rb') as pickled_file:
cost_data_old = pickle.load(pickled_file)
if cost_data.equals(cost_data_old):
# If not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
cost_data_result = cost_data_old
cost_data_old.sort_values(by=['cost'], ascending=False, inplace=True)
optimize = cost_data_old[cost_data_old['optimized'] == 0].head(100)
else:
# Optimize diff_data
cost_data_result = get_diff(cost_data, cost_data_old[['group', 'url', 'cost', 'pft', 'imps', 'cpm', 'rpm', 'ppm']])
cost_data_result.sort_values(by=['cost_diff'], ascending=False, inplace=True)
optimize = cost_data_result.head(100)
optimize['optimized'] = optimize.apply(optimizer, axis=1)
with open(filename, 'wb') as pickle_file:
pickle.dump(cost_data_result, pickle_file)
if __name__ == '__main__':
cost_data = read_database('AWS_HOST', 'root', 'password', 'db_name', True)
redis_db = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
compare_and_optimize(cost_data, partial(optimize_val, redis=redis_db))
Thank you for the suggestions. Let me test this and get back to you
â Deepak
May 1 at 9:10
add a comment |Â
up vote
0
down vote
In no particular order:
- I have a hard time understanding why the
get_diff
function use vectorized operations sometimes andapply
some other times. If the aim it to avoidNaN
s in the dataframe, you can simply usefillna
afterwards. If you want to avoidinf
that would come from a division by 0, you couldreplace
them with zeros orNaN
s depending on your use-case. In any case, thecal_xxx
functions are better replaced by vectorized operations. - Your lack of whitespace and your usage of acronyms as variable names makes your code hard to read and understand.
- You open things that you never close, this includes files and database connections. The
with
statement and theclosing
utility are your friends here. assert_frame_equal
can safely be replaced byequals
that behaves more nicely: it only returnsTrue
orFalse
. Thechanged
variable can then be safely eliminated.- Your top-level code is better put into one or several functions to improve maintainability and testing. In the same vein, the
global
keyword is to be avoided and having functions (such asoptimize_val
) rely on the external initialisation of an object they need to use is prone to errors. Instead, pass the object as a parameter; and if you can't controll the calling point (such as usingapply
that will call the function with a single parameter), you can make use offunctools.partial
to bind some parameters beforehand. - You should avoid putting credentials into your script, pass them on the command line (and use
argparse
for instance to retrieve them). - Don't fall into the habit of preparing SQL statement using
format
. Even though in this instance it sounds rather safe, you better train yourself to use parametrized statements.
Proposed improvements:
import re
import math
import datetime
import cPickle as pickle
from functools import partial
from contextlib import closing
import redis
import MySQLdb
import pandas as pd
READ_QUERY = """
SELECT group, url,
sum(lg) imps,
sum(acost)/1000000 cost,
sum(rpm-acost)/1000000 pft,
sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(acost)/1000000/sum(lg)*1000 cpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
FROM db_name.db
WHERE dh = %s
GROUP BY group, url HAVING pft < 0
ORDER BY sum(acost) DESC;"""
def read_database(host, user, password, database, local_infile=True):
now = datetime.datetime.now() + datetime.timedelta(minutes=330)
minutes = 0 if now.minute < 30 else 30
date = now.replace(minute=minutes, second=0, microsecond=0)
with closing(MySQLdb.connect(host=host, user=user, passwd=password, db=database, local_infile=int(local_infile))) as connection:
cursor = connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
cursor.execute(READ_QUERY, (date,))
return pd.DataFrame(list(cursor.fetchall()))
def rename_x(col_name):
if re.match(r'.*_(x)$', col_name):
return (col_name[:-2])
else:
return col_name
def get_diff(new, old):
diff = pd.merge(new, old, how='outer', on=['group', 'url'])
diff.fillna(0, inplace=True)
diff['pft_diff'] = diff['pft_x'] - diff['pft_y']
diff['cost_diff'] = diff['cost_x'] - diff['cost_y']
diff['imps_diff'] = diff['imps_x'] - diff['imps_y']
diff['diff_rpm'] = (diff['pft_diff'] + diff['cost_diff']) / diff['imps_diff'] * 1000
diff['diff_cpm'] = diff['cost_diff'] / diff['imps_diff'] * 1000
diff['diff_ppm'] = diff['pft_diff'] / diff['imps_diff'] * 1000
diff = diff.rename(columns=rename_x)
diff.drop(list(diff.filter(regex='_y')), axis=1, inplace=True)
diff.replace(pd.np.inf, 0, inplace=True)
diff['optimized'] = 0
return diff
def calc_bid_prob(lgpm, alpha=1, beta=0.01, infl=13, slope=1):
return int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
def optimize_val(rw, redis):
ppm = rw['diff_ppm']
lg = rw['imps_diff']
rpm = rw['diff_rpm']
redis_group_plcmnt = 'plcmnt:'.format(rw['group'])
redis_group_url = 'url:prob:'.format(rw['group'])
redis_url = rw['url']
bid = redis.hget(redis_group_plcmnt, redis_url)
try:
bid = int(bid)
except ValueError:
bid = 20
b_prob = redis.hget(redis_group_url, redis_url)
try:
if ppm < 0:
if ppm < -1:
if rpm >= 2:
new_bid = min((1-0.3)*rpm, bid)
else:
new_bid = min((1-0.5)*rpm, bid)
else:
if rpm < 2:
new_bid = min((1-0.3)*rpm, bid)
new_bid_prob = min(calc_bid_prob(lg), b_prob)
redis.hset(redis_group_plcmnt, redis_url, new_bid)
redis.hset(redis_group_url, redis_url, new_bid_prob)
return 1
except Exception:
return 0
def compare_and_optimize(cost_data, optimizer, filename='cost_data_old.p'):
with open(filename, 'rb') as pickled_file:
cost_data_old = pickle.load(pickled_file)
if cost_data.equals(cost_data_old):
# If not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
cost_data_result = cost_data_old
cost_data_old.sort_values(by=['cost'], ascending=False, inplace=True)
optimize = cost_data_old[cost_data_old['optimized'] == 0].head(100)
else:
# Optimize diff_data
cost_data_result = get_diff(cost_data, cost_data_old[['group', 'url', 'cost', 'pft', 'imps', 'cpm', 'rpm', 'ppm']])
cost_data_result.sort_values(by=['cost_diff'], ascending=False, inplace=True)
optimize = cost_data_result.head(100)
optimize['optimized'] = optimize.apply(optimizer, axis=1)
with open(filename, 'wb') as pickle_file:
pickle.dump(cost_data_result, pickle_file)
if __name__ == '__main__':
cost_data = read_database('AWS_HOST', 'root', 'password', 'db_name', True)
redis_db = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
compare_and_optimize(cost_data, partial(optimize_val, redis=redis_db))
Thank you for the suggestions. Let me test this and get back to you
â Deepak
May 1 at 9:10
add a comment |Â
up vote
0
down vote
up vote
0
down vote
In no particular order:
- I have a hard time understanding why the
get_diff
function use vectorized operations sometimes andapply
some other times. If the aim it to avoidNaN
s in the dataframe, you can simply usefillna
afterwards. If you want to avoidinf
that would come from a division by 0, you couldreplace
them with zeros orNaN
s depending on your use-case. In any case, thecal_xxx
functions are better replaced by vectorized operations. - Your lack of whitespace and your usage of acronyms as variable names makes your code hard to read and understand.
- You open things that you never close, this includes files and database connections. The
with
statement and theclosing
utility are your friends here. assert_frame_equal
can safely be replaced byequals
that behaves more nicely: it only returnsTrue
orFalse
. Thechanged
variable can then be safely eliminated.- Your top-level code is better put into one or several functions to improve maintainability and testing. In the same vein, the
global
keyword is to be avoided and having functions (such asoptimize_val
) rely on the external initialisation of an object they need to use is prone to errors. Instead, pass the object as a parameter; and if you can't controll the calling point (such as usingapply
that will call the function with a single parameter), you can make use offunctools.partial
to bind some parameters beforehand. - You should avoid putting credentials into your script, pass them on the command line (and use
argparse
for instance to retrieve them). - Don't fall into the habit of preparing SQL statement using
format
. Even though in this instance it sounds rather safe, you better train yourself to use parametrized statements.
Proposed improvements:
import re
import math
import datetime
import cPickle as pickle
from functools import partial
from contextlib import closing
import redis
import MySQLdb
import pandas as pd
READ_QUERY = """
SELECT group, url,
sum(lg) imps,
sum(acost)/1000000 cost,
sum(rpm-acost)/1000000 pft,
sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(acost)/1000000/sum(lg)*1000 cpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
FROM db_name.db
WHERE dh = %s
GROUP BY group, url HAVING pft < 0
ORDER BY sum(acost) DESC;"""
def read_database(host, user, password, database, local_infile=True):
now = datetime.datetime.now() + datetime.timedelta(minutes=330)
minutes = 0 if now.minute < 30 else 30
date = now.replace(minute=minutes, second=0, microsecond=0)
with closing(MySQLdb.connect(host=host, user=user, passwd=password, db=database, local_infile=int(local_infile))) as connection:
cursor = connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
cursor.execute(READ_QUERY, (date,))
return pd.DataFrame(list(cursor.fetchall()))
def rename_x(col_name):
if re.match(r'.*_(x)$', col_name):
return (col_name[:-2])
else:
return col_name
def get_diff(new, old):
diff = pd.merge(new, old, how='outer', on=['group', 'url'])
diff.fillna(0, inplace=True)
diff['pft_diff'] = diff['pft_x'] - diff['pft_y']
diff['cost_diff'] = diff['cost_x'] - diff['cost_y']
diff['imps_diff'] = diff['imps_x'] - diff['imps_y']
diff['diff_rpm'] = (diff['pft_diff'] + diff['cost_diff']) / diff['imps_diff'] * 1000
diff['diff_cpm'] = diff['cost_diff'] / diff['imps_diff'] * 1000
diff['diff_ppm'] = diff['pft_diff'] / diff['imps_diff'] * 1000
diff = diff.rename(columns=rename_x)
diff.drop(list(diff.filter(regex='_y')), axis=1, inplace=True)
diff.replace(pd.np.inf, 0, inplace=True)
diff['optimized'] = 0
return diff
def calc_bid_prob(lgpm, alpha=1, beta=0.01, infl=13, slope=1):
return int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
def optimize_val(rw, redis):
ppm = rw['diff_ppm']
lg = rw['imps_diff']
rpm = rw['diff_rpm']
redis_group_plcmnt = 'plcmnt:'.format(rw['group'])
redis_group_url = 'url:prob:'.format(rw['group'])
redis_url = rw['url']
bid = redis.hget(redis_group_plcmnt, redis_url)
try:
bid = int(bid)
except ValueError:
bid = 20
b_prob = redis.hget(redis_group_url, redis_url)
try:
if ppm < 0:
if ppm < -1:
if rpm >= 2:
new_bid = min((1-0.3)*rpm, bid)
else:
new_bid = min((1-0.5)*rpm, bid)
else:
if rpm < 2:
new_bid = min((1-0.3)*rpm, bid)
new_bid_prob = min(calc_bid_prob(lg), b_prob)
redis.hset(redis_group_plcmnt, redis_url, new_bid)
redis.hset(redis_group_url, redis_url, new_bid_prob)
return 1
except Exception:
return 0
def compare_and_optimize(cost_data, optimizer, filename='cost_data_old.p'):
with open(filename, 'rb') as pickled_file:
cost_data_old = pickle.load(pickled_file)
if cost_data.equals(cost_data_old):
# If not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
cost_data_result = cost_data_old
cost_data_old.sort_values(by=['cost'], ascending=False, inplace=True)
optimize = cost_data_old[cost_data_old['optimized'] == 0].head(100)
else:
# Optimize diff_data
cost_data_result = get_diff(cost_data, cost_data_old[['group', 'url', 'cost', 'pft', 'imps', 'cpm', 'rpm', 'ppm']])
cost_data_result.sort_values(by=['cost_diff'], ascending=False, inplace=True)
optimize = cost_data_result.head(100)
optimize['optimized'] = optimize.apply(optimizer, axis=1)
with open(filename, 'wb') as pickle_file:
pickle.dump(cost_data_result, pickle_file)
if __name__ == '__main__':
cost_data = read_database('AWS_HOST', 'root', 'password', 'db_name', True)
redis_db = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
compare_and_optimize(cost_data, partial(optimize_val, redis=redis_db))
In no particular order:
- I have a hard time understanding why the
get_diff
function use vectorized operations sometimes andapply
some other times. If the aim it to avoidNaN
s in the dataframe, you can simply usefillna
afterwards. If you want to avoidinf
that would come from a division by 0, you couldreplace
them with zeros orNaN
s depending on your use-case. In any case, thecal_xxx
functions are better replaced by vectorized operations. - Your lack of whitespace and your usage of acronyms as variable names makes your code hard to read and understand.
- You open things that you never close, this includes files and database connections. The
with
statement and theclosing
utility are your friends here. assert_frame_equal
can safely be replaced byequals
that behaves more nicely: it only returnsTrue
orFalse
. Thechanged
variable can then be safely eliminated.- Your top-level code is better put into one or several functions to improve maintainability and testing. In the same vein, the
global
keyword is to be avoided and having functions (such asoptimize_val
) rely on the external initialisation of an object they need to use is prone to errors. Instead, pass the object as a parameter; and if you can't controll the calling point (such as usingapply
that will call the function with a single parameter), you can make use offunctools.partial
to bind some parameters beforehand. - You should avoid putting credentials into your script, pass them on the command line (and use
argparse
for instance to retrieve them). - Don't fall into the habit of preparing SQL statement using
format
. Even though in this instance it sounds rather safe, you better train yourself to use parametrized statements.
Proposed improvements:
import re
import math
import datetime
import cPickle as pickle
from functools import partial
from contextlib import closing
import redis
import MySQLdb
import pandas as pd
READ_QUERY = """
SELECT group, url,
sum(lg) imps,
sum(acost)/1000000 cost,
sum(rpm-acost)/1000000 pft,
sum(rpm)/1000000/sum(lg)*1000 rpm,
sum(acost)/1000000/sum(lg)*1000 cpm,
sum(rpm-acost)/1000000/sum(lg)*1000 ppm
FROM db_name.db
WHERE dh = %s
GROUP BY group, url HAVING pft < 0
ORDER BY sum(acost) DESC;"""
def read_database(host, user, password, database, local_infile=True):
now = datetime.datetime.now() + datetime.timedelta(minutes=330)
minutes = 0 if now.minute < 30 else 30
date = now.replace(minute=minutes, second=0, microsecond=0)
with closing(MySQLdb.connect(host=host, user=user, passwd=password, db=database, local_infile=int(local_infile))) as connection:
cursor = connection.cursor(MySQLdb.cursors.DictCursor)
cursor.execute("SET NAMES utf8")
cursor.execute(READ_QUERY, (date,))
return pd.DataFrame(list(cursor.fetchall()))
def rename_x(col_name):
if re.match(r'.*_(x)$', col_name):
return (col_name[:-2])
else:
return col_name
def get_diff(new, old):
diff = pd.merge(new, old, how='outer', on=['group', 'url'])
diff.fillna(0, inplace=True)
diff['pft_diff'] = diff['pft_x'] - diff['pft_y']
diff['cost_diff'] = diff['cost_x'] - diff['cost_y']
diff['imps_diff'] = diff['imps_x'] - diff['imps_y']
diff['diff_rpm'] = (diff['pft_diff'] + diff['cost_diff']) / diff['imps_diff'] * 1000
diff['diff_cpm'] = diff['cost_diff'] / diff['imps_diff'] * 1000
diff['diff_ppm'] = diff['pft_diff'] / diff['imps_diff'] * 1000
diff = diff.rename(columns=rename_x)
diff.drop(list(diff.filter(regex='_y')), axis=1, inplace=True)
diff.replace(pd.np.inf, 0, inplace=True)
diff['optimized'] = 0
return diff
def calc_bid_prob(lgpm, alpha=1, beta=0.01, infl=13, slope=1):
return int(100 * (beta + (alpha - beta) / (1 + math.pow((lgpm / infl), slope))))
def optimize_val(rw, redis):
ppm = rw['diff_ppm']
lg = rw['imps_diff']
rpm = rw['diff_rpm']
redis_group_plcmnt = 'plcmnt:'.format(rw['group'])
redis_group_url = 'url:prob:'.format(rw['group'])
redis_url = rw['url']
bid = redis.hget(redis_group_plcmnt, redis_url)
try:
bid = int(bid)
except ValueError:
bid = 20
b_prob = redis.hget(redis_group_url, redis_url)
try:
if ppm < 0:
if ppm < -1:
if rpm >= 2:
new_bid = min((1-0.3)*rpm, bid)
else:
new_bid = min((1-0.5)*rpm, bid)
else:
if rpm < 2:
new_bid = min((1-0.3)*rpm, bid)
new_bid_prob = min(calc_bid_prob(lg), b_prob)
redis.hset(redis_group_plcmnt, redis_url, new_bid)
redis.hset(redis_group_url, redis_url, new_bid_prob)
return 1
except Exception:
return 0
def compare_and_optimize(cost_data, optimizer, filename='cost_data_old.p'):
with open(filename, 'rb') as pickled_file:
cost_data_old = pickle.load(pickled_file)
if cost_data.equals(cost_data_old):
# If not changed, proceed with old, optimize unoptimized top 100 and store as new pickle
cost_data_result = cost_data_old
cost_data_old.sort_values(by=['cost'], ascending=False, inplace=True)
optimize = cost_data_old[cost_data_old['optimized'] == 0].head(100)
else:
# Optimize diff_data
cost_data_result = get_diff(cost_data, cost_data_old[['group', 'url', 'cost', 'pft', 'imps', 'cpm', 'rpm', 'ppm']])
cost_data_result.sort_values(by=['cost_diff'], ascending=False, inplace=True)
optimize = cost_data_result.head(100)
optimize['optimized'] = optimize.apply(optimizer, axis=1)
with open(filename, 'wb') as pickle_file:
pickle.dump(cost_data_result, pickle_file)
if __name__ == '__main__':
cost_data = read_database('AWS_HOST', 'root', 'password', 'db_name', True)
redis_db = redis.Redis(host='AWS2_HOST', port=PORT, db=0)
compare_and_optimize(cost_data, partial(optimize_val, redis=redis_db))
answered Apr 30 at 13:40
Mathias Ettinger
21.8k32876
21.8k32876
Thank you for the suggestions. Let me test this and get back to you
â Deepak
May 1 at 9:10
add a comment |Â
Thank you for the suggestions. Let me test this and get back to you
â Deepak
May 1 at 9:10
Thank you for the suggestions. Let me test this and get back to you
â Deepak
May 1 at 9:10
Thank you for the suggestions. Let me test this and get back to you
â Deepak
May 1 at 9:10
add a comment |Â
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f193194%2fone-minute-cron-python-script-reads-from-mysql-and-updates-redis-based-on-calcul%23new-answer', 'question_page');
);
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password