我需要一个包含 tnsnames.ora 文件中所有数据库连接的字典。
我需要从这里开始:

(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=mydbserver.mydomain.com)(PORT=1521)))(CONNECT_DATA=(SID=CATAL)(SERVER=DEDICATED)(SERVICE_NAME=mydb.mydomain.com)))
对此:
{'DESCRIPTION': [{'ADDRESS_LIST': [{'ADDRESS': [{'PROTOCOL': 'TCP'},
                                               {'HOST': 'mydbserver.mydomain.com'},
                                               {'PORT': '1521'}
                                               ]
                                   }]
                  },
                  {'CONNECT_DATA': [{'SID': 'CATAL'},
                                    {'SERVER': 'DEDICATED'},
                                    {'SERVICE_NAME': 'mydb.mydomain.com'}
                                   ]
                  }
                ]
}
到目前为止,我的代码是:
def get_param(param_string):
    print("get_param input:", param_string)
    if param_string.count("(") != param_string.count(")"):
        raise Exception("Number of '(' is not egal to number of ')' : " + str(param_string.count("(")) + " and " + str(param_string.count(")")))
    else:
        param_string = param_string[1:-1]
        splitted     = param_string.split("=")
        keywork      = splitted[0]
        if len(splitted) == 2:
            return {keywork: splitted[1]}
        else:
            splitted.remove(keywork)
            values       = "=".join(splitted)
            return {keywork: get_value_list(values)}

def get_value_list(value_string):
    print("get_value_list input:", value_string)
    to_return = list()
    if "=" not in value_string and "(" not in value_string and ")" not in value_string:
        to_return.append(value_string)
    elif value_string[0] != "(":
        raise Exception("[ERROR] Format error '(' is not the first char: " + repr(value_string))
    else:
        parenth_count = 0
        strlen        = len(value_string)
        current_value = ""
        for i in range(0,strlen):
            current_char = value_string[i]
            current_value += current_char
            if current_char == "(":
                parenth_count += 1
            elif current_char == ")":
                parenth_count += -1
                if parenth_count == 0:
                    to_return.append(get_param(current_value))
                    if i != (strlen - 1):
                        if value_string[i+1] == "(":
                           to_return += get_value_list(value_string[i+1:])
                        else:
                            raise Exception("Format error - Next char should be a '('. value_string[i+1]:" + repr(value_string[i+1]) )
                    break
    print("get_value_list return:", to_return)
    if len(to_return) == 0:
        to_return = ""
    elif len(to_return) == 1:
        to_return = to_return[0]
    return to_return

connection_infos   = "(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=mydbserver.mydomain.com)(PORT=1521)))(CONNECT_DATA=(SID=CATAL)(SERVER=DEDICATED)(SERVICE_NAME=mydb.mydomain.com)))"
current_connection = get_param(connection_infos)
print("current_connection:", current_connection)
pprint(current_connection)
我得到了这个:
{'DESCRIPTION': [{'ADDRESS_LIST': {'ADDRESS': [{'PROTOCOL': 'TCP'},
                                               {'HOST': 'mydbserver.mydomain.com'},
                                                'PORT']
                                   }
                  },
                  'CONNECT_DATA'
                ]
}
所以我做错了什么。而且我觉得我做的事情太复杂了。有人会指出我犯的一些错误或帮助我找到一种更简单的方法来做到这一点吗?

最佳答案

我现在有一个工作代码,但我对它并不满意。它太长,不灵活,并且不适用于其他一些 tnsnames.ora 可能的格式:

