File: //usr/bin/firewall-cmd
#!/usr/bin/python3
# SPDX-License-Identifier: GPL-2.0-or-later
#
# Copyright (C) 2009-2016 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
# Jiri Popelka <jpopelka@redhat.com>
from gi.repository import GObject
import sys
sys.modules["gobject"] = GObject
import argparse
import os
from firewall.client import (
FirewallClient,
FirewallClientIPSetSettings,
FirewallClientZoneSettings,
FirewallClientServiceSettings,
FirewallClientIcmpTypeSettings,
FirewallClientHelperSettings,
FirewallClientPolicySettings,
)
from firewall.errors import FirewallError
from firewall import errors
from firewall.functions import joinArgs, splitArgs, getPortRange
from firewall.core.fw_nm import (
nm_is_imported,
nm_get_connection_of_interface,
nm_get_zone_of_connection,
nm_set_zone_of_connection,
nm_get_interfaces_in_zone,
)
from firewall.core.io.zone import zone_reader
from firewall.core.io.policy import policy_reader
from firewall.core.io.service import service_reader
from firewall.core.io.ipset import ipset_reader
from firewall.core.io.icmptype import icmptype_reader
from firewall.core.io.helper import helper_reader
from firewall.command import FirewallCommand
def __usage():
sys.stdout.write(
"""
Usage: firewall-cmd [OPTIONS...]
General Options
-h, --help Prints a short help text and exits
-V, --version Print the version string of firewalld
-q, --quiet Do not print status messages
Status Options
--state Return and print firewalld state
--reload Reload firewall and keep state information
--complete-reload Reload firewall and lose state information
--runtime-to-permanent
Create permanent from runtime configuration
--reset-to-defaults
Reset configuration to firewalld's default configuration
--check-config Check permanent configuration for errors
Log Denied Options
--get-log-denied Print the log denied value
--set-log-denied=<value>
Set log denied value
Permanent Options
--permanent Set an option permanently
Usable for options marked with [P]
Zone Options
--get-default-zone Print default zone for connections and interfaces
--set-default-zone=<zone>
Set default zone
--get-active-zones Print currently active zones
--get-zones Print predefined zones [P]
--get-services Print predefined services [P]
--get-icmptypes Print predefined icmptypes [P]
--get-zone-of-interface=<interface>
Print name of the zone the interface is bound to [P]
--get-zone-of-source=<source>[/<mask>]|<MAC>|ipset:<ipset>
Print name of the zone the source is bound to [P]
--list-all-zones List everything added for or enabled in all zones [P]
--new-zone=<zone> Add a new zone [P only]
--new-zone-from-file=<filename> [--name=<zone>]
Add a new zone from file with optional name [P only]
--delete-zone=<zone> Delete an existing zone [P only]
--load-zone-defaults=<zone>
Load zone default settings [P only]
--zone=<zone> Use this zone to set or query options, else default zone
Usable for options marked with [Z]
--info-zone=<zone> Print information about a zone
--path-zone=<zone> Print file path of a zone [P only]
Policy Options
--get-policies Print predefined policies
--get-active-policies
Print currently active policies
--list-all-policies List everything added for or enabled in all policies
--new-policy=<policy>
Add a new empty policy
--new-policy-from-file=<filename> [--name=<policy>]
Add a new policy from file with optional name override [P only]
--delete-policy=<policy>
Delete an existing policy
--load-policy-defaults=<policy>
Load policy default settings
--policy=<policy> Use this policy to set or query options
Usable for options marked with [O]
--info-policy=<policy>
Print information about a policy
--path-policy=<policy>
Print file path of a policy
IPSet Options
--get-ipset-types Print the supported ipset types
--new-ipset=<ipset> --type=<ipset type> [--option=<key>[=<value>]]..
Add a new ipset [P only]
--new-ipset-from-file=<filename> [--name=<ipset>]
Add a new ipset from file with optional name [P only]
--delete-ipset=<ipset>
Delete an existing ipset [P only]
--load-ipset-defaults=<ipset>
Load ipset default settings [P only]
--info-ipset=<ipset> Print information about an ipset
--path-ipset=<ipset> Print file path of an ipset [P only]
--get-ipsets Print predefined ipsets
--ipset=<ipset> --set-description=<description>
Set new description to ipset [P only]
--ipset=<ipset> --get-description
Print description for ipset [P only]
--ipset=<ipset> --set-short=<description>
Set new short description to ipset [P only]
--ipset=<ipset> --get-short
Print short description for ipset [P only]
--ipset=<ipset> --add-entry=<entry>
Add a new entry to an ipset [P]
--ipset=<ipset> --remove-entry=<entry>
Remove an entry from an ipset [P]
--ipset=<ipset> --query-entry=<entry>
Return whether ipset has an entry [P]
--ipset=<ipset> --get-entries
List entries of an ipset [P]
--ipset=<ipset> --add-entries-from-file=<entry>
Add a new entries to an ipset [P]
--ipset=<ipset> --remove-entries-from-file=<entry>
Remove entries from an ipset [P]
IcmpType Options
--new-icmptype=<icmptype>
Add a new icmptype [P only]
--new-icmptype-from-file=<filename> [--name=<icmptype>]
Add a new icmptype from file with optional name [P only]
--delete-icmptype=<icmptype>
Delete an existing icmptype [P only]
--load-icmptype-defaults=<icmptype>
Load icmptype default settings [P only]
--info-icmptype=<icmptype>
Print information about an icmptype
--path-icmptype=<icmptype>
Print file path of an icmptype [P only]
--icmptype=<icmptype> --set-description=<description>
Set new description to icmptype [P only]
--icmptype=<icmptype> --get-description
Print description for icmptype [P only]
--icmptype=<icmptype> --set-short=<description>
Set new short description to icmptype [P only]
--icmptype=<icmptype> --get-short
Print short description for icmptype [P only]
--icmptype=<icmptype> --add-destination=<ipv>
Enable destination for ipv in icmptype [P only]
--icmptype=<icmptype> --remove-destination=<ipv>
Disable destination for ipv in icmptype [P only]
--icmptype=<icmptype> --query-destination=<ipv>
Return whether destination ipv is enabled in icmptype [P only]
--icmptype=<icmptype> --get-destinations
List destinations in icmptype [P only]
Service Options
--new-service=<service>
Add a new service [P only]
--new-service-from-file=<filename> [--name=<service>]
Add a new service from file with optional name [P only]
--delete-service=<service>
Delete an existing service [P only]
--load-service-defaults=<service>
Load icmptype default settings [P only]
--info-service=<service>
Print information about a service
--path-service=<service>
Print file path of a service [P only]
--service=<service> --set-description=<description>
Set new description to service [P only]
--service=<service> --get-description
Print description for service [P only]
--service=<service> --set-short=<description>
Set new short description to service [P only]
--service=<service> --get-short
Print short description for service [P only]
--service=<service> --add-port=<portid>[-<portid>]/<protocol>
Add a new port to service [P only]
--service=<service> --remove-port=<portid>[-<portid>]/<protocol>
Remove a port from service [P only]
--service=<service> --query-port=<portid>[-<portid>]/<protocol>
Return whether the port has been added for service [P only]
--service=<service> --get-ports
List ports of service [P only]
--service=<service> --add-protocol=<protocol>
Add a new protocol to service [P only]
--service=<service> --remove-protocol=<protocol>
Remove a protocol from service [P only]
--service=<service> --query-protocol=<protocol>
Return whether the protocol has been added for service [P only]
--service=<service> --get-protocols
List protocols of service [P only]
--service=<service> --add-source-port=<portid>[-<portid>]/<protocol>
Add a new source port to service [P only]
--service=<service> --remove-source-port=<portid>[-<portid>]/<protocol>
Remove a source port from service [P only]
--service=<service> --query-source-port=<portid>[-<portid>]/<protocol>
Return whether the source port has been added for service [P only]
--service=<service> --get-source-ports
List source ports of service [P only]
--service=<service> --add-helper=<helper>
Add a new helper to service [P only]
--service=<service> --remove-helper=<helper>
Remove a helper from service [P only]
--service=<service> --query-helper=<helper>
Return whether the helper has been added for service [P only]
--service=<service> --get-service-helpers
List helpers of service [P only]
--service=<service> --set-destination=<ipv>:<address>[/<mask>]
Set destination for ipv to address in service [P only]
--service=<service> --remove-destination=<ipv>
Disable destination for ipv i service [P only]
--service=<service> --query-destination=<ipv>:<address>[/<mask>]
Return whether destination ipv is set for service [P only]
--service=<service> --get-destinations
List destinations in service [P only]
--service=<service> --add-include=<service>
Add a new include to service [P only]
--service=<service> --remove-include=<service>
Remove a include from service [P only]
--service=<service> --query-include=<service>
Return whether the include has been added for service [P only]
--service=<service> --get-includes
List includes of service [P only]
Options to Adapt and Query Zones and Policies
--list-all List everything added for or enabled [P] [Z] [O]
--timeout=<timeval> Enable an option for timeval time, where timeval is
a number followed by one of letters 's' or 'm' or 'h'
Usable for options marked with [T]
--set-description=<description>
Set new description [P only] [Z] [O]
--get-description Print description [P only] [Z] [O]
--get-target Get the target [P only] [Z] [O]
--set-target=<target>
Set the target [P only] [Z] [O]
--set-short=<description>
Set new short description [Z] [O]
--get-short Print short description [P only] [Z] [O]
--list-services List services added [P] [Z]
--add-service=<service>
Add a service [P] [Z] [O] [T]
--remove-service=<service>
Remove a service [P] [Z] [O]
--query-service=<service>
Return whether service has been added [P] [Z] [O]
--list-ports List ports added [P] [Z] [O]
--add-port=<portid>[-<portid>]/<protocol>
Add the port [P] [Z] [O] [T]
--remove-port=<portid>[-<portid>]/<protocol>
Remove the port [P] [Z] [O]
--query-port=<portid>[-<portid>]/<protocol>
Return whether the port has been added [P] [Z] [O]
--list-protocols List protocols added [P] [Z] [O]
--add-protocol=<protocol>
Add the protocol [P] [Z] [O] [T]
--remove-protocol=<protocol>
Remove the protocol [P] [Z] [O]
--query-protocol=<protocol>
Return whether the protocol has been added [P] [Z] [O]
--list-source-ports List source ports added [P] [Z] [O]
--add-source-port=<portid>[-<portid>]/<protocol>
Add the source port [P] [Z] [O] [T]
--remove-source-port=<portid>[-<portid>]/<protocol>
Remove the source port [P] [Z] [O]
--query-source-port=<portid>[-<portid>]/<protocol>
Return whether the source port has been added [P] [Z] [O]
--list-icmp-blocks List Internet ICMP type blocks added [P] [Z] [O]
--add-icmp-block=<icmptype>
Add an ICMP block [P] [Z] [O] [T]
--remove-icmp-block=<icmptype>
Remove the ICMP block [P] [Z] [O]
--query-icmp-block=<icmptype>
Return whether an ICMP block has been added [P] [Z] [O]
--list-forward-ports List IPv4 forward ports added [P] [Z] [O]
--add-forward-port=port=<portid>[-<portid>]:proto=<protocol>[:toport=<portid>[-<portid>]][:toaddr=<address>[/<mask>]]
Add the IPv4 forward port [P] [Z] [O] [T]
--remove-forward-port=port=<portid>[-<portid>]:proto=<protocol>[:toport=<portid>[-<portid>]][:toaddr=<address>[/<mask>]]
Remove the IPv4 forward port [P] [Z] [O]
--query-forward-port=port=<portid>[-<portid>]:proto=<protocol>[:toport=<portid>[-<portid>]][:toaddr=<address>[/<mask>]]
Return whether the IPv4 forward port has been added [P] [Z] [O]
--add-masquerade Enable IPv4 masquerade [P] [Z] [O] [T]
--remove-masquerade Disable IPv4 masquerade [P] [Z] [O]
--query-masquerade Return whether IPv4 masquerading has been enabled [P] [Z] [O]
--list-rich-rules List rich language rules added [P] [Z] [O]
--add-rich-rule=<rule>
Add rich language rule 'rule' [P] [Z] [O] [T]
--remove-rich-rule=<rule>
Remove rich language rule 'rule' [P] [Z] [O]
--query-rich-rule=<rule>
Return whether a rich language rule 'rule' has been
added [P] [Z] [O]
Options to Adapt and Query Zones
--add-icmp-block-inversion
Enable inversion of icmp blocks for a zone [P] [Z]
--remove-icmp-block-inversion
Disable inversion of icmp blocks for a zone [P] [Z]
--query-icmp-block-inversion
Return whether inversion of icmp blocks has been enabled
for a zone [P] [Z]
--add-forward Enable forwarding of packets between interfaces and
sources in a zone [P] [Z] [T]
--remove-forward Disable forwarding of packets between interfaces and
sources in a zone [P] [Z]
--query-forward Return whether forwarding of packets between interfaces
and sources has been enabled for a zone [P] [Z]
--get-priority Get the priority [P only] [Z]
--set-priority=<priority>
Set the priority [P only] [Z]
--get-ingress-priority
Get the ingress priority [P only] [Z]
--set-ingress-priority=<priority>
Set the ingress priority [P only] [Z]
--get-egress-priority
Get the egress priority [P only] [Z]
--set-egress-priority=<priority>
Set the egress priority [P only] [Z]
Options to Adapt and Query Policies
--get-priority Get the priority [P only] [O]
--set-priority=<priority>
Set the priority [P only] [O]
--list-ingress-zones
List ingress zones that are bound to a policy [P] [O]
--add-ingress-zone=<zone>
Add the ingress zone to a policy [P] [O]
--remove-ingress-zone=<zone>
Remove the ingress zone from a policy [P] [O]
--query-ingress-zone=<zone>
Query whether the ingress zone has been adedd to a
policy [P] [O]
--list-egress-zones
List egress zones that are bound to a policy [P] [O]
--add-egress-zone=<zone>
Add the egress zone to a policy [P] [O]
--remove-egress-zone=<zone>
Remove the egress zone from a policy [P] [O]
--query-egress-zone=<zone>
Query whether the egress zone has been adedd to a
policy [P] [O]
Options to Handle Bindings of Interfaces
--list-interfaces List interfaces that are bound to a zone [P] [Z]
--add-interface=<interface>
Bind the <interface> to a zone [P] [Z]
--change-interface=<interface>
Change zone the <interface> is bound to [P] [Z]
--query-interface=<interface>
Query whether <interface> is bound to a zone [P] [Z]
--remove-interface=<interface>
Remove binding of <interface> from a zone [P] [Z]
Options to Handle Bindings of Sources
--list-sources List sources that are bound to a zone [P] [Z]
--add-source=<source>[/<mask>]|<MAC>|ipset:<ipset>
Bind the source to a zone [P] [Z]
--change-source=<source>[/<mask>]|<MAC>|ipset:<ipset>
Change zone the source is bound to [Z]
--query-source=<source>[/<mask>]|<MAC>|ipset:<ipset>
Query whether the source is bound to a zone [P] [Z]
--remove-source=<source>[/<mask>]|<MAC>|ipset:<ipset>
Remove binding of the source from a zone [P] [Z]
Helper Options
--new-helper=<helper> --module=<module> [--family=<family>]
Add a new helper [P only]
--new-helper-from-file=<filename> [--name=<helper>]
Add a new helper from file with optional name [P only]
--delete-helper=<helper>
Delete an existing helper [P only]
--load-helper-defaults=<helper>
Load helper default settings [P only]
--info-helper=<helper> Print information about an helper
--path-helper=<helper> Print file path of an helper [P only]
--get-helpers Print predefined helpers
--helper=<helper> --set-description=<description>
Set new description to helper [P only]
--helper=<helper> --get-description
Print description for helper [P only]
--helper=<helper> --set-short=<description>
Set new short description to helper [P only]
--helper=<helper> --get-short
Print short description for helper [P only]
--helper=<helper> --add-port=<portid>[-<portid>]/<protocol>
Add a new port to helper [P only]
--helper=<helper> --remove-port=<portid>[-<portid>]/<protocol>
Remove a port from helper [P only]
--helper=<helper> --query-port=<portid>[-<portid>]/<protocol>
Return whether the port has been added for helper [P only]
--helper=<helper> --get-ports
List ports of helper [P only]
--helper=<helper> --set-module=<module>
Set module to helper [P only]
--helper=<helper> --get-module
Get module from helper [P only]
--helper=<helper> --set-family={ipv4|ipv6|}
Set family for helper [P only]
--helper=<helper> --get-family
Get module from helper [P only]
Direct Options
--direct First option for all direct options
--get-all-chains
Get all chains [P]
--get-chains {ipv4|ipv6|eb} <table>
Get all chains added to the table [P]
--add-chain {ipv4|ipv6|eb} <table> <chain>
Add a new chain to the table [P]
--remove-chain {ipv4|ipv6|eb} <table> <chain>
Remove the chain from the table [P]
--query-chain {ipv4|ipv6|eb} <table> <chain>
Return whether the chain has been added to the table [P]
--get-all-rules
Get all rules [P]
--get-rules {ipv4|ipv6|eb} <table> <chain>
Get all rules added to chain in table [P]
--add-rule {ipv4|ipv6|eb} <table> <chain> <priority> <arg>...
Add rule to chain in table [P]
--remove-rule {ipv4|ipv6|eb} <table> <chain> <priority> <arg>...
Remove rule with priority from chain in table [P]
--remove-rules {ipv4|ipv6|eb} <table> <chain>
Remove rules from chain in table [P]
--query-rule {ipv4|ipv6|eb} <table> <chain> <priority> <arg>...
Return whether a rule with priority has been added to
chain in table [P]
--passthrough {ipv4|ipv6|eb} <arg>...
Pass a command through (untracked by firewalld)
--get-all-passthroughs
Get all tracked passthrough rules [P]
--get-passthroughs {ipv4|ipv6|eb} <arg>...
Get tracked passthrough rules [P]
--add-passthrough {ipv4|ipv6|eb} <arg>...
Add a new tracked passthrough rule [P]
--remove-passthrough {ipv4|ipv6|eb} <arg>...
Remove a tracked passthrough rule [P]
--query-passthrough {ipv4|ipv6|eb} <arg>...
Return whether the tracked passthrough rule has been
added [P]
Lockdown Options
--lockdown-on Enable lockdown.
--lockdown-off Disable lockdown.
--query-lockdown Query whether lockdown is enabled
Lockdown Whitelist Options
--list-lockdown-whitelist-commands
List all command lines that are on the whitelist [P]
--add-lockdown-whitelist-command=<command>
Add the command to the whitelist [P]
--remove-lockdown-whitelist-command=<command>
Remove the command from the whitelist [P]
--query-lockdown-whitelist-command=<command>
Query whether the command is on the whitelist [P]
--list-lockdown-whitelist-contexts
List all contexts that are on the whitelist [P]
--add-lockdown-whitelist-context=<context>
Add the context context to the whitelist [P]
--remove-lockdown-whitelist-context=<context>
Remove the context from the whitelist [P]
--query-lockdown-whitelist-context=<context>
Query whether the context is on the whitelist [P]
--list-lockdown-whitelist-uids
List all user ids that are on the whitelist [P]
--add-lockdown-whitelist-uid=<uid>
Add the user id uid to the whitelist [P]
--remove-lockdown-whitelist-uid=<uid>
Remove the user id uid from the whitelist [P]
--query-lockdown-whitelist-uid=<uid>
Query whether the user id uid is on the whitelist [P]
--list-lockdown-whitelist-users
List all user names that are on the whitelist [P]
--add-lockdown-whitelist-user=<user>
Add the user name user to the whitelist [P]
--remove-lockdown-whitelist-user=<user>
Remove the user name user from the whitelist [P]
--query-lockdown-whitelist-user=<user>
Query whether the user name user is on the whitelist [P]
Panic Options
--panic-on Enable panic mode
--panic-off Disable panic mode
--query-panic Query whether panic mode is enabled
"""
)
def try_set_zone_of_interface(_zone, interface):
if nm_is_imported():
try:
connection = nm_get_connection_of_interface(interface)
except Exception:
pass
else:
if connection is not None:
if _zone == nm_get_zone_of_connection(connection):
if _zone == "":
cmd.print_warning(
"The interface is under control of NetworkManager and already bound to the default zone"
)
else:
cmd.print_warning(
"The interface is under control of NetworkManager and already bound to '%s'"
% _zone
)
if _zone == "":
cmd.print_msg(
"The interface is under control of NetworkManager, setting zone to default."
)
else:
cmd.print_msg(
"The interface is under control of NetworkManager, setting zone to '%s'."
% _zone
)
nm_set_zone_of_connection(_zone, connection)
return True
return False
def try_get_zone_of_interface(interface):
if nm_is_imported():
try:
connection = nm_get_connection_of_interface(interface)
except Exception:
pass
else:
if connection is not None:
return nm_get_zone_of_connection(connection)
return False
def try_nm_get_interfaces_in_zone(zone):
if nm_is_imported():
try:
return nm_get_interfaces_in_zone(zone)
except Exception:
pass
return []
parser = argparse.ArgumentParser(
usage="'firewall-cmd --help' for usage information or see firewall-cmd(1) man page",
add_help=False,
)
parser_group_output = parser.add_mutually_exclusive_group()
parser_group_output.add_argument("-v", "--verbose", action="store_true")
parser_group_output.add_argument("-q", "--quiet", action="store_true")
parser_group_standalone = parser.add_mutually_exclusive_group()
parser_group_standalone.add_argument("-h", "--help", action="store_true")
parser_group_standalone.add_argument("-V", "--version", action="store_true")
parser_group_standalone.add_argument("--state", action="store_true")
parser_group_standalone.add_argument("--reload", action="store_true")
parser_group_standalone.add_argument("--complete-reload", action="store_true")
parser_group_standalone.add_argument("--runtime-to-permanent", action="store_true")
parser_group_standalone.add_argument("--reset-to-defaults", action="store_true")
parser_group_standalone.add_argument("--check-config", action="store_true")
parser_group_standalone.add_argument("--get-ipset-types", action="store_true")
parser_group_standalone.add_argument("--get-log-denied", action="store_true")
parser_group_standalone.add_argument("--set-log-denied", metavar="<value>")
parser_group_standalone.add_argument("--get-automatic-helpers", action="store_true")
parser_group_standalone.add_argument("--set-automatic-helpers", metavar="<value>")
parser_group_standalone.add_argument("--panic-on", action="store_true")
parser_group_standalone.add_argument("--panic-off", action="store_true")
parser_group_standalone.add_argument("--query-panic", action="store_true")
parser_group_standalone.add_argument("--lockdown-on", action="store_true")
parser_group_standalone.add_argument("--lockdown-off", action="store_true")
parser_group_standalone.add_argument("--query-lockdown", action="store_true")
parser_group_standalone.add_argument("--get-default-zone", action="store_true")
parser_group_standalone.add_argument("--set-default-zone", metavar="<zone>")
parser_group_standalone.add_argument("--get-zones", action="store_true")
parser_group_standalone.add_argument("--get-policies", action="store_true")
parser_group_standalone.add_argument("--get-services", action="store_true")
parser_group_standalone.add_argument("--get-icmptypes", action="store_true")
parser_group_standalone.add_argument("--get-active-zones", action="store_true")
parser_group_standalone.add_argument("--get-active-policies", action="store_true")
parser_group_standalone.add_argument(
"--get-zone-of-interface", metavar="<iface>", action="append"
)
parser_group_standalone.add_argument(
"--get-zone-of-source", metavar="<source>", action="append"
)
parser_group_standalone.add_argument("--list-all-zones", action="store_true")
parser_group_standalone.add_argument("--list-all-policies", action="store_true")
parser_group_standalone.add_argument("--info-zone", metavar="<zone>")
parser_group_standalone.add_argument("--info-policy", metavar="<policy>")
parser_group_standalone.add_argument("--info-service", metavar="<service>")
parser_group_standalone.add_argument("--info-icmptype", metavar="<icmptype>")
parser_group_standalone.add_argument("--info-ipset", metavar="<ipset>")
parser_group_standalone.add_argument("--info-helper", metavar="<helper>")
parser_group_config = parser.add_mutually_exclusive_group()
parser_group_config.add_argument("--new-icmptype", metavar="<icmptype>")
parser_group_config.add_argument("--new-icmptype-from-file", metavar="<filename>")
parser_group_config.add_argument("--delete-icmptype", metavar="<icmptype>")
parser_group_config.add_argument("--load-icmptype-defaults", metavar="<icmptype>")
parser_group_config.add_argument("--new-service", metavar="<service>")
parser_group_config.add_argument("--new-service-from-file", metavar="<filename>")
parser_group_config.add_argument("--delete-service", metavar="<service>")
parser_group_config.add_argument("--load-service-defaults", metavar="<service>")
parser_group_config.add_argument("--new-zone", metavar="<zone>")
parser_group_config.add_argument("--new-zone-from-file", metavar="<filename>")
parser_group_config.add_argument("--delete-zone", metavar="<zone>")
parser_group_config.add_argument("--load-zone-defaults", metavar="<zone>")
parser_group_config.add_argument("--new-policy", metavar="<policy>")
parser_group_config.add_argument("--new-policy-from-file", metavar="<filename>")
parser_group_config.add_argument("--delete-policy", metavar="<policy>")
parser_group_config.add_argument("--load-policy-defaults", metavar="<policy>")
parser_group_config.add_argument("--new-ipset", metavar="<ipset>")
parser_group_config.add_argument("--new-ipset-from-file", metavar="<filename>")
parser_group_config.add_argument("--delete-ipset", metavar="<ipset>")
parser_group_config.add_argument("--load-ipset-defaults", metavar="<ipset>")
parser_group_config.add_argument("--new-helper", metavar="<helper>")
parser_group_config.add_argument("--new-helper-from-file", metavar="<filename>")
parser_group_config.add_argument("--delete-helper", metavar="<helper>")
parser_group_config.add_argument("--load-helper-defaults", metavar="<helper>")
parser_group_config.add_argument("--path-zone", metavar="<zone>")
parser_group_config.add_argument("--path-policy", metavar="<policy>")
parser_group_config.add_argument("--path-service", metavar="<service>")
parser_group_config.add_argument("--path-icmptype", metavar="<icmptype>")
parser_group_config.add_argument("--path-ipset", metavar="<ipset>")
parser_group_config.add_argument("--path-helper", metavar="<helper>")
parser.add_argument("--name", default="", metavar="<name>")
parser_group_lockdown_whitelist = parser.add_mutually_exclusive_group()
parser_group_lockdown_whitelist.add_argument(
"--list-lockdown-whitelist-commands", action="store_true"
)
parser_group_lockdown_whitelist.add_argument(
"--add-lockdown-whitelist-command", metavar="<command>", action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--remove-lockdown-whitelist-command", metavar="<command>", action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--query-lockdown-whitelist-command", metavar="<command>", action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--list-lockdown-whitelist-contexts", action="store_true"
)
parser_group_lockdown_whitelist.add_argument(
"--add-lockdown-whitelist-context", metavar="<context>", action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--remove-lockdown-whitelist-context", metavar="<context>", action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--query-lockdown-whitelist-context", metavar="<context>", action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--list-lockdown-whitelist-uids", action="store_true"
)
parser_group_lockdown_whitelist.add_argument(
"--add-lockdown-whitelist-uid", metavar="<uid>", type=int, action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--remove-lockdown-whitelist-uid", metavar="<uid>", type=int, action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--query-lockdown-whitelist-uid", metavar="<uid>", type=int, action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--list-lockdown-whitelist-users", action="store_true"
)
parser_group_lockdown_whitelist.add_argument(
"--add-lockdown-whitelist-user", metavar="<user>", action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--remove-lockdown-whitelist-user", metavar="<user>", action="append"
)
parser_group_lockdown_whitelist.add_argument(
"--query-lockdown-whitelist-user", metavar="<user>", action="append"
)
parser.add_argument("--permanent", action="store_true")
parser.add_argument("--zone", default="", metavar="<zone>")
parser.add_argument("--policy", default="", metavar="<policy>")
parser.add_argument("--timeout", default="0", metavar="<seconds>")
parser_group_zone_or_policy = parser.add_mutually_exclusive_group()
parser_group_zone_or_policy.add_argument(
"--add-interface", metavar="<iface>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-interface", metavar="<iface>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-interface", metavar="<iface>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--change-interface", "--change-zone", metavar="<iface>", action="append"
)
parser_group_zone_or_policy.add_argument("--list-interfaces", action="store_true")
parser_group_zone_or_policy.add_argument(
"--add-source", metavar="<source>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-source", metavar="<source>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-source", metavar="<source>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--change-source", metavar="<source>", action="append"
)
parser_group_zone_or_policy.add_argument("--list-sources", action="store_true")
parser_group_zone_or_policy.add_argument(
"--add-ingress-zone", metavar="<zone>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-ingress-zone", metavar="<zone>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-ingress-zone", metavar="<zone>", action="append"
)
parser_group_zone_or_policy.add_argument("--list-ingress-zones", action="store_true")
parser_group_zone_or_policy.add_argument(
"--add-egress-zone", metavar="<zone>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-egress-zone", metavar="<zone>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-egress-zone", metavar="<zone>", action="append"
)
parser_group_zone_or_policy.add_argument("--list-egress-zones", action="store_true")
parser_group_zone_or_policy.add_argument(
"--add-rich-rule", metavar="<rule>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-rich-rule", metavar="<rule>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-rich-rule", metavar="<rule>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--add-service", metavar="<service>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-service", metavar="<zone_or_policy>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-service", metavar="<zone_or_policy>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--add-port", metavar="<port>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-port", metavar="<port>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-port", metavar="<port>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--add-protocol", metavar="<protocol>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-protocol", metavar="<protocol>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-protocol", metavar="<protocol>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--add-source-port", metavar="<port>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-source-port", metavar="<port>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-source-port", metavar="<port>", action="append"
)
parser_group_zone_or_policy.add_argument("--add-forward", action="store_true")
parser_group_zone_or_policy.add_argument("--remove-forward", action="store_true")
parser_group_zone_or_policy.add_argument("--query-forward", action="store_true")
parser_group_zone_or_policy.add_argument("--add-masquerade", action="store_true")
parser_group_zone_or_policy.add_argument("--remove-masquerade", action="store_true")
parser_group_zone_or_policy.add_argument("--query-masquerade", action="store_true")
parser_group_zone_or_policy.add_argument(
"--add-icmp-block", metavar="<icmptype>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-icmp-block", metavar="<icmptype>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-icmp-block", metavar="<icmptype>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--add-icmp-block-inversion", action="store_true"
)
parser_group_zone_or_policy.add_argument(
"--remove-icmp-block-inversion", action="store_true"
)
parser_group_zone_or_policy.add_argument(
"--query-icmp-block-inversion", action="store_true"
)
parser_group_zone_or_policy.add_argument(
"--add-forward-port", metavar="<port>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--remove-forward-port", metavar="<port>", action="append"
)
parser_group_zone_or_policy.add_argument(
"--query-forward-port", metavar="<port>", action="append"
)
parser_group_zone_or_policy.add_argument("--list-rich-rules", action="store_true")
parser_group_zone_or_policy.add_argument("--list-services", action="store_true")
parser_group_zone_or_policy.add_argument("--list-ports", action="store_true")
parser_group_zone_or_policy.add_argument("--list-protocols", action="store_true")
parser_group_zone_or_policy.add_argument("--list-icmp-blocks", action="store_true")
parser_group_zone_or_policy.add_argument("--list-forward-ports", action="store_true")
parser_group_zone_or_policy.add_argument("--list-source-ports", action="store_true")
parser_group_zone_or_policy.add_argument("--list-all", action="store_true")
parser_group_zone_or_policy.add_argument("--get-target", action="store_true")
parser_group_zone_or_policy.add_argument("--set-target", metavar="<target>")
parser_group_zone_or_policy.add_argument("--get-priority", action="store_true")
parser_group_zone_or_policy.add_argument("--set-priority", metavar="<priority>")
parser_group_zone_or_policy.add_argument("--get-ingress-priority", action="store_true")
parser_group_zone_or_policy.add_argument("--set-ingress-priority", metavar="<priority>")
parser_group_zone_or_policy.add_argument("--get-egress-priority", action="store_true")
parser_group_zone_or_policy.add_argument("--set-egress-priority", metavar="<priority>")
parser.add_argument("--option", metavar="<key>[=<value>]", action="append")
parser.add_argument("--type", metavar="<ipsettype>")
parser.add_argument("--ipset", metavar="<ipset>")
parser_ipset = parser.add_mutually_exclusive_group()
# parser_ipset.add_argument("--add-option", metavar="<key>[=<value>]")
# parser_ipset.add_argument("--remove-option", metavar="<key>[=<value>]")
# parser_ipset.add_argument("--query-option", metavar="<key>[=<value>]")
# parser_ipset.add_argument("--get-options", action="store_true")
parser_ipset.add_argument("--get-ipsets", action="store_true")
parser_ipset.add_argument("--add-entry", metavar="<entry>", action="append")
parser_ipset.add_argument("--remove-entry", metavar="<entry>", action="append")
parser_ipset.add_argument("--query-entry", metavar="<entry>", action="append")
parser_ipset.add_argument("--get-entries", action="store_true")
parser_ipset.add_argument(
"--add-entries-from-file", metavar="<filename>", action="append"
)
parser_ipset.add_argument(
"--remove-entries-from-file", metavar="<filename>", action="append"
)
parser.add_argument("--icmptype", metavar="<icmptype>")
parser_icmptype = parser.add_mutually_exclusive_group()
parser_icmptype.add_argument("--add-destination", metavar="<ipv>", action="append")
parser_icmptype.add_argument("--remove-destination", metavar="<ipv>", action="append")
parser_icmptype.add_argument("--query-destination", metavar="<ipv>", action="append")
parser_icmptype.add_argument("--get-destinations", action="store_true")
parser.add_argument("--service", metavar="<service>")
parser_service = parser.add_mutually_exclusive_group()
parser_service.add_argument("--get-ports", action="store_true")
parser_service.add_argument("--get-source-ports", action="store_true")
parser_service.add_argument("--get-protocols", action="store_true")
parser_service.add_argument("--add-module", metavar="<module>", action="append")
parser_service.add_argument("--remove-module", metavar="<module>", action="append")
parser_service.add_argument("--query-module", metavar="<module>", action="append")
parser_service.add_argument("--get-modules", action="store_true")
parser_service.add_argument("--add-helper", metavar="<helper>", action="append")
parser_service.add_argument("--remove-helper", metavar="<helper>", action="append")
parser_service.add_argument("--query-helper", metavar="<helper>", action="append")
parser_service.add_argument("--get-service-helpers", action="store_true")
parser_service.add_argument("--add-include", metavar="<service>", action="append")
parser_service.add_argument("--remove-include", metavar="<service>", action="append")
parser_service.add_argument("--query-include", metavar="<service>", action="append")
parser_service.add_argument("--get-includes", action="store_true")
parser_service.add_argument(
"--set-destination", metavar="<destination>", action="append"
)
parser_service.add_argument("--get-destination", action="store_true")
parser_service.add_argument("--set-description", metavar="<description>")
parser_service.add_argument("--get-description", action="store_true")
parser_service.add_argument("--set-short", metavar="<description>")
parser_service.add_argument("--get-short", action="store_true")
parser.add_argument("--helper", metavar="<helper>")
parser.add_argument("--family", metavar="<family>")
parser.add_argument("--module", metavar="<module>")
parser_helper = parser.add_mutually_exclusive_group()
# parser_helper.add_argument("--get-ports", action="store_true")
parser_helper.add_argument("--get-helpers", action="store_true")
parser_helper.add_argument("--set-module", metavar="<module>")
parser_helper.add_argument("--get-module", action="store_true")
# parser_helper.add_argument("--query-module", metavar="<module>")
parser_helper.add_argument("--set-family", metavar="<family>|''", nargs="*")
parser_helper.add_argument("--get-family", action="store_true")
parser.add_argument("--direct", action="store_true")
# not possible to have sequences of options here
parser_direct = parser.add_mutually_exclusive_group()
parser_direct.add_argument(
"--passthrough",
nargs=argparse.REMAINDER,
metavar=("{ ipv4 | ipv6 | eb }", "<args>"),
)
parser_direct.add_argument(
"--add-passthrough",
nargs=argparse.REMAINDER,
metavar=("{ ipv4 | ipv6 | eb }", "<args>"),
)
parser_direct.add_argument(
"--remove-passthrough",
nargs=argparse.REMAINDER,
metavar=("{ ipv4 | ipv6 | eb }", "<args>"),
)
parser_direct.add_argument(
"--query-passthrough",
nargs=argparse.REMAINDER,
metavar=("{ ipv4 | ipv6 | eb }", "<args>"),
)
parser_direct.add_argument(
"--get-passthroughs", nargs=1, metavar=("{ ipv4 | ipv6 | eb }")
)
parser_direct.add_argument("--get-all-passthroughs", action="store_true")
parser_direct.add_argument(
"--add-chain", nargs=3, metavar=("{ ipv4 | ipv6 | eb }", "<table>", "<chain>")
)
parser_direct.add_argument(
"--remove-chain", nargs=3, metavar=("{ ipv4 | ipv6 | eb }", "<table>", "<chain>")
)
parser_direct.add_argument(
"--query-chain", nargs=3, metavar=("{ ipv4 | ipv6 | eb }", "<table>", "<chain>")
)
parser_direct.add_argument("--get-all-chains", action="store_true")
parser_direct.add_argument(
"--get-chains", nargs=2, metavar=("{ ipv4 | ipv6 | eb }", "<table>")
)
parser_direct.add_argument(
"--add-rule",
nargs=argparse.REMAINDER,
metavar=("{ ipv4 | ipv6 | eb }", "<table> <chain> <priority> <args>"),
)
parser_direct.add_argument(
"--remove-rule",
nargs=argparse.REMAINDER,
metavar=("{ ipv4 | ipv6 | eb }", "<table> <chain> <priority> <args>"),
)
parser_direct.add_argument(
"--remove-rules", nargs=3, metavar=("{ ipv4 | ipv6 | eb }", "<table> <chain>")
)
parser_direct.add_argument(
"--query-rule",
nargs=argparse.REMAINDER,
metavar=("{ ipv4 | ipv6 | eb }", "<table> <chain> <priority> <args>"),
)
parser_direct.add_argument(
"--get-rules", nargs=3, metavar=("{ ipv4 | ipv6 | eb }", "<table>", "<chain>")
)
parser_direct.add_argument("--get-all-rules", action="store_true")
##############################################################################
args = sys.argv[1:]
if len(sys.argv) > 1:
i = -1
if "--passthrough" in args:
i = args.index("--passthrough") + 1
elif "--add-passthrough" in args:
i = args.index("--add-passthrough") + 1
elif "--remove-passthrough" in args:
i = args.index("--remove-passthrough") + 1
elif "--query-passthrough" in args:
i = args.index("--query-passthrough") + 1
elif "--add-rule" in args:
i = args.index("--add-rule") + 4
elif "--remove-rule" in args:
i = args.index("--remove-rule") + 4
elif "--query-rule" in args:
i = args.index("--query-rule") + 4
# join <args> into one argument to prevent parser from parsing each iptables
# option, because they can conflict with firewall-cmd options
# # e.g. --delete (iptables) and --delete-* (firewall-cmd)
if (i > -1) and (i < len(args) - 1):
aux_args = args[:]
args = aux_args[: i + 1] # all but not <args>
args.append(joinArgs(aux_args[i + 1 :])) # add <args> as one arg
a = parser.parse_args(args)
options_standalone = (
a.help
or a.version
or a.state
or a.reload
or a.complete_reload
or a.runtime_to_permanent
or a.reset_to_defaults
or a.panic_on
or a.panic_off
or a.query_panic
or a.lockdown_on
or a.lockdown_off
or a.query_lockdown
or a.get_default_zone
or a.set_default_zone
or a.get_active_zones
or a.get_ipset_types
or a.get_log_denied
or a.set_log_denied
or a.get_automatic_helpers
or a.set_automatic_helpers
or a.check_config
or a.get_active_policies
)
options_desc_xml_file = (
a.set_description or a.get_description or a.set_short or a.get_short
)
options_lockdown_whitelist = (
a.list_lockdown_whitelist_commands
or a.add_lockdown_whitelist_command
or a.remove_lockdown_whitelist_command
or a.query_lockdown_whitelist_command
or a.list_lockdown_whitelist_contexts
or a.add_lockdown_whitelist_context
or a.remove_lockdown_whitelist_context
or a.query_lockdown_whitelist_context
or a.list_lockdown_whitelist_uids
or a.add_lockdown_whitelist_uid is not None
or a.remove_lockdown_whitelist_uid is not None
or a.query_lockdown_whitelist_uid is not None
or a.list_lockdown_whitelist_users
or a.add_lockdown_whitelist_user
or a.remove_lockdown_whitelist_user
or a.query_lockdown_whitelist_user
)
options_config = (
a.get_zones
or a.get_services
or a.get_icmptypes
or options_lockdown_whitelist
or a.list_all_zones
or a.get_zone_of_interface
or a.get_zone_of_source
or a.info_zone
or a.info_icmptype
or a.info_service
or a.info_ipset
or a.info_policy
or a.get_ipsets
or a.info_helper
or a.get_helpers
or a.get_policies
or a.list_all_policies
)
options_zone_and_policy_adapt_query = (
a.add_service
or a.remove_service
or a.query_service
or a.add_port
or a.remove_port
or a.query_port
or a.add_protocol
or a.remove_protocol
or a.query_protocol
or a.add_source_port
or a.remove_source_port
or a.query_source_port
or a.add_icmp_block
or a.remove_icmp_block
or a.query_icmp_block
or a.add_forward_port
or a.remove_forward_port
or a.query_forward_port
or a.add_rich_rule
or a.remove_rich_rule
or a.query_rich_rule
or a.add_masquerade
or a.remove_masquerade
or a.query_masquerade
or a.list_services
or a.list_ports
or a.list_protocols
or a.list_source_ports
or a.list_icmp_blocks
or a.list_forward_ports
or a.list_rich_rules
or a.list_all
or a.get_target
or a.set_target
or a.set_priority
or a.get_priority
)
options_zone_unique = (
a.add_icmp_block_inversion
or a.remove_icmp_block_inversion
or a.query_icmp_block_inversion
or a.add_forward
or a.remove_forward
or a.query_forward
or a.list_interfaces
or a.change_interface
or a.add_interface
or a.remove_interface
or a.query_interface
or a.list_sources
or a.change_source
or a.add_source
or a.remove_source
or a.query_source
or a.set_ingress_priority
or a.get_ingress_priority
or a.set_egress_priority
or a.get_egress_priority
)
options_zone_ops = options_zone_unique or options_zone_and_policy_adapt_query
options_policy_unique = (
a.list_ingress_zones
or a.add_ingress_zone
or a.remove_ingress_zone
or a.query_ingress_zone
or a.list_egress_zones
or a.add_egress_zone
or a.remove_egress_zone
or a.query_egress_zone
)
options_policy_ops = options_policy_unique or options_zone_and_policy_adapt_query
options_zone = a.zone or a.timeout != "0" or options_zone_ops or options_desc_xml_file
options_policy = (
a.policy or a.timeout != "0" or options_policy_ops or options_desc_xml_file
)
options_ipset = (
a.add_entry
or a.remove_entry
or a.query_entry
or a.get_entries
or a.add_entries_from_file
or a.remove_entries_from_file
or options_desc_xml_file
)
options_icmptype = (
a.add_destination
or a.remove_destination
or a.query_destination
or a.get_destinations
or options_desc_xml_file
)
options_service = (
a.add_port
or a.remove_port
or a.query_port
or a.get_ports
or a.add_protocol
or a.remove_protocol
or a.query_protocol
or a.get_protocols
or a.add_source_port
or a.remove_source_port
or a.query_source_port
or a.get_source_ports
or a.add_module
or a.remove_module
or a.query_module
or a.get_modules
or a.set_destination
or a.remove_destination
or a.query_destination
or a.get_destinations
or options_desc_xml_file
or a.add_include
or a.remove_include
or a.query_include
or a.get_includes
or a.add_helper
or a.remove_helper
or a.query_helper
or a.get_service_helpers
)
options_helper = (
a.add_port
or a.remove_port
or a.query_port
or a.get_ports
or a.set_module
or a.get_module
or a.set_family
or a.get_family
or options_desc_xml_file
)
options_permanent = (
a.permanent
or options_config
or a.zone
or options_zone_ops
or a.policy
or options_policy_ops
or a.ipset
or options_ipset
or a.helper
or options_helper
)
options_permanent_only = (
a.new_icmptype
or a.delete_icmptype
or a.new_icmptype_from_file
or a.load_icmptype_defaults
or a.new_service
or a.delete_service
or a.new_service_from_file
or a.load_service_defaults
or a.new_zone
or a.delete_zone
or a.new_zone_from_file
or a.load_zone_defaults
or a.new_policy
or a.delete_policy
or a.new_policy_from_file
or a.load_policy_defaults
or a.new_ipset
or a.delete_ipset
or a.new_ipset_from_file
or a.load_ipset_defaults
or a.new_helper
or a.delete_helper
or a.new_helper_from_file
or a.load_helper_defaults
or (a.icmptype and options_icmptype)
or (a.service and options_service)
or (a.helper and options_helper)
or a.path_zone
or a.path_icmptype
or a.path_service
or a.path_ipset
or a.path_helper
or options_desc_xml_file
or a.set_priority
or a.set_ingress_priority
or a.set_egress_priority
or a.get_priority
or a.get_ingress_priority
or a.get_egress_priority
or a.path_policy
or a.get_target
or a.set_target
)
options_direct = (
a.passthrough
or a.add_chain
or a.remove_chain
or a.query_chain
or a.get_chains
or a.get_all_chains
or a.add_rule
or a.remove_rule
or a.remove_rules
or a.query_rule
or a.get_rules
or a.get_all_rules
or a.add_passthrough
or a.remove_passthrough
or a.query_passthrough
or a.get_passthroughs
or a.get_all_passthroughs
)
# these are supposed to only write out some output
options_list_get = (
a.help
or a.version
or a.list_all
or a.list_all_zones
or a.list_lockdown_whitelist_commands
or a.list_lockdown_whitelist_contexts
or a.list_lockdown_whitelist_uids
or a.list_lockdown_whitelist_users
or a.list_services
or a.list_ports
or a.list_protocols
or a.list_icmp_blocks
or a.list_forward_ports
or a.list_rich_rules
or a.list_interfaces
or a.list_sources
or a.get_default_zone
or a.get_active_zones
or a.get_zone_of_interface
or a.get_zone_of_source
or a.get_zones
or a.get_services
or a.get_icmptypes
or a.get_target
or a.info_zone
or a.info_icmptype
or a.info_service
or a.info_ipset
or a.get_ipsets
or a.get_entries
or a.info_helper
or a.get_helpers
or a.get_destinations
or a.get_description
or a.list_all_policies
or a.info_policy
or a.get_policies
or a.get_active_policies
)
# Set quiet and verbose
cmd = FirewallCommand(a.quiet, a.verbose)
def myexcepthook(exctype, value, traceback):
cmd.exception_handler(str(value))
sys.excepthook = myexcepthook
# Check various impossible combinations of options
if options_standalone and (
options_zone
or options_permanent
or options_direct
or options_permanent_only
or options_ipset
or options_policy
):
cmd.fail(
parser.format_usage() + "Can't use stand-alone options with other options."
)
if options_ipset and not options_desc_xml_file and not a.ipset:
cmd.fail(parser.format_usage() + "No ipset specified.")
if (
(options_icmptype and not a.icmptype)
and not (options_service and a.service)
and not options_desc_xml_file
):
cmd.fail(parser.format_usage() + "No icmptype specified.")
if (
(options_helper and not a.helper)
and not (options_service and a.service)
and not options_zone
and not options_desc_xml_file
and not options_policy
):
cmd.fail(parser.format_usage() + "No helper specified.")
if (
(options_direct or options_permanent_only)
and (options_zone and not a.zone)
and (options_service and not a.service)
and (options_icmptype and a.icmptype)
and not options_desc_xml_file
):
cmd.fail(parser.format_usage() + "Can't be used with --zone.")
if (a.direct and not options_direct) or (options_direct and not a.direct):
cmd.fail(parser.format_usage() + "Wrong usage of 'direct' options.")
if a.zone and a.direct:
cmd.fail(parser.format_usage() + "--zone is an invalid option with --direct")
if a.name and not (
a.new_zone_from_file
or a.new_service_from_file
or a.new_ipset_from_file
or a.new_icmptype_from_file
or a.new_helper_from_file
or a.new_policy_from_file
):
cmd.fail(parser.format_usage() + "Wrong usage of '--name' option.")
if options_permanent_only and not a.permanent:
cmd.fail(parser.format_usage() + "Option can be used only with --permanent.")
if options_config and (options_zone or options_policy):
cmd.fail(
parser.format_usage()
+ "Wrong usage of --get-zones | --get-services | --get-icmptypes | --get-policies."
)
if a.timeout != "0":
value = 0
unit = "s"
if len(a.timeout) < 1:
cmd.fail(
parser.format_usage()
+ "'%s' is wrong timeout value. Use for example '2m' or '1h'" % a.timeout
)
elif len(a.timeout) == 1:
if a.timeout.isdigit():
value = int(a.timeout[0])
else:
cmd.fail(
parser.format_usage()
+ "'%s' is wrong timeout value. Use for example '2m' or '1h'"
% a.timeout
)
elif len(a.timeout) > 1:
if a.timeout.isdigit():
value = int(a.timeout)
unit = "s"
else:
if a.timeout[:-1].isdigit():
value = int(a.timeout[:-1])
else:
cmd.fail(
parser.format_usage()
+ "'%s' is wrong timeout value. Use for example '2m' or '1h'"
% a.timeout
)
unit = a.timeout[-1:].lower()
if unit == "s":
a.timeout = value
elif unit == "m":
a.timeout = value * 60
elif unit == "h":
a.timeout = value * 60 * 60
else:
cmd.fail(
parser.format_usage()
+ "'%s' is wrong timeout value. Use for example '2m' or '1h'" % a.timeout
)
else:
a.timeout = 0
if a.timeout and not (
a.add_service
or a.add_port
or a.add_protocol
or a.add_icmp_block
or a.add_forward_port
or a.add_source_port
or a.add_forward
or a.add_masquerade
or a.add_rich_rule
):
cmd.fail(parser.format_usage() + "Wrong --timeout usage")
if a.permanent:
if a.timeout:
cmd.fail(parser.format_usage() + "Can't specify timeout for permanent action.")
if options_config and not a.zone:
pass
elif options_permanent:
pass
else:
cmd.fail(parser.format_usage() + "Wrong --permanent usage.")
if a.quiet and options_list_get:
# it makes no sense to use --quiet with these options
a.quiet = False
cmd.set_quiet(a.quiet)
cmd.fail("-q/--quiet can't be used with this option(s)")
if a.zone and a.policy:
cmd.fail(parser.format_usage() + "Can't use --zone with --policy.")
if a.policy and options_zone_unique:
cmd.fail(parser.format_usage() + "Can't use --policy with zone only options.")
if a.zone and options_policy_unique:
cmd.fail(parser.format_usage() + "Can't use --zone with policy only options.")
if not a.policy and options_policy_unique:
cmd.fail(parser.format_usage() + "Must use --policy with policy only options.")
if a.help:
__usage()
sys.exit(0)
zone = a.zone
try:
fw = FirewallClient()
except FirewallError as msg:
code = FirewallError.get_code(str(msg))
cmd.print_and_exit("Error: %s" % msg, code)
fw.setExceptionHandler(cmd.exception_handler)
if not (
options_standalone
or options_ipset
or options_icmptype
or options_service
or options_helper
or options_config
or options_zone_ops
or options_policy
or options_direct
or options_permanent_only
):
state = fw.get_property("state")
if state == "RUNNING":
state = "State: running\n\n"
elif state == "FAILED":
state = "State: failed\n\n"
else:
state = "State: not running\n\n"
cmd.fail(state + "No options specified\n" + parser.format_usage()[:-1])
if not fw.connected:
if a.state:
cmd.print_and_exit("not running", errors.NOT_RUNNING)
else:
cmd.print_and_exit("FirewallD is not running", errors.NOT_RUNNING)
cmd.set_fw(fw)
if (
options_zone_ops
and not zone
and not a.policy
and not (a.service and options_service)
and not (a.helper and options_helper)
):
default = fw.getDefaultZone()
cmd.print_if_verbose("No zone specified, using default zone, i.e. '%s'" % default)
active = list(fw.getActiveZones().keys())
if active and default not in active:
cmd.print_msg(
"""You're performing an operation over default zone ('%s'),
but your connections/interfaces are in zone '%s' (see --get-active-zones)
You most likely need to use --zone=%s option.\n"""
% (default, ",".join(active), active[0])
)
if a.permanent:
if a.get_ipsets:
cmd.print_and_exit(" ".join(fw.config().getIPSetNames()))
elif a.new_ipset:
if not a.type:
cmd.fail(parser.format_usage() + "No type specified.")
if a.type == "hash:mac" and a.family:
cmd.fail(
parser.format_usage()
+ "--family is not compatible with the hash:mac type"
)
settings = FirewallClientIPSetSettings()
settings.setType(a.type)
if a.option:
for opt in a.option:
settings.addOption(*cmd.parse_ipset_option(opt))
if a.family:
settings.addOption("family", a.family)
config = fw.config()
config.addIPSet(a.new_ipset, settings)
elif a.new_ipset_from_file:
filename = os.path.basename(a.new_ipset_from_file)
dirname = os.path.dirname(a.new_ipset_from_file)
if dirname == "":
dirname = "./"
try:
obj = ipset_reader(filename, dirname)
except FirewallError as msg:
cmd.fail(
"Failed to load ipset file '%s': %s" % (a.new_ipset_from_file, msg)
)
except IOError as msg:
cmd.fail("Failed to load ipset file: %s" % msg)
if a.name:
obj.name = a.name
config = fw.config()
config.addIPSet(obj.name, obj.export_config())
elif a.delete_ipset:
ipset = fw.config().getIPSetByName(a.delete_ipset)
ipset.remove()
elif a.load_ipset_defaults:
ipset = fw.config().getIPSetByName(a.load_ipset_defaults)
ipset.loadDefaults()
elif a.info_ipset:
ipset = fw.config().getIPSetByName(a.info_ipset)
cmd.print_ipset_info(a.info_ipset, ipset.getSettings())
sys.exit(0)
elif a.path_ipset:
ipset = fw.config().getIPSetByName(a.path_ipset)
cmd.print_and_exit(
"%s/%s" % (ipset.get_property("path"), ipset.get_property("filename"))
)
elif a.ipset:
ipset = fw.config().getIPSetByName(a.ipset)
settings = ipset.getSettings()
if a.add_entry:
cmd.add_sequence(
a.add_entry, settings.addEntry, settings.queryEntry, None, "'%s'"
)
ipset.update(settings)
elif a.remove_entry:
cmd.remove_sequence(
a.remove_entry, settings.removeEntry, settings.queryEntry, None, "'%s'"
)
ipset.update(settings)
elif a.query_entry:
cmd.query_sequence(a.query_entry, settings.queryEntry, None, "'%s'")
elif a.get_entries:
l = settings.getEntries()
cmd.print_and_exit("\n".join(l))
elif a.add_entries_from_file:
changed = False
for filename in a.add_entries_from_file:
try:
entries = cmd.get_ipset_entries_from_file(filename)
except IOError as msg:
message = "Failed to read file '%s': %s" % (filename, msg)
if len(a.add_entries_from_file) > 1:
cmd.print_warning(message)
else:
cmd.print_and_exit(message)
else:
old_entries = settings.getEntries()
entries_set = set()
for entry in old_entries:
entries_set.add(entry)
for entry in entries:
if entry not in entries_set:
old_entries.append(entry)
entries_set.add(entry)
changed = True
else:
cmd.print_if_verbose("Warning: ALREADY_ENABLED: %s" % entry)
if changed:
settings.setEntries(old_entries)
if changed:
ipset.update(settings)
elif a.remove_entries_from_file:
changed = False
for filename in a.remove_entries_from_file:
try:
entries = cmd.get_ipset_entries_from_file(filename)
except IOError as msg:
message = "Failed to read file '%s': %s" % (filename, msg)
if len(a.remove_entries_from_file) > 1:
cmd.print_warning(message)
else:
cmd.print_and_exit(message)
else:
old_entries = settings.getEntries()
entries_set = set()
for entry in old_entries:
entries_set.add(entry)
for entry in entries:
if entry in entries_set:
old_entries.remove(entry)
entries_set.discard(entry)
changed = True
else:
cmd.print_if_verbose("Warning: NOT_ENABLED: %s" % entry)
if changed:
settings.setEntries(old_entries)
if changed:
ipset.update(settings)
elif a.set_description:
settings.setDescription(a.set_description)
ipset.update(settings)
elif a.get_description:
cmd.print_and_exit(settings.getDescription())
elif a.set_short:
settings.setShort(a.set_short)
ipset.update(settings)
elif a.get_short:
cmd.print_and_exit(settings.getShort())
else:
cmd.fail(parser.format_usage() + "Unknown option")
elif a.get_zones:
cmd.print_and_exit(" ".join(fw.config().getZoneNames()))
elif a.new_zone:
config = fw.config()
config.addZone(a.new_zone, FirewallClientZoneSettings())
elif a.new_zone_from_file:
filename = os.path.basename(a.new_zone_from_file)
dirname = os.path.dirname(a.new_zone_from_file)
if dirname == "":
dirname = "./"
try:
obj = zone_reader(filename, dirname)
except FirewallError as msg:
cmd.fail("Failed to load zone file '%s': %s" % (a.new_zone_from_file, msg))
except IOError as msg:
cmd.fail("Failed to load zone file: %s" % msg)
if a.name:
obj.name = a.name
config = fw.config()
config.addZone(obj.name, obj.export_config())
elif a.delete_zone:
zone = fw.config().getZoneByName(a.delete_zone)
zone.remove()
elif a.load_zone_defaults:
zone = fw.config().getZoneByName(a.load_zone_defaults)
zone.loadDefaults()
elif a.info_zone:
zone = fw.config().getZoneByName(a.info_zone)
cmd.print_zone_info(
a.info_zone, zone.getSettings(), default_zone=fw.getDefaultZone()
)
sys.exit(0)
elif a.path_zone:
zone = fw.config().getZoneByName(a.path_zone)
cmd.print_and_exit(
"%s/%s" % (zone.get_property("path"), zone.get_property("filename"))
)
elif a.get_policies:
cmd.print_and_exit(" ".join(fw.config().getPolicyNames()))
elif a.new_policy:
config = fw.config()
config.addPolicy(a.new_policy, FirewallClientPolicySettings())
elif a.new_policy_from_file:
filename = os.path.basename(a.new_policy_from_file)
dirname = os.path.dirname(a.new_policy_from_file)
if dirname == "":
dirname = "./"
try:
obj = policy_reader(filename, dirname)
except FirewallError as msg:
cmd.fail(
"Failed to load policy file '%s': %s" % (a.new_policy_from_file, msg)
)
except IOError as msg:
cmd.fail("Failed to load policy file: %s" % msg)
if a.name:
obj.name = a.name
config = fw.config()
config.addPolicy(obj.name, obj.export_config_dict())
elif a.delete_policy:
policy = fw.config().getPolicyByName(a.delete_policy)
policy.remove()
elif a.load_policy_defaults:
policy = fw.config().getPolicyByName(a.load_policy_defaults)
policy.loadDefaults()
elif a.info_policy:
policy = fw.config().getPolicyByName(a.info_policy)
cmd.print_policy_info(a.info_policy, policy.getSettings())
sys.exit(0)
elif a.path_policy:
policy = fw.config().getPolicyByName(a.path_policy)
cmd.print_and_exit(
"%s/%s" % (policy.get_property("path"), policy.get_property("filename"))
)
elif a.get_services:
cmd.print_and_exit(" ".join(fw.config().getServiceNames()))
elif a.new_service:
config = fw.config()
config.addService(a.new_service, FirewallClientServiceSettings())
elif a.new_service_from_file:
filename = os.path.basename(a.new_service_from_file)
dirname = os.path.dirname(a.new_service_from_file)
if dirname == "":
dirname = "./"
try:
obj = service_reader(filename, dirname)
except FirewallError as msg:
cmd.fail(
"Failed to load service file '%s': %s" % (a.new_service_from_file, msg)
)
except IOError as msg:
cmd.fail("Failed to load service file: %s" % msg)
if a.name:
obj.name = a.name
config = fw.config()
config.addService(obj.name, obj.export_config())
elif a.delete_service:
service = fw.config().getServiceByName(a.delete_service)
service.remove()
elif a.load_service_defaults:
service = fw.config().getServiceByName(a.load_service_defaults)
service.loadDefaults()
elif a.info_service:
service = fw.config().getServiceByName(a.info_service)
cmd.print_service_info(a.info_service, service.getSettings())
sys.exit(0)
elif a.path_service:
service = fw.config().getServiceByName(a.path_service)
cmd.print_and_exit(
"%s/%s" % (service.get_property("path"), service.get_property("filename"))
)
elif a.get_helpers:
cmd.print_and_exit(" ".join(fw.config().getHelperNames()))
elif a.new_helper:
if not a.module:
cmd.fail(parser.format_usage() + "No module specified.")
settings = FirewallClientHelperSettings()
settings.setModule(a.module)
if a.family:
settings.setFamily(a.family)
config = fw.config()
config.addHelper(a.new_helper, settings)
elif a.new_helper_from_file:
filename = os.path.basename(a.new_helper_from_file)
dirname = os.path.dirname(a.new_helper_from_file)
if dirname == "":
dirname = "./"
try:
obj = helper_reader(filename, dirname)
except FirewallError as msg:
cmd.fail(
"Failed to load helper file '%s': %s" % (a.new_helper_from_file, msg)
)
except IOError as msg:
cmd.fail("Failed to load helper file: %s" % msg)
if a.name:
obj.name = a.name
config = fw.config()
config.addHelper(obj.name, obj.export_config())
elif a.delete_helper:
helper = fw.config().getHelperByName(a.delete_helper)
helper.remove()
elif a.load_helper_defaults:
helper = fw.config().getHelperByName(a.load_helper_defaults)
helper.loadDefaults()
elif a.info_helper:
helper = fw.config().getHelperByName(a.info_helper)
cmd.print_helper_info(a.info_helper, helper.getSettings())
sys.exit(0)
elif a.path_helper:
helper = fw.config().getHelperByName(a.path_helper)
cmd.print_and_exit(
"%s/%s" % (helper.get_property("path"), helper.get_property("filename"))
)
elif a.helper:
helper = fw.config().getHelperByName(a.helper)
settings = helper.getSettings()
if a.add_port:
cmd.add_sequence(
a.add_port,
settings.addPort,
settings.queryPort,
cmd.parse_port,
"%s/%s",
)
helper.update(settings)
elif a.remove_port:
cmd.remove_sequence(
a.remove_port,
settings.removePort,
settings.queryPort,
cmd.parse_port,
"%s/%s",
)
helper.update(settings)
elif a.query_port:
cmd.query_sequence(
a.query_port, settings.queryPort, cmd.parse_port, "%s/%s"
)
elif a.get_ports:
l = helper.getPorts()
cmd.print_and_exit(" ".join(["%s/%s" % (port[0], port[1]) for port in l]))
elif a.get_module:
cmd.print_and_exit(settings.getModule())
elif a.set_module:
settings.setModule(cmd.check_module(a.set_module))
helper.update(settings)
elif a.get_family:
cmd.print_and_exit(settings.getFamily())
elif a.set_family:
settings.setFamily(cmd.check_helper_family(a.set_family[0]))
helper.update(settings)
elif a.set_description:
settings.setDescription(a.set_description)
helper.update(settings)
elif a.get_description:
cmd.print_and_exit(settings.getDescription())
elif a.set_short:
settings.setShort(a.set_short)
helper.update(settings)
elif a.get_short:
cmd.print_and_exit(settings.getShort())
else:
cmd.fail(parser.format_usage() + "Unknown option")
elif a.get_icmptypes:
cmd.print_and_exit(" ".join(fw.config().getIcmpTypeNames()))
elif a.new_icmptype:
config = fw.config()
config.addIcmpType(a.new_icmptype, FirewallClientIcmpTypeSettings())
elif a.new_icmptype_from_file:
filename = os.path.basename(a.new_icmptype_from_file)
dirname = os.path.dirname(a.new_icmptype_from_file)
if dirname == "":
dirname = "./"
try:
obj = icmptype_reader(filename, dirname)
except FirewallError as msg:
cmd.fail(
"Failed to load icmptype file '%s': %s"
% (a.new_icmptype_from_file, msg)
)
except IOError as msg:
cmd.fail("Failed to load icmptype file: %s" % msg)
if a.name:
obj.name = a.name
config = fw.config()
config.addIcmpType(obj.name, obj.export_config())
elif a.delete_icmptype:
icmptype = fw.config().getIcmpTypeByName(a.delete_icmptype)
icmptype.remove()
elif a.load_icmptype_defaults:
icmptype = fw.config().getIcmpTypeByName(a.load_icmptype_defaults)
icmptype.loadDefaults()
elif a.info_icmptype:
icmptype = fw.config().getIcmpTypeByName(a.info_icmptype)
cmd.print_icmptype_info(a.info_icmptype, icmptype.getSettings())
sys.exit(0)
elif a.path_icmptype:
icmptype = fw.config().getIcmpTypeByName(a.path_icmptype)
cmd.print_and_exit(
"%s/%s" % (icmptype.get_property("path"), icmptype.get_property("filename"))
)
elif a.icmptype:
icmptype = fw.config().getIcmpTypeByName(a.icmptype)
settings = icmptype.getSettings()
if a.add_destination:
cmd.add_sequence(
a.add_destination,
settings.addDestination,
settings.queryDestination,
cmd.check_destination_ipv,
"'%s'",
)
icmptype.update(settings)
elif a.remove_destination:
cmd.remove_sequence(
a.remove_destination,
settings.removeDestination,
settings.queryDestination,
cmd.check_destination_ipv,
"'%s'",
)
icmptype.update(settings)
elif a.query_destination:
cmd.query_sequence(
a.query_destination,
settings.queryDestination,
cmd.check_destination_ipv,
"'%s'",
)
elif a.get_destinations:
l = settings.getDestinations()
if len(l) == 0:
l = ["ipv4", "ipv6"]
cmd.print_and_exit("\n".join(l))
elif a.set_description:
settings.setDescription(a.set_description)
icmptype.update(settings)
elif a.get_description:
cmd.print_and_exit(settings.getDescription())
elif a.set_short:
settings.setShort(a.set_short)
icmptype.update(settings)
elif a.get_short:
cmd.print_and_exit(settings.getShort())
else:
cmd.fail(parser.format_usage() + "Unknown option")
elif a.service:
service = fw.config().getServiceByName(a.service)
settings = service.getSettings()
if a.add_port:
cmd.add_sequence(
a.add_port,
settings.addPort,
settings.queryPort,
cmd.parse_port,
"%s/%s",
)
service.update(settings)
elif a.remove_port:
cmd.remove_sequence(
a.remove_port,
settings.removePort,
settings.queryPort,
cmd.parse_port,
"%s/%s",
)
service.update(settings)
elif a.query_port:
cmd.query_sequence(
a.query_port, settings.queryPort, cmd.parse_port, "%s/%s"
)
elif a.get_ports:
l = settings.getPorts()
cmd.print_and_exit(" ".join(["%s/%s" % (port[0], port[1]) for port in l]))
elif a.add_protocol:
cmd.add_sequence(
a.add_protocol,
settings.addProtocol,
settings.queryProtocol,
None,
"'%s'",
)
service.update(settings)
elif a.remove_protocol:
cmd.remove_sequence(
a.remove_protocol,
settings.removeProtocol,
settings.queryProtocol,
None,
"'%s'",
)
service.update(settings)
elif a.query_protocol:
cmd.query_sequence(a.query_protocol, settings.queryProtocol, None, "'%s'")
elif a.get_protocols:
l = settings.getProtocols()
cmd.print_and_exit(" ".join(["%s" % protocol for protocol in l]))
elif a.add_source_port:
cmd.add_sequence(
a.add_source_port,
settings.addSourcePort,
settings.querySourcePort,
cmd.parse_port,
"%s/%s",
)
service.update(settings)
elif a.remove_source_port:
cmd.remove_sequence(
a.remove_source_port,
settings.removeSourcePort,
settings.querySourcePort,
cmd.parse_port,
"%s/%s",
)
service.update(settings)
elif a.query_source_port:
cmd.query_sequence(
a.query_source_port, settings.querySourcePort, cmd.parse_port, "%s/%s"
)
elif a.get_source_ports:
l = settings.getSourcePorts()
cmd.print_and_exit(" ".join(["%s/%s" % (port[0], port[1]) for port in l]))
elif a.add_module:
cmd.add_sequence(
a.add_module, settings.addModule, settings.queryModule, None, "'%s'"
)
service.update(settings)
elif a.remove_module:
cmd.remove_sequence(
a.remove_module,
settings.removeModule,
settings.queryModule,
None,
"'%s'",
)
service.update(settings)
elif a.query_module:
cmd.query_sequence(a.query_module, settings.queryModule, None, "'%s'")
elif a.get_modules:
l = settings.getModules()
cmd.print_and_exit(" ".join(["%s" % module for module in l]))
elif a.set_destination:
cmd.add_sequence(
a.set_destination,
settings.setDestination,
settings.queryDestination,
cmd.parse_service_destination,
"%s:%s",
)
service.update(settings)
elif a.remove_destination:
# special case for removeDestination: Only ipv, no address
for ipv in a.remove_destination:
cmd.check_destination_ipv(ipv)
if ipv not in settings.getDestinations():
if len(a.remove_destination) > 1:
cmd.print_warning("Warning: NOT_ENABLED: '%s'" % ipv)
else:
code = FirewallError.get_code("NOT_ENABLED")
cmd.print_and_exit("Error: NOT_ENABLED: '%s'" % ipv, code)
else:
settings.removeDestination(ipv)
service.update(settings)
elif a.query_destination:
cmd.query_sequence(
a.query_destination,
settings.queryDestination,
cmd.parse_service_destination,
"'%s'",
)
elif a.get_destinations:
l = settings.getDestinations()
cmd.print_and_exit(
" ".join(["%s:%s" % (dest[0], dest[1]) for dest in l.items()])
)
elif a.add_include:
cmd.add_sequence(
a.add_include, settings.addInclude, settings.queryInclude, None, "'%s'"
)
service.update(settings)
elif a.remove_include:
cmd.remove_sequence(
a.remove_include,
settings.removeInclude,
settings.queryInclude,
None,
"'%s'",
)
service.update(settings)
elif a.query_include:
cmd.query_sequence(a.query_include, settings.queryInclude, None, "'%s'")
elif a.get_includes:
l = settings.getIncludes()
cmd.print_and_exit(" ".join(["%s" % include for include in sorted(l)]))
elif a.add_helper:
cmd.add_sequence(
a.add_helper, settings.addHelper, settings.queryHelper, None, "'%s'"
)
service.update(settings)
elif a.remove_helper:
cmd.remove_sequence(
a.remove_helper,
settings.removeHelper,
settings.queryHelper,
None,
"'%s'",
)
service.update(settings)
elif a.query_helper:
cmd.query_sequence(a.query_helper, settings.queryHelper, None, "'%s'")
elif a.get_service_helpers:
l = settings.getHelpers()
cmd.print_and_exit(" ".join(["%s" % helper for helper in sorted(l)]))
elif a.set_description:
settings.setDescription(a.set_description)
service.update(settings)
elif a.get_description:
cmd.print_and_exit(settings.getDescription())
elif a.set_short:
settings.setShort(a.set_short)
service.update(settings)
elif a.get_short:
cmd.print_and_exit(settings.getShort())
else:
cmd.fail(parser.format_usage() + "Unknown option")
# lockdown whitelist
elif options_lockdown_whitelist:
policies = fw.config().policies()
# commands
if a.list_lockdown_whitelist_commands:
l = policies.getLockdownWhitelistCommands()
cmd.print_and_exit("\n".join(l))
elif a.add_lockdown_whitelist_command:
cmd.add_sequence(
a.add_lockdown_whitelist_command,
policies.addLockdownWhitelistCommand,
policies.queryLockdownWhitelistCommand,
None,
"'%s'",
)
elif a.remove_lockdown_whitelist_command:
cmd.remove_sequence(
a.remove_lockdown_whitelist_command,
policies.removeLockdownWhitelistCommand,
policies.queryLockdownWhitelistCommand,
None,
"'%s'",
)
elif a.query_lockdown_whitelist_command:
cmd.query_sequence(
a.query_lockdown_whitelist_command,
policies.queryLockdownWhitelistCommand,
None,
"'%s'",
)
# contexts
elif a.list_lockdown_whitelist_contexts:
l = policies.getLockdownWhitelistContexts()
cmd.print_and_exit("\n".join(l))
elif a.add_lockdown_whitelist_context:
cmd.add_sequence(
a.add_lockdown_whitelist_context,
policies.addLockdownWhitelistContext,
policies.queryLockdownWhitelistContext,
None,
"'%s'",
)
elif a.remove_lockdown_whitelist_context:
cmd.remove_sequence(
a.remove_lockdown_whitelist_context,
policies.removeLockdownWhitelistContext,
policies.queryLockdownWhitelistContext,
None,
"'%s'",
)
elif a.query_lockdown_whitelist_context:
cmd.query_sequence(
a.query_lockdown_whitelist_context,
policies.queryLockdownWhitelistContext,
None,
"'%s'",
)
# uids
elif a.list_lockdown_whitelist_uids:
l = policies.getLockdownWhitelistUids()
cmd.print_and_exit(" ".join(map(str, l)))
elif a.add_lockdown_whitelist_uid is not None:
cmd.add_sequence(
a.add_lockdown_whitelist_uid,
policies.addLockdownWhitelistUid,
policies.queryLockdownWhitelistUid,
None,
"%s",
)
elif a.remove_lockdown_whitelist_uid is not None:
cmd.remove_sequence(
a.remove_lockdown_whitelist_uid,
policies.removeLockdownWhitelistUid,
policies.queryLockdownWhitelistUid,
None,
"%s",
)
elif a.query_lockdown_whitelist_uid is not None:
cmd.query_sequence(
a.query_lockdown_whitelist_uid,
policies.queryLockdownWhitelistUid,
None,
"%s",
)
# users
elif a.list_lockdown_whitelist_users:
l = policies.getLockdownWhitelistUsers()
cmd.print_and_exit("\n".join(l))
elif a.add_lockdown_whitelist_user:
cmd.add_sequence(
a.add_lockdown_whitelist_user,
policies.addLockdownWhitelistUser,
policies.queryLockdownWhitelistUser,
None,
"%s",
)
elif a.remove_lockdown_whitelist_user:
cmd.remove_sequence(
a.remove_lockdown_whitelist_user,
policies.removeLockdownWhitelistUser,
policies.queryLockdownWhitelistUser,
None,
"%s",
)
elif a.query_lockdown_whitelist_user:
cmd.query_sequence(
a.query_lockdown_whitelist_user,
policies.queryLockdownWhitelistUser,
None,
"'%s'",
)
elif options_direct:
direct = fw.config().direct()
if a.passthrough:
if len(a.passthrough) < 2:
cmd.fail(
"usage: --permanent --direct --passthrough { ipv4 | ipv6 | eb } <args>"
)
cmd.print_msg(
direct.addPassthrough(
cmd.check_ipv(a.passthrough[0]), splitArgs(a.passthrough[1])
)
)
if a.add_passthrough:
if len(a.add_passthrough) < 2:
cmd.fail(
"usage: --permanent --direct --add-passthrough { ipv4 | ipv6 | eb } <args>"
)
cmd.print_msg(
direct.addPassthrough(
cmd.check_ipv(a.add_passthrough[0]), splitArgs(a.add_passthrough[1])
)
)
elif a.remove_passthrough:
if len(a.remove_passthrough) < 2:
cmd.fail(
"usage: --permanent --direct --remove-passthrough { ipv4 | ipv6 | eb } <args>"
)
direct.removePassthrough(
cmd.check_ipv(a.remove_passthrough[0]),
splitArgs(a.remove_passthrough[1]),
)
elif a.query_passthrough:
if len(a.query_passthrough) < 2:
cmd.fail(
"usage: --permanent --direct --query-passthrough { ipv4 | ipv6 | eb } <args>"
)
cmd.print_query_result(
direct.queryPassthrough(
cmd.check_ipv(a.query_passthrough[0]),
splitArgs(a.query_passthrough[1]),
)
)
sys.exit(0)
elif a.get_passthroughs:
rules = direct.getPassthroughs(cmd.check_ipv(a.get_passthroughs[0]))
for rule in rules:
cmd.print_msg(joinArgs(rule))
sys.exit(0)
elif a.get_all_passthroughs:
for ipv, rule in direct.getAllPassthroughs():
cmd.print_msg("%s %s" % (ipv, joinArgs(rule)))
sys.exit(0)
elif a.add_chain:
direct.addChain(
cmd.check_ipv(a.add_chain[0]), a.add_chain[1], a.add_chain[2]
)
elif a.remove_chain:
direct.removeChain(
cmd.check_ipv(a.remove_chain[0]), a.remove_chain[1], a.remove_chain[2]
)
elif a.query_chain:
cmd.print_query_result(
direct.queryChain(
cmd.check_ipv(a.query_chain[0]), a.query_chain[1], a.query_chain[2]
)
)
sys.exit(0)
elif a.get_chains:
cmd.print_and_exit(
" ".join(
direct.getChains(cmd.check_ipv(a.get_chains[0]), a.get_chains[1])
)
)
sys.exit(0)
elif a.get_all_chains:
chains = direct.getAllChains()
for ipv, table, chain in chains:
cmd.print_msg("%s %s %s" % (ipv, table, chain))
sys.exit(0)
elif a.add_rule:
if len(a.add_rule) < 5:
cmd.fail(
"usage: --permanent --direct --add-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
try:
priority = int(a.add_rule[3])
except ValueError:
cmd.fail(
"usage: --permanent --direct --add-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
direct.addRule(
cmd.check_ipv(a.add_rule[0]),
a.add_rule[1],
a.add_rule[2],
priority,
splitArgs(a.add_rule[4]),
)
elif a.remove_rule:
if len(a.remove_rule) < 5:
cmd.fail(
"usage: --permanent --direct --remove-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
try:
priority = int(a.remove_rule[3])
except ValueError:
cmd.fail(
"usage: --permanent --direct --remove-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
direct.removeRule(
cmd.check_ipv(a.remove_rule[0]),
a.remove_rule[1],
a.remove_rule[2],
priority,
splitArgs(a.remove_rule[4]),
)
elif a.remove_rules:
if len(a.remove_rules) < 3:
cmd.fail(
"usage: --permanent --direct --remove-rules { ipv4 | ipv6 | eb } <table> <chain>"
)
direct.removeRules(
cmd.check_ipv(a.remove_rules[0]), a.remove_rules[1], a.remove_rules[2]
)
elif a.query_rule:
if len(a.query_rule) < 5:
cmd.fail(
"usage: --permanent --direct --query-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
try:
priority = int(a.query_rule[3])
except ValueError:
cmd.fail(
"usage: --permanent --direct --query-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
cmd.print_query_result(
direct.queryRule(
cmd.check_ipv(a.query_rule[0]),
a.query_rule[1],
a.query_rule[2],
priority,
splitArgs(a.query_rule[4]),
)
)
sys.exit(0)
elif a.get_rules:
rules = direct.getRules(
cmd.check_ipv(a.get_rules[0]), a.get_rules[1], a.get_rules[2]
)
for priority, rule in rules:
cmd.print_msg("%d %s" % (priority, joinArgs(rule)))
sys.exit(0)
elif a.get_all_rules:
rules = direct.getAllRules()
for ipv, table, chain, priority, rule in rules:
cmd.print_msg(
"%s %s %s %d %s" % (ipv, table, chain, priority, joinArgs(rule))
)
sys.exit(0)
elif a.list_all_policies:
names = fw.config().getPolicyNames()
for policy in sorted(names):
settings = fw.config().getPolicyByName(policy).getSettings()
cmd.print_policy_info(policy, settings)
cmd.print_msg("")
sys.exit(0)
elif a.policy:
fw_policy = fw.config().getPolicyByName(a.policy)
settings = fw_policy.getSettings()
# list all policy settings
if a.list_all:
cmd.print_policy_info(a.policy, settings)
sys.exit(0)
# ingress zones
elif a.list_ingress_zones:
l = settings.getIngressZones()
cmd.print_and_exit(" ".join(sorted(l)))
elif a.add_ingress_zone:
cmd.add_sequence(
a.add_ingress_zone,
settings.addIngressZone,
settings.queryIngressZone,
None,
"'%s'",
)
elif a.remove_ingress_zone:
cmd.remove_sequence(
a.remove_ingress_zone,
settings.removeIngressZone,
settings.queryIngressZone,
None,
"'%s'",
)
elif a.query_ingress_zone:
cmd.query_sequence(
a.query_ingress_zone, settings.queryIngressZone, None, "'%s'"
)
# egress zones
elif a.list_egress_zones:
l = settings.getEgressZones()
cmd.print_and_exit(" ".join(sorted(l)))
elif a.add_egress_zone:
cmd.add_sequence(
a.add_egress_zone,
settings.addEgressZone,
settings.queryEgressZone,
None,
"'%s'",
)
elif a.remove_egress_zone:
cmd.remove_sequence(
a.remove_egress_zone,
settings.removeEgressZone,
settings.queryEgressZone,
None,
"'%s'",
)
elif a.query_egress_zone:
cmd.query_sequence(
a.query_egress_zone, settings.queryEgressZone, None, "'%s'"
)
# priority
elif a.get_priority:
cmd.print_and_exit(str(settings.getPriority()))
elif a.set_priority:
settings.setPriority(a.set_priority)
# rich rules
elif a.list_rich_rules:
l = settings.getRichRules()
cmd.print_and_exit("\n".join(l))
elif a.add_rich_rule:
cmd.add_sequence(
a.add_rich_rule,
settings.addRichRule,
settings.queryRichRule,
None,
"'%s'",
)
elif a.remove_rich_rule:
cmd.remove_sequence(
a.remove_rich_rule,
settings.removeRichRule,
settings.queryRichRule,
None,
"'%s'",
)
elif a.query_rich_rule:
cmd.query_sequence(a.query_rich_rule, settings.queryRichRule, None, "'%s'")
# service
elif a.list_services:
l = settings.getServices()
cmd.print_and_exit(" ".join(sorted(l)))
elif a.add_service:
cmd.add_sequence(
a.add_service, settings.addService, settings.queryService, None, "'%s'"
)
elif a.remove_service:
cmd.remove_sequence(
a.remove_service,
settings.removeService,
settings.queryService,
None,
"'%s'",
)
elif a.query_service:
cmd.query_sequence(a.query_service, settings.queryService, None, "'%s'")
# port
elif a.list_ports:
l = settings.getPorts()
cmd.print_and_exit(
" ".join(
[
"%s/%s" % (port[0], port[1])
for port in sorted(
l, key=lambda x: (x[1], getPortRange(x[0])[0])
)
]
)
)
elif a.add_port:
cmd.add_sequence(
a.add_port,
settings.addPort,
settings.queryPort,
cmd.parse_port,
"%s/%s",
)
elif a.remove_port:
cmd.remove_sequence(
a.remove_port,
settings.removePort,
settings.queryPort,
cmd.parse_port,
"%s/%s",
)
elif a.query_port:
cmd.query_sequence(
a.query_port, settings.queryPort, cmd.parse_port, "%s/%s"
)
# protocol
elif a.list_protocols:
l = settings.getProtocols()
cmd.print_and_exit(" ".join(["%s" % protocol for protocol in sorted(l)]))
elif a.add_protocol:
cmd.add_sequence(
a.add_protocol,
settings.addProtocol,
settings.queryProtocol,
None,
"'%s'",
)
elif a.remove_protocol:
cmd.remove_sequence(
a.remove_protocol,
settings.removeProtocol,
settings.queryProtocol,
None,
"'%s'",
)
elif a.query_protocol:
cmd.query_sequence(a.query_protocol, settings.queryProtocol, None, "'%s'")
# source port
elif a.list_source_ports:
l = settings.getSourcePorts()
cmd.print_and_exit(
" ".join(
[
"%s/%s" % (port[0], port[1])
for port in sorted(
l, key=lambda x: (x[1], getPortRange(x[0])[0])
)
]
)
)
elif a.add_source_port:
cmd.add_sequence(
a.add_source_port,
settings.addSourcePort,
settings.querySourcePort,
cmd.parse_port,
"%s/%s",
)
elif a.remove_source_port:
cmd.remove_sequence(
a.remove_source_port,
settings.removeSourcePort,
settings.querySourcePort,
cmd.parse_port,
"%s/%s",
)
elif a.query_source_port:
cmd.query_sequence(
a.query_source_port, settings.querySourcePort, cmd.parse_port, "%s/%s"
)
# masquerade
elif a.add_masquerade:
settings.addMasquerade()
elif a.remove_masquerade:
settings.removeMasquerade()
elif a.query_masquerade:
cmd.print_query_result(settings.queryMasquerade())
# forward port
elif a.list_forward_ports:
l = settings.getForwardPorts()
cmd.print_and_exit(
"\n".join(
[
"port=%s:proto=%s:toport=%s:toaddr=%s"
% (port, protocol, toport, toaddr)
for (port, protocol, toport, toaddr) in l
]
)
)
elif a.add_forward_port:
cmd.add_sequence(
a.add_forward_port,
settings.addForwardPort,
settings.queryForwardPort,
cmd.parse_forward_port,
"port=%s:proto=%s:toport=%s:toaddr=%s",
)
elif a.remove_forward_port:
cmd.remove_sequence(
a.remove_forward_port,
settings.removeForwardPort,
settings.queryForwardPort,
cmd.parse_forward_port,
"port=%s:proto=%s:toport=%s:toaddr=%s",
)
elif a.query_forward_port:
cmd.query_sequence(
a.query_forward_port,
settings.queryForwardPort,
cmd.parse_forward_port,
"port=%s:proto=%s:toport=%s:toaddr=%s",
)
# block icmp
elif a.list_icmp_blocks:
l = settings.getIcmpBlocks()
cmd.print_and_exit(" ".join(l))
elif a.add_icmp_block:
cmd.add_sequence(
a.add_icmp_block,
settings.addIcmpBlock,
settings.queryIcmpBlock,
None,
"'%s'",
)
elif a.remove_icmp_block:
cmd.remove_sequence(
a.remove_icmp_block,
settings.removeIcmpBlock,
settings.queryIcmpBlock,
None,
"'%s'",
)
elif a.query_icmp_block:
cmd.query_sequence(
a.query_icmp_block, settings.queryIcmpBlock, None, "'%s'"
)
# target
elif a.get_target:
target = settings.getTarget()
cmd.print_and_exit(target)
elif a.set_target:
settings.setTarget(a.set_target)
# set description
elif a.set_description:
settings = fw.config().getPolicyByName(a.policy).getSettings()
settings.setDescription(a.set_description)
# get description
elif a.get_description:
settings = fw.config().getPolicyByName(a.policy).getSettings()
cmd.print_and_exit(settings.getDescription())
# set short description
elif a.set_short:
settings = fw.config().getPolicyByName(a.policy).getSettings()
settings.setShort(a.set_short)
# get short description
elif a.get_short:
settings = fw.config().getPolicyByName(a.policy).getSettings()
cmd.print_and_exit(settings.getShort())
fw_policy.update(settings)
else:
if zone == "":
zone = fw.getDefaultZone()
fw_zone = fw.config().getZoneByName(zone)
settings = fw_zone.getSettings()
# interface
if a.list_interfaces:
interfaces = sorted(
set(try_nm_get_interfaces_in_zone(zone)) | set(fw_zone.getInterfaces())
)
cmd.print_and_exit(" ".join(interfaces))
elif a.get_zone_of_interface:
for interface in a.get_zone_of_interface:
# ask NM before checking our config
zone = try_get_zone_of_interface(interface)
if not zone:
zone = fw.config().getZoneOfInterface(interface)
if zone:
if len(a.get_zone_of_interface) > 1:
cmd.print_warning("%s: %s" % (interface, zone))
else:
cmd.print_and_exit(zone)
else:
if len(a.get_zone_of_interface) > 1:
cmd.print_warning("%s: no zone" % interface)
else:
cmd.fail("no zone")
elif a.change_interface:
interfaces = []
for interface in a.change_interface:
if not try_set_zone_of_interface(zone, interface):
interfaces.append(interface)
for interface in interfaces:
old_zone_name = fw.config().getZoneOfInterface(interface)
if old_zone_name != zone:
if old_zone_name:
old_zone_obj = fw.config().getZoneByName(old_zone_name)
old_zone_obj.removeInterface(interface) # remove from old
fw_zone.addInterface(interface) # add to new
elif a.add_interface:
interfaces = []
for interface in a.add_interface:
if not try_set_zone_of_interface(a.zone, interface):
interfaces.append(interface)
cmd.add_sequence(
interfaces, fw_zone.addInterface, fw_zone.queryInterface, None, "'%s'"
)
elif a.remove_interface:
interfaces = []
for interface in a.remove_interface:
if not try_set_zone_of_interface("", interface):
interfaces.append(interface)
cmd.remove_sequence(
interfaces,
fw_zone.removeInterface,
fw_zone.queryInterface,
None,
"'%s'",
)
elif a.query_interface:
cmd.query_sequence(a.query_interface, fw_zone.queryInterface, None, "'%s'")
# source
if a.list_sources:
sources = fw_zone.getSources()
cmd.print_and_exit(" ".join(sources))
elif a.get_zone_of_source:
for source in a.get_zone_of_source:
zone = fw.config().getZoneOfSource(source)
if zone:
if len(a.get_zone_of_source) > 1:
cmd.print_warning("%s: %s" % (source, zone))
else:
cmd.print_and_exit(zone)
else:
if len(a.get_zone_of_source) > 1:
cmd.print_warning("%s: no zone" % source)
else:
cmd.fail("no zone")
elif a.change_source:
for source in a.change_source:
old_zone_name = fw.config().getZoneOfSource(source)
if old_zone_name != zone:
if old_zone_name:
old_zone_obj = fw.config().getZoneByName(old_zone_name)
old_zone_obj.removeSource(source) # remove from old
fw_zone.addSource(source) # add to new
elif a.add_source:
cmd.add_sequence(
a.add_source, fw_zone.addSource, fw_zone.querySource, None, "'%s'"
)
elif a.remove_source:
cmd.remove_sequence(
a.remove_source, fw_zone.removeSource, fw_zone.querySource, None, "'%s'"
)
elif a.query_source:
cmd.query_sequence(a.query_source, fw_zone.querySource, None, "'%s'")
# rich rules
if a.list_rich_rules:
l = fw_zone.getRichRules()
cmd.print_and_exit("\n".join(l))
elif a.add_rich_rule:
cmd.add_sequence(
a.add_rich_rule,
fw_zone.addRichRule,
fw_zone.queryRichRule,
None,
"'%s'",
)
elif a.remove_rich_rule:
cmd.remove_sequence(
a.remove_rich_rule,
fw_zone.removeRichRule,
fw_zone.queryRichRule,
None,
"'%s'",
)
elif a.query_rich_rule:
cmd.query_sequence(a.query_rich_rule, fw_zone.queryRichRule, None, "'%s'")
# service
if a.list_services:
l = fw_zone.getServices()
cmd.print_and_exit(" ".join(sorted(l)))
elif a.add_service:
cmd.add_sequence(
a.add_service, fw_zone.addService, fw_zone.queryService, None, "'%s'"
)
elif a.remove_service:
cmd.remove_sequence(
a.remove_service,
fw_zone.removeService,
fw_zone.queryService,
None,
"'%s'",
)
elif a.query_service:
cmd.query_sequence(a.query_service, fw_zone.queryService, None, "'%s'")
# port
elif a.list_ports:
l = fw_zone.getPorts()
cmd.print_and_exit(
" ".join(
[
"%s/%s" % (port[0], port[1])
for port in sorted(
l, key=lambda x: (x[1], getPortRange(x[0])[0])
)
]
)
)
elif a.add_port:
cmd.add_sequence(
a.add_port, fw_zone.addPort, fw_zone.queryPort, cmd.parse_port, "%s/%s"
)
elif a.remove_port:
cmd.remove_sequence(
a.remove_port,
fw_zone.removePort,
fw_zone.queryPort,
cmd.parse_port,
"%s/%s",
)
elif a.query_port:
cmd.query_sequence(a.query_port, fw_zone.queryPort, cmd.parse_port, "%s/%s")
# protocol
elif a.list_protocols:
l = fw_zone.getProtocols()
cmd.print_and_exit(" ".join(["%s" % protocol for protocol in sorted(l)]))
elif a.add_protocol:
cmd.add_sequence(
a.add_protocol, fw_zone.addProtocol, fw_zone.queryProtocol, None, "'%s'"
)
elif a.remove_protocol:
cmd.remove_sequence(
a.remove_protocol,
fw_zone.removeProtocol,
fw_zone.queryProtocol,
None,
"'%s'",
)
elif a.query_protocol:
cmd.query_sequence(a.query_protocol, fw_zone.queryProtocol, None, "'%s'")
# source port
elif a.list_source_ports:
l = fw_zone.getSourcePorts()
cmd.print_and_exit(
" ".join(
[
"%s/%s" % (port[0], port[1])
for port in sorted(
l, key=lambda x: (x[1], getPortRange(x[0])[0])
)
]
)
)
elif a.add_source_port:
cmd.add_sequence(
a.add_source_port,
fw_zone.addSourcePort,
fw_zone.querySourcePort,
cmd.parse_port,
"%s/%s",
)
elif a.remove_source_port:
cmd.remove_sequence(
a.remove_source_port,
fw_zone.removeSourcePort,
fw_zone.querySourcePort,
cmd.parse_port,
"%s/%s",
)
elif a.query_source_port:
cmd.query_sequence(
a.query_source_port, fw_zone.querySourcePort, cmd.parse_port, "%s/%s"
)
# forward
elif a.add_forward:
fw_zone.addForward()
elif a.remove_forward:
fw_zone.removeForward()
elif a.query_forward:
cmd.print_query_result(fw_zone.queryForward())
# masquerade
elif a.add_masquerade:
fw_zone.addMasquerade()
elif a.remove_masquerade:
fw_zone.removeMasquerade()
elif a.query_masquerade:
cmd.print_query_result(fw_zone.queryMasquerade())
# forward port
elif a.list_forward_ports:
l = fw_zone.getForwardPorts()
cmd.print_and_exit(
"\n".join(
[
"port=%s:proto=%s:toport=%s:toaddr=%s"
% (port, protocol, toport, toaddr)
for (port, protocol, toport, toaddr) in l
]
)
)
elif a.add_forward_port:
cmd.add_sequence(
a.add_forward_port,
fw_zone.addForwardPort,
fw_zone.queryForwardPort,
cmd.parse_forward_port,
"port=%s:proto=%s:toport=%s:toaddr=%s",
)
elif a.remove_forward_port:
cmd.remove_sequence(
a.remove_forward_port,
fw_zone.removeForwardPort,
fw_zone.queryForwardPort,
cmd.parse_forward_port,
"port=%s:proto=%s:toport=%s:toaddr=%s",
)
elif a.query_forward_port:
cmd.query_sequence(
a.query_forward_port,
fw_zone.queryForwardPort,
cmd.parse_forward_port,
"port=%s:proto=%s:toport=%s:toaddr=%s",
)
# block icmp
elif a.list_icmp_blocks:
l = fw_zone.getIcmpBlocks()
cmd.print_and_exit(" ".join(l))
elif a.add_icmp_block:
cmd.add_sequence(
a.add_icmp_block,
fw_zone.addIcmpBlock,
fw_zone.queryIcmpBlock,
None,
"'%s'",
)
elif a.remove_icmp_block:
cmd.remove_sequence(
a.remove_icmp_block,
fw_zone.removeIcmpBlock,
fw_zone.queryIcmpBlock,
None,
"'%s'",
)
elif a.query_icmp_block:
cmd.query_sequence(a.query_icmp_block, fw_zone.queryIcmpBlock, None, "'%s'")
# icmp block inversion
elif a.add_icmp_block_inversion:
fw_zone.addIcmpBlockInversion()
elif a.remove_icmp_block_inversion:
fw_zone.removeIcmpBlockInversion()
elif a.query_icmp_block_inversion:
cmd.print_query_result(fw_zone.queryIcmpBlockInversion())
# zone target
elif a.get_target:
target = fw_zone.getTarget()
cmd.print_and_exit(target if target != "%%REJECT%%" else "REJECT")
elif a.set_target:
fw_zone.setTarget(
a.set_target if a.set_target != "REJECT" else "%%REJECT%%"
)
# list all zone settings
elif a.list_all:
interfaces = try_nm_get_interfaces_in_zone(zone)
cmd.print_zone_info(
zone,
settings,
extra_interfaces=interfaces,
default_zone=fw.getDefaultZone(),
)
sys.exit(0)
# list everything
elif a.list_all_zones:
names = fw.config().getZoneNames()
default_zone = fw.getDefaultZone()
for zone in sorted(names):
interfaces = try_nm_get_interfaces_in_zone(zone)
cmd.print_zone_info(
zone,
fw.config().getZoneByName(zone).getSettings(),
default_zone=default_zone,
extra_interfaces=interfaces,
)
cmd.print_msg("")
sys.exit(0)
# set zone description
elif a.set_description:
settings.setDescription(a.set_description)
fw_zone.update(settings)
# get zone description
elif a.get_description:
cmd.print_and_exit(settings.getDescription())
# set zone short description
elif a.set_short:
settings.setShort(a.set_short)
fw_zone.update(settings)
# get zone short description
elif a.get_short:
cmd.print_and_exit(settings.getShort())
# priority
elif a.get_priority:
cmd.print_and_exit(str(settings.getPriority()))
elif a.set_priority:
settings.setPriority(a.set_priority)
fw_zone.update(settings)
elif a.get_ingress_priority:
cmd.print_and_exit(str(settings.getIngressPriority()))
elif a.set_ingress_priority:
settings.setIngressPriority(a.set_ingress_priority)
fw_zone.update(settings)
elif a.get_egress_priority:
cmd.print_and_exit(str(settings.getEgressPriority()))
elif a.set_egress_priority:
settings.setEgressPriority(a.set_egress_priority)
fw_zone.update(settings)
elif a.version:
cmd.print_and_exit(fw.get_property("version"))
elif a.state:
state = fw.get_property("state")
if state == "RUNNING":
cmd.print_and_exit("running")
elif state == "FAILED":
cmd.print_and_exit("failed", errors.RUNNING_BUT_FAILED)
else:
cmd.print_and_exit("not running", errors.NOT_RUNNING)
elif a.get_log_denied:
cmd.print_and_exit(fw.getLogDenied())
elif a.set_log_denied:
fw.setLogDenied(a.set_log_denied)
elif a.get_automatic_helpers:
cmd.print_and_exit(fw.getAutomaticHelpers())
elif a.set_automatic_helpers:
fw.setAutomaticHelpers(a.set_automatic_helpers)
elif a.get_ipset_types:
types = fw.get_property("IPSetTypes")
cmd.print_and_exit(" ".join(sorted(types)))
elif a.reload:
fw.reload()
elif a.complete_reload:
fw.complete_reload()
elif a.runtime_to_permanent:
fw.runtimeToPermanent()
elif a.reset_to_defaults:
fw.resetToDefaults()
elif a.check_config:
fw.checkPermanentConfig()
elif a.direct:
if a.passthrough:
if len(a.passthrough) < 2:
cmd.fail("usage: --direct --passthrough { ipv4 | ipv6 | eb } <args>")
msg = fw.passthrough(
cmd.check_ipv(a.passthrough[0]), splitArgs(a.passthrough[1])
)
if msg:
sys.stdout.write(msg + "\n")
elif a.add_passthrough:
if len(a.add_passthrough) < 2:
cmd.fail("usage: --direct --add-passthrough { ipv4 | ipv6 | eb } <args>")
fw.addPassthrough(
cmd.check_ipv(a.add_passthrough[0]), splitArgs(a.add_passthrough[1])
)
elif a.remove_passthrough:
if len(a.remove_passthrough) < 2:
cmd.fail("usage: --direct --remove-passthrough { ipv4 | ipv6 | eb } <args>")
fw.removePassthrough(
cmd.check_ipv(a.remove_passthrough[0]), splitArgs(a.remove_passthrough[1])
)
elif a.query_passthrough:
if len(a.query_passthrough) < 2:
cmd.fail("usage: --direct --query-passthrough { ipv4 | ipv6 | eb } <args>")
cmd.print_query_result(
fw.queryPassthrough(
cmd.check_ipv(a.query_passthrough[0]), splitArgs(a.query_passthrough[1])
)
)
elif a.get_passthroughs:
rules = fw.getPassthroughs(cmd.check_ipv(a.get_passthroughs[0]))
for rule in rules:
cmd.print_msg(joinArgs(rule))
sys.exit(0)
elif a.get_all_passthroughs:
for ipv, rule in fw.getAllPassthroughs():
cmd.print_msg("%s %s" % (ipv, joinArgs(rule)))
sys.exit(0)
elif a.add_chain:
fw.addChain(cmd.check_ipv(a.add_chain[0]), a.add_chain[1], a.add_chain[2])
elif a.remove_chain:
fw.removeChain(
cmd.check_ipv(a.remove_chain[0]), a.remove_chain[1], a.remove_chain[2]
)
elif a.query_chain:
cmd.print_query_result(
fw.queryChain(
cmd.check_ipv(a.query_chain[0]), a.query_chain[1], a.query_chain[2]
)
)
elif a.get_chains:
cmd.print_and_exit(
" ".join(fw.getChains(cmd.check_ipv(a.get_chains[0]), a.get_chains[1]))
)
elif a.get_all_chains:
chains = fw.getAllChains()
for ipv, table, chain in chains:
cmd.print_msg("%s %s %s" % (ipv, table, chain))
sys.exit(0)
elif a.add_rule:
if len(a.add_rule) < 5:
cmd.fail(
"usage: --direct --add-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
try:
priority = int(a.add_rule[3])
except ValueError:
cmd.fail(
"usage: --direct --add-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
fw.addRule(
cmd.check_ipv(a.add_rule[0]),
a.add_rule[1],
a.add_rule[2],
priority,
splitArgs(a.add_rule[4]),
)
elif a.remove_rule:
if len(a.remove_rule) < 5:
cmd.fail(
"usage: --direct --remove-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
try:
priority = int(a.remove_rule[3])
except ValueError:
cmd.fail(
"usage: --direct --remove-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
fw.removeRule(
cmd.check_ipv(a.remove_rule[0]),
a.remove_rule[1],
a.remove_rule[2],
priority,
splitArgs(a.remove_rule[4]),
)
elif a.remove_rules:
if len(a.remove_rules) < 3:
cmd.fail(
"usage: --direct --remove-rules { ipv4 | ipv6 | eb } <table> <chain>"
)
fw.removeRules(
cmd.check_ipv(a.remove_rules[0]), a.remove_rules[1], a.remove_rules[2]
)
elif a.query_rule:
if len(a.query_rule) < 5:
cmd.fail(
"usage: --direct --query-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
try:
priority = int(a.query_rule[3])
except ValueError:
cmd.fail(
"usage: --direct --query-rule { ipv4 | ipv6 | eb } <table> <chain> <priority> <args>"
)
cmd.print_query_result(
fw.queryRule(
cmd.check_ipv(a.query_rule[0]),
a.query_rule[1],
a.query_rule[2],
priority,
splitArgs(a.query_rule[4]),
)
)
elif a.get_rules:
rules = fw.getRules(
cmd.check_ipv(a.get_rules[0]), a.get_rules[1], a.get_rules[2]
)
for priority, rule in rules:
cmd.print_msg("%d %s" % (priority, joinArgs(rule)))
sys.exit(0)
elif a.get_all_rules:
rules = fw.getAllRules()
for ipv, table, chain, priority, rule in rules:
cmd.print_msg(
"%s %s %s %d %s" % (ipv, table, chain, priority, joinArgs(rule))
)
sys.exit(0)
elif a.get_default_zone:
cmd.print_and_exit(fw.getDefaultZone())
elif a.set_default_zone:
fw.setDefaultZone(a.set_default_zone)
elif a.get_zones:
cmd.print_and_exit(" ".join(fw.getZones()))
elif a.get_active_zones:
zones = fw.getActiveZones()
default_zone = fw.getDefaultZone()
for zone in zones:
cmd.print_msg("%s%s" % (zone, " (default)" if zone == default_zone else ""))
for x in ["interfaces", "sources"]:
if x in zones[zone] and zones[zone][x]:
cmd.print_msg(" %s: %s" % (x, " ".join(zones[zone][x])))
sys.exit(0)
elif a.get_policies:
cmd.print_and_exit(" ".join(fw.getPolicies()))
elif a.get_active_policies:
policies = fw.getActivePolicies()
for policy in policies:
cmd.print_msg("%s" % policy)
for x in ["ingress_zones", "egress_zones"]:
if x in policies[policy]:
cmd.print_msg(
" %s: %s" % (x.replace("_", "-"), " ".join(policies[policy][x]))
)
sys.exit(0)
elif a.get_services:
l = fw.listServices()
cmd.print_and_exit(" ".join(l))
elif a.get_icmptypes:
l = fw.listIcmpTypes()
cmd.print_and_exit(" ".join(l))
# panic
elif a.panic_on:
fw.enablePanicMode()
elif a.panic_off:
fw.disablePanicMode()
elif a.query_panic:
cmd.print_query_result(fw.queryPanicMode())
# ipset
elif a.get_ipsets:
ipsets = fw.getIPSets()
cmd.print_and_exit(" ".join(sorted(ipsets)))
elif a.info_ipset:
cmd.print_ipset_info(a.info_ipset, fw.getIPSetSettings(a.info_ipset))
sys.exit(0)
elif a.add_entry:
cmd.x_add_sequence(a.ipset, a.add_entry, fw.addEntry, fw.queryEntry, None, "'%s'")
elif a.remove_entry:
cmd.x_remove_sequence(
a.ipset, a.remove_entry, fw.removeEntry, fw.queryEntry, None, "'%s'"
)
elif a.query_entry:
cmd.x_query_sequence(a.ipset, a.query_entry, fw.queryEntry, None, "'%s'")
elif a.get_entries:
l = fw.getEntries(a.ipset)
cmd.print_and_exit("\n".join(l))
elif a.add_entries_from_file:
old_entries = fw.getEntries(a.ipset)
changed = False
for filename in a.add_entries_from_file:
try:
entries = cmd.get_ipset_entries_from_file(filename)
except IOError as msg:
message = "Failed to read file '%s': %s" % (filename, msg)
if len(a.add_entries_from_file) > 1:
cmd.print_warning(message)
else:
cmd.print_and_exit(message)
else:
entries_set = set()
for entry in old_entries:
entries_set.add(entry)
for entry in entries:
if entry not in entries_set:
old_entries.append(entry)
entries_set.add(entry)
changed = True
else:
cmd.print_if_verbose("Warning: ALREADY_ENABLED: %s" % entry)
if changed:
fw.setEntries(a.ipset, old_entries)
elif a.remove_entries_from_file:
old_entries = fw.getEntries(a.ipset)
changed = False
for filename in a.remove_entries_from_file:
try:
entries = cmd.get_ipset_entries_from_file(filename)
except IOError as msg:
message = "Failed to read file '%s': %s" % (filename, msg)
if len(a.remove_entries_from_file) > 1:
cmd.print_warning(message)
else:
cmd.print_and_exit(message)
else:
entries_set = set()
for entry in old_entries:
entries_set.add(entry)
for entry in entries:
if entry in entries_set:
old_entries.remove(entry)
entries_set.discard(entry)
changed = True
else:
cmd.print_if_verbose("Warning: NOT_ENABLED: %s" % entry)
if changed:
fw.setEntries(a.ipset, old_entries)
# helper
elif a.get_helpers:
helpers = fw.getHelpers()
cmd.print_and_exit(" ".join(sorted(helpers)))
elif a.info_helper:
cmd.print_helper_info(a.info_helper, fw.getHelperSettings(a.info_helper))
sys.exit(0)
# lockdown
elif a.lockdown_on:
fw.config().set_property("Lockdown", "yes") # permanent
fw.enableLockdown() # runtime
elif a.lockdown_off:
fw.config().set_property("Lockdown", "no") # permanent
fw.disableLockdown() # runtime
elif a.query_lockdown:
cmd.print_query_result(fw.queryLockdown()) # runtime
# lockdown = fw.config().get_property("Lockdown")
# cmd.print_query_result(lockdown.lower() in [ "yes", "true" ])
# lockdown whitelist
# commands
elif a.list_lockdown_whitelist_commands:
l = fw.getLockdownWhitelistCommands()
cmd.print_and_exit("\n".join(l))
elif a.add_lockdown_whitelist_command:
cmd.add_sequence(
a.add_lockdown_whitelist_command,
fw.addLockdownWhitelistCommand,
fw.queryLockdownWhitelistCommand,
None,
"'%s'",
)
elif a.remove_lockdown_whitelist_command:
cmd.remove_sequence(
a.remove_lockdown_whitelist_command,
fw.removeLockdownWhitelistCommand,
fw.queryLockdownWhitelistCommand,
None,
"'%s'",
)
elif a.query_lockdown_whitelist_command:
cmd.query_sequence(
a.query_lockdown_whitelist_command,
fw.queryLockdownWhitelistCommand,
None,
"'%s'",
)
# contexts
elif a.list_lockdown_whitelist_contexts:
l = fw.getLockdownWhitelistContexts()
cmd.print_and_exit("\n".join(l))
elif a.add_lockdown_whitelist_context:
cmd.add_sequence(
a.add_lockdown_whitelist_context,
fw.addLockdownWhitelistContext,
fw.queryLockdownWhitelistContext,
None,
"'%s'",
)
elif a.remove_lockdown_whitelist_context:
cmd.remove_sequence(
a.remove_lockdown_whitelist_context,
fw.removeLockdownWhitelistContext,
fw.queryLockdownWhitelistContext,
None,
"'%s'",
)
elif a.query_lockdown_whitelist_context:
cmd.query_sequence(
a.query_lockdown_whitelist_context,
fw.queryLockdownWhitelistContext,
None,
"'%s'",
)
# uids
elif a.list_lockdown_whitelist_uids:
l = fw.getLockdownWhitelistUids()
cmd.print_and_exit(" ".join(map(str, l)))
elif a.add_lockdown_whitelist_uid is not None:
cmd.add_sequence(
a.add_lockdown_whitelist_uid,
fw.addLockdownWhitelistUid,
fw.queryLockdownWhitelistUid,
None,
"'%s'",
)
elif a.remove_lockdown_whitelist_uid is not None:
cmd.remove_sequence(
a.remove_lockdown_whitelist_uid,
fw.removeLockdownWhitelistUid,
fw.queryLockdownWhitelistUid,
None,
"'%s'",
)
elif a.query_lockdown_whitelist_uid is not None:
cmd.query_sequence(
a.query_lockdown_whitelist_uid, fw.queryLockdownWhitelistUid, None, "'%s'"
)
# users
elif a.list_lockdown_whitelist_users:
l = fw.getLockdownWhitelistUsers()
cmd.print_and_exit(" ".join(l))
elif a.add_lockdown_whitelist_user:
cmd.add_sequence(
a.add_lockdown_whitelist_user,
fw.addLockdownWhitelistUser,
fw.queryLockdownWhitelistUser,
None,
"'%s'",
)
elif a.remove_lockdown_whitelist_user:
cmd.remove_sequence(
a.remove_lockdown_whitelist_user,
fw.removeLockdownWhitelistUser,
fw.queryLockdownWhitelistUser,
None,
"'%s'",
)
elif a.query_lockdown_whitelist_user:
cmd.query_sequence(
a.query_lockdown_whitelist_user, fw.queryLockdownWhitelistUser, None, "'%s'"
)
# interface
elif a.list_interfaces:
l = fw.getInterfaces(zone)
cmd.print_and_exit(" ".join(l))
elif a.get_zone_of_interface:
for interface in a.get_zone_of_interface:
zone = fw.getZoneOfInterface(interface)
if zone:
if len(a.get_zone_of_interface) > 1:
cmd.print_warning("%s: %s" % (interface, zone))
else:
cmd.print_and_exit(zone)
else:
if len(a.get_zone_of_interface) > 1:
cmd.print_warning("%s: no zone" % interface)
else:
cmd.fail("no zone")
elif a.add_interface:
interfaces = []
for interface in a.add_interface:
interfaces.append(interface)
cmd.x_add_sequence(
zone, interfaces, fw.addInterface, fw.queryInterface, None, "'%s'"
)
elif a.change_interface:
interfaces = []
for interface in a.change_interface:
interfaces.append(interface)
cmd.x_add_sequence(
zone, interfaces, fw.changeZoneOfInterface, fw.queryInterface, None, "'%s'"
)
elif a.remove_interface:
interfaces = []
for interface in a.remove_interface:
interfaces.append(interface)
cmd.x_remove_sequence(
zone, interfaces, fw.removeInterface, fw.queryInterface, None, "'%s'"
)
elif a.query_interface:
cmd.x_query_sequence(zone, a.query_interface, fw.queryInterface, None, "'%s'")
# source
elif a.list_sources:
sources = fw.getSources(zone)
cmd.print_and_exit(" ".join(sources))
elif a.get_zone_of_source:
for source in a.get_zone_of_source:
zone = fw.getZoneOfSource(source)
if zone:
if len(a.get_zone_of_source) > 1:
cmd.print_warning("%s: %s" % (source, zone))
else:
cmd.print_and_exit(zone)
else:
if len(a.get_zone_of_source) > 1:
cmd.print_warning("%s: no zone" % source)
else:
cmd.fail("no zone")
sys.exit(0)
elif a.add_source:
cmd.x_add_sequence(zone, a.add_source, fw.addSource, fw.querySource, None, "'%s'")
elif a.change_source:
cmd.x_add_sequence(
zone, a.change_source, fw.changeZoneOfSource, fw.querySource, None, "'%s'"
)
elif a.remove_source:
cmd.x_remove_sequence(
zone, a.remove_source, fw.removeSource, fw.querySource, None, "'%s'"
)
elif a.query_source:
cmd.x_query_sequence(zone, a.query_source, fw.querySource, None, "'%s'")
# policy
elif a.policy:
settings = fw.getPolicySettings(a.policy)
if a.list_all:
cmd.print_policy_info(
a.policy, settings, active_policies=fw.getActivePolicies()
)
sys.exit(0)
# ingress zones
elif a.list_ingress_zones:
l = settings.getIngressZones()
cmd.print_and_exit(" ".join(sorted(l)))
elif a.add_ingress_zone:
cmd.add_sequence(
a.add_ingress_zone,
settings.addIngressZone,
settings.queryIngressZone,
None,
"'%s'",
)
elif a.remove_ingress_zone:
cmd.remove_sequence(
a.remove_ingress_zone,
settings.removeIngressZone,
settings.queryIngressZone,
None,
"'%s'",
)
elif a.query_ingress_zone:
cmd.query_sequence(
a.query_ingress_zone, settings.queryIngressZone, None, "'%s'"
)
# egress zones
elif a.list_egress_zones:
l = settings.getEgressZones()
cmd.print_and_exit(" ".join(sorted(l)))
elif a.add_egress_zone:
cmd.add_sequence(
a.add_egress_zone,
settings.addEgressZone,
settings.queryEgressZone,
None,
"'%s'",
)
elif a.remove_egress_zone:
cmd.remove_sequence(
a.remove_egress_zone,
settings.removeEgressZone,
settings.queryEgressZone,
None,
"'%s'",
)
elif a.query_egress_zone:
cmd.query_sequence(a.query_egress_zone, settings.queryEgressZone, None, "'%s'")
# rich rules
elif a.list_rich_rules:
l = settings.getRichRules()
cmd.print_and_exit("\n".join(l))
elif a.add_rich_rule:
cmd.add_sequence(
a.add_rich_rule, settings.addRichRule, settings.queryRichRule, None, "'%s'"
)
elif a.remove_rich_rule:
cmd.remove_sequence(
a.remove_rich_rule,
settings.removeRichRule,
settings.queryRichRule,
None,
"'%s'",
)
elif a.query_rich_rule:
cmd.query_sequence(a.query_rich_rule, settings.queryRichRule, None, "'%s'")
# service
if a.list_services:
l = settings.getServices()
cmd.print_and_exit(" ".join(sorted(l)))
elif a.add_service:
cmd.add_sequence(
a.add_service, settings.addService, settings.queryService, None, "'%s'"
)
elif a.remove_service:
cmd.remove_sequence(
a.remove_service,
settings.removeService,
settings.queryService,
None,
"'%s'",
)
elif a.query_service:
cmd.query_sequence(a.query_service, settings.queryService, None, "'%s'")
# port
elif a.list_ports:
l = settings.getPorts()
cmd.print_and_exit(
" ".join(
[
"%s/%s" % (port[0], port[1])
for port in sorted(l, key=lambda x: (x[1], getPortRange(x[0])[0]))
]
)
)
elif a.add_port:
cmd.add_sequence(
a.add_port, settings.addPort, settings.queryPort, cmd.parse_port, "%s/%s"
)
elif a.remove_port:
cmd.remove_sequence(
a.remove_port,
settings.removePort,
settings.queryPort,
cmd.parse_port,
"%s/%s",
)
elif a.query_port:
cmd.query_sequence(a.query_port, settings.queryPort, cmd.parse_port, "%s/%s")
# protocol
elif a.list_protocols:
l = settings.getProtocols()
cmd.print_and_exit(" ".join(["%s" % protocol for protocol in sorted(l)]))
elif a.add_protocol:
cmd.add_sequence(
a.add_protocol, settings.addProtocol, settings.queryProtocol, None, "'%s'"
)
elif a.remove_protocol:
cmd.remove_sequence(
a.remove_protocol,
settings.removeProtocol,
settings.queryProtocol,
None,
"'%s'",
)
elif a.query_protocol:
cmd.query_sequence(a.query_protocol, settings.queryProtocol, None, "'%s'")
# source port
elif a.list_source_ports:
l = settings.getSourcePorts()
cmd.print_and_exit(
" ".join(
[
"%s/%s" % (port[0], port[1])
for port in sorted(l, key=lambda x: (x[1], getPortRange(x[0])[0]))
]
)
)
elif a.add_source_port:
cmd.add_sequence(
a.add_source_port,
settings.addSourcePort,
settings.querySourcePort,
cmd.parse_port,
"%s/%s",
)
elif a.remove_source_port:
cmd.remove_sequence(
a.remove_source_port,
settings.removeSourcePort,
settings.querySourcePort,
cmd.parse_port,
"%s/%s",
)
elif a.query_source_port:
cmd.query_sequence(
a.query_source_port, settings.querySourcePort, cmd.parse_port, "%s/%s"
)
# masquerade
elif a.add_masquerade:
settings.addMasquerade()
elif a.remove_masquerade:
settings.removeMasquerade()
elif a.query_masquerade:
cmd.print_query_result(settings.queryMasquerade())
# forward port
elif a.list_forward_ports:
l = settings.getForwardPorts()
cmd.print_and_exit(
"\n".join(
[
"port=%s:proto=%s:toport=%s:toaddr=%s"
% (port, protocol, toport, toaddr)
for (port, protocol, toport, toaddr) in l
]
)
)
elif a.add_forward_port:
cmd.add_sequence(
a.add_forward_port,
settings.addForwardPort,
settings.queryForwardPort,
cmd.parse_forward_port,
"port=%s:proto=%s:toport=%s:toaddr=%s",
)
elif a.remove_forward_port:
cmd.remove_sequence(
a.remove_forward_port,
settings.removeForwardPort,
settings.queryForwardPort,
cmd.parse_forward_port,
"port=%s:proto=%s:toport=%s:toaddr=%s",
)
elif a.query_forward_port:
cmd.query_sequence(
a.query_forward_port,
settings.queryForwardPort,
cmd.parse_forward_port,
"port=%s:proto=%s:toport=%s:toaddr=%s",
)
# block icmp
elif a.list_icmp_blocks:
l = settings.getIcmpBlocks()
cmd.print_and_exit(" ".join(l))
elif a.add_icmp_block:
cmd.add_sequence(
a.add_icmp_block,
settings.addIcmpBlock,
settings.queryIcmpBlock,
None,
"'%s'",
)
elif a.remove_icmp_block:
cmd.remove_sequence(
a.remove_icmp_block,
settings.removeIcmpBlock,
settings.queryIcmpBlock,
None,
"'%s'",
)
elif a.query_icmp_block:
cmd.query_sequence(a.query_icmp_block, settings.queryIcmpBlock, None, "'%s'")
fw.setPolicySettings(a.policy, settings)
# endif a.policy
#
# else zone:
# rich rules
elif a.list_rich_rules:
l = fw.getRichRules(zone)
cmd.print_and_exit("\n".join(l))
elif a.add_rich_rule:
cmd.zone_add_timeout_sequence(
zone, a.add_rich_rule, fw.addRichRule, fw.queryRichRule, None, "'%s'", a.timeout
)
elif a.remove_rich_rule:
cmd.x_remove_sequence(
zone, a.remove_rich_rule, fw.removeRichRule, fw.queryRichRule, None, "'%s'"
)
elif a.query_rich_rule:
cmd.x_query_sequence(zone, a.query_rich_rule, fw.queryRichRule, None, "'%s'")
# service
elif a.list_services:
l = fw.getServices(zone)
cmd.print_and_exit(" ".join(sorted(l)))
elif a.add_service:
cmd.zone_add_timeout_sequence(
zone, a.add_service, fw.addService, fw.queryService, None, "'%s'", a.timeout
)
elif a.remove_service:
cmd.x_remove_sequence(
zone, a.remove_service, fw.removeService, fw.queryService, None, "'%s'"
)
elif a.query_service:
cmd.x_query_sequence(zone, a.query_service, fw.queryService, None, "'%s'")
# port
elif a.list_ports:
l = fw.getPorts(zone)
cmd.print_and_exit(
" ".join(
[
"%s/%s" % (port[0], port[1])
for port in sorted(l, key=lambda x: (x[1], getPortRange(x[0])[0]))
]
)
)
elif a.add_port:
cmd.zone_add_timeout_sequence(
zone, a.add_port, fw.addPort, fw.queryPort, cmd.parse_port, "'%s/%s'", a.timeout
)
elif a.remove_port:
cmd.x_remove_sequence(
zone, a.remove_port, fw.removePort, fw.queryPort, cmd.parse_port, "'%s/%s'"
)
elif a.query_port:
cmd.x_query_sequence(zone, a.query_port, fw.queryPort, cmd.parse_port, "'%s/%s'")
# protocol
elif a.list_protocols:
l = fw.getProtocols(zone)
cmd.print_and_exit(" ".join(["%s" % protocol for protocol in sorted(l)]))
elif a.add_protocol:
cmd.zone_add_timeout_sequence(
zone, a.add_protocol, fw.addProtocol, fw.queryProtocol, None, "'%s'", a.timeout
)
elif a.remove_protocol:
cmd.x_remove_sequence(
zone, a.remove_protocol, fw.removeProtocol, fw.queryProtocol, None, "'%s'"
)
elif a.query_protocol:
cmd.x_query_sequence(zone, a.query_protocol, fw.queryProtocol, None, "'%s'")
# source port
elif a.list_source_ports:
l = fw.getSourcePorts(zone)
cmd.print_and_exit(
" ".join(
[
"%s/%s" % (port[0], port[1])
for port in sorted(l, key=lambda x: (x[1], getPortRange(x[0])[0]))
]
)
)
elif a.add_source_port:
cmd.zone_add_timeout_sequence(
zone,
a.add_source_port,
fw.addSourcePort,
fw.querySourcePort,
cmd.parse_port,
"'%s/%s'",
a.timeout,
)
elif a.remove_source_port:
cmd.x_remove_sequence(
zone,
a.remove_source_port,
fw.removeSourcePort,
fw.querySourcePort,
cmd.parse_port,
"'%s/%s'",
)
elif a.query_source_port:
cmd.x_query_sequence(
zone, a.query_source_port, fw.querySourcePort, cmd.parse_port, "'%s/%s'"
)
# forward
elif a.add_forward:
fw.addForward(zone)
elif a.remove_forward:
fw.removeForward(zone)
elif a.query_forward:
cmd.print_query_result(fw.queryForward(zone))
# masquerade
elif a.add_masquerade:
fw.addMasquerade(zone, a.timeout)
elif a.remove_masquerade:
fw.removeMasquerade(zone)
elif a.query_masquerade:
cmd.print_query_result(fw.queryMasquerade(zone))
# forward port
elif a.list_forward_ports:
l = fw.getForwardPorts(zone)
cmd.print_and_exit(
"\n".join(
[
"port=%s:proto=%s:toport=%s:toaddr=%s"
% (port, protocol, toport, toaddr)
for (port, protocol, toport, toaddr) in l
]
)
)
elif a.add_forward_port:
cmd.zone_add_timeout_sequence(
zone,
a.add_forward_port,
fw.addForwardPort,
fw.queryForwardPort,
cmd.parse_forward_port,
"'port=%s:proto=%s:toport=%s:toaddr=%s'",
a.timeout,
)
elif a.remove_forward_port:
cmd.x_remove_sequence(
zone,
a.remove_forward_port,
fw.removeForwardPort,
fw.queryForwardPort,
cmd.parse_forward_port,
"'port=%s:proto=%s:toport=%s:toaddr=%s'",
)
elif a.query_forward_port:
cmd.x_query_sequence(
zone,
a.query_forward_port,
fw.queryForwardPort,
cmd.parse_forward_port,
"'port=%s:proto=%s:toport=%s:toaddr=%s'",
)
# block icmp
elif a.list_icmp_blocks:
l = fw.getIcmpBlocks(zone)
cmd.print_and_exit(" ".join(l))
elif a.add_icmp_block:
cmd.zone_add_timeout_sequence(
zone,
a.add_icmp_block,
fw.addIcmpBlock,
fw.queryIcmpBlock,
None,
"'%s'",
a.timeout,
)
elif a.remove_icmp_block:
cmd.x_remove_sequence(
zone, a.remove_icmp_block, fw.removeIcmpBlock, fw.queryIcmpBlock, None, "'%s'"
)
elif a.query_icmp_block:
cmd.x_query_sequence(zone, a.query_icmp_block, fw.queryIcmpBlock, None, "'%s'")
# icmp block inversion
elif a.add_icmp_block_inversion:
fw.addIcmpBlockInversion(zone)
elif a.remove_icmp_block_inversion:
fw.removeIcmpBlockInversion(zone)
elif a.query_icmp_block_inversion:
cmd.print_query_result(fw.queryIcmpBlockInversion(zone))
# list all
elif a.list_all:
z = zone if zone else fw.getDefaultZone()
cmd.print_zone_info(
z,
fw.getZoneSettings(z),
default_zone=fw.getDefaultZone(),
active_zones=fw.getActiveZones(),
)
sys.exit(0)
# list everything
elif a.list_all_zones:
default_zone = fw.getDefaultZone()
active_zones = fw.getActiveZones()
for zone in fw.getZones():
cmd.print_zone_info(
zone,
fw.getZoneSettings(zone),
default_zone=default_zone,
active_zones=active_zones,
)
cmd.print_msg("")
sys.exit(0)
elif a.list_all_policies:
active_policies = fw.getActivePolicies()
for policy in fw.getPolicies():
cmd.print_policy_info(
policy, fw.getPolicySettings(policy), active_policies=active_policies
)
cmd.print_msg("")
sys.exit(0)
elif a.info_zone:
cmd.print_zone_info(
a.info_zone,
fw.getZoneSettings(a.info_zone),
default_zone=fw.getDefaultZone(),
active_zones=fw.getActiveZones(),
)
sys.exit(0)
elif a.info_policy:
cmd.print_policy_info(
a.info_policy,
fw.getPolicySettings(a.info_policy),
active_policies=fw.getActivePolicies(),
)
sys.exit(0)
elif a.info_service:
cmd.print_service_info(a.info_service, fw.getServiceSettings(a.info_service))
sys.exit(0)
elif a.info_icmptype:
cmd.print_icmptype_info(a.info_icmptype, fw.getIcmpTypeSettings(a.info_icmptype))
sys.exit(0)
cmd.print_and_exit("success")