In this week's exercises, your group will try out the various tasks for performing threat intelligence using LLMs. Begin by completing the setup parts of the codelab. Then, attempt the exercise your group has been assigned in the following Google Slide presentation:

Add screenshots that you can use to walkthrough how you performed the exercise. Your group will present your results for the exercise during the last hour of class. After completing the exercise you've been assigned, continue to the rest of the exercises. Change into the source directory containing the examples, create a virtual environment, activate it, and install the packages.

cd cs410g-src
git pull
cd 09*
virtualenv -p python3 env
source env/bin/activate
pip install -r requirements.txt

Bring up an ssh session on your course VM.

Rapid API account

Rapid APIs provides a streamlined mechanism for accessing a vast number of third party application programming interfaces that can be integrated into your application. It supports many security-related APIs that we can use within our agents. Visit https://rapidapi.com/ and sign-up for an account with Google using your pdx.edu email address. Then, visit your "Apps" and find the "Authorization" section of your default application. Within the application, an application key has been defined. Copy the key and go back to the course VM.

Set up an environment variable to utilize Rapid API.

export RAPID_API_KEY="<FMI>"

Note that you can add this to your .bashrc file to automatically set the key when you login each time.

Enable Rapid API services

There are many intelligence APIs available through Rapid API. In this set of exercises, we'll utilize a set of APIs that provide free subscriptions. The first API is the mailcheck service at https://rapidapi.com/Top-Rated/api/e-mail-check-invalid-or-disposable-domain. The second is the OOP Spam Filter at https://rapidapi.com/oopspam/api/oopspam-spam-filter. Navigate to each service's landing page on Rapid APIs, enable the API and then subscribe to the API using its free plan.

Google Safe Browsing

Google's Safe Browsing API allows you to gather intelligence about URLs to allow one to avoid potentially dangerous content. To enable it on the GOOGLE_API_KEY you have previously set up, visit the web interface to your GCP project and bring up Cloud Shell. Enable the API via the following command:

gcloud services enable safebrowsing.googleapis.com

Then, navigate to your project's API credentials page at https://console.cloud.google.com/apis/credentials. Find the API you've set up previously, then edit the key.

Within the API restrictions for the key, scroll down and add Safe Browsing to the APIs allowed. Then, click on "Save" persist the changes.

OpenCVE

OpenCVE provides a real-time feed for the latest application vulnerabilities and weaknesses. Visit https://opencve.io and create an account using your pdx.edu email address. When utilizing OpenCVE's API's, we'll need to supply our account credentials. Set up environment variables that contain them.

export OPENCVE_USERNAME="<FMI>"
export OPENCVE_PASSWORD="<FMI>"

Note that you can add this to your .bashrc file to automatically set the key when you login each time.

Obtaining information about particular IP addresses can be helpful in responding to potential attacks. In this exercise, two sources of IP intelligence are integrated as tools using an agent. The first is from IPWhois. Begin by visiting the URL https://ipwho.is/ to see information about the IP address you're connecting from.

Another source of information is VirusTotal. VirusTotal provides comprehensive information across many domains. Visit the VirusTotal documentation site at https://docs.virustotal.com/reference/overview to examine the kinds of intelligence available for access. Navigate to the site's IP address API.

An agent with custom tools for calling each service based on a user query is provided in the repository. It will not run properly unless each tool is given a detailed description for what it does and how the results it returns should be interpreted.

@tool
def ip_loc(address):
    url = f"http://ipwho.is/{address}"
    response = requests.get(url)
    return response.json()

@tool
def ip_report(address):
    url = f"https://www.virustotal.com/api/v3/ip_addresses/{address}"
    headers = {"x-apikey": VIRUSTOTAL_API_KEY}
    response = requests.get(url, headers=headers)
    return response.json()

View the agent provided (01_net_int.py) and modify each tool's description given in Python docstring comments to better reflect the information you have seen each tool return. Run the modified agent:

python3 01_net_int.py