class Tnsnames():
    def __init__(self, file_path, file_name='tnsnames.ora'):
        self.file_path  = file_path
        self.file_name  = file_name
        self.load_file()
    def load_file(self):
        try:
            fhd = open(os.path.join(self.file_path, self.file_name), 'rt', encoding='utf-8')
        except:
            raise
        else:
            #Oracle doc : https://docs.oracle.com/cd/B28359_01/network.111/b28317/tnsnames.htm#NETRF007
            file_content      = list()
            for l in fhd:
                l = " ".join(l.split()).strip(" \n")
                if len(l) > 0:
                    if "#" not in l:
                        file_content.append(l)
            fhd.close()
            file_content           = " ".join(file_content)
            connections_list       = dict()
            current_depth          = 0
            current_word           = ""
            current_keyword        = ""
            name_to_register       = ""
            is_in_add_list         = False
            current_addr           = dict()
            connections_aliases    = dict()
            stop_registering       = False
            connections_duplicates = list()
            for c in file_content:
                if c == " ":
                    pass
                elif c == "=":
                    current_keyword = str(current_word)
                    current_word    = ""
                    if current_keyword == "ADDRESS_LIST":
                        is_in_add_list = True
                elif c == "(":
                    if current_depth == 0:
                        current_keyword = current_keyword.upper()
                        names_list      = current_keyword.replace(" ","").split(",")
                        if len(names_list) == 1:
                            name_to_register = names_list[0]
                        else:
                            name_to_register = None
                            # We use either the first name with at least
                            # a dot in it, or the longest one.
                            for n in names_list:
                                if "." in n:
                                    name_to_register = n
                                    break
                            else:
                                name_to_register = max(names_list, key=len)
                            names_list.remove(name_to_register)
                            for n in names_list:
                                if n in connections_aliases.keys():
                                    print("[ERROR] already registered alias:", n,
                                          ". Registered to:", connections_aliases[n],
                                          ". New:", name_to_register,
                                          ". This possible duplicate will not be registered.")
                                    connections_duplicates.append(n)
                                    stop_registering = True
                                else:
                                    connections_aliases[n] = name_to_register
                        if not stop_registering:
                            connections_list[name_to_register] = {"ADDRESS_LIST": list(),
                                                                  "CONNECT_DATA": dict(),
                                                                  "LAST_TEST_TS": None}
                        current_depth += 1
                    elif current_depth in [1,2,3]:
                        current_depth += 1
                    else:
                        print("[ERROR] Incorrect depth:", repr(current_depth), ". Current connection will not be registered" )
                        del connections_list[name_to_register]
                        stop_registering = True
                elif c == ")":
                    if current_depth == 1:
                        if stop_registering:
                            stop_registering = False
                        else:
                            # Before moving to next connection,
                            # we check that current connection
                            # have at least a HOST, and a SID or
                            # SERVICE_NAME
                            connection_is_valid = True
                            if isinstance(connections_list[name_to_register]["ADDRESS_LIST"], dict):
                                if "HOST" not in connections_list[name_to_register]["ADDRESS_LIST"].keys():
                                    print("[ERROR] Only one address defined, and no HOST defined. Current connection will not be registered:", name_to_register)
                                    connection_is_valid = False
                            elif isinstance(connections_list[name_to_register]["ADDRESS_LIST"], list):
                                for current_address in connections_list[name_to_register]["ADDRESS_LIST"]:
                                    if "HOST" in current_address.keys():
                                        break
                                else:
                                    print("[ERROR] Multiple addresses but none with HOST. Current connection will not be registered:", name_to_register)
                                    connection_is_valid = False
                            else:
                                print("[ERROR] Incorrect address format:", connections_list[name_to_register]["ADDRESS_LIST"], " Connection:", name_to_register)
                                connection_is_valid = False
                            if not connection_is_valid:
                                del connections_list[name_to_register]
                            else:
                                if "SERVICE_NAME" not in connections_list[name_to_register]["CONNECT_DATA"].keys() and \
                                   "SID" not in connections_list[name_to_register]["CONNECT_DATA"].keys():
                                    print("[ERROR] Missing SERVICE_NAME / SID for connection:", name_to_register)
                                    del connections_list[name_to_register]
                    elif current_depth == 2:
                        if is_in_add_list:
                            is_in_add_list = False
                            if not stop_registering:
                                if len(connections_list[name_to_register]["ADDRESS_LIST"]) == 1:
                                    connections_list[name_to_register]["ADDRESS_LIST"] = connections_list[name_to_register]["ADDRESS_LIST"][0]
                    elif current_depth == 3:
                        if is_in_add_list:
                            if not stop_registering:
                                connections_list[name_to_register]["ADDRESS_LIST"].append(current_addr)
                            current_addr   = dict()
                        elif current_keyword.upper() in ["SID", "SERVER", "SERVICE_NAME"]:
                            if not stop_registering:
                                connections_list[name_to_register]["CONNECT_DATA"][current_keyword.upper()] = current_word.upper()
                    elif current_depth == 4:
                        if is_in_add_list:
                            if not stop_registering:
                                current_addr[current_keyword.upper()] = current_word.upper()
                    current_keyword = ""
                    current_word    = ""
                    current_depth  += -1
                else:
                    current_word += c
        self.connections = connections_list
        self.aliases     = connections_aliases
        self.duplicates  = connections_duplicates

