How to Build an Automated Recon Pipeline with Python and Luigi - Part II (Port Scanning I)

Jan 22, 2020 | 17 minutes read

Tags: how-to, bug bounty, hack the box, python, recon, luigi

Welcome back! If you found your way here without reading Part I, you may want to start there. This post is part two of a multi-part series demonstrating how to build an automated pipeline for target reconnaissance. The target in question could be the target of a pentest, bug bounty, or capture the flag challenge (shout out to my HTB peoples!). By the end of the series, we’ll have built a functional recon pipeline that can be tailored to fit your own needs.

Previous posts:

Part II will:

  • Add masscan to our pipeline
  • Parse masscan results for further use in the pipeline

Part II’s git tags:

  • stage-1
  • stage-2

To get the repository to the point at which we’ll start, we can run one of the following commands. Which command used depends on if the repository is already present or not.

git clone --branch stage-0
git checkout tags/stage-0


  • Target scope
  • Port scanning I <– this post
  • Port scanning II
  • Subdomain enumeration
  • Web scanning
    • Screenshots
    • Subdomain takeover
    • CORS misconfiguration
    • Forced browsing
    • Tech stack identification
  • Data storage
  • Visualization / reporting
  • Slack integration

Stage 1 - masscan Scanning

If you would like to skip to this point in the code, run the following git command from within the cloned repository: git checkout tags/stage-0

In this post, we’ll add masscan to our pipeline. Because we’ve already covered a lot of the basics during Part I, this post focuses on code more than background.

Let’s begin by adding a new file to our recon module named with the following contents.

 1import logging
 3import luigi
 4from luigi.util import inherits
 5from luigi.contrib.external_program import ExternalProgramTask
 7from recon.targets import TargetList
11class Masscan(ExternalProgramTask):
12    -------------8<-------------

There are a couple of items of interest already in our file. Let’s take a look at line 8 @inherits(TargetList).

inherits Decorator

The inherits decorator is made specifically to avoid what luigi calls parameter explosion. Consider this example from the luigi documentation:

class TaskA(luigi.ExternalTask):
    param_a = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('/tmp/log-{t.param_a}'.format(t=self))

class TaskB(luigi.Task):
    param_b = luigi.Parameter()
    param_a = luigi.Parameter()

    def requires(self):
        return TaskA(param_a=self.param_a)

class TaskC(luigi.Task):
    param_c = luigi.Parameter()
    param_b = luigi.Parameter()
    param_a = luigi.Parameter()

    def requires(self):
        return TaskB(param_b=self.param_b, param_a=self.param_a)

Each successive Task must include the parameters of each previous Task on which it depends. In workflows requiring many Tasks chained together, handling parameters can quickly get out of hand. Each downstream Task becomes more burdensome than the last, and refactoring becomes increasingly difficult. Luigi’s creators came up with inherits as the solution to this particular problem.

The inherits class decorator copies ONLY parameters from one Task class to another and avoids direct inheritance. This approach eliminates the need to repeat Parameters, prevents inheritance issues, and keeps the Task command-line interface as simple as possible. As a result, refactoring Task parameters is much more straightforward when changes are required.


The next piece of our code that we haven’t seen before is the ExternalProgramTask. The ExternalProgramTask is a template Task for running an external program in a subprocess. The program is run using subprocess.Popen. Luigi’s creators provided a simple Task wrapper for running commands as subprocesses. We’ll be using ExternalProgramTasks frequently in our pipeline.

We’ve covered the pieces of code that weren’t familiar up to our class definition, let’s take a look at our Parameters next.

masscan.Masscan Parameters

One of our goals when writing pieces of our pipeline is to reduce the amount of command line arguments required to execute commands. We’ll accomplish this by specifying sane defaults to our Tasks that we’ll override as required. Our target masscan command will look something like the command below:

