Skip to content

Commit

Permalink
Merge branch 'dev' into pr@dev@sftp_web
Browse files Browse the repository at this point in the history
  • Loading branch information
feng626 committed Oct 11, 2024
2 parents dcb4930 + de372d8 commit 40f3b4a
Show file tree
Hide file tree
Showing 28 changed files with 258 additions and 27 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM jumpserver/koko-base:20241010_100502 AS stage-build
FROM jumpserver/koko-base:20241009_024227 AS stage-build

WORKDIR /opt/koko
ARG TARGETARCH
COPY . .
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile-base
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ WORKDIR /opt
ARG HELM_VERSION=v3.14.3
ARG KUBECTL_VERSION=v1.29.3
ARG CHECK_VERSION=v1.0.3
ARG USQL_VERSION=v0.0.3
ARG USQL_VERSION=v0.0.4

RUN set -ex \
&& mkdir -p /opt/koko/bin \
&& wget -O kubectl.tar.gz https://dl.k8s.io/${KUBECTL_VERSION}/kubernetes-client-linux-${TARGETARCH}.tar.gz \
Expand Down
3 changes: 3 additions & 0 deletions pkg/exchange/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const (

ActionEvent = "Action"

PermExpiredEvent = "PermExpired"
PermValidEvent = "PermValid"

ShareRemoveUser = "Share_REMOVE_USER"
)