测试 tnsnames.ora :
########################################
# This is a sample tnsnames.ora        #
########################################


###################################################
#  PRODDB
###################################################

proddb.mydbs.domain.com, PRODDB =
  (DESCRIPTION =
    (ADDRESS_LIST =
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb1.mydbs.domain.com)(PORT = 1522))
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb2.mydbs.domain.com)(PORT = 1522))
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb3.mydbs.domain.com)(PORT = 1522))
      (ADDRESS = (PROTOCOL = TCP)(HOST = proddb4.mydbs.domain.com)(PORT = 1522))
    )
    (CONNECT_DATA =
      (SID = PRODDB)
      (SERVER = DEDICATED)
      (SERVICE_NAME = proddb.mydbs.domain.com)
    )
  )

###################################################
#  DEVDBA : Test database for DBA usage
###################################################

devdba.mydbs.domain.com, DEVDBA =
  (DESCRIPTION =
    (ADDRESS_LIST =
      (ADDRESS = (PROTOCOL = TCP)(HOST = devdba.mydbs.domain.com)(PORT = 1521))
    )
    (CONNECT_DATA =
      (SID = DEVDBA)
    )
  )

测试代码:
from pprint       import pprint
from lib_database import Tnsnames

tnsnnames = Tnsnames('/usr/lib/oracle/12.2/client64/network/admin')
print('Connexions:')
pprint(tnsnnames.connections)
print('Aliases:')
pprint(tnsnnames.aliases)
print('Duplicates:')
pprint(tnsnnames.duplicates)

输出 :
Connexions:
    {'DEVDBA.MYDBS.DOMAIN.COM': {'ADDRESS_LIST': {'HOST': 'DEVDBA.MYDBS.DOMAIN.COM',
                                                  'PORT': '1521',
                                                  'PROTOCOL': 'TCP'},
                                 'CONNECT_DATA': {'SID': 'DEVDBA'},
     'PRODDB.MYDBS.DOMAIN.COM': {'ADDRESS_LIST': [{'HOST': 'PRODDB1.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'},
                                                  {'HOST': 'PRODDB2.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'},
                                                  {'HOST': 'PRODDB3.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'},
                                                  {'HOST': 'PRODDB4.MYDBS.DOMAIN.COM',
                                                   'PORT': '1522',
                                                   'PROTOCOL': 'TCP'}],
                                 'CONNECT_DATA': {'SERVER': 'DEDICATED',
                                                  'SERVICE_NAME': 'PRODDB.MYDBS.DOMAIN.COM',
                                                  'SID': 'PRODDB'}}
Aliases:
    {'DEVDBA': 'DEVDBA.MYDBS.DOMAIN.COM', 'PRODDB': 'PRODDB.MYDBS.DOMAIN.COM'}
Duplicates:
    []

我找不到 tnsnames.ora 文件的其他 Python 解析器。如果你知道一个,请指点我。

10-07 15:17