Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

import json 

from pyspark.sql.types import Row 

from datetime import datetime, timedelta 

from azure.common import AzureMissingResourceHttpError 

 

from etl_settings import WASB_TEMPLATE 

from etl_settings import blob_service, bu_blob_service, vault_blob_service 

from etl_settings import VAULT_CONTAINER, VAULT_EVT_PARQUET, VAULT_TBL 

from etl_settings import VAULT_BULK_UPLOAD_PREFIX 

from etl_settings import TEMP_EVT_PATH 

from etl_settings import SB_ETL_EVENT_Q 

from etl_settings import format_vault_fpath 

from util.log import getDebugLogger 

from utils import WASB_RE 

from utils import send_queue_message 

from bulk import bu_extract_blob_info 

 

from products.sharefile.xform import PRODUCT_NAMES as SF_PRODUCT_NAMES # TODO 

 

logger = getDebugLogger(__name__) 

 

# 

# Events are archived at 

# wasbs://cas-vault@${storage_account_url}/data.parquet 

#  

# Bulk uploads are archived at 

# wasbs://cas-vault@${storage_account_url}/uploads/${uuid}/${file} 

 

 

RAW_DATA_TRANSFORMERS = [ 

{ 

"match": lambda evt: evt['prod'] in SF_PRODUCT_NAMES, 

"transformer": lambda body: body.replace('"$type"', '"type"').replace('"$values"', '"values"'), 

}, 

] 

 

''' 

root 

|-- SequenceNumber: long (nullable = true) 

|-- Offset: string (nullable = true) 

|-- EnqueuedTimeUtc: string (nullable = true) 

|-- SystemProperties: map (nullable = true) 

| |-- key: string 

| |-- value: struct (valueContainsNull = true) 

| | |-- member0: long (nullable = true) 

| | |-- member1: double (nullable = true) 

| | |-- member2: string (nullable = true) 

| | |-- member3: binary (nullable = true) 

|-- Properties: map (nullable = true) 

| |-- key: string 

| |-- value: struct (valueContainsNull = true) 

| | |-- member0: long (nullable = true) 

| | |-- member1: double (nullable = true) 

| | |-- member2: string (nullable = true) 

| | |-- member3: binary (nullable = true) 

|-- Body: binary (nullable = true) 

''' 

 

def eh_evt_2_events(eh_evt): 

try: 

body = eh_evt['Body'].decode('utf-8') 

j_obj = json.loads(body) 

except: 

j_obj = None 

if type(j_obj) != list: 

# no transformation if standard EH event 

return [eh_evt] 

else: 

# make new EH evt with new Body 

eh_evt.pop('Body') 

return [dict(eh_evt, **{'Body':json.dumps(evt)}) for evt in j_obj] 

 

def prepare_eh_evt_4_vault(eh_evt, batch_date, batch_key): 

try: 

evt = None 

body = eh_evt['Body'].decode('utf-8') 

evt = json.loads(body) 

xformed = False 

for xformer in RAW_DATA_TRANSFORMERS: 

if xformer["match"](evt): 

body = xformer["transformer"](body) 

xformed = True 

if xformed: 

evt = json.loads(body) 

evt["schema"] = evt.pop("$schema", None) 

# short-term: sharefile moved to "tenant":{"id":...} 

tenant = evt["tenant"] if evt.has_key("tenant") else {"id": evt['org'], "type": "orgid"} 

if 'id' in tenant: 

tenant_id = "%s:%s" % (tenant.get("type", "ccid"), tenant['id']) 

else: 

tenant_id = ','.join([":".join(x) for x in tenant.items()]) 

eh_evt.update({ 

'batch_date': batch_date, 

'batch_key': batch_key, 

'id': evt['id'], 

'type': evt['type'], 

'prod': evt['prod'].replace("NetScaler", "Netscaler"), #FIXME: tmp workaround to avoid hive exception 

'org': evt.get('org', ""), 

'tenant': tenant_id.lower(), 

'date': evt['st'].split('T')[0].replace('-', ''), 

'body2': json.dumps(evt), 

}) 

