Skip to content

Commit

Permalink
更新Check Point防火墙封禁模块
Browse files Browse the repository at this point in the history
  • Loading branch information
sec-report committed Jul 25, 2024
1 parent 487de21 commit b20cf8e
Show file tree
Hide file tree
Showing 31 changed files with 272 additions and 28 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ Docker全部运行后访问 [http://127.0.0.1/](http://127.0.0.1/) 访问管理
| [长亭WAF社区版](./device/alarm/chaitin_waf_ce) | [旁路阻断](./device/block/tcp_reset) |
| [微步蜜罐HFish](./device/alarm/threatbook_hfish) | [OPNsense](./device/block/opnsense) |
| [奇安信天眼](./device/alarm/qianxin_skyeye) |[RouterOS](./device/block/router_os) |
| [奇安信椒图](./device/alarm/qianxin_jowtolock) | |
| [奇安信椒图](./device/alarm/qianxin_jowtolock) | [CheckPoint](./check_point) |
| [绿盟WAF](./device/alarm/nsfocus_waf) | |

## 黑/白名单说明
Expand Down
2 changes: 1 addition & 1 deletion device/alarm/chaitin_waf_ce/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pip3 install SecAutoBan requests PyJWT

```
server_ip = "127.0.0.1",
server_port = 8080,
server_port = 80,
sk = "sk-xxx",
```

Expand Down
2 changes: 1 addition & 1 deletion device/alarm/chaitin_waf_ce/chaitin_waf_ce.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def alarm_analysis(ws_client):
}
sec_auto_ban = SecAutoBan(
server_ip="127.0.0.1",
server_port=8000,
server_port=80,
sk="sk-*****",
client_type="alarm",
alarm_analysis = alarm_analysis
Expand Down
4 changes: 2 additions & 2 deletions device/alarm/example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pip3 install SecAutoBan

```
server_ip = "127.0.0.1", # 平台IP
server_port = 8080, # 平台端口
server_port = 80, # 平台端口
sk = "sk-xxx", # 回连密钥
```

Expand All @@ -34,7 +34,7 @@ syslog模版,自带了一个syslog服务器(默认监听567端口),需
listen_syslog_udp_port = 567 # syslog监听端口
...
server_ip = "127.0.0.1", # 平台IP
server_port = 8080, # 平台端口
server_port = 80, # 平台端口
sk = "sk-xxx", # 回连密钥
```

Expand Down
2 changes: 1 addition & 1 deletion device/alarm/example/base_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def alarm_analysis(ws_client):
if __name__ == "__main__":
sec_auto_ban = SecAutoBan(
server_ip="127.0.0.1",
server_port=8000,
server_port=80,
sk="sk-*****",
client_type="alarm",
alarm_analysis = alarm_analysis
Expand Down
2 changes: 1 addition & 1 deletion device/alarm/example/syslog_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def alarm_analysis(ws_client):
listen_syslog_udp_port = 567
sec_auto_ban = SecAutoBan(
server_ip="127.0.0.1",
server_port=8000,
server_port=80,
sk="sk-*****",
client_type="alarm",
alarm_analysis = alarm_analysis
Expand Down
2 changes: 1 addition & 1 deletion device/alarm/nsfocus_waf/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pip3 install SecAutoBan

```
server_ip = "127.0.0.1",
server_port = 8080,
server_port = 80,
sk = "sk-xxx",
```

Expand Down
2 changes: 1 addition & 1 deletion device/alarm/nsfocus_waf/nsfocus_waf.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def alarm_analysis(ws_client):
listen_syslog_udp_port = 567
sec_auto_ban = SecAutoBan(
server_ip="127.0.0.1",
server_port=8000,
server_port=80,
sk="sk-*****",
client_type="alarm",
alarm_analysis = alarm_analysis
Expand Down
2 changes: 1 addition & 1 deletion device/alarm/qianxin_jowtolock/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pip3 install SecAutoBan

```
server_ip = "127.0.0.1",
server_port = 8080,
server_port = 80,
sk = "sk-xxx",
```

Expand Down
2 changes: 1 addition & 1 deletion device/alarm/qianxin_jowtolock/qianxin_jowtolock.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def alarm_analysis(ws_client):
bypass_lan = True # 过滤内网攻击,True 开启 | False 关闭
sec_auto_ban = SecAutoBan(
server_ip="127.0.0.1",
server_port=8000,
server_port=80,
sk="sk-*****",
client_type="alarm",
alarm_analysis = alarm_analysis
Expand Down
2 changes: 1 addition & 1 deletion device/alarm/qianxin_skyeye/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pip3 install SecAutoBan ipaddress

```
server_ip = "127.0.0.1",
server_port = 8080,
server_port = 80,
sk = "sk-xxx",
```

Expand Down
2 changes: 1 addition & 1 deletion device/alarm/qianxin_skyeye/qianxin_skyeye.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def alarm_analysis(ws_client):
bypass_lan = True # 过滤内网攻击,True 开启 | False 关闭
sec_auto_ban = SecAutoBan(
server_ip="127.0.0.1",
server_port=8000,
server_port=80,
sk="sk-*****",
client_type="alarm",
alarm_analysis = alarm_analysis
Expand Down
2 changes: 1 addition & 1 deletion device/alarm/threatbook_hfish/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pip3 install SecAutoBan requests

```
server_ip = "127.0.0.1",
server_port = 8080,
server_port = 80,
sk = "sk-xxx",
```

Expand Down
2 changes: 1 addition & 1 deletion device/alarm/threatbook_hfish/threatbook_hfish.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def alarm_analysis(ws_client):
}
sec_auto_ban = SecAutoBan(
server_ip="127.0.0.1",
server_port=8000,
server_port=80,
sk="sk-*****",
client_type="alarm",
alarm_analysis = alarm_analysis
Expand Down
1 change: 1 addition & 0 deletions device/block/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
* [tcp_reset](./tcp_reset): 旁路阻断
* [opnsense](./opnsense): OPNsense
* [router_os](./router_os): RouterOS
* [check_point](./check_point): CheckPoint
70 changes: 70 additions & 0 deletions device/block/check_point/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# CheckPoint

CheckPoint封禁模块

## 下载模块

```
wget https://raw.githubusercontent.com/sec-report/SecAutoBan/main/device/block/check_point/check_point.py
```

## 配置CheckPoint

### 开启Management API

![](./img/1.jpg)

### 新建网络分组

`新建-网络分组`弹出框新建分组`sec_auto_ban`并保存:

![](./img/2.jpg)

### 为网络分组创建封禁规则

`安全策略-访问控制-策略`页面新建两条规则,分别为阻止源为`sec_auto_ban`及目的为`sec_auto_ban`,图例:

![](./img/3.jpg)

## 配置模块

### 安装依赖

```
pip3 install SecAutoBan requests
```

### 修改配置

#### 修改回连核心模块配置

更改脚本第`159`-`161`

```
server_ip = "127.0.0.1",
server_port = 80,
sk = "sk-xxx",
```

#### 修改与CheckPoint连接的地址

更改脚本第`153`

```
"url": "http://xxx.xxx.xxx.xxx",
```

#### 填写CheckPoint用户名密码

更改脚本第`154`-`155`

```
"username": "admin",
"password": "",
```

## 运行

```shell
python3 check_point.py
```
171 changes: 171 additions & 0 deletions device/block/check_point/check_point.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import time
import signal
import requests
from SecAutoBan import SecAutoBan
from multiprocessing.pool import ThreadPool
requests.packages.urllib3.disable_warnings()


def signal_handler(signal, frame):
sec_auto_ban.print("[+] 注销Session")
publish()
logout()
exit()


def login():
post_json = {
"user": check_point_conf["username"],
"password": check_point_conf["password"],
}
r = requests.post(check_point_conf["url"] + "/web_api/login", json=post_json, verify=False)
if r.status_code == 200 and "sid" in r.json():
sec_auto_ban.print("[+] 防火墙登录成功")
else:
sec_auto_ban.print("[-] 防火墙登录失败")
exit()
global check_point_session_id
check_point_session_id = r.json()["sid"]


def discard():
header = {
"X-chkp-sid": check_point_session_id
}
requests.post(check_point_conf["url"] + "/web_api/discard", json={}, headers=header, verify=False)


def publish():
header = {
"X-chkp-sid": check_point_session_id
}
r = requests.post(check_point_conf["url"] + "/web_api/publish", json={}, headers=header, verify=False)
if "task-id" not in r.json():
sec_auto_ban.print("[-] 推送失败,回退session")
discard()


def logout():
header = {
"X-chkp-sid": check_point_session_id
}
requests.post(check_point_conf["url"] + "/web_api/logout", json={}, headers=header, verify=False)


def keepalive():
time.sleep(60)
header = {
"X-chkp-sid": check_point_session_id
}
r = requests.post(check_point_conf["url"] + "/web_api/keepalive", json={}, headers=header, verify=False)
if r.status_code != 200:
login()
keepalive()


def check_host(ip: str) -> str:
post_json = {
"filter": ip
}
header = {
"X-chkp-sid": check_point_session_id
}
r = requests.post(check_point_conf["url"] + "/web_api/show-hosts", json=post_json, headers=header, verify=False)
if "total" in r.json():
if r.json()["total"] == 0:
return ""
return r.json()["objects"][0]["uid"]
return ""


def get_host_uid(ip: str) -> str:
uid = check_host(ip)
if len(uid) != 0:
return uid
post_json = {
"name": "block_" + ip,
"ip-address": ip
}
header = {
"X-chkp-sid": check_point_session_id
}
r = requests.post(check_point_conf["url"] + "/web_api/add-host", json=post_json, headers=header, verify=False)
return r.json()["uid"]


def block_ip(ip):
if check_exist_ip(ip):
return
host_uid = get_host_uid(ip)
if len(host_uid) == 0:
sec_auto_ban.print("[-] IP: " + ip + " 添加失败")
post_json = {
"name": check_point_conf["group_name"],
"members": {
"add": host_uid
},
"details-level": "uid"
}
header = {
"X-chkp-sid": check_point_session_id
}
requests.post(check_point_conf["url"] + "/web_api/set-group", json=post_json, headers=header, verify=False)
publish()


def unblock_ip(ip):
if not check_exist_ip(ip):
return
host_uid = get_host_uid(ip)
if len(host_uid) == 0:
sec_auto_ban.print("[-] IP: " + ip + " 删除失败")
post_json = {
"name": check_point_conf["group_name"],
"members": {
"remove": host_uid
},
}
header = {
"X-chkp-sid": check_point_session_id
}
requests.post(check_point_conf["url"] + "/web_api/set-group", json=post_json, headers=header, verify=False)
publish()


def get_all_block_ip() -> list:
post_json = {
"name": check_point_conf["group_name"]
}
header = {
"X-chkp-sid": check_point_session_id
}
r = requests.post(check_point_conf["url"] + "/web_api/show-group", json=post_json, headers=header, verify=False)
return [i["ipv4-address"]for i in r.json()["members"]]


def check_exist_ip(ip) -> bool:
return ip in get_all_block_ip()


if __name__ == "__main__":
check_point_session_id = ""
check_point_conf = {
"url": "https://xxx.xxx.xxx.xxx",
"username": "admin",
"password": "",
"group_name": "sec_auto_ban"
}
sec_auto_ban = SecAutoBan(
server_ip="127.0.0.1",
server_port=80,
sk="sk-*****",
client_type="block",
block_ip = block_ip,
unblock_ip = unblock_ip,
get_all_block_ip= get_all_block_ip
)
pool = ThreadPool(processes=1)
login()
pool.apply_async(keepalive)
signal.signal(signal.SIGINT, signal_handler)
sec_auto_ban.run()
Binary file added device/block/check_point/img/1.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added device/block/check_point/img/2.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added device/block/check_point/img/3.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit b20cf8e

Please sign in to comment.