Expand Down
18 changes: 18 additions & 0 deletions pkg/handler/server_ssh.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,17 @@ func (s *Server) proxyAssetCommand(sess ssh.Session, sshClient *srvconn.SSHClien
switch task.Name {
case model.TaskKillSession:
cancel()
logger.Infof("User %s end command request %s as task kill_session",
tokenInfo.User.String(), sshClient)
return nil
case model.TaskPermExpired:
cancel()
logger.Infof("User %s end command request %s as task permission has expired",
tokenInfo.User.String(), sshClient)
return nil
case model.TaskPermValid:
return nil

}
return fmt.Errorf("ssh proxy not support task: %s", task.Name)
})
Expand Down Expand Up @@ -534,7 +544,15 @@ func (s *Server) proxyVscodeShell(sess ssh.Session, vsReq *vscodeReq, sshClient
switch task.Name {
case model.TaskKillSession:
cancel()
logger.Infof("User %s end vscode request %s as task kill_session", vsReq.user, sshClient)
return nil
case model.TaskPermExpired:
cancel()
logger.Infof("User %s end vscode request %s as permission has expired", vsReq.user, sshClient)
return nil
case model.TaskPermValid:
return nil

}
return fmt.Errorf("ssh proxy not support task: %s", task.Name)
})
Expand Down
7 changes: 7 additions & 0 deletions pkg/handler/server_ssh_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ func (s *Server) HandleSSHRequest(ctx ssh.Context, srv *ssh.Server, req *gossh.R
switch task.Name {
case model.TaskKillSession:
cancel()
logger.Info("ide session killed as task kill session")
return nil
case model.TaskPermExpired:
cancel()
logger.Info("ide session killed as task perm expired")
return nil
case model.TaskPermValid:
return nil
}
return fmt.Errorf("ssh proxy not support task: %s", task.Name)
Expand Down
11 changes: 10 additions & 1 deletion pkg/httpd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"bytes"
"context"
"encoding/json"
"github.com/gliderlabs/ssh"
"io"
"sync"
"time"

"github.com/gliderlabs/ssh"

"github.com/jumpserver/koko/pkg/exchange"
"github.com/jumpserver/koko/pkg/logger"
)
Expand Down Expand Up @@ -162,6 +163,14 @@ func (c *Client) HandleRoomEvent(event string, roomMsg *exchange.RoomMessage) {
msgType = TerminalSessionResume
msgData = string(roomMsg.Body)
logger.Debugf("Resume terminal session : %+v", roomMsg)
case exchange.PermValidEvent:
msgType = TerminalPermValid
msgData = string(roomMsg.Body)
logger.Debugf("Terminal perm is valid : %+v", roomMsg)
case exchange.PermExpiredEvent:
msgType = TerminalPermExpired
msgData = string(roomMsg.Body)
logger.Debugf("Terminal perm is expired : %+v", roomMsg)
default:
logger.Infof("unsupported room msg %+v", roomMsg)
return
Expand Down
3 changes: 3 additions & 0 deletions pkg/httpd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const (
TerminalSessionPause = "TERMINAL_SESSION_PAUSE"
TerminalSessionResume = "TERMINAL_SESSION_RESUME"

TerminalPermValid = "TERMINAL_PERM_VALID"
TerminalPermExpired = "TERMINAL_PERM_EXPIRED"

TerminalShare = "TERMINAL_SHARE"
TerminalShareJoin = "TERMINAL_SHARE_JOIN"
TerminalShareLeave = "TERMINAL_SHARE_LEAVE"
Expand Down
1 change: 1 addition & 0 deletions pkg/jms-sdk-go/model/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Session struct {
AccountID string `json:"account_id"`
Type LabelField `json:"type"`
ErrReason LabelField `json:"error_reason,omitempty"`
TokenId string `json:"token_id,omitempty"`
}

type ReplayVersion string
Expand Down
14 changes: 9 additions & 5 deletions pkg/jms-sdk-go/model/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,22 @@ type Terminal struct {
}

type TerminalTask struct {
ID string `json:"id"`
Name string `json:"name"`
Args string `json:"args"`
Kwargs TaskKwargs `json:"kwargs"`
IsFinished bool
ID string `json:"id"`
Name string `json:"name"`
Args string `json:"args"`
Kwargs TaskKwargs `json:"kwargs"`
}

const (
TaskKillSession = "kill_session"

TaskLockSession = "lock_session"
TaskUnlockSession = "unlock_session"

// TaskPermExpired TaskPermValid 非 api 数据,仅用于内部处理

TaskPermExpired = "perm_expired"
TaskPermValid = "perm_valid"
)

type TaskKwargs struct {
Expand Down
15 changes: 15 additions & 0 deletions pkg/jms-sdk-go/model/token.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (c *ConnectToken) CreateSession(addr string,
LoginFrom: loginFrom,
Type: SessionType,
ErrReason: LabelField(SessionReplayErrUnsupported),
TokenId: c.Id,
}
}

Expand Down Expand Up @@ -77,3 +78,17 @@ type ConnectOptions struct {
FilenameConflictResolution string `json:"file_name_conflict_resolution,omitempty"`
TerminalThemeName string `json:"terminal_theme_name,omitempty"`
}

// token 授权和过期状态

type TokenCheckStatus struct {
Detail string `json:"detail"`
Code string `json:"code"`
Expired bool `json:"expired"`
}

const (
CodePermOk = "perm_ok"
CodePermAccountInvalid = "perm_account_invalid"
CodePermExpired = "perm_expired"
)
6 changes: 6 additions & 0 deletions pkg/jms-sdk-go/service/jms_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,9 @@ type SuperConnectTokenReq struct {

Params map[string]string `json:"-"`
}

func (s *JMService) CheckTokenStatus(tokenId string) (res model.TokenCheckStatus, err error) {
reqURL := fmt.Sprintf(SuperConnectTokenCheckURL, tokenId)
_, err = s.authClient.Get(reqURL, &res)
return
}
2 changes: 2 additions & 0 deletions pkg/jms-sdk-go/service/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ const (
SuperConnectTokenSecretURL = "/api/v1/authentication/super-connection-token/secret/"
SuperConnectTokenInfoURL = "/api/v1/authentication/super-connection-token/"

SuperConnectTokenCheckURL = "/api/v1/authentication/super-connection-token/%s/check/"

UserPermsAssetAccountsURL = "/api/v1/perms/users/%s/assets/%s/"
AccountSecretURL = "/api/v1/assets/account-secrets/%s/"
UserPermsAssetsURL = "/api/v1/perms/users/%s/assets/"
Expand Down
2 changes: 2 additions & 0 deletions pkg/koko/koko.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func runTasks(jmsService *service.JMService) {
go uploadRemainFTPFile(jmsService)
}
go keepHeartbeat(jmsService)

go RunConnectTokensCheck(jmsService)
}

func MustJMService() *service.JMService {
Expand Down
6 changes: 3 additions & 3 deletions pkg/koko/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ func KeepWsHeartbeat(jmsService *service.JMService) {
}

func GetStatusData() interface{} {
sessions := session.GetAliveSessions()
ids := session.GetAliveSessionIds()
payload := model.HeartbeatData{
SessionOnlineIds: sessions,
SessionOnlineIds: ids,
CpuUsed: common.CpuLoad1Usage(),
MemoryUsed: common.MemoryUsagePercent(),
DiskUsed: common.DiskUsagePercent(),
SessionOnline: len(sessions),
SessionOnline: len(ids),
}
return map[string]interface{}{
"type": "status",
Expand Down
53 changes: 53 additions & 0 deletions pkg/koko/token_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package koko

import (
"time"

"github.com/jumpserver/koko/pkg/jms-sdk-go/model"
"github.com/jumpserver/koko/pkg/jms-sdk-go/service"
"github.com/jumpserver/koko/pkg/logger"
"github.com/jumpserver/koko/pkg/session"
)

// RunConnectTokensCheck every 5 minutes check token status
func RunConnectTokensCheck(jmsService *service.JMService) {
for {
time.Sleep(5 * time.Minute)
sessions := session.GetSessions()
tokens := make(map[string]model.TokenCheckStatus, len(sessions))
for _, s := range sessions {
ret, ok := tokens[s.TokenId]
if ok {
handleTokenCheck(s, &ret)
continue
}
ret, err := jmsService.CheckTokenStatus(s.TokenId)
if err != nil && ret.Code == "" {
logger.Errorf("Check token status failed: %s", err)
continue
}
tokens[s.TokenId] = ret
handleTokenCheck(s, &ret)
}
}
}

func handleTokenCheck(session *session.Session, tokenStatus *model.TokenCheckStatus) {
var task model.TerminalTask
switch tokenStatus.Code {
case model.CodePermOk:
task = model.TerminalTask{
Name: model.TaskPermValid,
Args: tokenStatus.Detail,
}
default:
task = model.TerminalTask{
Name: model.TaskPermExpired,
Args: tokenStatus.Detail,
}
}
if err := session.HandleTask(&task); err != nil {
logger.Errorf("Handle token check task failed: %s", err)
}

}
5 changes: 5 additions & 0 deletions pkg/proxy/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ func (r *FTPFileRecorder) ChunkedRecord(ftpLog *model.FTPLog, readerAt io.Reader
return err
}

if info.isExceedWrittenSize() {
logger.Errorf("FTP file %s is exceeds the max limit and discard it", ftpLog.ID)
return nil
}

if err1 := common.ChunkedFileTransfer(info.fd, readerAt, offset, totalSize); err1 != nil {
logger.Errorf("FTP file %s write err: %s", ftpLog.ID, err1)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func NewServer(conn UserConnection, jmsService *service.JMService, opts ...Conne
AccountID: account.ID,
OrgID: connOpts.authInfo.OrgId,
Type: model.NORMALType,
TokenId: connOpts.authInfo.Id,
}

if !connOpts.authInfo.Actions.EnableConnect() {
Expand Down Expand Up @@ -939,6 +940,10 @@ func (s *Server) Proxy() {
sw.PauseOperation(task.Kwargs.CreatedByUser)
case model.TaskUnlockSession:
sw.ResumeOperation(task.Kwargs.CreatedByUser)
case model.TaskPermExpired:
sw.PermBecomeExpired(task.Name, task.Args)
case model.TaskPermValid:
sw.PermBecomeValid(task.Name, task.Args)
default:
return fmt.Errorf("ssh session unknown task %s", task.Name)
}
Expand Down
42 changes: 41 additions & 1 deletion pkg/proxy/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type SwitchSession struct {
notifyMsgChan chan *exchange.RoomMessage

MaxSessionTime time.Time

invalidPerm atomic.Bool
invalidPermData []byte
invalidPermTime time.Time
}

func (s *SwitchSession) Terminate(username string) {
Expand Down Expand Up @@ -71,6 +75,42 @@ func (s *SwitchSession) ResumeOperation(username string) {
}
}

func (s *SwitchSession) PermBecomeExpired(code, detail string) {
if s.invalidPerm.Load() {
return
}
s.invalidPerm.Store(true)
p, _ := json.Marshal(map[string]string{"code": code, "detail": detail})
s.invalidPermData = p
s.invalidPermTime = time.Now()
s.notifyMsgChan <- &exchange.RoomMessage{
Event: exchange.PermExpiredEvent, Body: p}
}

func (s *SwitchSession) PermBecomeValid(code, detail string) {
if !s.invalidPerm.Load() {
return
}
s.invalidPerm.Store(false)
s.invalidPermTime = s.MaxSessionTime
p, _ := json.Marshal(map[string]string{"code": code, "detail": detail})
s.invalidPermData = p
s.notifyMsgChan <- &exchange.RoomMessage{
Event: exchange.PermValidEvent, Body: p}
}

func (s *SwitchSession) CheckPermissionExpired(now time.Time) bool {
if s.p.CheckPermissionExpired(now) {
return true
}
if s.invalidPerm.Load() {
if now.After(s.invalidPermTime.Add(10 * time.Minute)) {
return true
}
}
return false
}

func (s *SwitchSession) setOperator(username string) {
s.currentOperator.Store(username)
}
Expand Down Expand Up @@ -301,7 +341,7 @@ func (s *SwitchSession) Bridge(userConn UserConnection, srvConn srvconn.ServerCo
s.recordSessionFinished(model.ReasonErrIdleDisconnect)
return
}
if s.p.CheckPermissionExpired(now) {
if s.CheckPermissionExpired(now) {
msg := lang.T("Permission has expired, disconnect")
logger.Infof("Session[%s] permission has expired, disconnect", s.ID)
msg = utils.WrapperWarn(msg)
Expand Down
14 changes: 13 additions & 1 deletion pkg/session/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ func GetSessionById(id string) (s *Session, ok bool) {
return
}

func GetAliveSessions() []string {
func GetAliveSessionIds() []string {
return sessManager.Range()
}

func GetSessions() []*Session {
return sessManager.GetSessions()
}

func AddSession(s *Session) {
sessManager.Add(s.ID, s)
}
Expand Down Expand Up @@ -68,3 +72,11 @@ func (s *sessionManager) Range() []string {

return sids
}

func (s *sessionManager) GetSessions() []*Session {
sessions := make([]*Session, 0, len(s.data))
for _, sess := range s.data {
sessions = append(sessions, sess)
}
return sessions
}
Loading

0 comments on commit 40f3b4a

Please sign in to comment.