return eh_evt 

except Exception as err: 

# TODO: how to properly log/alert bad event? - CAS-1104 opened 

logger.exception('prepare_eh_evt_4_vault error!') 

eh_evt.update({ 

'batch_date': batch_date, 

'batch_key': batch_key, 

'id': '', 

'type': '', 

'prod': 'error' if evt else 'jerror', 

'org': '', 

'tenant': '', 

'date': '', 

'body2': str(err), 

}) 

return eh_evt 

 

''' 

root 

|-- Body: binary (nullable = true) 

|-- EnqueuedTimeUtc: string (nullable = true) 

|-- Offset: string (nullable = true) 

|-- Properties: string (nullable = true) 

|-- SequenceNumber: long (nullable = true) 

|-- SystemProperties: string (nullable = true) 

|-- date: string (nullable = true) 

|-- id: string (nullable = true) 

|-- org: string (nullable = true) 

|-- prod: string (nullable = true) 

|-- publisher: string (nullable = true) 

|-- src: string (nullable = true) 

|-- type: string (nullable = true) 

|-- batch_date: date (nullable = true) 

|-- batch_key: string (nullable = true) 

''' 

 

# not using hive query to avoid loading whole partition  

def archive_events(spark, evt_hub_rdd, batch_date, batch_key, vault_tbl=VAULT_TBL, 

vault_parquet=VAULT_EVT_PARQUET): 

""" 

Save the event hub data rdd to vault, and 

return the newly appended dataframe as rdd 

of CAS events (JSON object) 

""" 

logger.debug("saving to vault ...") 

df = evt_hub_rdd.flatMap(lambda evt: eh_evt_2_events(evt)).map(lambda evt: prepare_eh_evt_4_vault(evt, batch_date, batch_key)).map(lambda evt_dict: Row(**evt_dict)).toDF() 

path = format_vault_fpath(batch_date.isoformat(), batch_key, vault_parquet) 

logger.debug("archive_events save to %s ..." % (path,)) 

start_t = datetime.now() 

 

if spark.catalog._jcatalog.tableExists("default", vault_tbl): 

df.write.save(format="parquet", mode="overwrite", partitionBy=['prod'], **{"path":path}) 

 

# update partitions 

if df: 

# generate SQL statement for partition update 

df.cache() 

parts = df.select("prod").distinct().collect() 

part_template = "PARTITION (batch_date='{0}', batch_key='{1}', prod='{2}')" 

new_partitions = [part_template.format(batch_date, batch_key, p['prod']) for p in parts] 

add_partition_sql = ("ALTER TABLE {0} ADD IF NOT EXISTS {1} ".format( 

vault_tbl, " ".join(new_partitions))) 

 

logger.info('Update partitions: %s: %s ', vault_tbl, add_partition_sql) 

spark.sql(add_partition_sql).collect() 

else: 

logger.info('Create and Save hive table %s at %s', vault_tbl, vault_parquet) 

df.write.saveAsTable(vault_tbl, 

format='parquet', 

mode="append", 

partitionBy=['batch_date', 'batch_key', 'prod'], 

path=vault_parquet) 

 

logger.debug("archive_events took %si" % (datetime.now() - start_t,)) 

df = spark.read.parquet(path) 

for r in df.groupBy('prod').count().collect(): 

# Row(prod=u'Netscaler.MAS', count=2) 

send_queue_message(SB_ETL_EVENT_Q, json.dumps({ 

"type": "archive_events", 

"id": batch_key, 

"prod": r['prod'], 

"count": r['count'], 

})) 

df = df.where("prod != 'error' and prod != 'jerror'") 

rdd = df.rdd.map(lambda row: dict(json.loads(row.body2), **{ 

'tenant_id': row.tenant, 

'date': row.date, 

'batch_date':batch_date.isoformat(), 

'batch_key':batch_key, 

})) 

