In telecom lingo, HLR or (Home Location Register) is simply a database that contains all sort of information about each mobile phone subscriber. The HLR database is usually built directly on binary system file, which allows for very fast data manipulation, but creates integrity and consistency dilemma from a security point-of-view.
I wrote this HLR decoder back in 2012 to decode Ericson HLR R13.2/14.1 and it was my very first Python application. The goal was to decode two HLR dump files and perform consistency check between them. The code is certainly not optimized and not complete, yet it can successfully decode the following records:
- ADM record: administrative data.
- ENDOFPRINTOUT record: end of printout information.
- SUBSDATA record: permanent subscribers data.
- EXTSUBSDATA record: extended subscriber data.
- CUG tag: Closed User Group data.
- GPRS tag: General Packet Radio Service data.
How to use the decoder:
First, you need to install & configure Python. If you’re a Windows user, you can follow this tutorial How to Setup a Proper Python Environment on Windows
Second, download the source code from this link and follow the instructions in the “readme.txt” file.
Feel free to use the code as you like (it’s published under Creative Commons (CC) Attribution License see: http://creativecommons.org/licenses/by/4.0/ )
Source-Code:
# Input: HLR binary dump file (ex: HLRDUMP.dat)
# Output: HLR Hexadecimal file (ex: HLRDUMP_hex.hex)
# Usage: C:\>python cmd_hexify.py "C:\>HLR\HLRDUMP.dat"
import os
import sys
from datetime import datetime
def getLastPosition(f):
f.seek(0, os.SEEK_END)
lpos = max(0, f.tell())
f.seek(0,0)
while lpos > 0 :
f.seek(lpos-1, 0)
if f.read(1).encode("hex") != '00' :
break
lpos -= 1
return f.tell()
def sanitizepath(path):
if len(path) == 0:
return;
snpath = r'"' + path + '"'
if path[0] == "\"":
snpath = snpath[1:]
if path[-1] == "\"":
snpath = snpath[0:-1]
return snpath
def toHex(in_path, out_path, offset):
try:
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
with open(out_path, 'ab') as outFile :
#Get EOF position
eof = getLastPosition(inFile)
lenPool = eof-offset+1
i = 0
inFile.seek(offset, 0)
sys.stdout.write('>>call file.read(%s)' % in_path)
pool = inFile.read(eof-offset)
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>call file.write(%s)' % out_path)
bulk = []
while i < lenPool :
bulk.append(pool[i:i+1])
i+=1
if i % 10240 == 0 :
outFile.write("".join(bulk).encode("hex"))
bulk = []
if bulk != []:
outFile.write("".join(bulk).encode("hex"))
sys.stdout.write('\t[DONE]\n')
return True
except IOError as (errno, strerror) :
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def main(argv=sys.argv):
try:
if argv is None :
argv = sys.argv
dumps = []
hexFiles = []
if len(sys.argv) > 1 :
print '\n$ time:\t%s\n' % datetime.now()
dumps.append(sys.argv[1])
hexFiles.append(sys.argv[1].replace('.dat', '_hex.hex'))
print '>invoke Hexify(%s)' % sys.argv[1]
done = toHex(dumps[0], hexFiles[0], 0)
if done:
print '>invoke Hexify(%s)\t[DONE]' % sys.argv[1]
print '\n$ time:\t%s\n' % datetime.now()
except:
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
finally:
print '>invoke sys.exit(main(sys.argv))'
##raw_input("Press any key to continue...")
if __name__ == "__main__" :
sys.exit(main(sys.argv))
# Input: - HLR binary dump file (ex: HLRDUMP.dat)
# - HLR SUBSDATA record file (ex: HLRDUMP_subsdata.hex)
# Output: HLR decoded CSV file (ex: HLRDUMP_parsed.csv)
# Usage: C:\>python cmd_decode_subsdata.py "D:\>HLR\HLRDUMP.dat" "D:\>HLR\HLRDUMP_subsdata.hex"
import sys
import os
import binascii
from datetime import datetime
if not 'DEBUG' in dir() :
"""
Debugging indicator.
- True: Output debugging results,
- False: Omit debugging results.
"""
DEBUG = False #or True
def enum(**enums):
"""
Function return generic enumeration type
- enums: enumerated list
"""
return type('Enum', (), enums)
ENCODING = enum(BIN=0, BCD=1, TBCD=2, HEX=16, ISO=8) #Define encoing types
def byte_to_binary(n):
"""
Convert bytes to binary
"""
return ''.join(str((n & (1 << i)) and 1) for i in reversed(range(8)))
def hex_to_binary(h):
"""
Convert hex to binary
"""
return ''.join(byte_to_binary(ord(b)) for b in binascii.unhexlify(h))
def getLastPosition(in_path):
"""
Function that returns the last non 0x00 position.
- in_path: the input file full-path + extension.
"""
try:
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position to EOF
inFile.seek(0, os.SEEK_END)
#Get EOF position
lpos = max(0, inFile.tell())
while lpos > 0 :
inFile.seek(lpos-1, 0)
if inFile.read(1).encode("hex") != '00' :
break
lpos -= 1
if DEBUG : print "[DEBUG]\r\n%d\r\n[/DEBUG]\r\n" % inFile.tell() ## Debugging
return inFile.tell()
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
#Raise the Exception and stop the program execution
raise
def sanitizepath(path):
if len(path) == 0:
return;
snpath = r'"' + path + '"'
if path[0] == "\"":
snpath = snpath[1:]
if path[-1] == "\"":
snpath = snpath[0:-1]
return os.path.normpath(os.path.normcase(snpath))
def decode(value, encoding):
"""
Function that decodes byte(s) according to the encoding type
- value: value to decode.
- encoding: encoding type.
"""
#Decode to Binary
if encoding == ENCODING.BIN :
return hex_to_binary(value.encode("hex"))
#Decode to BCD
if encoding == ENCODING.BCD :
bcd = value.encode("hex").lower()
return bcd[:bcd.find('f')]
#Decode to ISO
if encoding == ENCODING.ISO :
return value
#Decode to Hex
if encoding == ENCODING.HEX :
return value.encode("hex")
def getRecordAMD(in_path, out_path, offset):
"""
Function that returns the ADM Record.
The ADM record is stored in the file when subscriber data output is initiated.
This record consists of administrative data.
- in_path: the input file full-path + extension.
"""
try:
#Set the record length
length = 32
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position
inFile.seek(offset, 0)
sys.stdout.write('>>>call file.read(%s)' % in_path)
#Read the file AMD Record
pool = inFile.read(length)
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>extract data')
if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging
out = []
#Create AMD Record Type
out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 bytes
#Create Exhange Idenity
out.append( decode(pool[1:13], ENCODING.ISO) + ',') # 12 bytes, ISO Coded
#Create Starting Year
out.append( decode(pool[13:15], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Month
out.append( decode(pool[15:17], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Day
out.append( decode(pool[17:19], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Hour
out.append( decode(pool[19:21], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Minute
out.append( decode(pool[21:23], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Determine parameter given in command
temp = decode(pool[23:24], ENCODING.BIN)[4:]
params = []
if temp[3] == '0':
params.append("1,")
else:
params.append("2,")
if temp[2] == '0':
params.append("3,")
else:
params.append("4,")
if temp[1] == '0':
params.append("5,")
else:
params.append("5,")
if temp[0] == '0':
params.append("7,")
else:
params.append("8,")
#Create Parameter given in command
out.append( "".join(params) ) # 1 bytes, Binary Coded
#Create Parameter value
##out.append( decode(pool[24:32], ENCODING.BCD) + ',') # 8 bytes, BCD Coded
out.append( decode(pool[24:32], ENCODING.BCD) ) # 8 bytes, BCD Coded
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>call file.write(%s)' % out_path)
#Open a file for write
with open(out_path, 'a') as outFile :
#Write to File
outFile.write("".join(out))
outFile.write('\n')
sys.stdout.write('\t[DONE]\n')
return True
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def getRecordEND(in_path, out_path, offset):
"""
Function that returns the ENDOFPRINTOUT Record.
The ENDOFPRINTOUT record is stored in the file when subscriber data output is finished..
This record indicates end of printout.
- in_path: the input file full-path + extension.
"""
try:
#Set the record length
length = 12
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position
inFile.seek(offset-length, 0)
sys.stdout.write('>>>call file.read(%s)' % in_path)
#Read the file ENDOFPRINTOUT Record
pool = inFile.read(length)
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>extract data')
if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging
out = []
#Create ENDOFPRINTOUT Record Type
out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 byte
#Create Ending Year
out.append( decode(pool[1:3], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Month
out.append( decode(pool[3:5], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Day
out.append( decode(pool[5:7], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Hour
out.append( decode(pool[7:9], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Minute
out.append( decode(pool[9:11], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Determine the type of end of print
##out.append( decode(pool[11:12], ENCODING.ISO) + ',') # 1 bytes, ISO Coded
out.append( decode(pool[11:12], ENCODING.ISO) ) # 1 bytes, ISO Coded
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>call file.write(%s)' % out_path)
#Open a file for write
with open(out_path, 'a') as outFile :
#Write to File
outFile.write("".join(out))
outFile.write('\n')
sys.stdout.write('\t[DONE]\n')
return True
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def getRecordSUBSDATA(in_path, out_path, offset):
"""
Function that returns the SUBSDATA Record.
A subscriber data block consists of two types of records per connected Subscriber:
1. one SUBSDATA record with permanent subscriber data, and
2. one optional EXTSUBSDATA record with extended subscriber data.
- in_path: the input file full-path + extension.
"""
try:
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position
inFile.seek(offset, 0)
root = None
print '>>>call file.read(%s)' % in_path
print '>>>extract data'
print '>>>call file.write(%s)' % out_path
#Open a file for write
with open(out_path, 'a') as outFile :
for line in inFile.xreadlines() :
#Create Root
out = []
#Extract SUBSDATA Record type
out.append( chr(int(line[:2],16)) + ',')
#Extract Record number
##out.append( str(int(line[2:8],16)) + ',')
#Extract ACIMSI number
out.append( line[8:24].replace('f', '') + ',')
#Extract MSISDN
out.append( line[24:40].replace('f', '') + ',')
#Extract Number of additional MSISDN
out.append( str(int(line[40:42],16)) + ',')
#Extract Additional MSISDN Information
pos,i = 42,0
while i < int(line[40:42],16) :
#Extract Additional MSISDN
out.append( line[42+i*22:(42+i*22+16)].replace('f', '') + ',')
#Extract BC number tied to Additional MSISDN
out.append( str(int(line[42+i*22+16:42+i*22+20],16)) + ',')
#Extract Service pointer for BS that represents the BC number
out.append( str(int(line[42+i*22+20:42+i*22+22],16)) + ',')
i = i+1
#Calculate current position
if i != 0:
i -= 1
pos = 42+i*22+22
i = 1
#Extract SUD Information
while line[pos:pos+2] not in ['ff', 'fe', ''] :
#Extract Service pointer for permanent SUD
out.append( str(int(line[pos:pos+2], 16)) + ',')
#Extract SUD value
out.append( str(int(line[pos+2:pos+6], 16)) + ',')
pos,i = pos+6, i+1
#Check if IMEISV identifier is included
if line[pos:pos+2] == 'fe' :
#Extract IMEISV identifier
out.append( line[pos+2:pos+18] + ',')
#Extract End of record
##out.append( line[pos:pos+2] + ',')
out.append( line[pos:pos+2] )
#Append record length in bytes
##out.append( str((pos+2)/2) + ',')
#Write to File
outFile.write("".join(out))
outFile.write('\n')
print '>>>call file.read(%s)\t[DONE]' % in_path
print '>>>extract data\t[DONE]'
print '>>>call file.write(%s)\t[DONE]' % out_path
return True
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def ExtractDump(dumps, hexFiles):
#Loop through each dump file
index = 0
for dump in dumps:
#Set output file
outPath = dump.replace('.dat','_parsed.csv')
#Extract AMD record
print '>>invoke getRecordAMD(%s)' % dump
done = getRecordAMD(dump, outPath, 0)
if done:
print '>>invoke getRecordAMD(%s)\t[DONE]' % dump
else:
raise
#Extract SUBDATA record
print '>>invoke getRecordSUBSDATA(%s)' % dump
done = getRecordSUBSDATA(hexFiles[index], outPath, 0)
if done:
print '>>invoke getRecordSUBSDATA(%s)\t[DONE]' % dump
else:
raise
#Extract ENDPRINT record
print '>>invoke getRecordEND(%s)' % dump
done = getRecordEND(dump, outPath, getLastPosition(dump))
if done:
print '>>invoke getRecordEND(%s)\t[DONE]' % dump
else:
raise
index += 1
def main(argv=sys.argv):
try:
if argv is None :
argv = sys.argv
dumps = []
hexFiles = []
if len(sys.argv) > 2 :
print '\n$ time:\t%s\n' % datetime.now()
dumps.append(sys.argv[1])
hexFiles.append(sys.argv[2])
print '>invoke ExtractDump(%s)' % sys.argv[1]
ExtractDump([dumps[0]], [hexFiles[0]])
print '>invoke ExtractDump(%s)\t[DONE]' % sys.argv[1]
print '\n$ time:\t%s\n' % datetime.now()
except:
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
finally:
print 'sys.exit(main(sys.argv))'
##raw_input("Press any key to continue...")
if __name__ == "__main__" :
sys.exit(main(sys.argv))
# Input: - HLR binary dump file (ex: CUGDUMP.dat)
# - HLR SUBSDATA record with CUG Tag file (ex: CUGDUMP_subsdata.hex)
# Output: HLR decoded CSV file (ex: CUGDUMP_parsed.csv)
# Usage: C:\>python cmd_decode_cug.py "D:\>HLR\CUGDUMP.dat" "D:\>HLR\CUGDUMP_subsdata.hex"
import sys
import os
import binascii
from datetime import datetime
if not 'DEBUG' in dir() :
"""
Debugging indicator.
- True: Output debugging results,
- False: Omit debugging results.
"""
DEBUG = False #or True
def enum(**enums):
"""
Function return generic enumeration type
- enums: enumerated list
"""
return type('Enum', (), enums)
ENCODING = enum(BIN=0, BCD=1, TBCD=2, HEX=16, ISO=8) #Define encoing types
def byte_to_binary(n):
"""
Convert bytes to binary
"""
return ''.join(str((n & (1 << i)) and 1) for i in reversed(range(8)))
def hex_to_binary(h):
"""
Convert hex to binary
"""
return ''.join(byte_to_binary(ord(b)) for b in binascii.unhexlify(h))
def getLastPosition(in_path):
"""
Function that returns the last non 0x00 position.
- in_path: the input file full-path + extension.
"""
try:
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position to EOF
inFile.seek(0, os.SEEK_END)
#Get EOF position
lpos = max(0, inFile.tell())
while lpos > 0 :
inFile.seek(lpos-1, 0)
if inFile.read(1).encode("hex") != '00' :
break
lpos -= 1
if DEBUG : print "[DEBUG]\r\n%d\r\n[/DEBUG]\r\n" % inFile.tell() ## Debugging
return inFile.tell()
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
#Raise the Exception and stop the program execution
raise
def sanitizepath(path):
if len(path) == 0:
return;
snpath = r'"' + path + '"'
if path[0] == "\"":
snpath = snpath[1:]
if path[-1] == "\"":
snpath = snpath[0:-1]
return os.path.normpath(os.path.normcase(snpath))
def decode(value, encoding):
"""
Function that decodes byte(s) according to the encoding type
- value: value to decode.
- encoding: encoding type.
"""
#Decode to Binary
if encoding == ENCODING.BIN :
return hex_to_binary(value.encode("hex"))
#Decode to BCD
if encoding == ENCODING.BCD :
bcd = value.encode("hex").lower()
return bcd[:bcd.find('f')]
#Decode to ISO
if encoding == ENCODING.ISO :
return value
#Decode to Hex
if encoding == ENCODING.HEX :
return value.encode("hex")
def getRecordAMD(in_path, out_path, offset):
"""
Function that returns the ADM Record.
The ADM record is stored in the file when subscriber data output is initiated.
This record consists of administrative data.
- in_path: the input file full-path + extension.
"""
try:
#Set the record length
length = 32
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position
inFile.seek(offset, 0)
sys.stdout.write('>>>call file.read(%s)' % in_path)
#Read the file AMD Record
pool = inFile.read(length)
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>extract data')
if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging
out = []
#Create AMD Record Type
out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 bytes
#Create Exhange Idenity
out.append( decode(pool[1:13], ENCODING.ISO) + ',') # 12 bytes, ISO Coded
#Create Starting Year
out.append( decode(pool[13:15], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Month
out.append( decode(pool[15:17], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Day
out.append( decode(pool[17:19], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Hour
out.append( decode(pool[19:21], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Minute
out.append( decode(pool[21:23], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Determine parameter given in command
temp = decode(pool[23:24], ENCODING.BIN)[4:]
params = []
if temp[3] == '0':
params.append("1,")
else:
params.append("2,")
if temp[2] == '0':
params.append("3,")
else:
params.append("4,")
if temp[1] == '0':
params.append("5,")
else:
params.append("5,")
if temp[0] == '0':
params.append("7,")
else:
params.append("8,")
#Create Parameter given in command
out.append( "".join(params) ) # 1 bytes, Binary Coded
#Create Parameter value
##out.append( decode(pool[24:32], ENCODING.BCD) + ',') # 8 bytes, BCD Coded
out.append( decode(pool[24:32], ENCODING.BCD) ) # 8 bytes, BCD Coded
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>call file.write(%s)' % out_path)
#Open a file for write
with open(out_path, 'a') as outFile :
#Write to File
outFile.write("".join(out))
outFile.write('\n')
sys.stdout.write('\t[DONE]\n')
return True
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def getRecordEND(in_path, out_path, offset):
"""
Function that returns the ENDOFPRINTOUT Record.
The ENDOFPRINTOUT record is stored in the file when subscriber data output is finished..
This record indicates end of printout.
- in_path: the input file full-path + extension.
"""
try:
#Set the record length
length = 12
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position
inFile.seek(offset-length, 0)
sys.stdout.write('>>>call file.read(%s)' % in_path)
#Read the file ENDOFPRINTOUT Record
pool = inFile.read(length)
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>extract data')
if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging
out = []
#Create ENDOFPRINTOUT Record Type
out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 byte
#Create Ending Year
out.append( decode(pool[1:3], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Month
out.append( decode(pool[3:5], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Day
out.append( decode(pool[5:7], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Hour
out.append( decode(pool[7:9], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Minute
out.append( decode(pool[9:11], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Determine the type of end of print
##out.append( decode(pool[11:12], ENCODING.ISO) + ',') # 1 bytes, ISO Coded
out.append( decode(pool[11:12], ENCODING.ISO) ) # 1 bytes, ISO Coded
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>call file.write(%s)' % out_path)
#Open a file for write
with open(out_path, 'a') as outFile :
#Write to File
outFile.write("".join(out))
outFile.write('\n')
sys.stdout.write('\t[DONE]\n')
return True
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def getRecordSUBSDATA(in_path, out_path, offset):
"""
Function that returns the SUBSDATA Record.
A subscriber data block consists of two types of records per connected Subscriber:
1. one SUBSDATA record with permanent subscriber data, and
2. one optional EXTSUBSDATA record with extended subscriber data.
- in_path: the input file full-path + extension.
"""
try:
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position
inFile.seek(offset, 0)
root = None
print '>>>call file.read(%s)' % in_path
print '>>>extract data'
print '>>>call file.write(%s)' % out_path
#Open a file for write
with open(out_path, 'a') as outFile :
for line in inFile.xreadlines() :
#Create Root
out = []
#Extract SUBSDATA Record type
out.append( chr(int(line[:2],16)) + ',')
#Extract Record number
##out.append( str(int(line[2:8],16)) + ',')
#Extract ACIMSI number
out.append( line[8:24].replace('f', '') + ',')
#Extract MSISDN
out.append( line[24:40].replace('f', '') + ',')
#Extract Number of additional MSISDN
out.append( str(int(line[40:42],16)) + ',')
#Extract Additional MSISDN Information
pos,i = 42,0
while i < int(line[40:42],16) :
#Extract Additional MSISDN
out.append( line[42+i*22:(42+i*22+16)].replace('f', '') + ',')
#Extract BC number tied to Additional MSISDN
out.append( str(int(line[42+i*22+16:42+i*22+20],16)) + ',')
#Extract Service pointer for BS that represents the BC number
out.append( str(int(line[42+i*22+20:42+i*22+22],16)) + ',')
i = i+1
#Calculate current position
if i != 0:
i -= 1
pos = 42+i*22+22
i = 1
#Extract SUD Information
while line[pos:pos+2] not in ['ff', 'fe', ''] :
#Extract Service pointer for permanent SUD
out.append( str(int(line[pos:pos+2], 16)) + ',')
#Extract SUD value
out.append( str(int(line[pos+2:pos+6], 16)) + ',')
pos,i = pos+6, i+1
#Check if IMEISV identifier is included
if line[pos:pos+2] == 'fe' :
#Extract IMEISV identifier
out.append( line[pos+2:pos+18] + ',')
#Extract End of record SUBSDATA
out.append( line[pos:pos+2] + ',')
pos += 2; #Calculate current position
##################################
# Extract Record EXTSUBSDATA Type
out.append( chr(int(line[pos:pos+2],16)) + ',')
# Tag Closed User Group (CUG) data (Numeral 1), 1 Byte
cugtag = int(line[pos+2:pos+4],16)
if cugtag != 1 : # No Tag CUG Data
#Extract End of record SUBSDATA
##out.append( line[pos+2:pos+4] + ',')
out.append( line[pos+2:pos+4] )
pos += 4; #Calculate current position
#Append record length in bytes
##out.append( str((pos+2)/2) + ',')
#Write to File
outFile.write("".join(out))
outFile.write('\n')
continue
out.append( str(cugtag) + ',')
pos += 4; #Calculate current position
# Extract CUG & BSG Data
while (line[pos:pos+4] != "8104"):
# Subtag IntraCUG data (Numeral 128), 1 Byte
out.append( str(int(line[pos:pos+2],16)) + ',')
# Length of IntraCUG data (Numeral 9 - 14), 1 Byte
out.append( str(int(line[pos+2:pos+4],16)) + ',')
# CUG Index (Numeral 0 - 32767), 2 Byte
out.append( str(int(line[pos+4:pos+8],16)) + ',')
# CUG Interlock Code Network Identity (Digit string 0000 - 9999), 2 Byte
out.append( str(line[pos+8:pos+12]) + ',')
# Interlock Code (Numeral 0 - 65535), 2 Byte
out.append( str(int(line[pos+12:pos+16],16)) + ',')
# CUG restrictions (Numeral 0 - 2), 1 Byte
out.append( str(int(line[pos+16:pos+18],16)) + ',')
# Number of BSGs affected (Numeral 1 - 6), 1 Byte
bsgnum = int(line[pos+18:pos+20],16)
out.append( str(bsgnum) + ',')
pos += 20; #Calculate current position
# BSG service pointer (Numeral 0 - 255), 1 Byte
for i in range(0, bsgnum):
out.append( str(int(line[pos:pos+2],16)) + ',')
pos += 2; #Calculate current position
# Extract Sub CUG Data
for i in range(0, bsgnum):
# Subtag InterCUG data (Numeral 129), 1 Byte
out.append( str(int(line[pos:pos+2],16)) + ',')
# Length of InterCUG data (Numeral 4), 1 Byte
out.append( str(int(line[pos+2:pos+4],16)) + ',')
# BSG service pointer (Numeral 0 - 255), 1 Byte
out.append( str(int(line[pos+4:pos+6],16)) + ',')
# Inter CUG accessibility (Numeral 0 - 3), 1 Byte
out.append( str(int(line[pos+6:pos+8],16)) + ',')
# Preferential CUG (Numeral 0 - 32767, 65535), 2 Byte
out.append( str(int(line[pos+8:pos+12],16)) + ',')
pos += 12; #Calculate current position
#Extract End of Record EXTSUBSDATA
##out.append( line[pos:pos+2] + ',')
out.append( line[pos:pos+2] )
pos += 2; #Calculate current position
##################################
#Append record length in bytes
##out.append( str((pos+2)/2) + ',')
#Write to File
outFile.write("".join(out))
outFile.write('\n')
print '>>>call file.read(%s)\t[DONE]' % in_path
print '>>>extract data\t[DONE]'
print '>>>call file.write(%s)\t[DONE]' % out_path
return True
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def ExtractDump(dumps, hexFiles):
#Loop through each dump file
index = 0
for dump in dumps:
#Set output file
outPath = dump.replace('.dat','_parsed.csv')
#Extract AMD record
print '>>invoke getRecordAMD(%s)' % dump
done = getRecordAMD(dump, outPath, 0)
if done:
print '>>invoke getRecordAMD(%s)\t[DONE]' % dump
else:
raise
#Extract SUBDATA record
print '>>invoke getRecordSUBSDATA(%s)' % dump
done = getRecordSUBSDATA(hexFiles[index], outPath, 0)
if done:
print '>>invoke getRecordSUBSDATA(%s)\t[DONE]' % dump
else:
raise
#Extract ENDPRINT record
print '>>invoke getRecordEND(%s)' % dump
done = getRecordEND(dump, outPath, getLastPosition(dump))
if done:
print '>>invoke getRecordEND(%s)\t[DONE]' % dump
else:
raise
index += 1
def main(argv=sys.argv):
try:
if argv is None :
argv = sys.argv
dumps = []
hexFiles = []
if len(sys.argv) > 2 :
print '\n$ time:\t%s\n' % datetime.now()
dumps.append(sys.argv[1])
hexFiles.append(sys.argv[2])
print '>invoke ExtractDump(%s)' % sys.argv[1]
ExtractDump([dumps[0]], [hexFiles[0]])
print '>invoke ExtractDump(%s)\t[DONE]' % sys.argv[1]
print '\n$ time:\t%s\n' % datetime.now()
except:
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
finally:
print 'sys.exit(main(sys.argv))'
##raw_input("Press any key to continue...")
if __name__ == "__main__" :
sys.exit(main(sys.argv))
# Input: - HLR binary dump file (ex: GPRSDUMP.dat)
# - HLR SUBSDATA record with GPRSDUMP Tag file (ex: GPRSDUMP_subsdata.hex)
# Output: HLR decoded CSV file (ex: GPRSDUMP_parsed.csv)
# Usage: C:\>python cmd_decode_gprs.py "D:\>HLR\GPRSDUMP.dat" "D:\>HLR\GPRSDUMP_subsdata.hex"
import sys
import os
import binascii
from datetime import datetime
if not 'DEBUG' in dir() :
"""
Debugging indicator.
- True: Output debugging results,
- False: Omit debugging results.
"""
DEBUG = False #or True
def enum(**enums):
"""
Function return generic enumeration type
- enums: enumerated list
"""
return type('Enum', (), enums)
ENCODING = enum(BIN=0, BCD=1, TBCD=2, HEX=16, ISO=8) #Define encoing types
def byte_to_binary(n):
"""
Convert bytes to binary
"""
return ''.join(str((n & (1 << i)) and 1) for i in reversed(range(8)))
def hex_to_binary(h):
"""
Convert hex to binary
"""
return ''.join(byte_to_binary(ord(b)) for b in binascii.unhexlify(h))
def getLastPosition(in_path):
"""
Function that returns the last non 0x00 position.
- in_path: the input file full-path + extension.
"""
try:
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position to EOF
inFile.seek(0, os.SEEK_END)
#Get EOF position
lpos = max(0, inFile.tell())
while lpos > 0 :
inFile.seek(lpos-1, 0)
if inFile.read(1).encode("hex") != '00' :
break
lpos -= 1
if DEBUG : print "[DEBUG]\r\n%d\r\n[/DEBUG]\r\n" % inFile.tell() ## Debugging
return inFile.tell()
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
#Raise the Exception and stop the program execution
raise
def sanitizepath(path):
if len(path) == 0:
return;
snpath = r'"' + path + '"'
if path[0] == "\"":
snpath = snpath[1:]
if path[-1] == "\"":
snpath = snpath[0:-1]
return os.path.normpath(os.path.normcase(snpath))
def decode(value, encoding):
"""
Function that decodes byte(s) according to the encoding type
- value: value to decode.
- encoding: encoding type.
"""
#Decode to Binary
if encoding == ENCODING.BIN :
return hex_to_binary(value.encode("hex"))
#Decode to BCD
if encoding == ENCODING.BCD :
bcd = value.encode("hex").lower()
return bcd[:bcd.find('f')]
#Decode to ISO
if encoding == ENCODING.ISO :
return value
#Decode to Hex
if encoding == ENCODING.HEX :
return value.encode("hex")
def getRecordAMD(in_path, out_path, offset):
"""
Function that returns the ADM Record.
The ADM record is stored in the file when subscriber data output is initiated.
This record consists of administrative data.
- in_path: the input file full-path + extension.
"""
try:
#Set the record length
length = 32
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position
inFile.seek(offset, 0)
sys.stdout.write('>>>call file.read(%s)' % in_path)
#Read the file AMD Record
pool = inFile.read(length)
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>extract data')
if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging
out = []
#Create AMD Record Type
out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 bytes
#Create Exhange Idenity
out.append( decode(pool[1:13], ENCODING.ISO) + ',') # 12 bytes, ISO Coded
#Create Starting Year
out.append( decode(pool[13:15], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Month
out.append( decode(pool[15:17], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Day
out.append( decode(pool[17:19], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Hour
out.append( decode(pool[19:21], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Starting Minute
out.append( decode(pool[21:23], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Determine parameter given in command
temp = decode(pool[23:24], ENCODING.BIN)[4:]
params = []
if temp[3] == '0':
params.append("1,")
else:
params.append("2,")
if temp[2] == '0':
params.append("3,")
else:
params.append("4,")
if temp[1] == '0':
params.append("5,")
else:
params.append("5,")
if temp[0] == '0':
params.append("7,")
else:
params.append("8,")
#Create Parameter given in command
out.append( "".join(params) ) # 1 bytes, Binary Coded
#Create Parameter value
##out.append( decode(pool[24:32], ENCODING.BCD) + ',') # 8 bytes, BCD Coded
out.append( decode(pool[24:32], ENCODING.BCD) ) # 8 bytes, BCD Coded
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>call file.write(%s)' % out_path)
#Open a file for write
with open(out_path, 'a') as outFile :
#Write to File
outFile.write("".join(out))
outFile.write('\n')
sys.stdout.write('\t[DONE]\n')
return True
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def getRecordEND(in_path, out_path, offset):
"""
Function that returns the ENDOFPRINTOUT Record.
The ENDOFPRINTOUT record is stored in the file when subscriber data output is finished..
This record indicates end of printout.
- in_path: the input file full-path + extension.
"""
try:
#Set the record length
length = 12
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position
inFile.seek(offset-length, 0)
sys.stdout.write('>>>call file.read(%s)' % in_path)
#Read the file ENDOFPRINTOUT Record
pool = inFile.read(length)
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>extract data')
if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging
out = []
#Create ENDOFPRINTOUT Record Type
out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 byte
#Create Ending Year
out.append( decode(pool[1:3], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Month
out.append( decode(pool[3:5], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Day
out.append( decode(pool[5:7], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Hour
out.append( decode(pool[7:9], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Create Ending Minute
out.append( decode(pool[9:11], ENCODING.ISO) + ',') # 2 bytes, ISO Coded
#Determine the type of end of print
##out.append( decode(pool[11:12], ENCODING.ISO) + ',') # 1 bytes, ISO Coded
out.append( decode(pool[11:12], ENCODING.ISO) ) # 1 bytes, ISO Coded
sys.stdout.write('\t[DONE]\n')
sys.stdout.write('>>>call file.write(%s)' % out_path)
#Open a file for write
with open(out_path, 'a') as outFile :
#Write to File
outFile.write("".join(out))
outFile.write('\n')
sys.stdout.write('\t[DONE]\n')
return True
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def getRecordSUBSDATA(in_path, out_path, offset):
"""
Function that returns the SUBSDATA Record.
A subscriber data block consists of two types of records per connected Subscriber:
1. one SUBSDATA record with permanent subscriber data, and
2. one optional EXTSUBSDATA record with extended subscriber data.
- in_path: the input file full-path + extension.
"""
try:
#Open a file for read in binary mode
with open(in_path, 'rb') as inFile :
#Set the file's current position
inFile.seek(offset, 0)
root = None
print '>>>call file.read(%s)' % in_path
print '>>>extract data'
print '>>>call file.write(%s)' % out_path
#Open a file for write
with open(out_path, 'a') as outFile :
for line in inFile.xreadlines() :
#Create Root
out = []
#Extract SUBSDATA Record type
#out.append( chr(int(line[:2],16)) + ',')
#Extract Record number
##out.append( str(int(line[2:8],16)) + ',')
#Extract ACIMSI number
out.append( line[8:24].replace('f', '') + ',')
#Extract MSISDN
out.append( line[24:40].replace('f', '') + ',')
#Extract Number of additional MSISDN
#out.append( str(int(line[40:42],16)) + ',')
#Extract Additional MSISDN Information
pos,i = 42,0
while i < int(line[40:42],16) :
#Extract Additional MSISDN
out.append( line[42+i*22:(42+i*22+16)].replace('f', '') + ',')
#Extract BC number tied to Additional MSISDN
#out.append( str(int(line[42+i*22+16:42+i*22+20],16)) + ',')
#Extract Service pointer for BS that represents the BC number
#out.append( str(int(line[42+i*22+20:42+i*22+22],16)) + ',')
i = i+1
#Calculate current position
if i != 0:
i -= 1
pos = 42+i*22+22
i = 1
#Extract SUD Information
while line[pos:pos+2] not in ['ff', 'fe', ''] :
#Extract Service pointer for permanent SUD
#out.append( str(int(line[pos:pos+2], 16)) + ',')
#Extract SUD value
#out.append( str(int(line[pos+2:pos+6], 16)) + ',')
pos,i = pos+6, i+1
#Check if IMEISV identifier is included
#if line[pos:pos+2] == 'fe' :
#Extract IMEISV identifier
#out.append( line[pos+2:pos+18] + ',')
#Extract End of record SUBSDATA
#out.append( line[pos:pos+2] + ',')
pos += 2; #Calculate current position
##################################
# Extract Record EXTSUBSDATA Type
#out.append( chr(int(line[pos:pos+2],16)) + ',')
# Tag GPRS data (Numeral 5), 1 Byte
gprstag = int(line[pos+2:pos+4],16)
if gprstag != 5 : # No Tag GPRS Data
#Extract End of record SUBSDATA
##out.append( line[pos+2:pos+4] + ',')
#out.append( line[pos+2:pos+4] )
pos += 4; #Calculate current position
#Append record length in bytes
##out.append( str((pos+2)/2) + ',')
#Write to File
outFile.write("".join(out))
outFile.write('\n')
continue
#out.append( str(gprstag) + ',')
# Subtag Rel-4 PDP data (Numeral 130), 1 Byte
#out.append( str(int(line[pos+4:pos+6],16)) + ',')
# Length of PDP data (Numeral 10-271), 2 Byte
#out.append( str(int(line[pos+6:pos+10],16)) + ',')
# Number of PDP contexts (Numeral 1 - 10), 1 Byte
pdpnum = int(line[pos+10:pos+12],16)
out.append( str(pdpnum) + ',')
pos += 12; #Calculate current position
# Extract PDP Data
for i in range(0, pdpnum):
# PDP context identifier (Numeral 1 - 10), 1 Byte
out.append( 'PDPID-' + str(int(line[pos:pos+2],16)) + ',')
# PDP addressing type (Numeral 0 - 2), 1 Byte
pdpaddtype = int(line[pos+2:pos+4],16)
#out.append( str(pdpaddtype) + ',')
# Check PDP Addressing type (dynamic, IPv4, or IPv6)
if pdpaddtype == 0 : # PDP Addressing type is dynamic
out.append('PDPADD-0,')
# Access Point Name (APN) identifier (Numeral 0 - 16383, 65535), 2 Byte
out.append( 'APNID-' + str(int(line[pos+4:pos+8],16)) + ',')
# Extended Quality Of Service (QOS) identifier (Numeral 0 - 4095), 2 Byte
out.append( 'QOSID-' + str(int(line[pos+8:pos+12],16)) + ',')
# Visited Public Land Mobile Network (VPLMN) address allowed (Numeral 0 - 1), 1 Byte
out.append( 'VPLMN-' + str(int(line[pos+12:pos+14],16)) + ',')
# PDP type (Numeral 0 - 2), 1 Byte
out.append( 'PDPT-' + str(int(line[pos+14:pos+16],16)) + ',')
# PDP context charging characteristics Indicator (Numeral 0, 255), 1 Byte
pdpcontextcharging = int(line[pos+16:pos+18],16)
out.append( 'PDPCH-' + str(pdpcontextcharging) + ',')
if pdpcontextcharging == 255 :
# Enhanced PDP context charging characteristics value (Numeral 0 - 65535), 2 Byte
out.append( 'PDPECH-' + str(int(line[pos+18:pos+22],16)) + ',')
pos += 22; #Calculate current position
else:
pos += 18; #Calculate current position
elif pdpaddtype == 1 : # PDP Addressing type is IPv4
# %todo%
#raise Exception("# %todo%\tIPv4")
#print "\n# %todo%\tIPv4\n"
# IPv4 PDP address
out.append( 'PDPIPv4-' + str(int(line[pos+4:pos+6],16)) + '.' + str(int(line[pos+6:pos+8],16)) + '.' + str(int(line[pos+8:pos+10],16)) + '.' + str(int(line[pos+10:pos+12],16)) + ',')
# Access Point Name (APN) identifier (Numeral 0 - 16383, 65535), 2 Byte
out.append( 'APNID-' + str(int(line[pos+12:pos+16],16)) + ',')
# Extended Quality Of Service (QOS) identifier (Numeral 0 - 4095), 2 Byte
out.append( 'QOSID-' + str(int(line[pos+16:pos+20],16)) + ',')
# Visited Public Land Mobile Network (VPLMN) address allowed (Numeral 0 - 1), 1 Byte
out.append( 'VPLMN-' + str(int(line[pos+20:pos+22],16)) + ',')
# PDP type (Numeral 0 - 2), 1 Byte
out.append( 'PDPT-' + str(int(line[pos+22:pos+24],16)) + ',')
# PDP context charging characteristics Indicator (Numeral 0, 255), 1 Byte
pdpcontextcharging = int(line[pos+24:pos+26],16)
out.append( 'PDPCH-' + str(pdpcontextcharging) + ',')
if pdpcontextcharging == 255 :
# Enhanced PDP context charging characteristics value (Numeral 0 - 65535), 2 Byte
out.append( 'PDPECH' + str(int(line[pos+26:pos+30],16)) + ',')
pos += 30; #Calculate current position
else:
pos += 26; #Calculate current position
elif pdpaddtype == 2 : # PDP Addressing type is IPv6
# %todo%
print "XOXO"
raise Exception("# %todo%\tIPv6")
#Extract End of Record EXTSUBSDATA
##out.append( line[pos:pos+2] + ',')
#out.append( line[pos:pos+2] )
pos += 2; #Calculate current position
##################################
#Append record length in bytes
##out.append( str((pos+2)/2) + ',')
#Write to File
outFile.write("".join(out))
outFile.write('\n')
print '>>>call file.read(%s)\t[DONE]' % in_path
print '>>>extract data\t[DONE]'
print '>>>call file.write(%s)\t[DONE]' % out_path
return True
except IOError as (errno, strerror):
#Output IO error message
print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror)
except:
#Output Exception message
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
#Close opened file
if 'inFile' in dir() and not inFile.closed :
inFile.close()
if 'outFile' in dir() and not outFile.closed :
outFile.close()
#Raise the Exception and stop the program execution
raise
def ExtractDump(dumps, hexFiles):
#Loop through each dump file
index = 0
for dump in dumps:
#Set output file
outPath = dump.replace('.dat','_parsed.csv')
#Extract AMD record
print '>>invoke getRecordAMD(%s)' % dump
done = getRecordAMD(dump, outPath, 0)
if done:
print '>>invoke getRecordAMD(%s)\t[DONE]' % dump
else:
raise
#Extract SUBDATA record
print '>>invoke getRecordSUBSDATA(%s)' % dump
done = getRecordSUBSDATA(hexFiles[index], outPath, 0)
if done:
print '>>invoke getRecordSUBSDATA(%s)\t[DONE]' % dump
else:
raise
#Extract ENDPRINT record
print '>>invoke getRecordEND(%s)' % dump
done = getRecordEND(dump, outPath, getLastPosition(dump))
if done:
print '>>invoke getRecordEND(%s)\t[DONE]' % dump
else:
raise
index += 1
def main(argv=sys.argv):
try:
if argv is None :
argv = sys.argv
dumps = []
hexFiles = []
if len(sys.argv) > 2 :
print '\n$ time:\t%s\n' % datetime.now()
dumps.append(sys.argv[1])
hexFiles.append(sys.argv[2])
print '>invoke ExtractDump(%s)' % sys.argv[1]
ExtractDump([dumps[0]], [hexFiles[0]])
print '>invoke ExtractDump(%s)\t[DONE]' % sys.argv[1]
print '\n$ time:\t%s\n' % datetime.now()
except:
print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0]
finally:
print 'sys.exit(main(sys.argv))'
##raw_input("Press any key to continue...")
if __name__ == "__main__" :
sys.exit(main(sys.argv))