For this set of exercises, you will leverage LLM agents to pull together network and application intelligence to perform tasks.
In the course VM, change into the directory for the exercises in the repository.
cd cs475-src/10 git pull
Rapid APIs provides a streamlined mechanism for accessing a vast number of third party application programming interfaces that can be integrated into your application. It supports many security-related APIs that we can use within our agents. Visit https://rapidapi.com/ and sign-up for an account with Google using your pdx.edu email address. Then, visit your "Apps" and find the "Authorization" section of your default application. Within the application, an application key has been defined. Copy the key and go back to the course VM.
Set up an environment variable to utilize Rapid API.
export RAPID_API_KEY="<FMI>"
Note that you can add this to your .bashrc
file to automatically set the key when you login each time.
There are many intelligence APIs available through Rapid API. In this set of exercises, we'll utilize a set of APIs that provide free subscriptions. The first API is the mailcheck service at https://rapidapi.com/Top-Rated/api/e-mail-check-invalid-or-disposable-domain. The second is the OOP Spam Filter at https://rapidapi.com/oopspam/api/oopspam-spam-filter. Navigate to each service's landing page on Rapid APIs, enable the API and then subscribe to the API using its free plan.
Google's Safe Browsing API allows you to gather intelligence about URLs to allow one to avoid potentially dangerous content. To enable it on the GOOGLE_API_KEY
you have previously set up, visit the web interface to your GCP project and bring up Cloud Shell. Enable the API via the following command:
gcloud services enable safebrowsing.googleapis.com
Then, navigate to your project's API credentials page at https://console.cloud.google.com/apis/credentials. Find the API you've set up previously, then edit the key.
Within the API restrictions for the key, scroll down and add Safe Browsing to the APIs allowed. Then, click on "Save" to persist the changes.
OpenCVE provides a real-time feed for the latest application vulnerabilities and weaknesses. Visit https://opencve.io and create an account using your pdx.edu email address. When utilizing OpenCVE's API's, we'll need to supply our account credentials. Set up environment variables that contain them.
export OPENCVE_USERNAME="<FMI>"
export OPENCVE_PASSWORD="<FMI>"
Note that you can add this to your .bashrc
file to automatically set the key when you login each time.
Obtaining information about particular IP addresses can be helpful in responding to potential attacks. In this exercise, two sources of IP intelligence are integrated as tools using an agent. The first is from IPWhois. Begin by visiting the URL https://ipwho.is/ to see information about the IP address you're connecting from.
Another source of information is VirusTotal. VirusTotal provides comprehensive information across many domains. Visit the VirusTotal documentation site at https://docs.virustotal.com/reference/overview to examine the kinds of intelligence available for access. Navigate to the site's IP address API.
Change into the exercise directory, create a virtual environment, activate it, and install the packages.
cd 01_net_int virtualenv -p python3 env source env/bin/activate pip install -r requirements.txt
An agent with custom tools for calling each service based on a user query is provided in the repository. It will not run properly unless each tool is given a detailed description for what it does and how the results it returns should be interpreted. View the tools provided in the MCP server code and modify each tool's description given in Python docstring comments as "(CHANGE ME)
" to better reflect the information you have seen each tool return. Having the LLM generate the description based on sample output in addition to attaching sample output to the description can be helpful in producing accurate execution.
@mcp.tool("ip_loc")
async def ip_loc(address:str):
"""(CHANGE ME)"""
url = f"http://ipwho.is/{address}"
response = requests.get(url)
if response.status_code == 200:
return response.json()
@mcp.tool("ip_report")
async def ip_report(address:str):
"""(CHANGE ME)"""
url = f"https://www.virustotal.com/api/v3/ip_addresses/{address}"
request_headers = {"x-apikey": VIRUSTOTAL_API_KEY}
response = requests.get(url, headers=request_headers)
if response.status_code == 200:
return response.json()
Run the modified agent:
python3 net_int_mcp_client.py
Deactivate the virtual environment and delete it.
deactivate rm -rf env
Obtaining information about particular DNS names can also be helpful in responding to potential attacks. In this exercise, three sources of Domain Name System intelligence are integrated as tools using an agent. The first is from the mailcheck service set up previously via Rapid APIs. Login to Rapid API and visit the playground for the service (https://rapidapi.com/Top-Rated/api/e-mail-check-invalid-or-disposable-domain) Click on the "Params" tab and test the endpoint with a DNS name of your choice.
Another source of information on DNS names is the Certificate Transparency log for TLS certificates. Visit https://certificate.transparency.dev/ to find out more about it. Then, visit https://crt.sh and lookup a DNS name to examine the kinds of information available for access.
A third source of information on DNS names is the whois registry. Visit https://en.wikipedia.org/wiki/WHOIS to find out more about what is stored using the registry. Then, in the course VM, run the command on a DNS name to examine the kinds of information available for access.
whois pdx.edu
Change into the exercise directory, create a virtual environment, activate it, and install the packages.
cd 02_dns_int virtualenv -p python3 env source env/bin/activate pip install -r requirements.txt
View the tools provided in the MCP server code and modify each tool's description given in Python docstring comments as "(CHANGE ME)
" to better reflect the information you have seen each tool return.
@mcp.tool("cert_domain_search")
async def cert_domain_search(domain):
"""(CHANGE ME)"""
url = f"""https://crt.sh/?Identity={domain}&output=json"""
response = requests.get(url)
return response.json()
@mcp.tool("email_domain_search")
async def email_domain_search(domain):
"""(CHANGE ME)"""
url = "https://mailcheck.p.rapidapi.com/"
querystring = {"domain": domain}
request_headers = {
"X-RapidAPI-Key": RAPID_API_KEY,
"X-RapidAPI-Host": "mailcheck.p.rapidapi.com"
}
response = requests.get(url, headers=request_headers, params=querystring)
return response.json()
@mcp.tool("whois_domain_search")
async def whois_domain_search(domain):
"""(CHANGE ME)"""
result = subprocess.run(['whois',domain], capture_output=True, text=True, check=True)
return result
Run the modified agent:
python3 dns_int_mcp_client.py
Deactivate the virtual environment and delete it.
deactivate rm -rf env
Obtaining information about particular URLs is useful in preventing the spread of malicious content. In this exercise, two sources of URL-based intelligence are integrated as tools using an agent. The first is from VirusTotal. Visit the VirusTotal documentation site at https://docs.virustotal.com/reference/overview and navigate to the site's URL API. There is a wealth of information on URLs via this API including what a range of malware detection engines think of the URL. To show this functionality, visit the course VM. Run the following command to retrieve the latest scan report on https://pdx.edu from the urls
API using your API key.
curl --request POST --url https://www.virustotal.com/api/v3/urls \
--form url=https://pdx.edu \
--header "x-apikey: ${VIRUSTOTAL_API_KEY}"
Pull out the report link from the response, then retrieve the report using your API key by filling in the command below.
curl --url <URL_for_report> \
--header "x-apikey: ${VIRUSTOTAL_API_KEY}"
Another API that can identify problematic URLs is Google's Safe Browsing API. Visit the documentation for the URL lookup endpoint at https://developers.google.com/safe-browsing/v4/lookup-api.
Finally, the PhishTank site provides a real-time database feed for known phishing URLs. If a particular URL is in the database, it will be indicated. View the API information at: https://www.phishtank.com/api_info.php. Then, find a recently reported Phishing URL at https://www.phishtank.com/phish_archive.php. Use the following command to query the PhishTank API for the URL.
curl --request POST --url https://checkurl.phishtank.com/checkurl/ \
--form url=<Phishing_URL> --form format=json \
--header "User-Agent: phishtank/$USER"
Change into the exercise directory, create a virtual environment, activate it, and install the packages.
cd 03_url_int virtualenv -p python3 env source env/bin/activate pip install -r requirements.txt
View the tools provided in the MCP server code and modify each tool's description given in Python docstring comments as "(CHANGE ME)
" to better reflect the information you have seen each tool return.
@mcp.tool("safebrowsing_url_report")
def safebrowsing_url_report(url):
"""(CHANGE ME)"""
api_url = "https://safebrowsing.googleapis.com/v4/threatMatches:find"
response = requests.post(f"{api_url}?key={GOOGLE_API_KEY}", headers=request_headers, data=json.dumps(payload))
...
@mcp.tool("virustotal_url_report")
def virustotal_url_report(url):
"""(CHANGE ME)"""
api_url = f"https://www.virustotal.com/api/v3/urls"
response = requests.post(api_url, headers=request_headers, data={"url":url})
...
@mcp.tool("phishtank_url_report")
def phishtank_url_report(url):
"""(CHANGE ME)"""
api_url = f"https://checkurl.phishtank.com/checkurl/"
response = requests.post(api_url, headers=request_headers, data=payload)
...
Run the modified agent:
python3 url_int_mcp_client.py
Deactivate the virtual environment and delete it.
deactivate rm -rf env
Another common area for threat intelligence is e-mail addresses and payloads. In this exercise, three sources of Email intelligence are integrated as tools using an agent. The first is from the OOPSpam Spam Filter. Login to Rapid API and visit the playground for the service (https://rapidapi.com/oopspam/api/oopspam-spam-filter) Test the endpoint with the default payload given.
View the documentation for the filter at: https://www.oopspam.com/docs.
Another useful e-mail service is one that detects addresses or domains that are invalid or that are disposable. One service for performing this detection can be found at: https://disify.com/.. To test the service, fill in the command below with your e-mail address and view the results.
curl "https://disify.com/api/email/OdinID@pdx.edu"
Change into the exercise directory, create a virtual environment, activate it, and install the packages.
cd 04_email_int virtualenv -p python3 env source env/bin/activate pip install -r requirements.txt
View the tools provided in the MCP server code and modify each tool's description given in Python docstring comments as "(CHANGE ME)
" to better reflect the information you have seen each tool return.
@mcp.tool("oop_spam_search")
def oop_spam_search(content):
"""(CHANGE ME)"""
url = "https://oopspam.p.rapidapi.com/v1/spamdetection"
payload = {
"content": content,
}
request_headers = {
"X-RapidAPI-Key": RAPID_API_KEY,
"X-RapidAPI-Host": "oopspam.p.rapidapi.com"
}
response = requests.post(url, json=payload, headers=request_headers)
...
@mcp.tool("email_is_spammer")
def email_is_spammer(address):
"""(CHANGE ME)"""
url = f"https://disify.com/api/email/{address}"
response = requests.get(url)
...
Run the modified agent:
python3 email_int_mcp_client.py
Deactivate the virtual environment and delete it.
deactivate rm -rf env
Common Vulnerabilities and Exposures (CVEs) and Common Weakness Enumerations (CWEs) provide a reference method to identify, categorize, and describe security vulnerabilities and software weaknesses. CVEs provide unique identifiers for publicly known cybersecurity vulnerabilities while CWEs classify common software weaknesses that might lead to vulnerabilities. Real-time databases for both CVEs and CWEs can be queried to understand what threats are present in one's software. In this exercise, we will see how one can utilize OpenCVE's API to pull down relevant information on both CVEs and CWEs. Begin by visiting the API documentation for OpenCVE's API at https://docs.opencve.io/api/. We'll be utilizing two API endpoints (/cve/
and /weaknesses/
) using the username and password we have set up previously. Examine the responses that are returned from each API.
Change into the exercise directory, create a virtual environment, activate it, and install the packages.
cd 05_app_int virtualenv -p python3 env source env/bin/activate pip install -r requirements.txt
View the tools provided in the MCP server code and modify each tool's description given in Python docstring comments as "(CHANGE ME)
" to better reflect the information you have seen each tool return.
@mcp.tool("cve_by_id")
def cve_by_id(cve_id):
"""(CHANGE ME)"""
url = f'https://www.opencve.io/api/cve/{cve_id}'
response = requests.get(url, auth=(OPENCVE_USERNAME, OPENCVE_PASSWORD))
...
@mcp.tool("cwe_by_id")
def cwe_by_id(cwe_id):
"""(CHANGE ME)"""
url = f'https://www.opencve.io/api/weaknesses/{cwe_id}'
response = requests.get(url, auth=(OPENCVE_USERNAME, OPENCVE_PASSWORD))
...
Run the modified agent:
python3 app_int_mcp_client.py
Run a query about CVE-2025-23475 and ask the agent to describe it and its associated CWE
Find another CVE from this year and repeat the query, finding its associated CWE
Deactivate the virtual environment and delete it.
deactivate rm -rf env
In this exercise, we'll examine an application of retrieval-augmented generation towards the problem of threat intelligence. The Trusted Automated Exchange of Intelligence Information (TAXII™) is an application protocol for exchanging cyber-threat intelligence (CTI) over HTTPS. An example of a production RAG application is shown below from the Open Threat Research Forge's GenAI Security Adventures repository. In the application, CTI documents from Mitre Att&ck's repository are downloaded, chunked, and tokenized before being embedded and inserted into a vector database. The application then allows a user to query the data via a prompt that is used to retrieve the relevant document. The prompt and document are then sent to the LLM to answer the user's query.
Change into the exercise directory, create a virtual environment, activate it, and install the packages.
cd 06_MitreAttack_RAG virtualenv -p python3 env source env/bin/activate pip install -r requirements.txt
The application is broken into 4 separate parts.
attackcti
downloadThe first downloads information on techniques used by attackers from the attackcti
package, then goes through each technique and adds the technique to groups that utilize it (via all_groups
) as shown in the code snippet below:
from attackcti import attack_client
lift = attack_client()
techniques_used_by_groups = lift.get_techniques_used_by_all_groups()
all_groups = dict()
for technique in techniques_used_by_groups:
...
technique_used = dict()
technique_used['matrix'] = technique['technique_matrix']
...
all_groups[technique['id']]['techniques'].append(technique_used)
After ingesting the techniques and adding them to the groups that use them, the program then generates Markdown files for each group that includes the techniques they use from a Markdown template as shown in the code snippet below:
group_template = os.path.join(current_directory, "group_template.md")
markdown_template = Template(open(group_template).read())
for key in list(all_groups.keys()):
group = all_groups[key]
group_for_render = copy.deepcopy(group)
markdown = markdown_template.render(metadata=group_for_render, group_name=group['group_name'], group_id=group['group_id'])
file_name = (group['group_name']).replace(' ','_')
open(f'{documents_directory}/{file_name}.md', encoding='utf-8', mode='w').write(markdown)
Run the download script.
python3 01_attackcti_download.py
Find the Markdown file that has been generated for APT 28.
The next part of the application takes the generated Markdown files for all of the threat groups, loads it using the UnstructuredMarkdownLoader, and splits it into chunks as shown in the code snippet below:
from langchain_community.document_loaders import UnstructuredMarkdownLoader
group_files = glob.glob(os.path.join(documents_directory, "*.md"))
md_docs = []
for group in group_files:
loader = UnstructuredMarkdownLoader(group)
md_docs.extend(loader.load_and_split())
It then loads the chunks into the vector database using an embedding model as shown in the code snippet below:
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import Chroma
embedding_function = GoogleGenerativeAIEmbeddings(model="models/embedding-001",
task_type="retrieval_query")
vectorstore = Chroma.from_documents(md_docs,
embedding_function, collection_name="groups_collection",
persist_directory=f"{current_directory}/.chromadb")
Run the loading script.
python3 02_loaddb.py
With the documents inserted into our vector database, we can now perform document similarity searches. The code snippet below returns the most similar documents that are retrieved from the vector database given a user's query.
relevant_docs = vectorstore.similarity_search(query)
for doc in relevant_docs:
print(f" {doc.metadata['source']}")
Run the document searching script.
python3 03_docsearch.py
With the ability to retrieve the relevant documents on particular threat groups from the downloaded Mitre ATT&CK CTI data, we can ask questions about particular threat groups. The code below utilizes a built-in question-answering chain within LangChain to take the user's prompt, retrieve the relevant content from the vector database, and send their concatenation to the LLM to handle.
from langchain.chains.question_answering import load_qa_chain
query = "Write a short summary about APT 28"
llm = GoogleGenerativeAI(model="...")
chain = load_qa_chain(llm, chain_type="stuff")
relevant_docs = retriever.get_relevant_documents(query)
results = chain.invoke({'input_documents':relevant_docs, 'question':query})
print(results['output_text'])
Run the document querying script.
python3 04_rag_query.py
The Microsoft Threat Intelligence Community (MSTIC) is a collection of experts that work to discover on-going threats and deliver information about them in real-time to security practitioners. While initially written for Microsoft Azure, the msticpy
Python package allows one to access a variety of external threat information sources including VirusTotal. While VirusTotal has its own Python package (vt-py
) for enabling access to its information feed, in this exercise, we'll be using the VirusTotal support within msticpy
to implement a custom MCP server to answer questions about particular IP addresses.
Change into the exercise directory, create a virtual environment, activate it, and install the packages.
cd 07_MSTIC virtualenv -p python3 env source env/bin/activate pip install -r requirements.txt
View the tools provided in the MCP server code. The MCP tools for the exercise implement tools that query external APIs described below.
TILookup.
lookup_ioc
This particular call performs a lookup on indicators of compromise that have been seen for a particular observable such as an IP address. Examine its documentation here. The following code accesses this call to find different indicators of compromise for an IP address, returning a JSON result that contains hashes of any communicating samples it has found.
from msticpy.context.tilookup import TILookup
@mcp.tool("ip_info")
async def ip_info(ip_address):
"""(CHANGE ME)"""
result = TILookup().lookup_ioc(observable=ip_address, ioc_type="ipv4", providers=["VirusTotal"])
details = result.at[0, 'RawResult']
comm_samples = details['detected_communicating_samples']
return json.dumps(comm_samples)
VTLookupV3.get_object
Given a sample and its hash, this call returns the attributes associated with it. Examine the documentation for the call here. When used within the agent, we lookup the sample then return its attributes in a JSON result.
from msticpy.context.vtlookupv3.vtlookupv3 import VTLookupV3
from msticpy.common.provider_settings import get_provider_settings
vt_key = get_provider_settings("TIProviders")["VirusTotal"].args["AuthKey"]
@mcp.tool("samples_hash_identification")
async def samples_hash_identification(hash_string:str, ctx: Context = None):
"""(CHANGE ME)"""
vt_lookup = VTLookupV3(vt_key=vt_key, force_nestasyncio=True)
result = vt_lookup.get_object(hash_string, "file")
json_result = result.to_json(orient='records')
return json_result
Modify each tool's description given in Python docstring comments as "(CHANGE ME)
" to better reflect the information you have seen each tool return. Having the LLM generate the description based on sample output in addition to attaching sample output to the description can be helpful in producing accurate execution.
Run the agent:
python3 mstic_mcp_client.py
Deactivate the virtual environment and delete it.
deactivate rm -rf env