Skip to main content


------------------------------
Mike Rajkowski
support
Rocket Internal - All Brands
DENVER CO US
------------------------------


------------------------------
Mike Rajkowski
support
Rocket Internal - All Brands
DENVER CO US
------------------------------
There are another UOPY sample codes to export / Import UniData or UniVerse data via JSON format file.

How to Export or Import U2 File & Dictionary using UOPY



------------------------------
Paul Chang
Principal Technical Support Engineer
Rocket Internal - All Brands
------------------------------


There are another UOPY sample codes to export / Import UniData or UniVerse data via JSON format file.

How to Export or Import U2 File & Dictionary using UOPY



------------------------------
Paul Chang
Principal Technical Support Engineer
Rocket Internal - All Brands
------------------------------

UOPY Example - Working with JSON to export and import U2 File or Dictionary

Export U2 File to a JSON text file

The following code is a simple example to export a U2 data file or dictionary to the output folder.

"""
This uopyfileexportcmd.py program is working with UniData 8.2.1 and UniVerse 11.3.1 against XDEMO account.
Version: 2.0.1 [Q01603]
Date: 6/9/2021, 5/5/2021, 10/2/2020

Functions:

- Input a Json text file for importing to U2 file
- Find all connection information and U2 file name
- Open a UO connection using uopy.py driver
- UO Error handling
- export data file to U2 server

- Syntax: 
python uopyfileexportcmd.py <server> <account> <username> <password> <service> <filename> <outpath> <export_type>

"""

glversion = "2.0.1"

import os, sys
import datetime
from datetime import date
import time

import uopy
from uopy import Session, Command
from uopy import File, List, Dictionary
from uopy import DynArray
from uopy import UOError

import configparser
import base64
import pathlib
import json
import getpass 

config = configparser.ConfigParser()
dbmstype = ""
serverx = ""
accountx = ""
userx = ""
passwordx = ""
servicex = ""
filename  = ""
outputpath = "c:/temp"
export_type = "file"

def export_u2file(serverx, accountx, userx, passwordx, servicex, filename, outputpath, export_type):
    try:
        Session1 = uopy.connect(host=serverx,user=userx, password=passwordx, account=accountx, service=servicex)
        F = File(filename, session=Session1)
    except UOError as e:	
        print(e)
        exit()
    #
    if export_type == "dict":
        filex = outputpath + "/" + filename + "_u2dict.json"
    else:
        filex = outputpath + "/" + filename + "_u2file.json"
    log = '","log":"' + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 
    connect_json = '{"connect":{"server":"' + serverx + '","account":"'  + accountx + '","user":"' + userx + '","password":"","service":"' + servicex + log + '"}}'
    count_records = 0
    try:
        if export_type == "dict":
            dict_all = '{"dictfilename":"' + filename + '","dict":"all",}\\n'
            F = Dictionary(filename, session=Session1)
            #
            id_list = List().select(F).read_list()
            # select_list = List().select(file)
            read_rs = F.read_records(id_list)
            read_rs_key = read_rs[2]
            read_rs_data = read_rs[3]
            osf = open(filex, "w")
            osf.write(connect_json + "\\n")
            osf.write(dict_all)
            for x in read_rs_data:
                #out = '{"' + read_rs_key[count_records] + '":' + str(x) + '}' + "\\n"
                # str(x.list)
                if type(x) == str:
                    out = '{"' + read_rs_key[count_records] + '":\\'' + str(x) + '\\'}' + "\\n"
                else:
                    for i in range(len(x)):
                        # remove the I_type field binary data
                        if i >= 15:
                            x[i] = ''                    
                    out = '{"' + read_rs_key[count_records] + '":' + str(x) + '}' + "\\n"
                osf.write(out)
                count_records = count_records + 1
            osf.close()
        else:
            dict_all = '{"filename":"' + filename + '","dict":"all",}\\n'
            F = File(filename, session=Session1)
            #
            id_list = List().select(F).read_list()
            # select_list = List().select(file)
            read_rs = F.read_records(id_list)
            read_rs_key = read_rs[2]
            read_rs_data = read_rs[3]
            osf = open(filex, "w")
            osf.write(connect_json + "\\n")
            osf.write(dict_all)
            for x in read_rs_data:
                if type(x) == str:
                    out = '{"' + read_rs_key[count_records] + '":\\'' + str(x) + '\\'}' + "\\n"
                else:
                    out = '{"' + read_rs_key[count_records] + '":' + str(x) + '}' + "\\n"
                osf.write(out)
                count_records = count_records + 1
            osf.close()
        Session1.close()
        #
    except Exception as e:
        print(e)
        exit()
    #
    print("Exported " + str(count_records) + " records to a json text file.")
    return True

