Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eric/cus 9 support cli updates #14

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
[build-system]
requires = ["setuptools >= 61.0"]
requires = [
"setuptools >= 61.0",
"requests"
]
build-backend = "setuptools.build_meta"

[project]
name = "socket-sdk-python"
dynamic = ["version"]
requires-python = ">= 3.9"
dependencies = [
'requests'
'requests',
'typing-extensions>=4.12.2'
]
readme = "README.rst"
license = {file = "LICENSE"}
Expand Down Expand Up @@ -57,6 +61,7 @@ include = [
"socketdev.sbom",
"socketdev.settings",
"socketdev.tools",
"socketdev.utils",
]

[tool.setuptools.dynamic]
Expand Down
119 changes: 25 additions & 94 deletions socketdev/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import logging
import requests
import base64

from socketdev.core.classes import Response
from socketdev.core.api import API
from socketdev.dependencies import Dependencies
from socketdev.exceptions import APIKeyMissing, APIFailure, APIAccessDenied, APIInsufficientQuota, APIResourceNotFound
from socketdev.export import Export
from socketdev.fullscans import FullScans
from socketdev.npm import NPM
Expand All @@ -17,106 +14,40 @@
from socketdev.repositories import Repositories
from socketdev.sbom import Sbom
from socketdev.settings import Settings
from socketdev.version import __version__
from socketdev.utils import Utils, IntegrationType, INTEGRATION_TYPES


__version__ = __version__
__author__ = "socket.dev"
__version__ = "1.0.14"
__all__ = ["socketdev"]


global encoded_key
encoded_key: str
__all__ = ["socketdev", "Utils", "IntegrationType", "INTEGRATION_TYPES"]

api_url = "https://api.socket.dev/v0"
request_timeout = 30
log = logging.getLogger("socketdev")
log.addHandler(logging.NullHandler())


def encode_key(token: str):
global encoded_key
encoded_key = base64.b64encode(token.encode()).decode("ascii")


def do_request(
path: str, headers: dict = None, payload: [dict, str] = None, files: list = None, method: str = "GET"
) -> Response:
"""
Shared function for performing the requests against the API.
:param path: String path of the URL
:param headers: Optional dictionary of the headers to include in the request. Defaults to None
:param payload: Optional dictionary or string of the payload to POST. Defaults to None
:param files: Optional list of files to send. Defaults to None
:param method: Optional string of the method for the Request. Defaults to GET
"""

if encoded_key is None or encoded_key == "":
raise APIKeyMissing

if headers is None:
headers = {
"Authorization": f"Basic {encoded_key}",
"User-Agent": f"SocketPythonScript/{__version__}",
"accept": "application/json",
}
url = f"{api_url}/{path}"
try:
response = requests.request(
method.upper(), url, headers=headers, data=payload, files=files, timeout=request_timeout
)
if response.status_code >= 400:
raise APIFailure("Bad Request")
elif response.status_code == 401:
raise APIAccessDenied("Unauthorized")
elif response.status_code == 403:
raise APIInsufficientQuota("Insufficient max_quota for API method")
elif response.status_code == 404:
raise APIResourceNotFound(f"Path not found {path}")
elif response.status_code == 429:
raise APIInsufficientQuota("Insufficient quota for API route")
except Exception as error:
response = Response(text=f"{error}", error=True, status_code=500)
raise APIFailure(response)
return response


class socketdev:
token: str
timeout: int
dependencies: Dependencies
npm: NPM
openapi: OpenAPI
org: Orgs
quota: Quota
report: Report
sbom: Sbom
purl: Purl
fullscans: FullScans
export: Export
repositories: Repositories
settings: Settings
repos: Repos

def __init__(self, token: str, timeout: int = 30):
self.api = API()
self.token = token + ":"
encode_key(self.token)
self.timeout = timeout
socketdev.set_timeout(self.timeout)
self.dependencies = Dependencies()
self.npm = NPM()
self.openapi = OpenAPI()
self.org = Orgs()
self.quota = Quota()
self.report = Report()
self.sbom = Sbom()
self.purl = Purl()
self.fullscans = FullScans()
self.export = Export()
self.repositories = Repositories()
self.repos = Repos()
self.settings = Settings()
self.api.encode_key(self.token)
self.api.set_timeout(timeout)

self.dependencies = Dependencies(self.api)
self.npm = NPM(self.api)
self.openapi = OpenAPI(self.api)
self.org = Orgs(self.api)
self.quota = Quota(self.api)
self.report = Report(self.api)
self.sbom = Sbom(self.api)
self.purl = Purl(self.api)
self.fullscans = FullScans(self.api)
self.export = Export(self.api)
self.repositories = Repositories(self.api)
self.repos = Repos(self.api)
self.settings = Settings(self.api)
self.utils = Utils()

@staticmethod
def set_timeout(timeout: int):
global request_timeout
request_timeout = timeout
# Kept for backwards compatibility
pass
53 changes: 53 additions & 0 deletions socketdev/core/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import base64
import requests
from socketdev.core.classes import Response
from socketdev.exceptions import APIKeyMissing, APIFailure, APIAccessDenied, APIInsufficientQuota, APIResourceNotFound
from socketdev.version import __version__


