In telecom lingo, HLR or (Home Location Register) is simply a database that contains all sort of information about each mobile phone subscriber. The HLR database is usually built directly on binary system file, which allows for very fast data manipulation, but creates integrity and consistency dilemma from a security point-of-view.
I wrote this HLR decoder back in 2012 to decode Ericson HLR R13.2/14.1 and it was my very first Python application. The goal was to decode two HLR dump files and perform consistency check between them. The code is certainly not optimized and not complete, yet it can successfully decode the following records:
- ADM record: administrative data.
- ENDOFPRINTOUT record: end of printout information.
- SUBSDATA record: permanent subscribers data.
- EXTSUBSDATA record: extended subscriber data.
- CUG tag: Closed User Group data.
- GPRS tag: General Packet Radio Service data.
How to use the decoder:
First, you need to install & configure Python. If you’re a Windows user, you can follow this tutorial How to Setup a Proper Python Environment on Windows
Second, download the source code fromĀ this link and follow the instructions in the “readme.txt” file.
Feel free to use the code as you like (it’s published under Creative Commons (CC) Attribution License see: http://creativecommons.org/licenses/by/4.0/ )
Source-Code:
# Input: HLR binary dump file (ex: HLRDUMP.dat) # Output: HLR Hexadecimal file (ex: HLRDUMP_hex.hex) # Usage: C:\>python cmd_hexify.py "C:\>HLR\HLRDUMP.dat" import os import sys from datetime import datetime def getLastPosition(f): f.seek(0, os.SEEK_END) lpos = max(0, f.tell()) f.seek(0,0) while lpos > 0 : f.seek(lpos-1, 0) if f.read(1).encode("hex") != '00' : break lpos -= 1 return f.tell() def sanitizepath(path): if len(path) == 0: return; snpath = r'"' + path + '"' if path[0] == "\"": snpath = snpath[1:] if path[-1] == "\"": snpath = snpath[0:-1] return snpath def toHex(in_path, out_path, offset): try: #Open a file for read in binary mode with open(in_path, 'rb') as inFile : with open(out_path, 'ab') as outFile : #Get EOF position eof = getLastPosition(inFile) lenPool = eof-offset+1 i = 0 inFile.seek(offset, 0) sys.stdout.write('>>call file.read(%s)' % in_path) pool = inFile.read(eof-offset) sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>call file.write(%s)' % out_path) bulk = [] while i < lenPool : bulk.append(pool[i:i+1]) i+=1 if i % 10240 == 0 : outFile.write("".join(bulk).encode("hex")) bulk = [] if bulk != []: outFile.write("".join(bulk).encode("hex")) sys.stdout.write('\t[DONE]\n') return True except IOError as (errno, strerror) : #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def main(argv=sys.argv): try: if argv is None : argv = sys.argv dumps = [] hexFiles = [] if len(sys.argv) > 1 : print '\n$ time:\t%s\n' % datetime.now() dumps.append(sys.argv[1]) hexFiles.append(sys.argv[1].replace('.dat', '_hex.hex')) print '>invoke Hexify(%s)' % sys.argv[1] done = toHex(dumps[0], hexFiles[0], 0) if done: print '>invoke Hexify(%s)\t[DONE]' % sys.argv[1] print '\n$ time:\t%s\n' % datetime.now() except: print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] finally: print '>invoke sys.exit(main(sys.argv))' ##raw_input("Press any key to continue...") if __name__ == "__main__" : sys.exit(main(sys.argv))
# Input: HLR Hexadecimal file (ex: HLRDUMP_hex.hex) # Output: HLR SUBSDATA record file (ex: HLRDUMP_subsdata.hex) # Usage: C:\>python cmd_extract_subsdata.py "D:\>HLR\HLRDUMP_hex.hex" import os import sys from datetime import datetime def getLastPosition(f): f.seek(0, os.SEEK_END) lpos = max(0, f.tell()) f.seek(0,0) while lpos > 0 : f.seek(lpos-1, 0) if f.read(1).encode("hex") != '00' : break lpos -= 1 return f.tell() def sanitizepath(path): if len(path) == 0: return; snpath = r'"' + path + '"' if path[0] == "\"": snpath = snpath[1:] if path[-1] == "\"": snpath = snpath[0:-1] return os.path.normpath(os.path.normcase(snpath)) def readSUBDATA(in_path, out_path, offset): try: #Open a file for read in binary mode with open(in_path, 'rb') as inFile : eof = getLastPosition(inFile) start,end = 0,0 inFile.seek(offset, 0) sys.stdout.write('>>call file.read(%s)' % in_path) pool = inFile.read(eof-offset) sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>call file.write(%s)' % out_path) ## ## with open("dump.txt", 'w') as out: ## out.write(pool) ## with open(out_path, 'ab') as outFile : while True: start = pool.find("ff32", end) end = pool.find("ff32", start+42) if min(start, end) < 0 : if start > 0 : outFile.write( pool[start+2:pool.find("ff33", start+2)+2] ) outFile.write( '\r\n' ) break outFile.write( pool[start+2:end+2] ) outFile.write( '\r\n' ) sys.stdout.write('\t[DONE]\n') return True except IOError as (errno, strerror) : #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def main(argv=sys.argv): try: if argv is None : argv = sys.argv hexFiles = [] outFiles = [] if len(sys.argv) > 1 : print '\n$ time:\t%s\n' % datetime.now() hexFiles.append(sys.argv[1]) outFiles.append(sys.argv[1].replace("_hex.hex", "_subsdata.hex")) print '>invoke ExtractSUBDATA(%s)' % sys.argv[1] done = readSUBDATA(hexFiles[0], outFiles[0], 0) if done: print '>invoke ExtractSUBDATA(%s)\t[DONE]' % sys.argv[1] print '\n$ time:\t%s\n' % datetime.now() except: print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] finally: print 'sys.exit(main(sys.argv))' ##raw_input("Press any key to continue...") if __name__ == "__main__" : sys.exit(main(sys.argv))
# Input: - HLR binary dump file (ex: HLRDUMP.dat) # - HLR SUBSDATA record file (ex: HLRDUMP_subsdata.hex) # Output: HLR decoded CSV file (ex: HLRDUMP_parsed.csv) # Usage: C:\>python cmd_decode_subsdata.py "D:\>HLR\HLRDUMP.dat" "D:\>HLR\HLRDUMP_subsdata.hex" import sys import os import binascii from datetime import datetime if not 'DEBUG' in dir() : """ Debugging indicator. - True: Output debugging results, - False: Omit debugging results. """ DEBUG = False #or True def enum(**enums): """ Function return generic enumeration type - enums: enumerated list """ return type('Enum', (), enums) ENCODING = enum(BIN=0, BCD=1, TBCD=2, HEX=16, ISO=8) #Define encoing types def byte_to_binary(n): """ Convert bytes to binary """ return ''.join(str((n & (1 << i)) and 1) for i in reversed(range(8))) def hex_to_binary(h): """ Convert hex to binary """ return ''.join(byte_to_binary(ord(b)) for b in binascii.unhexlify(h)) def getLastPosition(in_path): """ Function that returns the last non 0x00 position. - in_path: the input file full-path + extension. """ try: #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position to EOF inFile.seek(0, os.SEEK_END) #Get EOF position lpos = max(0, inFile.tell()) while lpos > 0 : inFile.seek(lpos-1, 0) if inFile.read(1).encode("hex") != '00' : break lpos -= 1 if DEBUG : print "[DEBUG]\r\n%d\r\n[/DEBUG]\r\n" % inFile.tell() ## Debugging return inFile.tell() except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() #Raise the Exception and stop the program execution raise def sanitizepath(path): if len(path) == 0: return; snpath = r'"' + path + '"' if path[0] == "\"": snpath = snpath[1:] if path[-1] == "\"": snpath = snpath[0:-1] return os.path.normpath(os.path.normcase(snpath)) def decode(value, encoding): """ Function that decodes byte(s) according to the encoding type - value: value to decode. - encoding: encoding type. """ #Decode to Binary if encoding == ENCODING.BIN : return hex_to_binary(value.encode("hex")) #Decode to BCD if encoding == ENCODING.BCD : bcd = value.encode("hex").lower() return bcd[:bcd.find('f')] #Decode to ISO if encoding == ENCODING.ISO : return value #Decode to Hex if encoding == ENCODING.HEX : return value.encode("hex") def getRecordAMD(in_path, out_path, offset): """ Function that returns the ADM Record. The ADM record is stored in the file when subscriber data output is initiated. This record consists of administrative data. - in_path: the input file full-path + extension. """ try: #Set the record length length = 32 #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position inFile.seek(offset, 0) sys.stdout.write('>>>call file.read(%s)' % in_path) #Read the file AMD Record pool = inFile.read(length) sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>extract data') if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging out = [] #Create AMD Record Type out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 bytes #Create Exhange Idenity out.append( decode(pool[1:13], ENCODING.ISO) + ',') # 12 bytes, ISO Coded #Create Starting Year out.append( decode(pool[13:15], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Month out.append( decode(pool[15:17], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Day out.append( decode(pool[17:19], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Hour out.append( decode(pool[19:21], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Minute out.append( decode(pool[21:23], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Determine parameter given in command temp = decode(pool[23:24], ENCODING.BIN)[4:] params = [] if temp[3] == '0': params.append("1,") else: params.append("2,") if temp[2] == '0': params.append("3,") else: params.append("4,") if temp[1] == '0': params.append("5,") else: params.append("5,") if temp[0] == '0': params.append("7,") else: params.append("8,") #Create Parameter given in command out.append( "".join(params) ) # 1 bytes, Binary Coded #Create Parameter value ##out.append( decode(pool[24:32], ENCODING.BCD) + ',') # 8 bytes, BCD Coded out.append( decode(pool[24:32], ENCODING.BCD) ) # 8 bytes, BCD Coded sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>call file.write(%s)' % out_path) #Open a file for write with open(out_path, 'a') as outFile : #Write to File outFile.write("".join(out)) outFile.write('\n') sys.stdout.write('\t[DONE]\n') return True except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def getRecordEND(in_path, out_path, offset): """ Function that returns the ENDOFPRINTOUT Record. The ENDOFPRINTOUT record is stored in the file when subscriber data output is finished.. This record indicates end of printout. - in_path: the input file full-path + extension. """ try: #Set the record length length = 12 #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position inFile.seek(offset-length, 0) sys.stdout.write('>>>call file.read(%s)' % in_path) #Read the file ENDOFPRINTOUT Record pool = inFile.read(length) sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>extract data') if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging out = [] #Create ENDOFPRINTOUT Record Type out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 byte #Create Ending Year out.append( decode(pool[1:3], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Month out.append( decode(pool[3:5], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Day out.append( decode(pool[5:7], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Hour out.append( decode(pool[7:9], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Minute out.append( decode(pool[9:11], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Determine the type of end of print ##out.append( decode(pool[11:12], ENCODING.ISO) + ',') # 1 bytes, ISO Coded out.append( decode(pool[11:12], ENCODING.ISO) ) # 1 bytes, ISO Coded sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>call file.write(%s)' % out_path) #Open a file for write with open(out_path, 'a') as outFile : #Write to File outFile.write("".join(out)) outFile.write('\n') sys.stdout.write('\t[DONE]\n') return True except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def getRecordSUBSDATA(in_path, out_path, offset): """ Function that returns the SUBSDATA Record. A subscriber data block consists of two types of records per connected Subscriber: 1. one SUBSDATA record with permanent subscriber data, and 2. one optional EXTSUBSDATA record with extended subscriber data. - in_path: the input file full-path + extension. """ try: #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position inFile.seek(offset, 0) root = None print '>>>call file.read(%s)' % in_path print '>>>extract data' print '>>>call file.write(%s)' % out_path #Open a file for write with open(out_path, 'a') as outFile : for line in inFile.xreadlines() : #Create Root out = [] #Extract SUBSDATA Record type out.append( chr(int(line[:2],16)) + ',') #Extract Record number ##out.append( str(int(line[2:8],16)) + ',') #Extract ACIMSI number out.append( line[8:24].replace('f', '') + ',') #Extract MSISDN out.append( line[24:40].replace('f', '') + ',') #Extract Number of additional MSISDN out.append( str(int(line[40:42],16)) + ',') #Extract Additional MSISDN Information pos,i = 42,0 while i < int(line[40:42],16) : #Extract Additional MSISDN out.append( line[42+i*22:(42+i*22+16)].replace('f', '') + ',') #Extract BC number tied to Additional MSISDN out.append( str(int(line[42+i*22+16:42+i*22+20],16)) + ',') #Extract Service pointer for BS that represents the BC number out.append( str(int(line[42+i*22+20:42+i*22+22],16)) + ',') i = i+1 #Calculate current position if i != 0: i -= 1 pos = 42+i*22+22 i = 1 #Extract SUD Information while line[pos:pos+2] not in ['ff', 'fe', ''] : #Extract Service pointer for permanent SUD out.append( str(int(line[pos:pos+2], 16)) + ',') #Extract SUD value out.append( str(int(line[pos+2:pos+6], 16)) + ',') pos,i = pos+6, i+1 #Check if IMEISV identifier is included if line[pos:pos+2] == 'fe' : #Extract IMEISV identifier out.append( line[pos+2:pos+18] + ',') #Extract End of record ##out.append( line[pos:pos+2] + ',') out.append( line[pos:pos+2] ) #Append record length in bytes ##out.append( str((pos+2)/2) + ',') #Write to File outFile.write("".join(out)) outFile.write('\n') print '>>>call file.read(%s)\t[DONE]' % in_path print '>>>extract data\t[DONE]' print '>>>call file.write(%s)\t[DONE]' % out_path return True except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def ExtractDump(dumps, hexFiles): #Loop through each dump file index = 0 for dump in dumps: #Set output file outPath = dump.replace('.dat','_parsed.csv') #Extract AMD record print '>>invoke getRecordAMD(%s)' % dump done = getRecordAMD(dump, outPath, 0) if done: print '>>invoke getRecordAMD(%s)\t[DONE]' % dump else: raise #Extract SUBDATA record print '>>invoke getRecordSUBSDATA(%s)' % dump done = getRecordSUBSDATA(hexFiles[index], outPath, 0) if done: print '>>invoke getRecordSUBSDATA(%s)\t[DONE]' % dump else: raise #Extract ENDPRINT record print '>>invoke getRecordEND(%s)' % dump done = getRecordEND(dump, outPath, getLastPosition(dump)) if done: print '>>invoke getRecordEND(%s)\t[DONE]' % dump else: raise index += 1 def main(argv=sys.argv): try: if argv is None : argv = sys.argv dumps = [] hexFiles = [] if len(sys.argv) > 2 : print '\n$ time:\t%s\n' % datetime.now() dumps.append(sys.argv[1]) hexFiles.append(sys.argv[2]) print '>invoke ExtractDump(%s)' % sys.argv[1] ExtractDump([dumps[0]], [hexFiles[0]]) print '>invoke ExtractDump(%s)\t[DONE]' % sys.argv[1] print '\n$ time:\t%s\n' % datetime.now() except: print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] finally: print 'sys.exit(main(sys.argv))' ##raw_input("Press any key to continue...") if __name__ == "__main__" : sys.exit(main(sys.argv))
# Input: - HLR binary dump file (ex: CUGDUMP.dat) # - HLR SUBSDATA record with CUG Tag file (ex: CUGDUMP_subsdata.hex) # Output: HLR decoded CSV file (ex: CUGDUMP_parsed.csv) # Usage: C:\>python cmd_decode_cug.py "D:\>HLR\CUGDUMP.dat" "D:\>HLR\CUGDUMP_subsdata.hex" import sys import os import binascii from datetime import datetime if not 'DEBUG' in dir() : """ Debugging indicator. - True: Output debugging results, - False: Omit debugging results. """ DEBUG = False #or True def enum(**enums): """ Function return generic enumeration type - enums: enumerated list """ return type('Enum', (), enums) ENCODING = enum(BIN=0, BCD=1, TBCD=2, HEX=16, ISO=8) #Define encoing types def byte_to_binary(n): """ Convert bytes to binary """ return ''.join(str((n & (1 << i)) and 1) for i in reversed(range(8))) def hex_to_binary(h): """ Convert hex to binary """ return ''.join(byte_to_binary(ord(b)) for b in binascii.unhexlify(h)) def getLastPosition(in_path): """ Function that returns the last non 0x00 position. - in_path: the input file full-path + extension. """ try: #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position to EOF inFile.seek(0, os.SEEK_END) #Get EOF position lpos = max(0, inFile.tell()) while lpos > 0 : inFile.seek(lpos-1, 0) if inFile.read(1).encode("hex") != '00' : break lpos -= 1 if DEBUG : print "[DEBUG]\r\n%d\r\n[/DEBUG]\r\n" % inFile.tell() ## Debugging return inFile.tell() except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() #Raise the Exception and stop the program execution raise def sanitizepath(path): if len(path) == 0: return; snpath = r'"' + path + '"' if path[0] == "\"": snpath = snpath[1:] if path[-1] == "\"": snpath = snpath[0:-1] return os.path.normpath(os.path.normcase(snpath)) def decode(value, encoding): """ Function that decodes byte(s) according to the encoding type - value: value to decode. - encoding: encoding type. """ #Decode to Binary if encoding == ENCODING.BIN : return hex_to_binary(value.encode("hex")) #Decode to BCD if encoding == ENCODING.BCD : bcd = value.encode("hex").lower() return bcd[:bcd.find('f')] #Decode to ISO if encoding == ENCODING.ISO : return value #Decode to Hex if encoding == ENCODING.HEX : return value.encode("hex") def getRecordAMD(in_path, out_path, offset): """ Function that returns the ADM Record. The ADM record is stored in the file when subscriber data output is initiated. This record consists of administrative data. - in_path: the input file full-path + extension. """ try: #Set the record length length = 32 #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position inFile.seek(offset, 0) sys.stdout.write('>>>call file.read(%s)' % in_path) #Read the file AMD Record pool = inFile.read(length) sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>extract data') if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging out = [] #Create AMD Record Type out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 bytes #Create Exhange Idenity out.append( decode(pool[1:13], ENCODING.ISO) + ',') # 12 bytes, ISO Coded #Create Starting Year out.append( decode(pool[13:15], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Month out.append( decode(pool[15:17], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Day out.append( decode(pool[17:19], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Hour out.append( decode(pool[19:21], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Minute out.append( decode(pool[21:23], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Determine parameter given in command temp = decode(pool[23:24], ENCODING.BIN)[4:] params = [] if temp[3] == '0': params.append("1,") else: params.append("2,") if temp[2] == '0': params.append("3,") else: params.append("4,") if temp[1] == '0': params.append("5,") else: params.append("5,") if temp[0] == '0': params.append("7,") else: params.append("8,") #Create Parameter given in command out.append( "".join(params) ) # 1 bytes, Binary Coded #Create Parameter value ##out.append( decode(pool[24:32], ENCODING.BCD) + ',') # 8 bytes, BCD Coded out.append( decode(pool[24:32], ENCODING.BCD) ) # 8 bytes, BCD Coded sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>call file.write(%s)' % out_path) #Open a file for write with open(out_path, 'a') as outFile : #Write to File outFile.write("".join(out)) outFile.write('\n') sys.stdout.write('\t[DONE]\n') return True except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def getRecordEND(in_path, out_path, offset): """ Function that returns the ENDOFPRINTOUT Record. The ENDOFPRINTOUT record is stored in the file when subscriber data output is finished.. This record indicates end of printout. - in_path: the input file full-path + extension. """ try: #Set the record length length = 12 #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position inFile.seek(offset-length, 0) sys.stdout.write('>>>call file.read(%s)' % in_path) #Read the file ENDOFPRINTOUT Record pool = inFile.read(length) sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>extract data') if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging out = [] #Create ENDOFPRINTOUT Record Type out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 byte #Create Ending Year out.append( decode(pool[1:3], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Month out.append( decode(pool[3:5], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Day out.append( decode(pool[5:7], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Hour out.append( decode(pool[7:9], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Minute out.append( decode(pool[9:11], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Determine the type of end of print ##out.append( decode(pool[11:12], ENCODING.ISO) + ',') # 1 bytes, ISO Coded out.append( decode(pool[11:12], ENCODING.ISO) ) # 1 bytes, ISO Coded sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>call file.write(%s)' % out_path) #Open a file for write with open(out_path, 'a') as outFile : #Write to File outFile.write("".join(out)) outFile.write('\n') sys.stdout.write('\t[DONE]\n') return True except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def getRecordSUBSDATA(in_path, out_path, offset): """ Function that returns the SUBSDATA Record. A subscriber data block consists of two types of records per connected Subscriber: 1. one SUBSDATA record with permanent subscriber data, and 2. one optional EXTSUBSDATA record with extended subscriber data. - in_path: the input file full-path + extension. """ try: #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position inFile.seek(offset, 0) root = None print '>>>call file.read(%s)' % in_path print '>>>extract data' print '>>>call file.write(%s)' % out_path #Open a file for write with open(out_path, 'a') as outFile : for line in inFile.xreadlines() : #Create Root out = [] #Extract SUBSDATA Record type out.append( chr(int(line[:2],16)) + ',') #Extract Record number ##out.append( str(int(line[2:8],16)) + ',') #Extract ACIMSI number out.append( line[8:24].replace('f', '') + ',') #Extract MSISDN out.append( line[24:40].replace('f', '') + ',') #Extract Number of additional MSISDN out.append( str(int(line[40:42],16)) + ',') #Extract Additional MSISDN Information pos,i = 42,0 while i < int(line[40:42],16) : #Extract Additional MSISDN out.append( line[42+i*22:(42+i*22+16)].replace('f', '') + ',') #Extract BC number tied to Additional MSISDN out.append( str(int(line[42+i*22+16:42+i*22+20],16)) + ',') #Extract Service pointer for BS that represents the BC number out.append( str(int(line[42+i*22+20:42+i*22+22],16)) + ',') i = i+1 #Calculate current position if i != 0: i -= 1 pos = 42+i*22+22 i = 1 #Extract SUD Information while line[pos:pos+2] not in ['ff', 'fe', ''] : #Extract Service pointer for permanent SUD out.append( str(int(line[pos:pos+2], 16)) + ',') #Extract SUD value out.append( str(int(line[pos+2:pos+6], 16)) + ',') pos,i = pos+6, i+1 #Check if IMEISV identifier is included if line[pos:pos+2] == 'fe' : #Extract IMEISV identifier out.append( line[pos+2:pos+18] + ',') #Extract End of record SUBSDATA out.append( line[pos:pos+2] + ',') pos += 2; #Calculate current position ################################## # Extract Record EXTSUBSDATA Type out.append( chr(int(line[pos:pos+2],16)) + ',') # Tag Closed User Group (CUG) data (Numeral 1), 1 Byte cugtag = int(line[pos+2:pos+4],16) if cugtag != 1 : # No Tag CUG Data #Extract End of record SUBSDATA ##out.append( line[pos+2:pos+4] + ',') out.append( line[pos+2:pos+4] ) pos += 4; #Calculate current position #Append record length in bytes ##out.append( str((pos+2)/2) + ',') #Write to File outFile.write("".join(out)) outFile.write('\n') continue out.append( str(cugtag) + ',') pos += 4; #Calculate current position # Extract CUG & BSG Data while (line[pos:pos+4] != "8104"): # Subtag IntraCUG data (Numeral 128), 1 Byte out.append( str(int(line[pos:pos+2],16)) + ',') # Length of IntraCUG data (Numeral 9 - 14), 1 Byte out.append( str(int(line[pos+2:pos+4],16)) + ',') # CUG Index (Numeral 0 - 32767), 2 Byte out.append( str(int(line[pos+4:pos+8],16)) + ',') # CUG Interlock Code Network Identity (Digit string 0000 - 9999), 2 Byte out.append( str(line[pos+8:pos+12]) + ',') # Interlock Code (Numeral 0 - 65535), 2 Byte out.append( str(int(line[pos+12:pos+16],16)) + ',') # CUG restrictions (Numeral 0 - 2), 1 Byte out.append( str(int(line[pos+16:pos+18],16)) + ',') # Number of BSGs affected (Numeral 1 - 6), 1 Byte bsgnum = int(line[pos+18:pos+20],16) out.append( str(bsgnum) + ',') pos += 20; #Calculate current position # BSG service pointer (Numeral 0 - 255), 1 Byte for i in range(0, bsgnum): out.append( str(int(line[pos:pos+2],16)) + ',') pos += 2; #Calculate current position # Extract Sub CUG Data for i in range(0, bsgnum): # Subtag InterCUG data (Numeral 129), 1 Byte out.append( str(int(line[pos:pos+2],16)) + ',') # Length of InterCUG data (Numeral 4), 1 Byte out.append( str(int(line[pos+2:pos+4],16)) + ',') # BSG service pointer (Numeral 0 - 255), 1 Byte out.append( str(int(line[pos+4:pos+6],16)) + ',') # Inter CUG accessibility (Numeral 0 - 3), 1 Byte out.append( str(int(line[pos+6:pos+8],16)) + ',') # Preferential CUG (Numeral 0 - 32767, 65535), 2 Byte out.append( str(int(line[pos+8:pos+12],16)) + ',') pos += 12; #Calculate current position #Extract End of Record EXTSUBSDATA ##out.append( line[pos:pos+2] + ',') out.append( line[pos:pos+2] ) pos += 2; #Calculate current position ################################## #Append record length in bytes ##out.append( str((pos+2)/2) + ',') #Write to File outFile.write("".join(out)) outFile.write('\n') print '>>>call file.read(%s)\t[DONE]' % in_path print '>>>extract data\t[DONE]' print '>>>call file.write(%s)\t[DONE]' % out_path return True except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def ExtractDump(dumps, hexFiles): #Loop through each dump file index = 0 for dump in dumps: #Set output file outPath = dump.replace('.dat','_parsed.csv') #Extract AMD record print '>>invoke getRecordAMD(%s)' % dump done = getRecordAMD(dump, outPath, 0) if done: print '>>invoke getRecordAMD(%s)\t[DONE]' % dump else: raise #Extract SUBDATA record print '>>invoke getRecordSUBSDATA(%s)' % dump done = getRecordSUBSDATA(hexFiles[index], outPath, 0) if done: print '>>invoke getRecordSUBSDATA(%s)\t[DONE]' % dump else: raise #Extract ENDPRINT record print '>>invoke getRecordEND(%s)' % dump done = getRecordEND(dump, outPath, getLastPosition(dump)) if done: print '>>invoke getRecordEND(%s)\t[DONE]' % dump else: raise index += 1 def main(argv=sys.argv): try: if argv is None : argv = sys.argv dumps = [] hexFiles = [] if len(sys.argv) > 2 : print '\n$ time:\t%s\n' % datetime.now() dumps.append(sys.argv[1]) hexFiles.append(sys.argv[2]) print '>invoke ExtractDump(%s)' % sys.argv[1] ExtractDump([dumps[0]], [hexFiles[0]]) print '>invoke ExtractDump(%s)\t[DONE]' % sys.argv[1] print '\n$ time:\t%s\n' % datetime.now() except: print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] finally: print 'sys.exit(main(sys.argv))' ##raw_input("Press any key to continue...") if __name__ == "__main__" : sys.exit(main(sys.argv))
# Input: - HLR binary dump file (ex: GPRSDUMP.dat) # - HLR SUBSDATA record with GPRSDUMP Tag file (ex: GPRSDUMP_subsdata.hex) # Output: HLR decoded CSV file (ex: GPRSDUMP_parsed.csv) # Usage: C:\>python cmd_decode_gprs.py "D:\>HLR\GPRSDUMP.dat" "D:\>HLR\GPRSDUMP_subsdata.hex" import sys import os import binascii from datetime import datetime if not 'DEBUG' in dir() : """ Debugging indicator. - True: Output debugging results, - False: Omit debugging results. """ DEBUG = False #or True def enum(**enums): """ Function return generic enumeration type - enums: enumerated list """ return type('Enum', (), enums) ENCODING = enum(BIN=0, BCD=1, TBCD=2, HEX=16, ISO=8) #Define encoing types def byte_to_binary(n): """ Convert bytes to binary """ return ''.join(str((n & (1 << i)) and 1) for i in reversed(range(8))) def hex_to_binary(h): """ Convert hex to binary """ return ''.join(byte_to_binary(ord(b)) for b in binascii.unhexlify(h)) def getLastPosition(in_path): """ Function that returns the last non 0x00 position. - in_path: the input file full-path + extension. """ try: #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position to EOF inFile.seek(0, os.SEEK_END) #Get EOF position lpos = max(0, inFile.tell()) while lpos > 0 : inFile.seek(lpos-1, 0) if inFile.read(1).encode("hex") != '00' : break lpos -= 1 if DEBUG : print "[DEBUG]\r\n%d\r\n[/DEBUG]\r\n" % inFile.tell() ## Debugging return inFile.tell() except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() #Raise the Exception and stop the program execution raise def sanitizepath(path): if len(path) == 0: return; snpath = r'"' + path + '"' if path[0] == "\"": snpath = snpath[1:] if path[-1] == "\"": snpath = snpath[0:-1] return os.path.normpath(os.path.normcase(snpath)) def decode(value, encoding): """ Function that decodes byte(s) according to the encoding type - value: value to decode. - encoding: encoding type. """ #Decode to Binary if encoding == ENCODING.BIN : return hex_to_binary(value.encode("hex")) #Decode to BCD if encoding == ENCODING.BCD : bcd = value.encode("hex").lower() return bcd[:bcd.find('f')] #Decode to ISO if encoding == ENCODING.ISO : return value #Decode to Hex if encoding == ENCODING.HEX : return value.encode("hex") def getRecordAMD(in_path, out_path, offset): """ Function that returns the ADM Record. The ADM record is stored in the file when subscriber data output is initiated. This record consists of administrative data. - in_path: the input file full-path + extension. """ try: #Set the record length length = 32 #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position inFile.seek(offset, 0) sys.stdout.write('>>>call file.read(%s)' % in_path) #Read the file AMD Record pool = inFile.read(length) sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>extract data') if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging out = [] #Create AMD Record Type out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 bytes #Create Exhange Idenity out.append( decode(pool[1:13], ENCODING.ISO) + ',') # 12 bytes, ISO Coded #Create Starting Year out.append( decode(pool[13:15], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Month out.append( decode(pool[15:17], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Day out.append( decode(pool[17:19], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Hour out.append( decode(pool[19:21], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Starting Minute out.append( decode(pool[21:23], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Determine parameter given in command temp = decode(pool[23:24], ENCODING.BIN)[4:] params = [] if temp[3] == '0': params.append("1,") else: params.append("2,") if temp[2] == '0': params.append("3,") else: params.append("4,") if temp[1] == '0': params.append("5,") else: params.append("5,") if temp[0] == '0': params.append("7,") else: params.append("8,") #Create Parameter given in command out.append( "".join(params) ) # 1 bytes, Binary Coded #Create Parameter value ##out.append( decode(pool[24:32], ENCODING.BCD) + ',') # 8 bytes, BCD Coded out.append( decode(pool[24:32], ENCODING.BCD) ) # 8 bytes, BCD Coded sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>call file.write(%s)' % out_path) #Open a file for write with open(out_path, 'a') as outFile : #Write to File outFile.write("".join(out)) outFile.write('\n') sys.stdout.write('\t[DONE]\n') return True except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def getRecordEND(in_path, out_path, offset): """ Function that returns the ENDOFPRINTOUT Record. The ENDOFPRINTOUT record is stored in the file when subscriber data output is finished.. This record indicates end of printout. - in_path: the input file full-path + extension. """ try: #Set the record length length = 12 #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position inFile.seek(offset-length, 0) sys.stdout.write('>>>call file.read(%s)' % in_path) #Read the file ENDOFPRINTOUT Record pool = inFile.read(length) sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>extract data') if DEBUG : print "[DEBUG]\r\n%s\r\n[/DEBUG]\r\n" % pool ## Debugging out = [] #Create ENDOFPRINTOUT Record Type out.append( decode(pool[:1], ENCODING.ISO) + ',') # 1 byte #Create Ending Year out.append( decode(pool[1:3], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Month out.append( decode(pool[3:5], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Day out.append( decode(pool[5:7], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Hour out.append( decode(pool[7:9], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Create Ending Minute out.append( decode(pool[9:11], ENCODING.ISO) + ',') # 2 bytes, ISO Coded #Determine the type of end of print ##out.append( decode(pool[11:12], ENCODING.ISO) + ',') # 1 bytes, ISO Coded out.append( decode(pool[11:12], ENCODING.ISO) ) # 1 bytes, ISO Coded sys.stdout.write('\t[DONE]\n') sys.stdout.write('>>>call file.write(%s)' % out_path) #Open a file for write with open(out_path, 'a') as outFile : #Write to File outFile.write("".join(out)) outFile.write('\n') sys.stdout.write('\t[DONE]\n') return True except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def getRecordSUBSDATA(in_path, out_path, offset): """ Function that returns the SUBSDATA Record. A subscriber data block consists of two types of records per connected Subscriber: 1. one SUBSDATA record with permanent subscriber data, and 2. one optional EXTSUBSDATA record with extended subscriber data. - in_path: the input file full-path + extension. """ try: #Open a file for read in binary mode with open(in_path, 'rb') as inFile : #Set the file's current position inFile.seek(offset, 0) root = None print '>>>call file.read(%s)' % in_path print '>>>extract data' print '>>>call file.write(%s)' % out_path #Open a file for write with open(out_path, 'a') as outFile : for line in inFile.xreadlines() : #Create Root out = [] #Extract SUBSDATA Record type #out.append( chr(int(line[:2],16)) + ',') #Extract Record number ##out.append( str(int(line[2:8],16)) + ',') #Extract ACIMSI number out.append( line[8:24].replace('f', '') + ',') #Extract MSISDN out.append( line[24:40].replace('f', '') + ',') #Extract Number of additional MSISDN #out.append( str(int(line[40:42],16)) + ',') #Extract Additional MSISDN Information pos,i = 42,0 while i < int(line[40:42],16) : #Extract Additional MSISDN out.append( line[42+i*22:(42+i*22+16)].replace('f', '') + ',') #Extract BC number tied to Additional MSISDN #out.append( str(int(line[42+i*22+16:42+i*22+20],16)) + ',') #Extract Service pointer for BS that represents the BC number #out.append( str(int(line[42+i*22+20:42+i*22+22],16)) + ',') i = i+1 #Calculate current position if i != 0: i -= 1 pos = 42+i*22+22 i = 1 #Extract SUD Information while line[pos:pos+2] not in ['ff', 'fe', ''] : #Extract Service pointer for permanent SUD #out.append( str(int(line[pos:pos+2], 16)) + ',') #Extract SUD value #out.append( str(int(line[pos+2:pos+6], 16)) + ',') pos,i = pos+6, i+1 #Check if IMEISV identifier is included #if line[pos:pos+2] == 'fe' : #Extract IMEISV identifier #out.append( line[pos+2:pos+18] + ',') #Extract End of record SUBSDATA #out.append( line[pos:pos+2] + ',') pos += 2; #Calculate current position ################################## # Extract Record EXTSUBSDATA Type #out.append( chr(int(line[pos:pos+2],16)) + ',') # Tag GPRS data (Numeral 5), 1 Byte gprstag = int(line[pos+2:pos+4],16) if gprstag != 5 : # No Tag GPRS Data #Extract End of record SUBSDATA ##out.append( line[pos+2:pos+4] + ',') #out.append( line[pos+2:pos+4] ) pos += 4; #Calculate current position #Append record length in bytes ##out.append( str((pos+2)/2) + ',') #Write to File outFile.write("".join(out)) outFile.write('\n') continue #out.append( str(gprstag) + ',') # Subtag Rel-4 PDP data (Numeral 130), 1 Byte #out.append( str(int(line[pos+4:pos+6],16)) + ',') # Length of PDP data (Numeral 10-271), 2 Byte #out.append( str(int(line[pos+6:pos+10],16)) + ',') # Number of PDP contexts (Numeral 1 - 10), 1 Byte pdpnum = int(line[pos+10:pos+12],16) out.append( str(pdpnum) + ',') pos += 12; #Calculate current position # Extract PDP Data for i in range(0, pdpnum): # PDP context identifier (Numeral 1 - 10), 1 Byte out.append( 'PDPID-' + str(int(line[pos:pos+2],16)) + ',') # PDP addressing type (Numeral 0 - 2), 1 Byte pdpaddtype = int(line[pos+2:pos+4],16) #out.append( str(pdpaddtype) + ',') # Check PDP Addressing type (dynamic, IPv4, or IPv6) if pdpaddtype == 0 : # PDP Addressing type is dynamic out.append('PDPADD-0,') # Access Point Name (APN) identifier (Numeral 0 - 16383, 65535), 2 Byte out.append( 'APNID-' + str(int(line[pos+4:pos+8],16)) + ',') # Extended Quality Of Service (QOS) identifier (Numeral 0 - 4095), 2 Byte out.append( 'QOSID-' + str(int(line[pos+8:pos+12],16)) + ',') # Visited Public Land Mobile Network (VPLMN) address allowed (Numeral 0 - 1), 1 Byte out.append( 'VPLMN-' + str(int(line[pos+12:pos+14],16)) + ',') # PDP type (Numeral 0 - 2), 1 Byte out.append( 'PDPT-' + str(int(line[pos+14:pos+16],16)) + ',') # PDP context charging characteristics Indicator (Numeral 0, 255), 1 Byte pdpcontextcharging = int(line[pos+16:pos+18],16) out.append( 'PDPCH-' + str(pdpcontextcharging) + ',') if pdpcontextcharging == 255 : # Enhanced PDP context charging characteristics value (Numeral 0 - 65535), 2 Byte out.append( 'PDPECH-' + str(int(line[pos+18:pos+22],16)) + ',') pos += 22; #Calculate current position else: pos += 18; #Calculate current position elif pdpaddtype == 1 : # PDP Addressing type is IPv4 # %todo% #raise Exception("# %todo%\tIPv4") #print "\n# %todo%\tIPv4\n" # IPv4 PDP address out.append( 'PDPIPv4-' + str(int(line[pos+4:pos+6],16)) + '.' + str(int(line[pos+6:pos+8],16)) + '.' + str(int(line[pos+8:pos+10],16)) + '.' + str(int(line[pos+10:pos+12],16)) + ',') # Access Point Name (APN) identifier (Numeral 0 - 16383, 65535), 2 Byte out.append( 'APNID-' + str(int(line[pos+12:pos+16],16)) + ',') # Extended Quality Of Service (QOS) identifier (Numeral 0 - 4095), 2 Byte out.append( 'QOSID-' + str(int(line[pos+16:pos+20],16)) + ',') # Visited Public Land Mobile Network (VPLMN) address allowed (Numeral 0 - 1), 1 Byte out.append( 'VPLMN-' + str(int(line[pos+20:pos+22],16)) + ',') # PDP type (Numeral 0 - 2), 1 Byte out.append( 'PDPT-' + str(int(line[pos+22:pos+24],16)) + ',') # PDP context charging characteristics Indicator (Numeral 0, 255), 1 Byte pdpcontextcharging = int(line[pos+24:pos+26],16) out.append( 'PDPCH-' + str(pdpcontextcharging) + ',') if pdpcontextcharging == 255 : # Enhanced PDP context charging characteristics value (Numeral 0 - 65535), 2 Byte out.append( 'PDPECH' + str(int(line[pos+26:pos+30],16)) + ',') pos += 30; #Calculate current position else: pos += 26; #Calculate current position elif pdpaddtype == 2 : # PDP Addressing type is IPv6 # %todo% print "XOXO" raise Exception("# %todo%\tIPv6") #Extract End of Record EXTSUBSDATA ##out.append( line[pos:pos+2] + ',') #out.append( line[pos:pos+2] ) pos += 2; #Calculate current position ################################## #Append record length in bytes ##out.append( str((pos+2)/2) + ',') #Write to File outFile.write("".join(out)) outFile.write('\n') print '>>>call file.read(%s)\t[DONE]' % in_path print '>>>extract data\t[DONE]' print '>>>call file.write(%s)\t[DONE]' % out_path return True except IOError as (errno, strerror): #Output IO error message print "\nException:\n$ I/O error({0}): {1}\n".format(errno, strerror) except: #Output Exception message print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] #Close opened file if 'inFile' in dir() and not inFile.closed : inFile.close() if 'outFile' in dir() and not outFile.closed : outFile.close() #Raise the Exception and stop the program execution raise def ExtractDump(dumps, hexFiles): #Loop through each dump file index = 0 for dump in dumps: #Set output file outPath = dump.replace('.dat','_parsed.csv') #Extract AMD record print '>>invoke getRecordAMD(%s)' % dump done = getRecordAMD(dump, outPath, 0) if done: print '>>invoke getRecordAMD(%s)\t[DONE]' % dump else: raise #Extract SUBDATA record print '>>invoke getRecordSUBSDATA(%s)' % dump done = getRecordSUBSDATA(hexFiles[index], outPath, 0) if done: print '>>invoke getRecordSUBSDATA(%s)\t[DONE]' % dump else: raise #Extract ENDPRINT record print '>>invoke getRecordEND(%s)' % dump done = getRecordEND(dump, outPath, getLastPosition(dump)) if done: print '>>invoke getRecordEND(%s)\t[DONE]' % dump else: raise index += 1 def main(argv=sys.argv): try: if argv is None : argv = sys.argv dumps = [] hexFiles = [] if len(sys.argv) > 2 : print '\n$ time:\t%s\n' % datetime.now() dumps.append(sys.argv[1]) hexFiles.append(sys.argv[2]) print '>invoke ExtractDump(%s)' % sys.argv[1] ExtractDump([dumps[0]], [hexFiles[0]]) print '>invoke ExtractDump(%s)\t[DONE]' % sys.argv[1] print '\n$ time:\t%s\n' % datetime.now() except: print "\nException:\n$ Unexpected error:%s\n" % sys.exc_info()[0] finally: print 'sys.exit(main(sys.argv))' ##raw_input("Press any key to continue...") if __name__ == "__main__" : sys.exit(main(sys.argv))