Event Processing Pipeline with AWS/CBR

TStillz
8 min readNov 19, 2018

In my previous blog post, we covered how to extract raw network connections from real time process executions using the Carbon Black APIs. After extracting out each network connection, we enriched the network data with some GeoIP information to aid in detecting suspicious network connections. The downside to this method is you need to specify the processes you wish to analyze. This means you may miss events if you fail to specify the correct process; you also miss out on key metrics that, by themselves may point to something suspicious. Let’s take a looks at how we can use some AWS resources and the Carbon Black Event Forwarder to handle all network connections. For reference, the event schema can be found at the following link below:

https://developer.carbonblack.com/reference/enterprise-response/event-forwarder/event-schema/

While the content below is far from perfect (prototyping an idea), it shows how we can use AWS resources to handle any JSON object containing an IP address or domain name.

Our base ingress.event.netconn event from Carbon Black Event Forwarder should look like the following:

Next, let’s create a simple AWS setup to handle a stream of network events, originating from the event forwarder:

  1. Ingress.event.netconn is forwarded to s3 from the event forwarder. (https://developer.carbonblack.com/reference/enterprise-response/event-forwarder)
  2. S3 bucket has an event notification set up to send a message to SQS any time a new object is created in this bucket.
  3. Once the new message is available on the SQS queue, a Lambda function will be executed
  4. Lambda function will read the message off the SQS queue. This message tells Lambda the location of the ingress.event.netconn json in s3. (CBR forwards the messages in groups, so one file should have many netconn JSON items inside)
  5. Lambda function will read each JSON item and perform a GeoIP lookups on the “remote_ip” field using an API lookup (assuming its not in the RFC1918 range). The geoip results are permanently appended to the JSON object.
  6. The JSON is saved to a database such as Mongo/ElasticSearch/DynamoDB

If we review our processed JSON, will now have a new key called “geoip”, as outlined below:

This is a very simple event processing pipeline where network events are forwarded to s3 and flow through this tiny pipeline to eventually have GeoIP data appended to each JSON object. This simple setup should allow you to process significantly more network connections than our previous script. But why stop there? With this pipeline, we could process ANY JSON document that has an IP address and append GeoIP information to it. We could also add more API lookups to help enrich the data further.

Additional Data Enrichment

Let’s say you’re collecting network events for 10 customers. A few additional metrics we created to assist in analysis at scale and on an ongoing basis are:

  • ip_domain_observations
  • ip_domain_reputation
  • intel
  • netconn hashes

ip_domain_observations is a collection of metrics for the IP address or domain categorized in three categories:

These metrics are incremental counters for each ip address over a given span of time (maybe 3–6 months or more). Some example questions you may ask within this dataset could be:

  • How many times have we observed this ip address or domain on the host “WIN-OTEMNUTBS23”?
  • How many times have we observed this ip address or domain at customer01?
  • How many times have we observed this ip address or domain at our financial customers?

We can get more complex by adding in port/protocol/direction into the mix or even track DNS resolutions if you so desire. For this example, we’re keeping it simple. You can see an example of this output below.

Line 29: Observations to date for 23.4.187.27 on host “WIN-OTEMNUTBS23”

Line 30: Observations to date for 23.4.187.27 for the customer “CUST01”

Line 31: Observations to date for 23.4.187.27 for all customers or subset of customers (ex: Financial or Energy sectors)

Ip_domain_reputation is a more focused dataset. While the data is still categorized into the three categories above, the reputation is tracking any alerts/investigations you may have performed that include either the ip address or domain. In our example, let’s say we have two result statuses (reported and false positive) for any alert or investigation performed on the ip address or domain:

Line 43: Internal reputation for the IP address 23.4.187.27 on the host “WIN-OTEMNUTBS23”.

Line 47: Internal reputation for the IP address 23.4.187.27 for the customer “CUST01”.

Line 43: Internal reputation for the IP address 23.4.187.27 for all customers or subset of customers (Financial or Energy sectors).

Intel comes in all shapes and sizes, so for this data, lets just assume you have three different vendors that provide intel on the ip addresses or domains. Your resulting JSON after lookup may look like below:

Netconn hashes is a collection of SHA1 hashes comprised of various parts of the JSON document itself. For this event type ingress.event.netconn, we could create three different hashes by concatenating the following pieces of information together:

  • HASH1: SHA1(remote_ip + port + protocol)
  • HASH2: SHA1(remote_ip + direction)
  • HASH3: SHA1(remote_ip + port + protocol + domain)

Once generated, you can quickly query the database for each hash and increment its value anytime that hash is observed. In the end, your JSON may look like the below:

To add in further context, we could also factor in the three categories mentioned above into the hash. This is just a fun example of how you can mux data together to find anomalies. I suspect hashing various parts of a network isn’t the most efficient thing, but it’s an interesting way to view the data. While off topic, a great example of using a hash to summarize a subset of data would be imphash: https://www.fireeye.com/blog/threat-research/2014/01/tracking-malware-import-hashing.html

To enable this workflow, we can create an API gateway that exposes all these data sources to Lambda, as outlined below:

Notice we have an SQS queue and Lambda function for each API endpoint so we can modularize the pipeline in the future. To better understand how the pipeline looks from an AWS perspective, checkout the image below:

While I’m sure there’s many other ways to build this out, the flow of data through this prototype pipeline is as follows:

  1. Ingress.event.netconn is sent from the event forwarder to your “netconns” s3 bucket. (group of events inside a single file)
  2. After successful transfer, an S3 object is created.
  3. Since an event notification is set up on the s3 bucket, a message about this Ingress.event.netconn object is sent to your SQS queue.
  4. Since a Lambda function is configured as an event source, a Lambda function will be started.
  5. The newly executed Lambda function will read the message from the SQS queue. This message will tell the Lambda function where the netconn object is stored in s3 for processing.
  6. Lambda function will retrieve the referenced JSON object from s3, perform any formatting or pre-lookups then place another message on the “geoip” SQS queue to begin the pipeline. The JSON document will traverse each queue and associated Lambda function. Each Lambda function is assigned a specific API call to use and will append its output to the JSON.
  7. After all Lambda functions have processed the JSON (a single netconn event), the newly enriched JSON document is saved to the database/
  • If the JSON document contains specific fields/tags at the end of processing, it could also be routed to an “alerts” queue for analyst review or further analysis.

Let’s take a look at an example final output after processing:

We can see that our newly processed network event now has 5 new keys:

  • Geoip
  • Observations
  • Netconn_hashes
  • Reputation
  • Intel

Once you get a grasp on how to enrich the event data, you can extend out this capability further by forwarding other raw endpoint events such as:

  • Ingress.event.filemod: Use case: If an attacker creates a staging directory in the ProgramData directory called “Python6”, you could identify any file mods made inside this “Python6” staging directory and kick off file acquisitions to collect these files in real time.
  • Ingress.event.regmod: Use case: If malware is identified creating a specific registry key, you could kick off a command to dump the contents of the new registry key to review the data.

Let’s not stop there! We can also blend in a few other server events as well to perform additional enrichment such as:

  • binaryinfo.observed: Use case: As stated in the CBR documentation, “This event happens when a new binary is observed for the first time anywhere in the environment”. When a new binary is observed, you could automatically download the binary from the CBR instance and perform some additional actions:
  • Toss the sample in a sandbox of your choice, appending key results to the JSON
  • Run the sample through Xori (https://github.com/endgameinc/xori)
  • Perform OSINT intel lookups on hashes/imphash and/or other attributes of the binary such as PE section names, unique strings, compile time or observed locations on disk.

While I hand picked only a subset of event types, the event forwarder supports many other event types outlined here: https://developer.carbonblack.com/reference/enterprise-response/event-forwarder/event-schema/. Prior to enabling all types, I would recommend you turn on one event at a time to assess load and ensure they’re generating value for your team.

Carbon Black also provides its own message bus. If you wish to learn more about this, you can follow the link below:

https://developer.carbonblack.com/reference/enterprise-response/5.1/message-bus-api/

While the content and workflow provided in this post are far from perfect (prototyping is key), I hope the concepts outlined in this post were helpful and parts of it are useful to your team. Happy Hunting!

Acknowledgements

Special thanks to Mike Scutt (@OMGAPT), Jason Garman and the CB team for all the help.

--

--

TStillz

Posting on various topics including incident response, malware analysis, development and finance/investing automation.