class API:
def __init__(self):
self.encoded_key = None
self.api_url = "https://api.socket.dev/v0"
self.request_timeout = 30

def encode_key(self, token: str):
self.encoded_key = base64.b64encode(token.encode()).decode("ascii")

def set_timeout(self, timeout: int):
self.request_timeout = timeout

def do_request(
self, path: str, headers: dict | None = None, payload: [dict, str] = None, files: list = None, method: str = "GET"
) -> Response:
if self.encoded_key is None or self.encoded_key == "":
raise APIKeyMissing

if headers is None:
headers = {
"Authorization": f"Basic {self.encoded_key}",
"User-Agent": f"SocketPythonScript/{__version__}",
"accept": "application/json",
}
url = f"{self.api_url}/{path}"
try:
response = requests.request(
method.upper(), url, headers=headers, data=payload, files=files, timeout=self.request_timeout
)

if response.status_code == 401:
raise APIAccessDenied("Unauthorized")
if response.status_code == 403:
raise APIInsufficientQuota("Insufficient max_quota for API method")
if response.status_code == 404:
raise APIResourceNotFound(f"Path not found {path}")
if response.status_code == 429:
raise APIInsufficientQuota("Insufficient quota for API route")
if response.status_code >= 400:
raise APIFailure("Bad Request")

return response

except Exception as error:
response = Response(text=f"{error}", error=True, status_code=500)
raise APIFailure(response)
35 changes: 13 additions & 22 deletions socketdev/dependencies/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,18 @@
import socketdev
from socketdev.tools import load_files
from urllib.parse import urlencode
import json
from urllib.parse import urlencode

from socketdev.tools import load_files


class Dependencies:
@staticmethod
def post(files: list, params: dict) -> dict:
def __init__(self, api):
self.api = api

def post(self, files: list, params: dict) -> dict:
loaded_files = []
loaded_files = load_files(files, loaded_files)
path = "dependencies/upload?" + urlencode(params)
response = socketdev.do_request(
path=path,
files=loaded_files,
method="POST"
)
response = self.api.do_request(path=path, files=loaded_files, method="POST")
if response.status_code == 200:
result = response.json()
else:
Expand All @@ -23,22 +21,15 @@ def post(files: list, params: dict) -> dict:
print(response.text)
return result

@staticmethod
def get(
limit: int = 50,
offset: int = 0,
self,
limit: int = 50,
offset: int = 0,
) -> dict:
path = "dependencies/search"
payload = {
"limit": limit,
"offset": offset
}
payload = {"limit": limit, "offset": offset}
payload_str = json.dumps(payload)
response = socketdev.do_request(
path=path,
method="POST",
payload=payload_str
)
response = self.api.do_request(path=path, method="POST", payload=payload_str)
if response.status_code == 200:
result = response.json()
else:
Expand Down
18 changes: 9 additions & 9 deletions socketdev/export/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from urllib.parse import urlencode
from dataclasses import dataclass, asdict
from typing import Optional
import socketdev


@dataclass
Expand All @@ -21,8 +20,10 @@ def to_query_params(self) -> str:


class Export:
@staticmethod
def cdx_bom(org_slug: str, id: str, query_params: Optional[ExportQueryParams] = None) -> dict:
def __init__(self, api):
self.api = api

def cdx_bom(self, org_slug: str, id: str, query_params: Optional[ExportQueryParams] = None) -> dict:
"""
Export a Socket SBOM as a CycloneDX SBOM
:param org_slug: String - The slug of the organization
Expand All @@ -33,16 +34,15 @@ def cdx_bom(org_slug: str, id: str, query_params: Optional[ExportQueryParams] =
path = f"orgs/{org_slug}/export/cdx/{id}"
if query_params:
path += query_params.to_query_params()
result = socketdev.do_request(path=path)
response = self.api.do_request(path=path)
try:
sbom = result.json()
sbom = response.json()
sbom["success"] = True
except Exception as error:
sbom = {"success": False, "message": str(error)}
return sbom

@staticmethod
def spdx_bom(org_slug: str, id: str, query_params: Optional[ExportQueryParams] = None) -> dict:
def spdx_bom(self, org_slug: str, id: str, query_params: Optional[ExportQueryParams] = None) -> dict:
"""
Export a Socket SBOM as an SPDX SBOM
:param org_slug: String - The slug of the organization
Expand All @@ -53,9 +53,9 @@ def spdx_bom(org_slug: str, id: str, query_params: Optional[ExportQueryParams] =
path = f"orgs/{org_slug}/export/spdx/{id}"
if query_params:
path += query_params.to_query_params()
result = socketdev.do_request(path=path)
response = self.api.do_request(path=path)
try:
sbom = result.json()
sbom = response.json()
sbom["success"] = True
except Exception as error:
sbom = {"success": False, "message": str(error)}
Expand Down
Loading