Obtaining information about particular DNS names can also be helpful in responding to potential attacks. In this exercise, three sources of Domain Name System intelligence are integrated as tools using an agent. The first is from the mailcheck service set up previously via Rapid APIs. Login to Rapid API and visit the playground for the service (https://rapidapi.com/Top-Rated/api/e-mail-check-invalid-or-disposable-domain) Click on the "Params" tab and test the endpoint with a DNS name of your choice.

Another source of information on DNS names is the Certificate Transparency log for TLS certificates. Visit https://certificate.transparency.dev/ to find out more about it. Then, visit https://crt.sh and lookup a DNS name to examine the kinds of information available for access.

A third source of information on DNS names is the whois registry. Visit https://en.wikipedia.org/wiki/WHOIS to find out more about what is stored using the registry. Then, in the course VM, run the command on a DNS name to examine the kinds of information available for access.

whois pdx.edu

An agent with custom tools for calling each service based on a user query is provided in the repository. It will not run properly unless each tool is given a detailed description for what it does and how the results it returns should be interpreted.

@tool
def cert_domain_search(domain):
    url = f"""https://crt.sh/?Identity={domain}&output=json"""
    response = requests.get(url)
    return response.json()

@tool
def email_domain_search(domain):
    url = "https://mailcheck.p.rapidapi.com/"
    querystring = {"domain": domain}
    headers = {
        "X-RapidAPI-Key": RAPID_API_KEY,
        "X-RapidAPI-Host": "mailcheck.p.rapidapi.com"
    }
    response = requests.get(url, headers=headers, params=querystring)
    return response.json()

@tool
def whois_domain_search(domain):
    result = subprocess.run(['whois',domain], capture_output=True, text=True, check=True)
    return result

View the agent provided (02_dns_int.py) and modify each tool's description given in Python docstring comments to better reflect the information you have seen each tool return. Run the modified agent:

python3 02_dns_int.py

Obtaining information about particular URLs is useful in preventing the spread of malicious content. In this exercise, two sources of URL-based intelligence are integrated as tools using an agent. The first is from VirusTotal. Visit the VirusTotal documentation site at https://docs.virustotal.com/reference/overview and navigate to the site's URL API. There is a wealth of information on URLs via this API including what a range of malware detection engines think of the URL. To show this functionality, visit the course VM. Run the following command to retrieve the latest scan report on https://pdx.edu from the urls API using your API key.

curl --request POST --url https://www.virustotal.com/api/v3/urls \
    --form url=https://pdx.edu \
    --header "x-apikey: ${VIRUSTOTAL_API_KEY}"

Pull out the report link from the response, then retrieve the report using your API key by filling in the command below.

curl --url <URL_for_report> \
    --header "x-apikey: ${VIRUSTOTAL_API_KEY}"

Another API that can identify problematic URLs is Google's Safe Browsing API. Visit the documentation for the URL lookup endpoint at https://developers.google.com/safe-browsing/v4/lookup-api.

Finally, the PhishTank site provides a real-time database feed for known phishing URLs. If a particular URL is in the database, it will be indicated. View the API information at: https://www.phishtank.com/api_info.php. Then, find a recently reported Phishing URL at https://www.phishtank.com/phish_archive.php. Use the following command to query the PhishTank API for the URL.

curl --request POST --url https://checkurl.phishtank.com/checkurl/ \
    --form url=<Phishing_URL> --form format=json \
    --header "User-Agent: phishtank/$USER"

An agent with custom tools for calling each service based on a user query is provided in the repository. It will not run properly unless each tool is given a detailed description for what it does and how the results it returns should be interpreted.

@tool
def safebrowsing_url_report(url):
    api_url = "https://safebrowsing.googleapis.com/v4/threatMatches:find"
    payload = {
        ...
        "threatInfo": {
            "threatTypes": ["THREAT_TYPE_UNSPECIFIED", "MALWARE", "SOCIAL_ENGINEERING", "UNWANTED_SOFTWARE", "POTENTIALLY_HARMFUL_APPLICATION"],
            "platformTypes": ["ANY_PLATFORM"],
            "threatEntryTypes": ["URL", "THREAT_ENTRY_TYPE_UNSPECIFIED", "EXECUTABLE"],
            "threatEntries": [
                {"url": url}
            ]
        }
    }
    ...
    response = requests.post(f"{api_url}?key={GOOGLE_API_KEY}", headers=headers, data=json.dumps(payload))
    return response.json()

@tool
def virustotal_url_report(url):
    url = f"https://www.virustotal.com/api/v3/urls"
    headers = {"x-apikey": VIRUSTOTAL_API_KEY}
    response = requests.post(url, headers=headers, data={"url":url})
    if response.status_code == 200:
        response_dict = response.json()
        link = response_dict['data']['links']['self']
        response = requests.get(link, headers=headers)
        return response.json()

@tool
def phishtank_url_report(url):
    api_url = f"https://checkurl.phishtank.com/checkurl/"
    payload = {
                 "format" : "json",
                 "url" : url
    }
    headers = {"User-Agent": f"phishtank/{os.getenv('USER')}"}
    response = requests.post(api_url, headers=headers, data=payload)
    return response.json()['results']['in_database']

View the agent provided (03_url_int.py) and modify each tool's description given in Python docstring comments to better reflect the information you have seen each tool return. Run the modified agent:

python3 03_url_int.py

Another common area for threat intelligence is e-mail addresses and payloads. In this exercise, three sources of Email intelligence are integrated as tools using an agent. The first is from the OOPSpam Spam Filter. Login to Rapid API and visit the playground for the service (https://rapidapi.com/oopspam/api/oopspam-spam-filter) Test the endpoint with the default payload given.

Another useful e-mail service is one that detects addresses or domains that have either previously sent spam or that are disposable. One service for performing this detection can be found at: https://eva.pingutil.com/. To test the service, fill in the command below with your e-mail address and view the results.

curl "https://api.eva.pingutil.com/email?email=OdinId@pdx.edu" 

An agent with custom tools for calling each service based on a user query is provided in the repository. It will not run properly unless each tool is given a detailed description for what it does and how the results it returns should be interpreted.

@tool
def oop_spam_search(content):
    url = "https://oopspam.p.rapidapi.com/v1/spamdetection"
    payload = {
        "content": content,
        "allowedLanguages": ["en"]
    }
    headers = {
        "content-type": "application/json",
        "X-RapidAPI-Key": RAPID_API_KEY,
        "X-RapidAPI-Host": "oopspam.p.rapidapi.com"
    }
    response = requests.post(url, json=payload, headers=headers)
    if response.status_code == 200:
        return response.json()

@tool
def email_is_spammer(address):
    url = f"http://api.eva.pingutil.com/email?email={address}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()

View the agent provided (04_email_int.py) and modify each tool's description given in Python docstring comments to better reflect the information you have seen each tool return. Run the modified agent:

python3 04_email_int.py

Common Vulnerabilities and Exposures (CVEs) and Common Weakness Enumerations (CWEs) provide a reference method to identify, categorize, and describe security vulnerabilities and software weaknesses. CVEs provide unique identifiers for publicly known cybersecurity vulnerabilities while CWEs classify common software weaknesses that might lead to vulnerabilities. Real-time databases for both CVEs and CWEs can be queried to understand what threats are present in one's software. In this exercise, we will see how one can utilize OpenCVE's API to pull down relevant information on both CVEs and CWEs. Begin by visiting the API documentation for OpenCVE's API at https://docs.opencve.io/api/. We'll be utilizing two API endpoints (/cve/ and /cwe/) using the username and password we have set up previously. Examine the responses that are returned from each API.

An agent with custom tools for calling each service based on a user query is provided in the repository. It will not run properly unless each tool is given a detailed description for what it does and how the results it returns should be interpreted.

@tool
def cve_by_id(cve_id):
    url = f'https://www.opencve.io/api/cve/{cve_id}'
    response = requests.get(url, auth=(OPENCVE_USERNAME, OPENCVE_PASSWORD))
    if response.status_code == 200:
        return response.json()

@tool
def cwe_by_id(cwe_id):
    url = f'https://www.opencve.io/api/cwe/{cwe_id}'
    response = requests.get(url, auth=(OPENCVE_USERNAME, OPENCVE_PASSWORD))
    if response.status_code == 200:
        return response.json()

View the agent provided (05_app_int.py) and modify each tool's description given in Python docstring comments to better reflect the information you have seen each tool return. Run the modified agent:

python3 05_app_int.py

In this exercise, we'll examine an application of retrieval-augmented generation towards the problem of threat intelligence. The Trusted Automated Exchange of Intelligence Information (TAXII™) is an application protocol for exchanging cyber-threat intelligence (CTI) over HTTPS. An example of a production RAG application is shown below from the Open Threat Research Forge's GenAI Security Adventures repository. In the application, CTI documents from Mitre Att&ck's repository are downloaded, chunked, and tokenized before being embedded and inserted into a vector database. The application then allows a user to query the data via a prompt that is used to retrieve the relevant document. The prompt and document are then sent to the LLM to answer the user's query.

To demonstrate this application, change into the directory containing the application and install its packages.

cd cs410g-src
git pull
cd 10*
pip install -r requirements.txt

The application is broken into 4 separate parts.

attackcti download

The first downloads information on techniques used by attackers from the attackcti package, then goes through each technique and adds the technique to groups that utilize it (via all_groups) as shown in the code snippet below:

from attackcti import attack_client
lift = attack_client()
techniques_used_by_groups = lift.get_techniques_used_by_all_groups()

all_groups = dict()
for technique in techniques_used_by_groups:
    ...
    technique_used = dict()
    technique_used['matrix'] = technique['technique_matrix']
    ...
    all_groups[technique['id']]['techniques'].append(technique_used)

After ingesting the techniques and adding them to the groups that use them, the program then generates Markdown files for each group that includes the techniques they use from a Markdown template as shown in the code snippet below:

group_template = os.path.join(current_directory, "group_template.md")
markdown_template = Template(open(group_template).read())
for key in list(all_groups.keys()):
    group = all_groups[key]
    group_for_render = copy.deepcopy(group)
    markdown = markdown_template.render(metadata=group_for_render, group_name=group['group_name'], group_id=group['group_id'])
    file_name = (group['group_name']).replace(' ','_')
    open(f'{documents_directory}/{file_name}.md', encoding='utf-8', mode='w').write(markdown)

Run the download script.

python3 01_attack_download.py

Find the Markdown file that has been generated for APT 28.

Document loading and embedding

The next part of the application takes the generated Markdown files for all of the threat groups, loads it using the UnstructuredMarkdownLoader, and splits it into chunks as shown in the code snippet below:

from langchain_community.document_loaders import UnstructuredMarkdownLoader
group_files = glob.glob(os.path.join(documents_directory, "*.md"))
md_docs = []
for group in group_files:
    loader = UnstructuredMarkdownLoader(group)
    md_docs.extend(loader.load_and_split())

It then loads the chunks into the vector database using an embedding model as shown in the code snippet below:

from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_community.vectorstores import Chroma
embedding_function = GoogleGenerativeAIEmbeddings(model="models/embedding-001",
      task_type="retrieval_query")
vectorstore = Chroma.from_documents(md_docs,
      embedding_function, collection_name="groups_collection",     
      persist_directory=f"{current_directory}/.chromadb")

Run the loading script.

python3 02_loaddb.py

Document searching

With the documents inserted into our vector database, we can now perform document similarity searches. The code snippet below returns the most similar documents that are retrieved from the vector database given a user's query.

relevant_docs = vectorstore.similarity_search(query)
for doc in relevant_docs:
    print(f"  {doc.metadata['source']}")

Run the document searching script.

python3 03_docsearch.py

Document query

With the ability to retrieve the relevant documents on particular threat groups from the downloaded Mitre ATT&CK CTI data, we can ask questions about particular threat groups. The code below utilizes a built-in question-answering chain within LangChain to take the user's prompt, retrieve the relevant content from the vector database, and send their concatenation to the LLM to handle.

from langchain.chains.question_answering import load_qa_chain
query = "Write a short summary about APT 28"
llm = GoogleGenerativeAI(model="gemini-pro")
chain = load_qa_chain(llm, chain_type="stuff")

relevant_docs = retriever.get_relevant_documents(query)
results = chain.invoke({'input_documents':relevant_docs, 'question':query})
print(results['output_text'])

Run the document querying script.

python3 04_rag_query.py

The Microsoft Threat Intelligence Community (MSTIC) is a collection of experts that work to discover on-going threats and deliver information about them in real-time to security practitioners. While initially written for Microsoft Azure, the msticpy Python package allows one to access a variety of external threat information sources including VirusTotal. While VirusTotal has its own Python package (vt-py) for enabling access to its information feed, in this exercise, we'll be using the VirusTotal support within msticpy to implement a custom LangChain agent to answer questions about particular IP addresses.

The agent for the exercise implements custom tools that query specific APIs. Change into the directory containing the application and install its packages.

cd cs410g-src
git pull
cd 11*
pip install -r requirements.txt

The agent accesses two different external calls described below.

TILookup.lookup_ioc

This particular call performs a lookup on indicators of compromise that have been seen for a particular observable such as an IP address. Examine its documentation here. The following code accesses this call to find different indicators of compromise for an IP address, returning a JSON result that contains hashes of any communicating samples it has found.

from msticpy.sectools.tilookup import TILookup

ti_lookup = TILookup()

def ip_info(ip_address):
    result = ti_lookup.lookup_ioc(observable=ip_address, ioc_type="ipv4", providers=["VirusTotal"])
    details = result.at[0, 'RawResult']
    comm_samples = details['detected_communicating_samples']
    return json.dumps(comm_samples)

VTLookupV3.get_object

Given a sample and its hash, this call returns the attributes associated with it. Examine the documentation for the call here. When used within the agent, we lookup the sample then return its attributes in a JSON result.

def samples_identification(id):
    result = vt_lookup.get_object(id, "file")
    json_result = result.to_json(orient='records')
    return json_result

An agent with custom tools for calling each API based on a user query is provided in the repository. It will not run properly unless each tool is given a description for what it does and how the results it returns should be interpreted.

tools = [
    Tool(
        name="Retrieve_IP_Info",
        func=ti_tool.ip_info,
        description="(CHANGE ME)",
    ),
    Tool(
        name="Retrieve_hash_information",
        func=ti_tool.samples_identification,
        description="(CHANGE ME)",
    ),
]

View the agent provided (mstic_gemini.py) and modify each tool's description to better reflect the information you have seen each tool return. Run the modified agent:

python3 mstic_gemini.py