Dealing With API Hashing Using Qiling in Ghidra
API Hashing is a technique used to obfuscate api calls used in a sample. While reverse engineering a sample, api calls are useful in deciphering the sample’s capability. If a sample is using API Hashing technique then it becomes a challenge to fully reverse engineer it. There are a few techniques to deal with API Hashing:
- HashDB: Database contains the hashes corresponding to api calls. Querying the database with a hash gives the name of api call.
- Logging in a debugger: Set breakpoints and log the calls which are resolved by api hash resolving function.
- Logging via Instrumentation: Use instrumentation frameworks such as Frida, Tiny-tracer to log the calls resolved by api hash resolving function.
- Using an Emulator: Emulate api hash resolving function and resolve all the api calls by passing hash value to it.
In this post, Emulator approach is explored. An old blog explored this approach: Automated dynamic import resolving using binary emulation however it is bit outdated now.
Qiling Emulation Framework (Version 1.4.6) is used to emulate the api hash resolving function of a malware sample. Ghidra Scripting APIs of Ghidra (Version: 11.0.3) is used to find all the hashes and result is updated. In order to use, Ghidra Scripting APIs in Python, ghidra-bridge (Version 1.0.0) is used. The following malware samples are used:
- REvil
- Zloader
- Dridex
Table of Contents
REvil malware sample - 32 bit
MD5: 890a58f200dfff23165df9e1b088e58f
SHA256: 5f56d5748940e4039053f85978074bde16d64bd5ba97f6f0026ba8172cb29e93
MalwareBazaar Link: https://bazaar.abuse.ch/sample/5f56d5748940e4039053f85978074bde16d64bd5ba97f6f0026ba8172cb29e93/
In REvil samples, at a particular address, hashes are stored which are passed to api hashing routine for resolving. In this sample, that address is at 0x0041c9f8
and api hashing routine is at 0x00405dcf
. The number of hashes is 140
Using Qiling Emulator, api hashing routine can be emulated. Further using Ghidra API exposed via ghidra-bridge, labeling can be done.
- Note the addresses
00405bd4
,00405bf3
and00405be1
.00405bd4
and00405bf3
is the loop where the api hash resolving functionFUN_00405dcf
is called. Hash stored at0x0041c9f8
is passed to it. - In the decompiled view, the loop can be seen clearly.
- At address
00405be1
, the value ineax
register is written back.
Python Script for resolving api hash and adding labels in Ghidra
The python3 script emulates the loop using Qiling and add the labels in Ghidra.
import ghidra_bridge
from qiling import *
from qiling.const import QL_VERBOSE,QL_INTERCEPT
def extract_eax_resolver(ql):
eax_value = ql.arch.regs.eax
esi_value = ql.arch.regs.esi
func = ql.loader.import_symbols[eax_value]
func_dll = func["dll"]
func_name = func["name"].decode("ascii")
print(f"resolves {func_dll}.{func_name}")
# adding labels in ghidra
start_addr = '0x41c9f8'
addr_accessed = int(start_addr,16) + esi_value
addr_accessed_hex = hex(addr_accessed)
label_to_add = f"{func_dll}.{func_name}"
addr_in_ghidra = currentProgram.getAddressFactory().getAddress(addr_accessed_hex)
start()
createLabel(addr_in_ghidra,label_to_add,True,ghidra.program.model.symbol.SourceType.USER_DEFINED)
end(True)
def sandbox(path, rootfs):
# create a sandbox for windows x86_64
ql = Qiling([path], rootfs,verbose=QL_VERBOSE.DISABLED)
print("Setting up parameters. Starting emulation")
ql.hook_address(extract_eax_resolver,0x00405be1)
ql.run(begin=0x00405bd4,end=0x00405bf3)
print("\n\n\nDone! resolving IAT")
if __name__ == "__main__":
b = ghidra_bridge.GhidraBridge(namespace=globals(),response_timeout=-1)
sandbox("C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows\\bin\\revil.exe", "C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows")
b = ghidra_bridge.GhidraBridge(namespace=globals(),response_timeout=-1)
exposes ghidra api which allows us to use it in the script. response_timeout is set -1 which means that any call made to Ghidra will wait until it gets a response from Ghidra. Be careful while setting to -1 since this means that the script might hang forever.ql.hook_address(extract_eax_resolver,0x00405be1)
creates a hook at0x00405be1
where function extract_eax_resolver is executed.ql.run(begin=0x00405bd4,end=0x00405bf3)
emulates the code between addresses0x00405bd4
and0x00405bf3
- in Function extract_eax_resolver,
-
eax_value = ql.arch.regs.eax
func = ql.loader.import_symbols[eax_value]
func_dll = func[“dll”]
func_name = func[“name”].decode(“ascii”)
The value from
eax
is extracted and then corresponding dll name and function is found by looking up import symbols loaded in qiling. -
esi_value = ql.arch.regs.esi
start_addr = ‘0x41c9f8’
addr_accessed = int(start_addr,16) + esi_value
addr_accessed_hex = hex(addr_accessed)
The value from
esi
is extracted and added to the address0x41c9f8
to get the location of hash which was resolved in the loop. It is converted to hex string. -
label_to_add = f"{func_dll}.{func_name}"
addr_in_ghidra = currentProgram.getAddressFactory().getAddress(addr_accessed_hex)
start()
createLabel(addr_in_ghidra,label_to_add,True,ghidra.program.model.symbol.SourceType.USER_DEFINED)
end(True)
The address is first converted to ghidra address object. Then transaction is started using
start()
wherecreateLabel
API is used to create a label. Finally,end(True)
is used to indicate that the transaction is over.Note: start() and end(True) is always used when changes needs to be done in Ghidra.
-
Result
Once the script is executed, api calls are resolved as seen below:
Additionally, changes can be seen in Ghidra in Fig 3 and Fig 4.
Total hashes present were 140. Using the script, all the 140 hashes are resolved without any errors. However, there are other functions where the function FUN_00405dcf
is called with a hash value as a parameter as seen in Fig 5. This can be trivially updated using the script above. It is left to the readers to try.
Additional References
To learn more about API Hashing in REvil, refer:
- https://blag.nullteilerfrei.de/2019/11/09/api-hashing-why-and-how/
- https://www.youtube.com/watch?v=QYQQUUpU04s
Zloader malware sample - 64 bit
MD5: 961a84c3f929074136f54a59810168e6
SHA256: f03b9dce7b701d874ba95293c9274782fceb85d55b276fd28a67b9e419114fdb
In Zloader sample, the api hash resolving function is at 0x140007be0
and it accepts 2 arguments. The first argument would be an integer and second argument would be the hash. However, we see a few variation in this sample:
-
Case 1:
res = api_resolver(< int >, < hash value>)
This type of function call is commonly seen in the sample. It is quite trivial to extract arguments passed to it using Ghidra Scripting.
-
Case 2:
param1 = another_function(< hash value>)
res = api_resolver(param1, < hash value>)
This presents a bit of challenge as the value of param1 needs to be calculated.
-
Case 3:
param2 = another_function(< hash value>)
res = api_resolver(< int >,param2)
Same as above except here it is param2
-
Case 4:
param1 = another_function(< hash value>)
param2 = another_function(< hash value>)
res = api_resolver(param1, param2)
In this case, both param1 and param2 needs to be calculated.
In order to emulate the api hash resolving function, arguments needs to be either fetched directly or by emulating another function passing its arguments.
Ghidra Python Snippets
To solve this challenge in Ghidra, pcode can be used. There are 2 forms of Pcode available in Ghidra : refined Pcode and raw Pcode. Refined Pcode is accessible after running decompiler. Raw Pcode is accessible as soon as the sample is loaded in Ghidra. Raw Pcode contains a lot more information compared to Refined Pcode. However in order to extract arguments/values passed to api hash resolving function, Refined Pcode is much better. Extracting Refined Pcode means executing decompiler which is a time consuming operation. Depending on the size and complexity, time taken will vary.
Python3 snippet to extract addresses of references and the function where the references are made:
addr = currentProgram.getAddressFactory().getAddress(iat_function_addr) #casting it to ghidraAddress type
listing = currentProgram.getListing()
func = listing.getFunctionContaining(addr)
# sets up the decompiler
options = ghidra.app.decompiler.DecompileOptions()
decompifc = ghidra.app.decompiler.DecompInterface()
decompifc.setOptions(options)
decompifc.toggleCCode(True)
decompifc.toggleSyntaxTree(True)
decompifc.setSimplificationStyle("decompile")
# get references to the function
reference_manager = currentProgram.getReferenceManager()
code_manager = currentProgram.getCodeManager()
references_to = reference_manager.getReferencesTo(addr)
references_count = reference_manager.getReferenceCountTo(addr)
reference_func_dict = dict()
while(references_to.hasNext()):
ref_addr = references_to.next().getFromAddress() #reference address
refFunc = currentProgram.getFunctionManager().getFunctionContaining(ref_addr) #function containing the reference
if refFunc is None:
# this means that there are undefined functions in ghidra
# we need to create the function then decompile it
submodel = ghidra.program.model.block.IsolatedEntrySubModel(currentProgram)
monitor = ghidra.util.task.ConsoleTaskMonitor() # need to pass this for codeblock
entry_func = submodel.getCodeBlocksContaining(ref_addr, monitor)[0].getFirstStartAddress()
# creating the name of the function
func_name = 'FUN_' + str(entry_func)
# to create the function, we can use flatprogram api
fapi = ghidra.program.flatapi.FlatProgramAPI(currentProgram)
start() #starting a transaction. used while modifying ghidra db
func = fapi.createFunction(entry_func,func_name)
end(True)
if func is None:
print(f"creating function failed at {ref_addr}")
continue
#this should fetch the function
refFunc = currentProgram.getFunctionManager().getFunctionContaining(ref_addr)
if refFunc in reference_func_dict.keys():
reference_func_dict[refFunc].append(ref_addr)
else:
reference_func_dict[refFunc] = [ref_addr]
addr = currentProgram.getAddressFactory().getAddress(iat_function_addr)
: iat_function_addr is the address of the api hash resolving function. It is converted to Ghidra address object.- Then decompiler interface is setup which is used later to invoke decompiler.
- Then reference manager is used to get all the address where api hash resolving function is called.
- Then a dictionary object is created which look like this: Ref_func : [ref_addr1 , ref_addr2]
- In case corresponding reference function is not found for an address, then a function is created in the codeblock. In Ghidra, ,missing reference function happens if the function is never called anywhere in the sample.
Python3 snippet to iterate over the dictionary and find the arguments and addresses for emulation :
list_addr_list = []
for refFunc,ref_addr_list in reference_func_dict.items():
# getting the decompilation of the function
#decompifc.openProgram(currentProgram)
if (~decompifc.openProgram(currentProgram)):
if decompifc.getLastMessage() != "":
print(decompifc.getLastMessage)
decompRes = decompifc.decompileFunction(refFunc,-1,None) # None because using ghidra_bridge
hfunction = decompRes.getHighFunction()
docroot = decompRes.getCCodeMarkup()
for ref_addr in ref_addr_list:
addr_list = []
pCode_ref = hfunction.getPcodeOps(ref_addr)
ins = pCode_ref.next()
if (ins.getOpcode() == ghidra.program.model.pcode.PcodeOp.CALL):
num_args = len(ins.inputs)
i = 0
value_pushed = []
while(i < num_args-1):
i = i + 1
if (ins.getInput(i).isConstant()):
value_pushed.append(ins.getInput(i).getOffset())
else:
# accessing the parameter of the function whose output is later passed
# to api hashing function
# in this case, we know that the only 1 input is passed and is a constant
pre_opcode = ins.getInput(i).getDef()
value_list_addr = []
if (pre_opcode.getOpcode() == ghidra.program.model.pcode.PcodeOp.CALL):
pre_value = pre_opcode.getInput(1).getOffset()
pre_addr = pre_opcode.getInput(0).getPCAddress()
pre_addr_1 = code_manager.getInstructionAfter(pre_addr).getAddress()
pre_addr_2 = code_manager.getInstructionAfter(pre_addr_1).getAddress()
value_list_addr.extend((pre_value,str(pre_addr),str(pre_addr_1),str(pre_addr_2)))
value_pushed.append(value_list_addr)
else:
value_pushed = []
break
if value_pushed : #checking if the list has values
ref_addr_1 = code_manager.getInstructionAfter(ref_addr).getAddress()
ref_addr_2 = code_manager.getInstructionAfter(ref_addr_1).getAddress()
addr_list.extend((str(ref_addr),str(ref_addr_1),str(ref_addr_2),value_pushed))
if addr_list :
list_addr_list.append(addr_list)
print("\nDone\n")
print(f"Total references found: {references_count} number of extracted values and addr : {len(list_addr_list)}")
- While running emulation, 3 addresses are required: begin address, address to hook at and end address. Before emulation is run, arguments need to be setup.
- In case of 32 bit sample, arguments would be pushed to stack. In case of 64 bit sample, arguments would in registers and then the stack.
- Above python3 snippet, iterates over the dictionary object. In each iteration, it walks the pcode ’tree’ to find the arguments. In case, arguments are a result of another function, then it finds that function and the arguments passed to it as well as the addresses required to emulate it.
As mentioned above, we have 4 cases. This is how the output will be look like for each case:
-
Case 1:
res = api_resolver(< int >, < hash value>)
Output:
[ < begin address> , < address to hook >, < end address>, [ < int value >, < hash value >] ]
-
Case 2:
param1 = another_function(< hash value>)
res = api_resolver(param1, < hash value>)
Output:
[ < begin address> , < address to hook >, < end address>, [ [ < hash value > , < begin address >, < address to hook >, < end address> ], < hash value >] ]
-
Case 3:
param2 = another_function(< hash value>)
res = api_resolver(< int >,param2)
Output:
[ < begin address> , < address to hook >, < end address>, [ < int value >, [ < hash value > , < begin address >, < address to hook >, < end address> ] ] ]
-
Case 4:
param1 = another_function(< hash value>)
param2 = another_function(< hash value>)
res = api_resolver(param1, param2)
Output:
[ < begin address> , < address to hook >, < end address>, [ [ < hash value > , < begin address >, < address to hook >, < end address> ], [ < hash value > , < begin address >, < address to hook >, < end address> ] ] ]
Once output is generated in the above format, it can be iterated over and emulated using Qiling.
Python3 snippet for emulation:
for addr_list in list_addr_list:
if type(addr_list[3][0]) is list:
try:
hook_handle = ql.hook_address(extract_rax,int(addr_list[3][0][2],16))
ql.arch.regs.rcx = addr_list[3][0][0]
ql.run(begin=int(addr_list[3][0][1],16),end=int(addr_list[3][0][3],16))
rcx_param = param_value
#del hook
ql.hook_del(hook_handle)
except KeyboardInterrupt:
sys.exit()
except:
logging.info(addr_list)
pass
else:
rcx_param = addr_list[3][0]
if type(addr_list[3][1]) is list:
try:
hook_handle = ql.hook_address(extract_rax,int(addr_list[3][1][2],16))
ql.arch.regs.rcx = addr_list[3][1][0]
ql.run(begin=int(addr_list[3][1][1],16),end=int(addr_list[3][1][3],16))
rdx_param = param_value
#del hook
ql.hook_del(hook_handle)
# in this case, need to re-initialize qiling emulator.
# to fix the error in resolving
ql = Qiling([path], rootfs,verbose=QL_VERBOSE.DISABLED)
except KeyboardInterrupt:
sys.exit()
except:
logging.info(addr_list)
pass
else:
rdx_param = addr_list[3][1]
try:
ql.arch.regs.rcx = rcx_param
ql.arch.regs.rdx = rdx_param
hook_handle = ql.hook_address(extract_rax_resolvor,int(addr_list[1],16))
ql.run(begin=int(addr_list[0],16),end=int(addr_list[2],16))
#del hook
ql.hook_del(hook_handle)
except KeyboardInterrupt:
sys.exit()
except:
logging.info(addr_list)
pass
- In the above snippet,first arguments are checked if it is a list. If it is a list then emulation is done and values that is passed to api hash resolving function is extracted.
- However, in case of second argument, if the emulation is done, Qiling object needs to be re-initialized. If it is not done, then it leads to an error, when api hash resolving function is executed.
- Lastly, once the arguments/values are extracted, api hash resolving function is emulated. Output will be the resolved api call.
Python Script for resolving api hash and adding comments in Ghidra
Complete Python3 script for emulating api hash resolving function and adding comments in Ghidra:
Note:Unlike the previous script in REvil section, this script is for 64 bit and instead of adding labels in Ghidra, a comment is added in Ghidra.
import ghidra_bridge
import logging
import sys
from qiling import *
from qiling.const import QL_VERBOSE,QL_INTERCEPT
logging.basicConfig(filename="IAT.log",level=logging.INFO)
def get_addr_ghidra(iat_function_addr):
print("\n getting referenced addresses and params passed\n")
addr = currentProgram.getAddressFactory().getAddress(iat_function_addr) #casting it to ghidraAddress type
listing = currentProgram.getListing()
func = listing.getFunctionContaining(addr)
# sets up the decompiler
options = ghidra.app.decompiler.DecompileOptions()
decompifc = ghidra.app.decompiler.DecompInterface()
decompifc.setOptions(options)
decompifc.toggleCCode(True)
decompifc.toggleSyntaxTree(True)
decompifc.setSimplificationStyle("decompile")
# get references to the function
reference_manager = currentProgram.getReferenceManager()
code_manager = currentProgram.getCodeManager()
references_to = reference_manager.getReferencesTo(addr)
references_count = reference_manager.getReferenceCountTo(addr)
reference_func_dict = dict()
while(references_to.hasNext()):
ref_addr = references_to.next().getFromAddress() #reference address
refFunc = currentProgram.getFunctionManager().getFunctionContaining(ref_addr) #function containing the reference
if refFunc is None:
# this means that there are undefined functions in ghidra
# we need to create the function then decompile it
submodel = ghidra.program.model.block.IsolatedEntrySubModel(currentProgram)
monitor = ghidra.util.task.ConsoleTaskMonitor() # need to pass this for codeblock
entry_func = submodel.getCodeBlocksContaining(ref_addr, monitor)[0].getFirstStartAddress()
# creating the name of the function
func_name = 'FUN_' + str(entry_func)
# to create the function, we can use flatprogram api
fapi = ghidra.program.flatapi.FlatProgramAPI(currentProgram)
start() #starting a transaction. used while modifying ghidra db
func = fapi.createFunction(entry_func,func_name)
end(True)
if func is None:
print(f"creating function failed at {ref_addr}")
continue
#this should hopefully create the function
refFunc = currentProgram.getFunctionManager().getFunctionContaining(ref_addr)
if refFunc in reference_func_dict.keys():
reference_func_dict[refFunc].append(ref_addr)
else:
reference_func_dict[refFunc] = [ref_addr]
list_addr_list = []
for refFunc,ref_addr_list in reference_func_dict.items():
# getting the decompilation of the function
#decompifc.openProgram(currentProgram)
if (~decompifc.openProgram(currentProgram)):
if decompifc.getLastMessage() != "":
print(decompifc.getLastMessage)
decompRes = decompifc.decompileFunction(refFunc,-1,None) # None because using ghidra_bridge
hfunction = decompRes.getHighFunction()
docroot = decompRes.getCCodeMarkup()
for ref_addr in ref_addr_list:
addr_list = []
pCode_ref = hfunction.getPcodeOps(ref_addr)
ins = pCode_ref.next()
if (ins.getOpcode() == ghidra.program.model.pcode.PcodeOp.CALL):
num_args = len(ins.inputs)
i = 0
value_pushed = []
while(i < num_args-1):
i = i + 1
if (ins.getInput(i).isConstant()):
value_pushed.append(ins.getInput(i).getOffset())
else:
# accessing the parameter of the function whose output is later passed
# to api hashing function
# in this case, we know that the only 1 input is passed and is a constant
pre_opcode = ins.getInput(i).getDef()
value_list_addr = []
if (pre_opcode.getOpcode() == ghidra.program.model.pcode.PcodeOp.CALL):
pre_value = pre_opcode.getInput(1).getOffset()
pre_addr = pre_opcode.getInput(0).getPCAddress()
pre_addr_1 = code_manager.getInstructionAfter(pre_addr).getAddress()
pre_addr_2 = code_manager.getInstructionAfter(pre_addr_1).getAddress()
value_list_addr.extend((pre_value,str(pre_addr),str(pre_addr_1),str(pre_addr_2)))
value_pushed.append(value_list_addr)
else:
value_pushed = []
break
if value_pushed : #checking if the list has values
ref_addr_1 = code_manager.getInstructionAfter(ref_addr).getAddress()
ref_addr_2 = code_manager.getInstructionAfter(ref_addr_1).getAddress()
addr_list.extend((str(ref_addr),str(ref_addr_1),str(ref_addr_2),value_pushed))
if addr_list :
list_addr_list.append(addr_list)
print("\nDone\n")
print(f"Total references found: {references_count} number of extracted values and addr : {len(list_addr_list)}")
return list_addr_list
def extract_rax_resolvor(ql):
rax_value = ql.arch.regs.rax
rip_value = ql.arch.regs.rip
func = ql.loader.import_symbols[rax_value]
func_dll = func["dll"]
func_name = func["name"].decode("ascii")
print(f"resolved {func_dll}.{func_name}")
print("Adding Comment in ghidra")
#convert to ghidra address
addr = currentProgram.getAddressFactory().getAddress(str(hex(rip_value)))
code = currentProgram.getListing().getCodeUnitAt(addr)
comment_msg = "{}.{}".format(func_dll,func_name)
# starting a transaction which modifies the ghidra project
start()
code.setComment(code.PRE_COMMENT, comment_msg)
end(True)
# ending the transaction once it is over
print(f"Comment added at : {hex(rip_value)}")
logging.info(f" address : {hex(rip_value)} | API : {func_dll}.{func_name}")
def extract_rax(ql):
global param_value
param_value = ql.arch.regs.rax
def sandbox(path, rootfs,list_addr_list):
global param_value
rdx_param = 0
rxc_param = 0
# create a sanbox for windows x86_64
print("\nInvoking emulator\n")
ql = Qiling([path], rootfs,verbose=QL_VERBOSE.DISABLED)
# first element is rcx. second element is edx
# check if the both values are constant.
# if not, then call the function whose return value
# is passed to the function.
print("\nstart emulation\n")
for addr_list in list_addr_list:
if type(addr_list[3][0]) is list:
try:
hook_handle = ql.hook_address(extract_rax,int(addr_list[3][0][2],16))
ql.arch.regs.rcx = addr_list[3][0][0]
ql.run(begin=int(addr_list[3][0][1],16),end=int(addr_list[3][0][3],16))
rcx_param = param_value
#del hook
ql.hook_del(hook_handle)
except KeyboardInterrupt:
sys.exit()
except:
logging.info(addr_list)
pass
else:
rcx_param = addr_list[3][0]
if type(addr_list[3][1]) is list:
try:
hook_handle = ql.hook_address(extract_rax,int(addr_list[3][1][2],16))
ql.arch.regs.rcx = addr_list[3][1][0]
ql.run(begin=int(addr_list[3][1][1],16),end=int(addr_list[3][1][3],16))
rdx_param = param_value
#del hook
ql.hook_del(hook_handle)
# in this case, need to re-initialize qiling emulator.
# to fix the error in resolving
ql = Qiling([path], rootfs,verbose=QL_VERBOSE.DISABLED)
except KeyboardInterrupt:
sys.exit()
except:
logging.info(addr_list)
pass
else:
rdx_param = addr_list[3][1]
try:
ql.arch.regs.rcx = rcx_param
ql.arch.regs.rdx = rdx_param
hook_handle = ql.hook_address(extract_rax_resolvor,int(addr_list[1],16))
ql.run(begin=int(addr_list[0],16),end=int(addr_list[2],16))
#del hook
ql.hook_del(hook_handle)
except KeyboardInterrupt:
sys.exit()
except:
logging.info(addr_list)
pass
if __name__ == "__main__":
global param_value
param_value = 0
b = ghidra_bridge.GhidraBridge(namespace=globals(),response_timeout=-1)
iat_function_addr = '0x140007be0'
list_addr = get_addr_ghidra(iat_function_addr)
sandbox("C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x8664_windows\\Windows\\Temp\\CyberMesh.exe", "C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x8664_windows",list_addr)
Result
After executing the script, following output can be seen:
In Ghidra, comment is added after api hash resolving function is called.
There are 254 references to api hash resolving function however this script resolves the calls only for 252. There are 2 places where the error occured. Checking the logs, it is seen that the error occurs when first param is 12 or 0xC.
To debug this, this specific address is emulated.
In Fig 10, it is seen that ole32.dll is loaded first. Then api-ms-win-core-com-l1-1-0.dll is loaded. Since, this dll is not available in Qiling, it causes an error. Later on, getprocaddress is used to get the address of CoCreateInstance.
In Zloader, if the first argument is 12 or 0xc, then it means the api from ole32.dll is resolved. In Qiling emualtor, this error can be fixed can be adding api-ms-win-core-com-l1-1-0.dll in the x8664_windows system32 folder. However this dll is not available in windows 10, hence it is not possible to fix this error. One might get this dll from an older version or a different version of Windows OS.
Additional References
- API Hashing in Zloader: https://blag.nullteilerfrei.de/2020/06/11/api-hashing-in-the-zloader-malware/
- Zlaoder anlaysis: https://www.zscaler.com/blogs/security-research/zloader-no-longer-silent-night
- Zloader reversing: https://aaqeel01.wordpress.com/2021/10/18/zloader-reversing/
Dridex malware sample - 32 bit
MD5: 6a8401448a5bd2b540850f811b20a66d
SHA256: 178ba564b39bd07577e974a9b677dfd86ffa1f1d0299dfd958eb883c5ef6c3e1
VT Link: https://www.virustotal.com/gui/file/178ba564b39bd07577e974a9b677dfd86ffa1f1d0299dfd958eb883c5ef6c3e1
MalwareBazaar Link: https://bazaar.abuse.ch/sample/178ba564b39bd07577e974a9b677dfd86ffa1f1d0299dfd958eb883c5ef6c3e1/
The sample is a packed Dridex malware sample. First it is unpacked and dumped to the disk. This blog: UnpackIt: Dridex shows all the steps in unpacking and fixing the dumped sample.
Dridex sample uses Vectoed Exception Handling (VEH). This is a problem since Qiling has limited support for VEH. To solve it, sample needs to be patched where the bytes 0xCC 0xC3
are patched with 0xFF 0xD0
in text section. By doing this, VEH is not used. It wouldn’t change the overall behaviour of the sample.
Note: the sample can be unpacked,patched,fixed and dumped using Qiling Emulator. This is left to the reader to explore
Once unpacked, fixed and patched sample, it can be used in Qiling Emulator. Hash of the sample that is used: MD5: a3defcb7c3ff21ca5013076b12e993eb
SHA256: 86560a5589cdfbbdfb09f122cb925584cb26983078bff195d0dfdf921e8cc438
It will be different depending on the method used to unpack the sample.
Ghidra Python Snippet
api hash resolving function is at address 0x1000d0d0
. It requires 2 arguments: first argument is a hash of the name of the dll and second argument is a hash of the name of api. In this sample, there are 344 references to api hash resolving function.
The same ghidra python snippet used for Zloader can be used here however it is time-consuming. So instead of relying on Refined Pcode, disassembled instructon can be used to find all arguments passed.
addr = currentProgram.getAddressFactory().getAddress(iat_function_addr)
reference_manager = currentProgram.getReferenceManager()
code_manager = currentProgram.getCodeManager()
references_to = reference_manager.getReferencesTo(addr)
list_addr_list = []
print("Getting the references and arguments")
while(references_to.hasNext()):
addr_list = []
ref_addr = references_to.next().getFromAddress()
# running a loop till it finds 2 push operations
# which pushs the hash to the stack
ref_addr_0 = ref_addr
arg = 0
hash_list = []
while( arg < 2):
ref_addr_pre = code_manager.getInstructionBefore(ref_addr_0).getAddress()
if 'PUSH' == code_manager.getInstructionAt(ref_addr_pre).getMnemonicString():
ins = code_manager.getInstructionAt(ref_addr_pre)
hash_value = str(ins).split(" ")[1]
if hash_value[0:2] == '0x':
hash_list.append(hash_value)
arg = arg + 1
else:
break
ref_addr_0 = ref_addr_pre
if arg != 2 :
continue
# getting address of next 2 instructions
ref_addr_1 = code_manager.getInstructionAfter(ref_addr).getAddress()
ref_addr_2 = code_manager.getInstructionAfter(ref_addr_1).getAddress()
addr_list.append(str(ref_addr))
addr_list.append(str(ref_addr_1))
addr_list.append(str(ref_addr_2))
addr_list.append(hash_list)
list_addr_list.append(addr_list)
iat_function_addr
contains the address of api hash resolving function as a hex string which is converted to Ghidra address object.- Then all the references made to api hash resolving function is iterated over.
- At each location, previous instructions are checked for PUSH operation. If a PUSH operation is done, additional check is done to ensure that the value passed is a hex string.
- Once 2 hex string are found to be pushed, then they are added to the list.
- Finally, the output would look like this:
[ < begin address> , < address to hook >, < end address >, [ < hash value1 > , < hash value2 > ] ]
Solving Emulation Issues
Dridex sample is quite complex. So is the api hash resolving function.
It does the following :
- Xor’s hash value with
0x438df952
(different value in different samples) - allocates heap using api
RtlCreateHeap
and calcualtes the crc checksum of the name of the dll and its exports loaded in its memory using apiRtlComputeCRC
- If a match is found, then it is resolved using api
LdrGetProcedureAddress
- After that
RtlFreeHeap
is invoked.
In case, a match is not found which would happen if it is resolving for an api whose corresponding dll is not loaded into memory during runtime, the function does the following :
- It accesses the
system32
path and iterates over it usingFindFirstFileExW
andFindNextFileW
- CRC32 checksum of the dll name in uppercase is caluclated using
RtlComputeCRC
and compared. - If a match is found, then it check its privileges using
OpenProcessHandle
,GetTokenInformation
,AllocateAndInitializeSid
and compare it usingEqualSid
- Then dll is loaded into the memory using
LdrLoadDll
- Then CRC32 checksum of the api calls present in the export of the loaded dll is calulated using
RtlComputeCRC
and compared. Finally it is resolved usingLdrGetProcedureAddress
once the match is found.
Most of the api call mentioned above doesn’t have implementation or a buggy implementation in Qiling. So, in order to emulate api hash resolving function of Dridex, such api calls needs to be implemented.
The following code snippet contains the implementation of api calls required for emulation.
Note: This api implementation should ideally be submitted to Qiling project. However, the implementation is written keeping Dridex sample in mind. It may not work for other use cases
@winsdkapi(cc=STDCALL, params={
'Flags' : ULONG,
'HeapBase' : PVOID,
'ReserveSize' : SIZE_T,
'CommitSize' : SIZE_T,
'Lock' : PVOID,
'Parameters' : DWORD #pointer to structure containing parameters
})
def hook_RtlCreateHeap(ql: Qiling, address: int, params):
# in this case, both reserveSize and commitSize are zero
# so 64 pages needs to be created. 1 page is committed in memory
# no need to implement other cases.
return ql.os.heap.alloc(64 * 1024)
# NTSYSAPI PVOID RtlFreeHeap (
# PVOID HeapHandle,
# ULONG Flags,
# PVOID HeapBase
# );
@winsdkapi(cc=STDCALL, params={
'HeapHandle' : PVOID,
'Flags' : ULONG,
'HeapBase' : PVOID
})
def hook_RtlFreeHeap(ql,address,params):
return ql.os.heap.free(params['HeapBase'])
def get_dir_size(path='.'):
total = 0
with os.scandir(path) as it:
for entry in it:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += get_dir_size(entry.path)
return total
# HANDLE FindFirstFileExW(
# LPWCSTR lpFileName,
# FINDEX_INFO_LEVELS fInfoLevelId,
# FINDEX_SEARCH_OPS fSearchOp,
# LPVOID lpSearchFilter,
# DWORD dwAdditionalFlags
# );
@winsdkapi(cc=STDCALL, params={
'lpFileName' : LPCWSTR,
'fInfoLevelId' : DWORD, # FINDEX_INFO_LEVELS
'lpFindFileData' : LPVOID,
'fSearchOp' : DWORD, # FINDEX_SEARCH_OPS
'lpSearchFilter' : LPVOID,
'dwAdditionalFlags' : DWORD
})
def hook_FindFirstFileExW(ql: Qiling, address: int, params):
file_iterator = 0 # this will be passed to findnextfile via handle obj
filename = params['lpFileName']
pointer = params['lpFindFileData']
if not filename:
return INVALID_HANDLE_VALUE
if len(filename) >= MAX_PATH:
return ERROR_INVALID_PARAMETER
# Check if path exists
filesize = 0
try:
# if filename is a path
path_check = os.path.isdir(filename)
if path_check :
path = filename.replace("C:","C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows")
#path_item = os.listdir(path)[file_iterator]
#filename = params['lpFileName'] + "\\" + path_item
f = path
filesize = get_dir_size(path)
# check if the string contains wild character *
# checking only for * in this case. In future, might want to add more
elif filename.__contains__("*"):
path_pattern = filename.replace("C:","C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows")
path_items = glob.glob(path_pattern) # gets the matching files as full path
path = path_pattern.split("*")[0]
f = path_pattern
filesize = os.path.getsize(path_items[file_iterator])
else:
f = ql.os.fs_mapper.open(filename, "r")
filesize = os.path.getsize(f.name)
except FileNotFoundError:
ql.os.last_error = ERROR_FILE_NOT_FOUND
return INVALID_HANDLE_VALUE
# Create a handle for the path
if path_check:
obj_list = [params['lpFileName'],file_iterator]
else:
obj_list = [filename,file_iterator]
new_handle = Handle(obj=obj_list)
ql.os.handle_manager.append(new_handle)
# calculate file time
epoch = datetime(1601, 1, 1)
elapsed = datetime.now() - epoch
# number of 100-nanosecond intervals since Jan 1, 1601 utc
# where: (10 ** 9) / 100 -> (10 ** 7)
hnano = int(elapsed.total_seconds() * (10 ** 7))
mask = (1 << 32) - 1
ftime = FILETIME(
(hnano >> 0) & mask,
(hnano >> 32) & mask
)
fdata_struct = make_win32_find_data(ql.arch.bits, wide=True)
if path_check:
with fdata_struct.ref(ql.mem, pointer) as fdata_obj:
fdata_obj.dwFileAttributes = FILE_ATTRIBUTE_DIRECTORY
fdata_obj.ftCreationTime = ftime
fdata_obj.ftLastAccessTime = ftime
fdata_obj.ftLastWriteTime = ftime
fdata_obj.nFileSizeHigh = (filesize >> 32) & mask
fdata_obj.nFileSizeLow = (filesize >> 0) & mask
fdata_obj.dwReserved0 = 0
fdata_obj.dwReserved1 = 0
fdata_obj.cFileName = ".".encode('utf-16-le').decode('utf-16-le')
fdata_obj.cAlternateFileName = ".".encode('utf-16-le').decode('utf-16-le')
fdata_obj.dwFileType = 0
fdata_obj.dwCreatorType = 0
fdata_obj.wFinderFlags = 0
elif filename.__contains__("*"):
with fdata_struct.ref(ql.mem, pointer) as fdata_obj:
fdata_obj.dwFileAttributes = FILE_ATTRIBUTE_DIRECTORY
fdata_obj.ftCreationTime = ftime
fdata_obj.ftLastAccessTime = ftime
fdata_obj.ftLastWriteTime = ftime
fdata_obj.nFileSizeHigh = (filesize >> 32) & mask
fdata_obj.nFileSizeLow = (filesize >> 0) & mask
fdata_obj.dwReserved0 = 0
fdata_obj.dwReserved1 = 0
fdata_obj.cFileName = path_items[file_iterator].split("\\")[-1].encode('utf-16-le').decode('utf-16-le')
fdata_obj.cAlternateFileName = path_items[file_iterator].split("\\")[-1].upper() if len(path_items[file_iterator].split("\\")[-1].upper()) <= 14 else path_items[file_iterator].split("\\")[-1].upper()[0:14]
fdata_obj.dwFileType = 0
fdata_obj.dwCreatorType = 0
fdata_obj.wFinderFlags = 0
else:
with fdata_struct.ref(ql.mem, pointer) as fdata_obj:
fdata_obj.dwFileAttributes = FILE_ATTRIBUTE_NORMAL
fdata_obj.ftCreationTime = ftime
fdata_obj.ftLastAccessTime = ftime
fdata_obj.ftLastWriteTime = ftime
fdata_obj.nFileSizeHigh = (filesize >> 32) & mask
fdata_obj.nFileSizeLow = (filesize >> 0) & mask
fdata_obj.dwReserved0 = 0
fdata_obj.dwReserved1 = 0
fdata_obj.cFileName = filename.encode('utf-16-le').decode('utf-16-le')
fdata_obj.cAlternateFileName = filename.upper().encode('utf-16-le').decode('utf-16-le')
fdata_obj.dwFileType = 0
fdata_obj.dwCreatorType = 0
fdata_obj.wFinderFlags = 0
return new_handle.id
# BOOL FindNextFileW(
# HANDLE lpFileName,
# LPWIN32_FIND_DATAA lpFindFileData
# );
@winsdkapi(cc=STDCALL, params={
'hFindFile' : HANDLE,
'lpFindFileData' : LPWIN32_FIND_DATAA
})
def hook_FindNextFileW(ql: Qiling, address: int, params):
ERROR_NO_MORE_FILES = 0x12
handle = params['hFindFile']
pointer = params['lpFindFileData']
search_handle = ql.os.handle_manager.get(handle)
path = search_handle.obj[0]
file_iterator = search_handle.obj[1]
file_iterator = file_iterator + 1
# upate it in the handle too
search_handle.obj[1] = file_iterator
# Check if path exists
filesize = 0
# needs to be replace instead of using full path manually
try:
# if filename is a path
path_check = os.path.isdir(path)
if path_check :
path = path.replace("C:","C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows")
#path_item = os.listdir(path)[file_iterator]
#filename = params['lpFileName'] + "\\" + path_item
filesize = get_dir_size(path)
# check if the string contains wild character *
# checking only for * in this case. In future, might want to add more
elif path.__contains__("*"):
path_pattern = path.replace("C:","C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows")
path_items = glob.glob(path_pattern) # gets the matching files as full path
path = path_pattern.split("*")[0]
if file_iterator == len(path_items):
ql.os.last_error = ERROR_NO_MORE_FILES
return 0
filesize = os.path.getsize(path_items[file_iterator])
else:
f = ql.os.fs_mapper.open(path, "r")
filesize = os.path.getsize(f.name)
except FileNotFoundError:
ql.os.last_error = ERROR_FILE_NOT_FOUND
return INVALID_HANDLE_VALUE
# calculate file time
epoch = datetime(1601, 1, 1)
elapsed = datetime.now() - epoch
# number of 100-nanosecond intervals since Jan 1, 1601 utc
# where: (10 ** 9) / 100 -> (10 ** 7)
hnano = int(elapsed.total_seconds() * (10 ** 7))
mask = (1 << 32) - 1
ftime = FILETIME(
(hnano >> 0) & mask,
(hnano >> 32) & mask
)
fdata_struct = make_win32_find_data(ql.arch.bits, wide=True)
with fdata_struct.ref(ql.mem, pointer) as fdata_obj:
fdata_obj.dwFileAttributes = FILE_ATTRIBUTE_NORMAL
fdata_obj.ftCreationTime = ftime
fdata_obj.ftLastAccessTime = ftime
fdata_obj.ftLastWriteTime = ftime
fdata_obj.nFileSizeHigh = (filesize >> 32) & mask
fdata_obj.nFileSizeLow = (filesize >> 0) & mask
fdata_obj.dwReserved0 = 0
fdata_obj.dwReserved1 = 0
fdata_obj.cFileName = path_items[file_iterator].split("\\")[-1].encode('utf-16-le').decode('utf-16-le')
fdata_obj.cAlternateFileName = path_items[file_iterator].split("\\")[-1].upper() if len(path_items[file_iterator].split("\\")[-1].upper()) <= 14 else path_items[file_iterator].split("\\")[-1].upper()[0:14]
fdata_obj.dwFileType = 0
fdata_obj.dwCreatorType = 0
fdata_obj.wFinderFlags = 0
return 1
@winsdkapi(cc=STDCALL, params={
'PathToFile' : PWCH,
'Flags' : ULONG,
'ModuleFileName' : PUNICODE_STRING,
'ModuleHandle' : PHANDLE
})
def hook_ldrloaddll(ql:Qiling,address:int,params):
#logging.info("load_dll called")
filename_addr = params["ModuleFileName"]
module_handle = params["ModuleHandle"]
wcstr = bytearray()
ch = 1
while ch != b'\x00\x00':
ch = ql.mem.read(filename_addr,2)
wcstr.extend(ch)
filename_addr += 2
#logging.info(wcstr)
lpLibFileName = wcstr[0:-2].decode('utf-16-le').split("\\")[-1]
dll = ql.loader.get_image_by_name(lpLibFileName, casefold=True)
if dll is not None:
return dll.base
dllhandle = ql.loader.load_dll(lpLibFileName)
ql.mem.write_ptr(module_handle,dllhandle)
#returning success even if it fails
return STATUS_SUCCESS
@winsdkapi(cc=STDCALL, params={
'ModuleHandle' : HMODULE,
'FunctionName' : PANSI_STRING,
'Ordinal' : WORD,
'FunctionAddress' : POINTER
})
def hook_LdrGetProcedureAddress(ql: Qiling, address: int, params):
ModuleHandle = params['ModuleHandle']
FunctionName = params['FunctionName']
Ordinal = params['Ordinal']
FunctionAddress = params['FunctionAddress']
# Check if dll is loaded
dll_name = next((os.path.basename(path).casefold() for base, _, path in ql.loader.images if base == ModuleHandle), None)
if dll_name is None:
ql.log.debug(f'Could not find specified handle {ModuleHandle} in loaded DLL')
return STATUS_DLL_NOT_FOUND
identifier = utils.read_pansi_string(ql, FunctionName) if FunctionName else Ordinal
iat = ql.loader.import_address_table[dll_name]
identifier = identifier.encode()
if not identifier:
return STATUS_INVALID_PARAMETER
if identifier not in iat:
return STATUS_PROCEDURE_NOT_FOUND
ql.mem.write_ptr(FunctionAddress, iat[identifier])
return STATUS_SUCCESS
# BOOL EqualSid(
# PSID pSid1,
# PSID pSid2
# );
@winsdkapi(cc=STDCALL, params={
'pSid1' : PSID,
'pSid2' : PSID
})
def hook_EqualSid(ql: Qiling, address: int, params):
ql.os.last_error = ERROR_SUCCESS
# return sid1 == sid2
return 1
# BOOL GetHandleInformation(
# 'hObject' : HANDLE,
# 'LPDWORD' : lpdwFlags
# );
@winsdkapi(cc=STDCALL, params={
'hObject' : HANDLE,
'lpdwFlags' : LPDWORD
})
def hook_GetHandleInformation(ql: Qiling, address: int, params):
HANDLE_FLAG_INHERIT = 0x00000001
ql.mem.write_ptr(params['lpdwFlags'],HANDLE_FLAG_INHERIT)
return 1
- The implementation of
EqualSid
is incorrect. Ideally, it would compare both Sids and return the result but it is not possible to do so due to a bug in Qiling itself. Refer the comments at: https://github.com/qilingframework/qiling/blob/9a78d186c97d6ff42d7df31155dda2cd9e1a7fe3/qiling/os/windows/dlls/advapi32.py#L833 - In this case,
EqualSid
returns true which means that the dridex sample thinks that it has admin priviliges.
Additionally, changes needs to be done in qiling/os/windows/structs.py
file. It has missing structs which needs to be defined. in __init__
method of Class Token
, missing struct is defined :
self.struct[Token.TokenInformationClass.TokenElevation.value] = ql.pack(0x1)
after TokenGroups.value is defined.self.struct[Token.TokenInformationClass.TokenUser] = ql.pack(sid_addr)
after TokenIntegrityLevel is defined.
Once these additional structs and api calls are implemented, api hash resolving functon can be emulated without any issues.
Python Script for resolving api hash and adding comments in Ghidra
Complete python3 script for emulating api hash resolving function and adding comments in Ghidra:
import sys
import os
import binascii
import logging
import glob
from datetime import datetime
from unicorn import UC_PROT_ALL
import ghidra_bridge
from qiling import *
from qiling.const import QL_VERBOSE,QL_INTERCEPT
from qiling.os.windows.const import *
from qiling.os.windows.api import *
from qiling.os.const import *
from qiling.os.windows.fncc import *
from qiling.os.windows.utils import *
from qiling.os.windows.thread import *
from qiling.os.windows.handle import *
from qiling.exception import *
from qiling.os.windows.api import *
from qiling.os.windows.fncc import *
from qiling.const import QL_ARCH
from qiling.os.windows.const import *
from qiling.os.windows import structs
from qiling.os.windows import utils
logging.basicConfig(filename="IAT.log",level=logging.INFO)
def get_addr_ghidra(iat_function_addr):
print("\n getting referenced addresses and params passed\n")
addr = currentProgram.getAddressFactory().getAddress(iat_function_addr)
reference_manager = currentProgram.getReferenceManager()
code_manager = currentProgram.getCodeManager()
references_to = reference_manager.getReferencesTo(addr)
list_addr_list = []
print("Getting the references and arguments")
while(references_to.hasNext()):
addr_list = []
ref_addr = references_to.next().getFromAddress()
# running a loop till it finds 2 push operations
# which pushs the hash to the stack
ref_addr_0 = ref_addr
arg = 0
hash_list = []
while( arg < 2):
ref_addr_pre = code_manager.getInstructionBefore(ref_addr_0).getAddress()
if 'PUSH' == code_manager.getInstructionAt(ref_addr_pre).getMnemonicString():
ins = code_manager.getInstructionAt(ref_addr_pre)
hash_value = str(ins).split(" ")[1]
if hash_value[0:2] == '0x':
hash_list.append(hash_value)
arg = arg + 1
else:
break
ref_addr_0 = ref_addr_pre
if arg != 2 :
continue
# getting address of next 2 instructions
ref_addr_1 = code_manager.getInstructionAfter(ref_addr).getAddress()
ref_addr_2 = code_manager.getInstructionAfter(ref_addr_1).getAddress()
addr_list.append(str(ref_addr))
addr_list.append(str(ref_addr_1))
addr_list.append(str(ref_addr_2))
addr_list.append(hash_list)
list_addr_list.append(addr_list)
return list_addr_list
@winsdkapi(cc=STDCALL, params={
'Flags' : ULONG,
'HeapBase' : PVOID,
'ReserveSize' : SIZE_T,
'CommitSize' : SIZE_T,
'Lock' : PVOID,
'Parameters' : DWORD #pointer to structure containing parameters
})
def hook_RtlCreateHeap(ql: Qiling, address: int, params):
# in this case, both reserveSize and commitSize are zero
# so 64 pages needs to be created. 1 page is committed in memory
# no need to implement other cases.
return ql.os.heap.alloc(64 * 1024)
# NTSYSAPI PVOID RtlFreeHeap (
# PVOID HeapHandle,
# ULONG Flags,
# PVOID HeapBase
# );
@winsdkapi(cc=STDCALL, params={
'HeapHandle' : PVOID,
'Flags' : ULONG,
'HeapBase' : PVOID
})
def hook_RtlFreeHeap(ql,address,params):
return ql.os.heap.free(params['HeapBase'])
def get_dir_size(path='.'):
total = 0
with os.scandir(path) as it:
for entry in it:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += get_dir_size(entry.path)
return total
# HANDLE FindFirstFileExW(
# LPWCSTR lpFileName,
# FINDEX_INFO_LEVELS fInfoLevelId,
# FINDEX_SEARCH_OPS fSearchOp,
# LPVOID lpSearchFilter,
# DWORD dwAdditionalFlags
# );
@winsdkapi(cc=STDCALL, params={
'lpFileName' : LPCWSTR,
'fInfoLevelId' : DWORD, # FINDEX_INFO_LEVELS
'lpFindFileData' : LPVOID,
'fSearchOp' : DWORD, # FINDEX_SEARCH_OPS
'lpSearchFilter' : LPVOID,
'dwAdditionalFlags' : DWORD
})
def hook_FindFirstFileExW(ql: Qiling, address: int, params):
file_iterator = 0 # this will be passed to findnextfile via handle obj
filename = params['lpFileName']
pointer = params['lpFindFileData']
if not filename:
return INVALID_HANDLE_VALUE
if len(filename) >= MAX_PATH:
return ERROR_INVALID_PARAMETER
# Check if path exists
filesize = 0
try:
# if filename is a path
path_check = os.path.isdir(filename)
if path_check :
path = filename.replace("C:","C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows")
#path_item = os.listdir(path)[file_iterator]
#filename = params['lpFileName'] + "\\" + path_item
f = path
filesize = get_dir_size(path)
# check if the string contains wild character *
# checking only for * in this case. In future, might want to add more
elif filename.__contains__("*"):
path_pattern = filename.replace("C:","C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows")
path_items = glob.glob(path_pattern) # gets the matching files as full path
path = path_pattern.split("*")[0]
f = path_pattern
filesize = os.path.getsize(path_items[file_iterator])
else:
f = ql.os.fs_mapper.open(filename, "r")
filesize = os.path.getsize(f.name)
except FileNotFoundError:
ql.os.last_error = ERROR_FILE_NOT_FOUND
return INVALID_HANDLE_VALUE
# Create a handle for the path
if path_check:
obj_list = [params['lpFileName'],file_iterator]
else:
obj_list = [filename,file_iterator]
new_handle = Handle(obj=obj_list)
ql.os.handle_manager.append(new_handle)
# calculate file time
epoch = datetime(1601, 1, 1)
elapsed = datetime.now() - epoch
# number of 100-nanosecond intervals since Jan 1, 1601 utc
# where: (10 ** 9) / 100 -> (10 ** 7)
hnano = int(elapsed.total_seconds() * (10 ** 7))
mask = (1 << 32) - 1
ftime = FILETIME(
(hnano >> 0) & mask,
(hnano >> 32) & mask
)
fdata_struct = make_win32_find_data(ql.arch.bits, wide=True)
if path_check:
with fdata_struct.ref(ql.mem, pointer) as fdata_obj:
fdata_obj.dwFileAttributes = FILE_ATTRIBUTE_DIRECTORY
fdata_obj.ftCreationTime = ftime
fdata_obj.ftLastAccessTime = ftime
fdata_obj.ftLastWriteTime = ftime
fdata_obj.nFileSizeHigh = (filesize >> 32) & mask
fdata_obj.nFileSizeLow = (filesize >> 0) & mask
fdata_obj.dwReserved0 = 0
fdata_obj.dwReserved1 = 0
fdata_obj.cFileName = ".".encode('utf-16-le').decode('utf-16-le')
fdata_obj.cAlternateFileName = ".".encode('utf-16-le').decode('utf-16-le')
fdata_obj.dwFileType = 0
fdata_obj.dwCreatorType = 0
fdata_obj.wFinderFlags = 0
elif filename.__contains__("*"):
with fdata_struct.ref(ql.mem, pointer) as fdata_obj:
fdata_obj.dwFileAttributes = FILE_ATTRIBUTE_DIRECTORY
fdata_obj.ftCreationTime = ftime
fdata_obj.ftLastAccessTime = ftime
fdata_obj.ftLastWriteTime = ftime
fdata_obj.nFileSizeHigh = (filesize >> 32) & mask
fdata_obj.nFileSizeLow = (filesize >> 0) & mask
fdata_obj.dwReserved0 = 0
fdata_obj.dwReserved1 = 0
fdata_obj.cFileName = path_items[file_iterator].split("\\")[-1].encode('utf-16-le').decode('utf-16-le')
fdata_obj.cAlternateFileName = path_items[file_iterator].split("\\")[-1].upper() if len(path_items[file_iterator].split("\\")[-1].upper()) <= 14 else path_items[file_iterator].split("\\")[-1].upper()[0:14]
fdata_obj.dwFileType = 0
fdata_obj.dwCreatorType = 0
fdata_obj.wFinderFlags = 0
else:
with fdata_struct.ref(ql.mem, pointer) as fdata_obj:
fdata_obj.dwFileAttributes = FILE_ATTRIBUTE_NORMAL
fdata_obj.ftCreationTime = ftime
fdata_obj.ftLastAccessTime = ftime
fdata_obj.ftLastWriteTime = ftime
fdata_obj.nFileSizeHigh = (filesize >> 32) & mask
fdata_obj.nFileSizeLow = (filesize >> 0) & mask
fdata_obj.dwReserved0 = 0
fdata_obj.dwReserved1 = 0
fdata_obj.cFileName = filename.encode('utf-16-le').decode('utf-16-le')
fdata_obj.cAlternateFileName = filename.upper().encode('utf-16-le').decode('utf-16-le')
fdata_obj.dwFileType = 0
fdata_obj.dwCreatorType = 0
fdata_obj.wFinderFlags = 0
return new_handle.id
# BOOL FindNextFileW(
# HANDLE lpFileName,
# LPWIN32_FIND_DATAA lpFindFileData
# );
@winsdkapi(cc=STDCALL, params={
'hFindFile' : HANDLE,
'lpFindFileData' : LPWIN32_FIND_DATAA
})
def hook_FindNextFileW(ql: Qiling, address: int, params):
ERROR_NO_MORE_FILES = 0x12
handle = params['hFindFile']
pointer = params['lpFindFileData']
search_handle = ql.os.handle_manager.get(handle)
path = search_handle.obj[0]
file_iterator = search_handle.obj[1]
file_iterator = file_iterator + 1
# upate it in the handle too
search_handle.obj[1] = file_iterator
# Check if path exists
filesize = 0
# needs to be replace instead of using full path manually
try:
# if filename is a path
path_check = os.path.isdir(path)
if path_check :
path = path.replace("C:","C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows")
#path_item = os.listdir(path)[file_iterator]
#filename = params['lpFileName'] + "\\" + path_item
filesize = get_dir_size(path)
# check if the string contains wild character *
# checking only for * in this case. In future, might want to add more
elif path.__contains__("*"):
path_pattern = path.replace("C:","C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows")
path_items = glob.glob(path_pattern) # gets the matching files as full path
path = path_pattern.split("*")[0]
if file_iterator == len(path_items):
ql.os.last_error = ERROR_NO_MORE_FILES
return 0
filesize = os.path.getsize(path_items[file_iterator])
else:
f = ql.os.fs_mapper.open(path, "r")
filesize = os.path.getsize(f.name)
except FileNotFoundError:
ql.os.last_error = ERROR_FILE_NOT_FOUND
return INVALID_HANDLE_VALUE
# calculate file time
epoch = datetime(1601, 1, 1)
elapsed = datetime.now() - epoch
# number of 100-nanosecond intervals since Jan 1, 1601 utc
# where: (10 ** 9) / 100 -> (10 ** 7)
hnano = int(elapsed.total_seconds() * (10 ** 7))
mask = (1 << 32) - 1
ftime = FILETIME(
(hnano >> 0) & mask,
(hnano >> 32) & mask
)
fdata_struct = make_win32_find_data(ql.arch.bits, wide=True)
with fdata_struct.ref(ql.mem, pointer) as fdata_obj:
fdata_obj.dwFileAttributes = FILE_ATTRIBUTE_NORMAL
fdata_obj.ftCreationTime = ftime
fdata_obj.ftLastAccessTime = ftime
fdata_obj.ftLastWriteTime = ftime
fdata_obj.nFileSizeHigh = (filesize >> 32) & mask
fdata_obj.nFileSizeLow = (filesize >> 0) & mask
fdata_obj.dwReserved0 = 0
fdata_obj.dwReserved1 = 0
fdata_obj.cFileName = path_items[file_iterator].split("\\")[-1].encode('utf-16-le').decode('utf-16-le')
fdata_obj.cAlternateFileName = path_items[file_iterator].split("\\")[-1].upper() if len(path_items[file_iterator].split("\\")[-1].upper()) <= 14 else path_items[file_iterator].split("\\")[-1].upper()[0:14]
fdata_obj.dwFileType = 0
fdata_obj.dwCreatorType = 0
fdata_obj.wFinderFlags = 0
return 1
@winsdkapi(cc=STDCALL, params={
'PathToFile' : PWCH,
'Flags' : ULONG,
'ModuleFileName' : PUNICODE_STRING,
'ModuleHandle' : PHANDLE
})
def hook_ldrloaddll(ql:Qiling,address:int,params):
#logging.info("load_dll called")
filename_addr = params["ModuleFileName"]
module_handle = params["ModuleHandle"]
wcstr = bytearray()
ch = 1
while ch != b'\x00\x00':
ch = ql.mem.read(filename_addr,2)
wcstr.extend(ch)
filename_addr += 2
#logging.info(wcstr)
lpLibFileName = wcstr[0:-2].decode('utf-16-le').split("\\")[-1]
dll = ql.loader.get_image_by_name(lpLibFileName, casefold=True)
if dll is not None:
return dll.base
dllhandle = ql.loader.load_dll(lpLibFileName)
ql.mem.write_ptr(module_handle,dllhandle)
#returning success even if it fails
return STATUS_SUCCESS
@winsdkapi(cc=STDCALL, params={
'ModuleHandle' : HMODULE,
'FunctionName' : PANSI_STRING,
'Ordinal' : WORD,
'FunctionAddress' : POINTER
})
def hook_LdrGetProcedureAddress(ql: Qiling, address: int, params):
ModuleHandle = params['ModuleHandle']
FunctionName = params['FunctionName']
Ordinal = params['Ordinal']
FunctionAddress = params['FunctionAddress']
# Check if dll is loaded
dll_name = next((os.path.basename(path).casefold() for base, _, path in ql.loader.images if base == ModuleHandle), None)
if dll_name is None:
ql.log.debug(f'Could not find specified handle {ModuleHandle} in loaded DLL')
return STATUS_DLL_NOT_FOUND
identifier = utils.read_pansi_string(ql, FunctionName) if FunctionName else Ordinal
iat = ql.loader.import_address_table[dll_name]
identifier = identifier.encode()
if not identifier:
return STATUS_INVALID_PARAMETER
if identifier not in iat:
return STATUS_PROCEDURE_NOT_FOUND
ql.mem.write_ptr(FunctionAddress, iat[identifier])
return STATUS_SUCCESS
# BOOL EqualSid(
# PSID pSid1,
# PSID pSid2
# );
@winsdkapi(cc=STDCALL, params={
'pSid1' : PSID,
'pSid2' : PSID
})
def hook_EqualSid(ql: Qiling, address: int, params):
ql.os.last_error = ERROR_SUCCESS
# return sid1 == sid2
return 1
# BOOL GetHandleInformation(
# 'hObject' : HANDLE,
# 'LPDWORD' : lpdwFlags
# );
@winsdkapi(cc=STDCALL, params={
'hObject' : HANDLE,
'lpdwFlags' : LPDWORD
})
def hook_GetHandleInformation(ql: Qiling, address: int, params):
HANDLE_FLAG_INHERIT = 0x00000001
ql.mem.write_ptr(params['lpdwFlags'],HANDLE_FLAG_INHERIT)
return 1
def extract_eax_resolvor(ql):
eax_value = ql.arch.regs.eax
eip_value = ql.arch.regs.eip
func = ql.loader.import_symbols[eax_value]
func_dll = func["dll"]
func_name = func["name"].decode("ascii")
print("\n\n")
print(f"resolves {func_dll}.{func_name}")
print("Adding Comment in ghidra")
#convert to ghidra address
addr = currentProgram.getAddressFactory().getAddress(str(hex(eip_value)))
code = currentProgram.getListing().getCodeUnitAt(addr)
comment_msg = "{}.{}".format(func_dll,func_name)
# starting a transaction which modifies the ghidra project
start()
code.setComment(code.PRE_COMMENT, comment_msg)
end(True)
# ending the transaction once it is over
print(f"Comment added at : {hex(eip_value)}")
logging.info(f" address : {hex(eip_value)} | API : {func_dll}.{func_name}")
def sandbox(path, rootfs,list_of_addr):
# create a sanbox for windows x86_64
ql = Qiling([path], rootfs,verbose=QL_VERBOSE.DISABLED)
print("Adding missing API...")
# Implementing missing API by hooking it
ql.os.set_api("RtlCreateHeap",hook_RtlCreateHeap,QL_INTERCEPT.CALL)
ql.os.set_api("RtlFreeHeap",hook_RtlFreeHeap,QL_INTERCEPT.CALL)
ql.os.set_api("FindFirstFileExW",hook_FindFirstFileExW,QL_INTERCEPT.CALL)
ql.os.set_api("FindNextFileW",hook_FindNextFileW,QL_INTERCEPT.CALL)
ql.os.set_api("LdrLoadDll",hook_ldrloaddll,QL_INTERCEPT.CALL)
ql.os.set_api("LdrGetProcedureAddress",hook_LdrGetProcedureAddress,QL_INTERCEPT.CALL)
ql.os.set_api("EqualSid",hook_EqualSid,QL_INTERCEPT.CALL)
ql.os.set_api("GetHandleInformation",hook_GetHandleInformation,QL_INTERCEPT.CALL)
print("Setting up parameters. Starting emulation")
for addr_list in list_of_addr:
ql.arch.stack_push(int(addr_list[3][1],16))
ql.arch.stack_push(int(addr_list[3][0],16))
hook_handle = ql.hook_address(extract_eax_resolvor,int(addr_list[1],16))
try:
ql.run(begin=int(addr_list[0],16),end=int(addr_list[2],16))
except KeyboardInterrupt:
sys.exit()
except:
#ql.hook_del(hook_handle)
pass
ql.hook_del(hook_handle)
print("\n\n\nDone! resolving IAT")
if __name__ == "__main__":
b = ghidra_bridge.GhidraBridge(namespace=globals())
iat_function_addr = '0x1000d0d0'
list_addr_list = get_addr_ghidra(iat_function_addr)
print("Done! Invoking Emulator")
sandbox("C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows\\bin\\patched.bin", "C:\\Users\\admin\\Desktop\\qiling\\examples\\rootfs\\x86_windows",list_addr_list)
Result
After executing the script, following output can be seen:
In Ghidra, comment is added after api hash resolving function is called.
Total references found is 344 however the script resolved 343 api calls. It is because at address 0x100011bd
, second argument passed to api hash resolving function is a value from a struct.
This can be emulated in Qiling and calls resolved in the loop will be resolved.
Additional References
Blogs that analyzed Dridex malware:
- https://cyber-anubis.github.io/malware%20analysis/dridex/
- https://www.0ffset.net/reverse-engineering/malware-analysis/dridex-veh-api-obfuscation/
- https://0xk4n3ki.github.io/posts/Dridex/
- https://web.archive.org/web/20230530201929/https://vk-intel.org/2018/09/10/lets-learn-dissecting-dridex-banking-malware-part-1-loader-and-avast-snxk-dll-hooking-lib/
- https://blog.lexfo.fr/dridex-malware.html
Thoughts
Comparision
- Using HashDB is still the easiest and fastest method to resolve api calls. The emulator approach can be used in case the hash is not present in database. This can happen if the hashing algorithm is new or a modified version of a known hashing algorithm. Folks can happen submit a python implementation of the hashing algorithm to HashDB in github so that hashes are added to the database.
- Using debugger or instrumentation to resolve api calls is much faster than using an emulator however it won’t resolve all the api calls. Only the api calls used by the sample will be resolved.
Further Changes
- Regarding adding comments in Ghidra, It is done only for demonstration purposes. The script can be updated to modify variable shown in decomplier itself. This can be done by accessing
HighSymbol
at that address usinggetLocalSymbolMap()
,getSymbols
and then using methodupdateDBVariable
to modify the variable. However, this will be time-consuming operation since decompiler needs to be invoked in order to accessHighSymbol
. - The script runs the emulation at every address where the api hash resolver call is called. It can be optimised by keeping track of the hash value and api call resolved. So, when same hash value is being resolved, instead of running the script, the earlier api call resolved will be the output.
Additional Notes
- To interact with Ghidra using python, Ghidrathon was considered. However it didn’t worked. It is possible to run it but in headless Ghidra mode only. Refer: Update Qiling to work within Ghidra (using Ghidrathon) #1301
- To get a better understanding of using Ghidra API calls, one can read go through scripts which are bundled with Ghidra. One can also refer the snippets at:
- In future, the project Dumpulator will be explored for API hash resolving. Hopefully, it works with Ghidrathon plugin.