return df, rdd 

 

def get_tenant_id(event): 

# return (${type}:${id}, ${id}) as a tuple 

return (event['tenant_id'], event['tenant_id'].split(':', 1)[1]) 

 

# https://info.citrite.net/display/CAS/Event+Data+Format 

def archive_bulk_uploads(event, sas_token): 

uuid = event['id'] 

tenant, _ = get_tenant_id(event) 

dvc = event.get('dvc', 'unknown') 

src_container, _, blob_list, _ = bu_extract_blob_info(event) 

if not src_container: 

logger.warning("archive_bulk_uploads_impl bypass %s" % src_container) 

return 

for src_blob, src, dst in zip(blob_list, [bu_blob_service.make_blob_url(src_container, b) for b in blob_list], 

['{root}/prod={prod}/tenant={tenant}/dvc={dvc}/batch_date={batch_date}/{uuid}/{blob}'.format( 

root=VAULT_BULK_UPLOAD_PREFIX, prod=event['prod'], tenant=tenant.replace(':', '|'), dvc=dvc, 

batch_date=event['batch_date'], uuid=uuid, blob=b) for b in blob_list]): 

logger.debug("copying %s to %s ..." % (src, dst)) 

src = src + "?" + sas_token 

vault_blob_service.copy_blob(VAULT_CONTAINER, dst, src) 

# bypass send_queue_message, temp patch for CAS-6378 

continue 

send_queue_message(SB_ETL_EVENT_Q, json.dumps({ 

"type": "bu archived", 

"container": src_container, 

"src": src_blob, 

"dst": dst, 

})) 

 

 

# TODO: partitioned vault storage, wait for tenant mapping 

''' 

def repartition_vault(job_ctxt, partitionBy=['prod', 'org'], repartitionDate=None): 

""" 

""" 

if repartitionDate is None: 

repartitionDate = (datetime.now() - timedelta(days=1)).isoformat().split('T')[0].replace('-', '') 

 

mv_fr_tbl, mv_fr_container, mv_fr_parquet = (VAULT_APPEND_TBL, VAULT_APPEND_CONTAINER, VAULT_APPEND_WASB) 

mv_to_tbl, mv_to_container, mv_to_parquet = (VAULT_MAIN_TBL, VAULT_MAIN_CONTAINER, VAULT_MAIN_WASB) 

print "moving data of {date} from {mv_fr_tbl} to {mv_to_tbl} ...".format(date=repartitionDate, mv_fr_tbl=mv_fr_tbl, mv_to_tbl=mv_to_tbl) 

df_2_move = job_ctxt.spark.sql("SELECT prod, org, src, date, raw FROM {tbl} WHERE date='{date}'".format(tbl=mv_fr_tbl, date=repartitionDate)) 

df_2_move.write.saveAsTable(mv_to_tbl, format="parquet", mode="append", partitionBy=partitionBy, **{"path": mv_to_parquet}) 

 

print "dropping data of {date} from {mv_fr_tbl} ...".format(date=repartitionDate, mv_fr_tbl=mv_fr_tbl) 

job_ctxt.spark.sql("ALTER TABLE {tbl} DROP IF EXISTS PARTITION(date='{date}')".format(tbl=mv_fr_tbl, date=repartitionDate)).collect() 

print "deleting data of {date} from {wasb} ...".format(date=repartitionDate, wasb=mv_fr_parquet) 

while True: 

bn_list = [b.name for b in blob_service.list_blobs(mv_fr_container, prefix='data.parquet/date={date}/'.format(date=repartitionDate))] 

if not bn_list: 

break 

print "{n} blobs to delete ...".format(n=len(bn_list)) 

for bn in bn_list: 

print "deleting {bn} ...".format(bn=bn) 

blob_service.delete_blob(mv_fr_container, bn) 

'''