try:
    config.read('uopyfileexportcmd.cfg')
    default_setting = config['appSettings']
    try:
        serverx = default_setting['server']
    except:
        pass
    try:
        servicex = default_setting['service']
    except:
        pass
    try:
        dbmstype = default_setting['dbmstype'].lower()
    except:
        dbmstype = ""
        pass
    try:
        accountx = default_setting['account_path']
    except:
        pass
    try:
        userx = default_setting['username']
    except:
        pass
    try:
        passwordx = default_setting['password']
        if len(passwordx) > 3 and passwordx[0:3] == "!@#":
            passwordx = base64.b64decode(passwordx[3:].encode("utf-8")).decode("utf-8")
    except:
        pass
    try:
        filename = default_setting['u2file']
    except:
        pass
    try:
        outputpath = default_setting['outputpath']
    except:
        pass
    try:
        export_type = default_setting['export_type']
    except:
        pass
except:
    print("Error1")
    pass

try:
    # export_u2file(serverx, accountx, userx, passwordx, servicex, filename, outputpath)
    #print(accountx, servicex)
    start_time_all = time.time()
    if len(sys.argv) == 1:
        export_u2file(serverx, accountx, userx, passwordx, servicex, filename, outputpath, export_type)
    else:
        #json_file = input("Enter a Json file for importing to U2 server: \\n")
        if len(sys.argv) < 8:
            print("Syntax: python uopyfileexportcmd.py <server> <account> <username> <password> <service> <filename> <outpath> <export_type>")
            exit()
        elif len(sys.argv) == 8:
            serverx = sys.argv[1]
            accountx = sys.argv[2]
            userx = sys.argv[3]
            passwordx = sys.argv[4]
            servicex = sys.argv[5]
            filename = sys.argv[6]
            outputpath = sys.argv[7]
            if len(passwordx) > 3 and passwordx[0:3] == "!@#":
                passwordx = base64.b64decode(passwordx[3:].encode("utf-8")).decode("utf-8")
            #
            export_u2file(serverx, accountx, userx, passwordx, servicex, filename, outputpath, "file")
        else:
            serverx = sys.argv[1]
            accountx = sys.argv[2]
            userx = sys.argv[3]
            passwordx = sys.argv[4]
            servicex = sys.argv[5]
            filename = sys.argv[6]
            outputpath = sys.argv[7]
            export_type = sys.argv[8]
            if len(passwordx) > 3 and passwordx[0:3] == "!@#":
                passwordx = base64.b64decode(passwordx[3:].encode("utf-8")).decode("utf-8")
            #
            export_u2file(serverx, accountx, userx, passwordx, servicex, filename, outputpath, export_type)
    #
    process_time_all = time.time() - start_time_all
    process_time_all = round(process_time_all, 6)
    print("Exporting U2 file (seconds): " + str(process_time_all))
    print("End of exporting!")
    #
except FileNotFoundError:
    print("file {} does not exist".format(filename))
    exit()
​


Import U2 File from a JSON text file

The following code is a simple example to import a U2 data file or dictionary from a JSON file in the temp folder.

"""
This uopyfileimportcmd.py program is working with UniData 8.2.1 and UniVerse 11.3.1 against XDEMO account.
Version: 2.0.0 [Q01605]
Date: 5/8/2021, 10/3/2020

Functions:

- Input a Json text file or mulitple Json files for importing to U2 file
- Find all connection information and U2 file name
- Open a UO connection using uopy.py driver
- UO Error handling
- Import data file to U2 server

SYntax: python uopyfileimportcmd c:\\\\temp\\STATES_uv.json localhost 

"""

glversion = "2.0.0"

import os, sys
import uopy
from uopy import Session, Command
from uopy import File, List, Dictionary
from uopy import DynArray
from uopy import UOError

import base64
import pathlib
import json
import getpass 
import time
import glob