masscan -v --open-only --banners --rate 1000 -e tun0 -oJ masscan.tesla.json --ports 80,443,22,21 -iL tesla.ips
masscan options used:

        increase verbosity!
        report only open ports, not closed ports.
        instructs masscan to grab banners, ... only a few protocols are supported
    --rate <packets-per-second>
        specifies the desired rate for transmitting packets (packets per second)
    -e <iface>
        use the named raw network interface, such as "eth0"; If not specified, 
        the first network interface found with a default gateway is used
    -oJ <filename>
        Sets the output format to JSON and saves the output in the given filename.
    --top-ports <n_ports>
        Scan top 1000 most popular ports (1000 hard-coded into masscan, more on this later)
    --ports <ports>
        specifies the port(s) to be scanned
    -iL <inputfilename> (nmap option)
        Input from list of hosts/networks

Of the things sent to masscan, -v, --open-only, and --banners can be hard-coded. Additionally, --rate and -e can be programmed with a sane default. We’ll receive the input to -iL from the previous Task; the file name passed to -oJ is based on the same information, meaning we don’t need to ask the user for it. In reality, there are only four Parameters that we need to concern ourselves with to generate the commands above. Let’s write those out.

11class Masscan(ExternalProgramTask):
12    rate = luigi.Parameter(default="1000")
13    interface = luigi.Parameter(default="tun0")
14    top_ports = luigi.IntParameter(default=0)
15    ports = luigi.Parameter(default="")

I’ve used tun0 as my default interface because synack and hack the box both use tun0. Enter whatever interface makes the most sense for you.

There we have our four Parameters with defaults. Next up, we’ll specify our value used in conjunction with -oJ.

15    ports = luigi.Parameter(default="")
17    def __init__(self, *args, **kwargs):
18        super(Masscan, self).__init__(*args, **kwargs)
19        self.masscan_output = f"masscan.{self.target_file}.json"

With that complete, we’re ready to tackle a problem related to --top-ports; let’s check that out now.

top-ports Woes

For better or worse, masscan includes a hard-coded array of integers representing its --top-ports.