def import_u2file(json_file, servert, accountt, usert, passwordt, servicet, filenamet, use_args):
    try:
        file1 = open(json_file, 'r')
        ## connection part
        line = file1.readline()
        # connect information:
        connect_jsonx = eval(line)
        connect_json = connect_jsonx["connect"]
        serverx = connect_json["server"]
        accountx = connect_json["account"]
        userx = connect_json["user"]
        servicex = connect_json["service"]
        passwordx = connect_json["password"]
        #
        line = file1.readline()
        dict_json2 = eval(line)
        dictfile_check=False
        try:
            filename = dict_json2['filename']
            dictx = dict_json2['dict']
            if dictx != "all":
                print("The Json text file is not for a U2 data file")
                exit()
        except:
            #print("The Json text file is not for a U2 data file")
            #exit()
            try:
                filename = dict_json2['dictfilename']
                #dictx = dict_json2['dictfile']
                dictx = dict_json2['dict']
                if dictx != "all":
                    print("The Json text file is not for a U2 data file")
                    exit()
                dictfile_check=True
            except:
                print("The Json text file is not for a U2 dictionary file")
                exit()
        #
        if use_args == True:
            serverx = servert
            accountx = accountt
            userx = usert
            passwordx = passwordt
            servicex = servicet
            filename = filenamet
        # 
        if userx == "":
            try: 
                userx = input("Input the username against the '" + serverx + "' server: ")
            except Exception as error: 
                print('ERROR', error)
        #
        if passwordx == "":
            try: 
                print("Input the password for the user '" + userx + "' against the '" + serverx + "' server: \\n")
                passwordx = getpass.getpass()
            except Exception as error: 
                print('ERROR', error)
        else:
            if len(passwordx) > 3 and passwordx[0:3] == "!@#":
                passwordx = base64.b64decode(passwordx[3:].encode("utf-8")).decode("utf-8")        
        #
        try:
            Session1 = uopy.connect(host=serverx,user=userx, password=passwordx, account=accountx, service=servicex)
            if dictfile_check == True:
                F = Dictionary(filename, session=Session1)
            else:
                F = File(filename, session=Session1)
        except UOError as e:	
            print(e)
            exit()
        #
        # Need to update to write the whole file
        id_list_all = []
        record_value_list_all = []
        count_records = 0
        while True:
            line = file1.readline()
            if not line:
                break
            else:
                data_json = eval(line)
                try:
                    id_list = []
                    record_list = []
                    for key in data_json:
                        #print(key + ":" + str(data_json[key]))
                        ##F.write(key,data_json[key])
                        #id_list.append(key)
                        #record_list.append(data_json[key])
                        record_value_list = data_json[key]
                    count_records = count_records + 1
                    #
                    #F.write_records(id_list, record_list)
                    id_list_all.append(key)
                    record_value_list_all.append(record_value_list)
                except UOError as ex:
                    print(ex)
                    break
        #
        write_rs = F.write_records(id_list_all, record_value_list_all, lock_flag=0)
        print("Importing " + json_file + " json file - "+ str(count_records) + " records to the U2 " + filename + " dictionary or file successfully.")
        file1.close()
        Session1.close()
        #
    except FileNotFoundError:
        print("file {} does not exist".format(filename))
        exit()
    return True

servert = "localhost"
accountt = ""
usert = ""
passwordt = ""
servicet = ""
filenamet  = ""

try:
    use_args = False
    if len(sys.argv) == 2:
        # python uopyfileimportcmd.py C:\\tmp\\Workfiles_u2\\CLOG200730-002_u2file.json
        json_file = sys.argv[1]
    elif len(sys.argv) == 1:
        json_file = input("Enter a Json file for importing to U2 server: \\n")
        if not os.path.exists(json_file):
            print("Import Json file not found - "+ json_file)
            exit()
    else:
        #json_file = input("Enter a Json file for importing to U2 server: \\n")
        if len(sys.argv) != 8:
            print("Syntax: python uopyfileimportcmd.py imported_files.json <server> <account> <username> <password> <service> <filename>")
            exit()
        else:
            json_file = sys.argv[1]
            servert = sys.argv[2]
            accountt = sys.argv[3]
            usert = sys.argv[4]
            passwordt = sys.argv[5]
            if len(passwordt) > 3 and passwordt[0:3] == "!@#":
                passwordt = base64.b64decode(passwordt[3:].encode("utf-8")).decode("utf-8")
            servicet = sys.argv[6]
            filenamet = sys.argv[7]
            use_args = True
    #
    if json_file.find('*') > 0:
        xlist=glob.glob(json_file)
        start_time_all = time.time()
        for xfile in xlist:
            start_time = time.time()
            import_u2file(xfile, servert, accountt, usert, passwordt, servicet, filenamet, use_args)
            process_time = time.time() - start_time
            print("Importing time (seconds): " + str(round(process_time,6)))
        #
        process_time_all = time.time() - start_time_all
        print("Importing all files (seconds): " + str(round(process_time_all,6)))
    else:
        if len(json_file)>0 and os.path.exists(json_file):
            start_time = time.time()
            import_u2file(json_file, servert, accountt, usert, passwordt, servicet, filenamet, use_args)
            process_time = time.time() - start_time
            print("Importing time (seconds): " + str(round(process_time,6)))
        else:
            print("File does not exist - " + json_file)
            exit()
    #
    print("End of importing to U2 server!")
    #	
except FileNotFoundError:
    print("file {} does not exist".format(filename))
    exit()






------------------------------
Paul Chang
Principal Technical Support Engineer
Rocket Internal - All Brands
------------------------------