config_top_ports(struct Masscan *masscan, unsigned n)
    unsigned i;
    static const unsigned short top_tcp_ports[] = {

This matters for us because the array is only 1000 ports long. Nmap has weights that are greater than 0 for 4262 tcp ports and a lot of udp ports.

grep '/tcp' /usr/share/nmap/nmap-services | grep -v 0.000000 | wc -l


What we want is the ability to include at least up to 4262 tcp ports when specifying the --top-ports option. We’ll do this with a config file. It’s simple and effective, plus, we can remove our hard-coded defaults in our Parameters above and instead specify them in the same config file! Let’s get started.

First, we’ll grab all of the weighted tcp ports with a “simple” bash command. The main steps here are to grep out tcp ports, sort them by weight with the highest value at the top, grab the first 5000 entries, then print them as a comma-separated value.

grep '/tcp' /usr/share/nmap/nmap-services | sort -rk 3,3 | head -5000 | awk '{print $2}' | cut -f 1 -d / | xargs | sed 's/ /,/g'


Let’s create our config file and copy this value into it as a python list.

top_tcp_ports = [80,23,443,21,22,25,3389,110,445,139,143,53...]

And repeat the process for udp ports by changing the value for grep from /tcp to /udp.

top_tcp_ports = [80,23,443,21,22,25,3389,110,445,139,143,53...]
top_udp_ports = [631,161,137,123,138,1434,445,135,67,53,139...]

There we have our top ports saved off in a config file for easy use later on. While we’re on the subject of the config file, let’s update it with our default masscan values.

masscan_config = {
    'iface': 'tun0',
    'rate': '1000',

Let’s also update to make use of our new config file.

8from recon.config import top_tcp_ports, top_udp_ports, masscan_config
12class Masscan(ExternalProgramTask):
13    rate = luigi.Parameter(default=masscan_config.get("rate"))
14    interface = luigi.Parameter(default=masscan_config.get("iface"))
15    top_ports = luigi.IntParameter(default=0)
16    ports = luigi.Parameter(default="")

Excellent; we’ve done a bit of refactoring and solved the --top-ports problem, go team! Let’s check out handling user input next.

Handling Parameters

Alright, let’s take care of our Parameter input handling. We want to allow the use of either --top-ports on the command line or --ports, but not both. A scenario like this is known as a mutually exclusive set of options. The argparse library has a straightforward way of managing mutual exclusivity, but in this case, we’re on our own.

60    def program_args(self):
61        if self.ports and self.top_ports:
62            # can't have both
63            logging.error("Only --ports or --top-ports is permitted, not both.")
64            exit(1)
66        if not self.ports and not self.top_ports:
67            # need at least one
68            logging.error("Must specify either --top-ports or --ports.")
69            exit(2)
71        if self.top_ports < 0:
72            # sanity check
73            logging.error("--top-ports must be greater than 0")
74            exit(3)
76        if self.top_ports:
77            # if --top-ports used, format the top_*_ports lists as strings and then into a proper masscan --ports option
78            top_tcp_ports_str = ",".join(str(x) for x in top_tcp_ports)
79            top_udp_ports_str = ",".join(str(x) for x in top_udp_ports)
81            self.ports = f"{top_tcp_ports_str},U:{top_udp_ports_str}"
82            self.top_ports = ""
84        command = [
85            "masscan",
86            "-v",
87            "--open",
88            "--banners",
89            "--rate",
90            self.rate,
91            "-e",
92            self.interface,
93            "-oJ",
94            self.masscan_output,
95            "--ports",
96            self.ports,
97            "-iL",
98            self.input().get("target_list").path,
99        ]
101        return command

We’re putting the Parameter parsing logic in the program_args method. What we return from this function will ultimately determine what options and arguments get passed into subprocess.Popen. That makes it a great candidate for Parameter processing. There is a reason that we use exit() when we receive erroneous input. ExternalProgramTask’s program_args method can return None, and the pipeline will continue execution. This case is most likely to result in the pipeline’s catastrophic failure. Exiting the program stops execution at that point, which allows luigi to resume execution from this same Task.

All the Rest

The remaining code in our Masscan Task class is pretty trivial. We need to specify dependencies and our Task’s Target (take a look at luigi’s core concepts from Part I if you need a refresher on Targets).

The requires method contains the Tasks on which our Task depends. We can see that this Task relies on the TargetList Task from Part I. Our Task will only run if all the Tasks it requires complete successfully.

40    def requires(self):
41        return TargetList(target_file=self.target_file)

The output method returns the Target output for this Task. The LocalTarget in this case corresponds to the output JSON file generated by masscan.

50    def output(self):
51        return luigi.LocalTarget(self.masscan_output)

Finalized Code

Here’s our finalized Masscan class with comments/docstrings etc…

  1import logging
  3import luigi
  4from luigi.util import inherits
  5from luigi.contrib.external_program import ExternalProgramTask
  7from recon.targets import TargetList
  8from recon.config import top_tcp_ports, top_udp_ports, masscan_config
 12class Masscan(ExternalProgramTask):
 13    """ Run masscan against a target specified via the TargetList Task.
 15    Masscan commands are structured like the example below.  When specified, --top_ports is processed and
 16    then ultimately passed to --ports.
 18    masscan -v --open-only --banners --rate 1000 -e tun0 -oJ masscan.tesla.json --ports 80,443,22,21 -iL tesla.ips
 20    The corresponding luigi command is shown below.
 22    PYTHONPATH=$(pwd) luigi --local-scheduler --module recon.masscan Masscan --target-file tesla --ports 80,443,22,21
 24    Args:
 25        rate: desired rate for transmitting packets (packets per second)
 26        interface: use the named raw network interface, such as "eth0"
 27        top_ports: Scan top N most popular ports
 28        ports: specifies the port(s) to be scanned
 29        target_file: specifies the file on disk containing a list of ips or domains *--* Required by upstream Task
 30    """
 32    rate = luigi.Parameter(default=masscan_config.get("rate"))
 33    interface = luigi.Parameter(default=masscan_config.get("iface"))
 34    top_ports = luigi.IntParameter(default=0)  # IntParameter -> top_ports expected as int
 35    ports = luigi.Parameter(default="")
 37    def __init__(self, *args, **kwargs):
 38        super(Masscan, self).__init__(*args, **kwargs)
 39        self.masscan_output = f"masscan.{self.target_file}.json"
 41    def requires(self):
 42        """ Masscan depends on TargetList to run.
 44        TargetList expects target_file as a parameter.
 46        Returns:
 47            dict(str: TargetList)
 48        """
 49        return {"target_list": TargetList(target_file=self.target_file)}
 51    def output(self):
 52        """ Returns the target output for this task.
 54        Naming convention for the output file is masscan.TARGET_FILE.json.
 56        Returns:
 57            luigi.local_target.LocalTarget
 58        """
 59        return luigi.LocalTarget(self.masscan_output)
 61    def program_args(self):
 62        """ Defines the options/arguments sent to masscan after processing.
 64        Returns:
 65            list: list of options/arguments, beginning with the name of the executable to run
 66        """
 67        if self.ports and self.top_ports:
 68            # can't have both
 69            logging.error("Only --ports or --top-ports is permitted, not both.")
 70            exit(1)
 72        if not self.ports and not self.top_ports:
 73            # need at least one
 74            logging.error("Must specify either --top-ports or --ports.")
 75            exit(2)
 77        if self.top_ports < 0:
 78            # sanity check
 79            logging.error("--top-ports must be greater than 0")
 80            exit(3)
 82        if self.top_ports:
 83            # if --top-ports used, format the top_*_ports lists as strings and then into a proper masscan --ports option
 84            top_tcp_ports_str = ",".join(str(x) for x in top_tcp_ports[: self.top_ports])
 85            top_udp_ports_str = ",".join(str(x) for x in top_udp_ports[: self.top_ports])
 87            self.ports = f"{top_tcp_ports_str},U:{top_udp_ports_str}"
 88            self.top_ports = 0
 90        command = [
 91            "masscan",
 92            "-v",
 93            "--open",
 94            "--banners",
 95            "--rate",
 96            self.rate,
 97            "-e",
 98            self.interface,
 99            "-oJ",
100            self.masscan_output,
101            "--ports",
102            self.ports,
103            "-iL",
104            self.input().get("target_list").path,
105        ]
107        return command

Also, here’s a depiction of our repository’s directory.

├── Pipfile
├── Pipfile.lock
└── recon

Stage 2 - Processing masscan Output

If you would like to skip to this point in the code, run the following git command from within the cloned repository: git checkout tags/stage-1


With masscan execution complete, we now need to process the results. Our goal in this section is to take the ports identified as open and pass them along to a targeted nmap scan. Unfortunately, nmap doesn’t know how to interpret masscan generated results. That’s where this Task comes into play. We’ll parse masscan’s JSON output and store it in a pickled object for later use. Let’s begin.

We’ll start with our class definition.

113class ParseMasscanOutput(luigi.Task):
114    -------------8<-------------

We’ve already seen inherits; we’re using it to copy in the Parameters from our Masscan class and our TargetList class (discussed above). There’s only one new item here, luigi.Task. Even though Tasks are luigi’s bread and butter, and were covered in Part I, we’ve only used sub-classes of Tasks so far. We’ll see some of the differences associated with using a Task shortly.

Next, we’ll look at the requires function.

122    def requires(self):
123        args = {
124            "rate": self.rate,
125            "target_file": self.target_file,
126            "top_ports": self.top_ports,
127            "interface": self.interface,
128            "ports": self.ports,
129        }
130        return Masscan(**args)

All we’re saying with this function is that the Masscan Task must run before initiating this Task. Additionally, the Parameters that we copied via inherits are passed to the Masscan Task via dictionary unpacking.

Now for the output function.

139    def output(self):
140        return luigi.LocalTarget(f"masscan.{self.target_file}.parsed.pickle")

A simple function, we’re specifying the naming convention of our pickled object. All future Tasks that rely on this one will expect this file to be present on the filesystem. If it’s missing, the immediate downstream Task will fail, breaking the pipeline.

Storing Our Results

The meat of our class resides in the run function. As I’m sure you’ve guessed, the run function is where the business logic of our Task lies. In our case, we need to perform two primary actions: parse JSON and save it for later. run is the function where we implement that logic.

149    def run(self):
150        ip_dict = defaultdict(lambda: defaultdict(set))  # nested defaultdict
151        -------------8<-------------

ip_dict is the data structure we’ll use to store the results of parsing masscan’s JSON output. We’re using a nested defaultdict to store our results. A defaultdict acts like an ordinary python dictionary, but we initialize it with a function known as a default factory. The default factory takes no arguments but provides the default value for a nonexistent key.

When we use defaultdict, instead of a code pattern like this

1d = {}
3if "k" not in d:
4    d["k"] = list()

We can write code like this.

1d = defaultdict(list)

It’s a simple improvement that is more concise and less error-prone than the first example since every key is automatically initialized. That means that we get the added benefit of not worrying about a defaultdict raising a KeyError exception.

Our use of defaultdict is slightly more complicated than what’s above, but here’s how it will work.

>>> from pprint import pprint
>>> from collections import defaultdict
>>> d = defaultdict(lambda: defaultdict(set))
>>> d['']['tcp'].add(80)
>>> d['']['udp'].add(53)
>>> d['']['tcp'].add(445)
>>> pprint(d)
>>> defaultdict(<function <lambda> at 0x7f5629e44440>,
            {'': defaultdict(<class 'set'>, {'tcp': {445}}),
             '': defaultdict(<class 'set'>,
                                      {'tcp': {80},
                                       'udp': {53}})})

The top-level keys will be initialized with an empty defaultdict(set) as their values. These top-level keys will be our ip addresses. Each defaultdict(set) underneath the top-level will correspond to a protocol. The set within the sub-level defaultdict will store open ports. We use a set as our container for ports because, by definition, a set is an unordered collection of unique values. So, if for any reason we see the same port/protocol while parsing, it won’t result in additional overhead for the rest of the pipeline.

The snippet below attempts to load masscan’s JSON file. self.input() corresponds to the return value from the Masscan Task’s output function. If an exception is thrown, we’ll print the exception and return None. When we return at this point due to an exception, we will not have created the pickle file, so the pipeline will rerun this task when rerun.

153        try:
154            entries = json.load(self.input().open())  
155        except json.decoder.JSONDecodeError as e:
156            return print(e)

After we’ve loaded the JSON file, we begin our loop over its contents. Here’s an example JSON file produced by masscan.

{   "ip": "",   "timestamp": "1568233934", "ports": [ {"port": 53, "proto": "tcp", "status": "open", "reason": "syn-ack", "ttl": 63} ] }
{   "ip": "",   "timestamp": "1568233935", "ports": [ {"port": 21, "proto": "tcp", "status": "open", "reason": "syn-ack", "ttl": 63} ] }
{   "ip": "",   "timestamp": "1568233935", "ports": [ {"port": 53, "proto": "udp", "status": "open", "reason": "none", "ttl": 0} ] }
{   "ip": "",   "timestamp": "1568233936", "ports": [ {"port": 22, "proto": "tcp", "status": "open", "reason": "syn-ack", "ttl": 63} ] }

As we loop through, we grab the ip address first. From there, we snag the list of ports and iterate over it. In doing so, we make a note of the protocol. With all of that information, we can add the entry to ip_dict.

160        for entry in entries:
161            single_target_ip = entry.get("ip")
162            for port_entry in entry.get("ports"):
163                protocol = port_entry.get("proto")
164                ip_dict[single_target_ip][protocol].add(str(port_entry.get("port")))

Here is what our ip_dict looks like after having parsed the example above.

{'': defaultdict(<class 'set'>, {'tcp': {'21', '22', '53'}, 'udp': {'53'}})}

Our final step is to write ip_dict to disk as a pickled object.

166        with open(self.output().path, "wb") as f:
167            pickle.dump(dict(ip_dict), f)

Finalized Code

Thus completes our ParseMasscanOutput class; here it is in its entirety with accompanying comments et al.

114class ParseMasscanOutput(luigi.Task):
115    """ Read masscan JSON results and create a pickled dictionary of pertinent information for processing.
117    Args:
118        rate: desired rate for transmitting packets (packets per second) *--* Required by upstream Task
119        interface: use the named raw network interface, such as "eth0" *--* Required by upstream Task
120        top_ports: Scan top N most popular ports *--* Required by upstream Task
121        ports: specifies the port(s) to be scanned *--* Required by upstream Task
122        target_file: specifies the file on disk containing a list of ips or domains *--* Required by upstream Task
123    """
125    def requires(self):
126        """ ParseMasscanOutput depends on Masscan to run.
128        Masscan expects rate, target_file, interface, and either ports or top_ports as parameters.
130        Returns:
131            luigi.Task - Masscan
132        """
133        args = {
134            "rate": self.rate,
135            "target_file": self.target_file,
136            "top_ports": self.top_ports,
137            "interface": self.interface,
138            "ports": self.ports,
139        }
140        return Masscan(**args)
142    def output(self):
143        """ Returns the target output for this task.
145        Naming convention for the output file is masscan.TARGET_FILE.parsed.pickle.
147        Returns:
148            luigi.local_target.LocalTarget
149        """
150        return luigi.LocalTarget(f"masscan.{self.target_file}.parsed.pickle")
152    def run(self):
153        """ Reads masscan JSON results and creates a pickled dictionary of pertinent information for processing. """
154        ip_dict = defaultdict(lambda: defaultdict(set))  # nested defaultdict
156        try:
157            entries = json.load(self.input().open())  # load masscan results from Masscan Task
158        except json.decoder.JSONDecodeError as e:
159            # return on exception; no output file created; pipeline should start again from
160            # this task if restarted because we never hit pickle.dump
161            return print(e)
163        """
164        build out ip_dictionary from the loaded JSON
166        masscan JSON structure over which we're looping
167        [
168        {   "ip": "",   "timestamp": "1567856130", "ports": [ {"port": 22, "proto": "tcp", "status": "open", "reason": "syn-ack", "ttl": 63} ] }
169        ,
170        {   "ip": "",   "timestamp": "1567856130", "ports": [ {"port": 80, "proto": "tcp", "status": "open", "reason": "syn-ack", "ttl": 63} ] }
171        ]
173        ip_dictionary structure that is built out from each JSON entry 
174        {
175            "IP_ADDRESS":
176                {'udp': {"161", "5000", ... },
177                ... 
178                i.e. {protocol: set(ports) }
179        }
180        """
181        for entry in entries:
182            single_target_ip = entry.get("ip")
183            for port_entry in entry.get("ports"):
184                protocol = port_entry.get("proto")
185                ip_dict[single_target_ip][protocol].add(str(port_entry.get("port")))
187        with open(self.output().path, "wb") as f:
188            pickle.dump(dict(ip_dict), f)

Finally, we can test out our new addition to the pipeline with the following command. The command below isn’t one that we’re likely ever to run directly, but may be useful for testing the current codebase.

PYTHONPATH=$(pwd) luigi --local-scheduler --module recon.masscan ParseMasscanOutput --target-file scavenger --top-ports 1000

That wraps things up for this post. In the next installment, we’ll take a look at integrating nmap into our pipeline!

Additional Resources

  1. Luigi - ExternalProgramTask
  2. Luigi - inherits and requires
  3. masscan
  4. defaultdict

comments powered by Disqus