Skip to content

Commit

Permalink
Merge pull request #7 from AlmogBaku/feat/struct_handling
Browse files Browse the repository at this point in the history
feat: #1 add supports complex struct parsing with streaming
  • Loading branch information
AlmogBaku authored May 14, 2024
2 parents b467043 + 54c82df commit b9542ff
Show file tree
Hide file tree
Showing 15 changed files with 1,275 additions and 49 deletions.
52 changes: 52 additions & 0 deletions .github/workflows/pr-triage.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: "Pull Request Triage"
on:
# NB: using `pull_request_target` runs this in the context of
# the base repository, so it has permission to upload to the checks API.
# This means changes won't kick in to this file until merged onto the
# main branch.
pull_request_target:
types: [ opened, edited, reopened, synchronize ]

permissions:
contents: read
pull-requests: write
issues: write

jobs:
triage:
name: "Triage Pull Request"
runs-on: ubuntu-latest
steps:
- uses: codelytv/pr-size-labeler@v1
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
xs_label: 'size/xs'
xs_max_size: '15'
s_label: 'size/s'
s_max_size: '100'
m_label: 'size/m'
m_max_size: '500'
l_label: 'size/l'
l_max_size: '1000'
xl_label: 'size/xl'
fail_if_xl: 'false'
message_if_xl: >
This PR exceeds the recommended size of 1000 lines.
Please make sure you are NOT addressing multiple issues with one PR.
Note this PR might be rejected due to its size.
files_to_ignore: ''
# - name: "Check for PR body length"
# shell: bash
# env:
# PR_BODY: ${{ github.event.pull_request.body }}
# run: |
# if [ ${#PR_BODY} -lt 80 ]; then
# echo "::error title=PR body is too short::Your PR is probably isn't descriptive enough.\nYou should give a description that highlights both what you're doing it and *why* you're doing it. Someone reading the PR description without clicking any issue links should be able to roughly understand what's going on."
# exit 1
# fi
- uses: amannn/action-semantic-pull-request@v5
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
disallowScopes: |
release
16 changes: 2 additions & 14 deletions .github/workflows/publish.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,8 @@ jobs:
name: "Run tests"
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.9'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install ruff pytest
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Test with pytest
run: |
pytest --ignore=tests/example.py --doctest-modules --junitxml=junit/test-results.xml
- name: Test
uses: ./.github/workflows/test.yaml
version:
runs-on: ubuntu-latest
outputs:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ jobs:
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Test with pytest
run: |
pytest --ignore=tests/example.py --doctest-modules --junitxml=junit/test-results.xml
pytest --ignore=tests/example.py --ignore=tests/example_struct.py --doctest-modules --junitxml=junit/test-results.xml
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ dist
openai_streaming.egg-info/
.benchmarks
junit
.venv
115 changes: 114 additions & 1 deletion docs/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,28 @@

# decorator

<a id="decorator.OpenAIStreamingFunction"></a>

## OpenAIStreamingFunction Objects

```python
class OpenAIStreamingFunction(Protocol)
```

A Protocol that represents a function that can be used with OpenAI Streaming.

<a id="decorator.OpenAIStreamingFunction.openai_schema"></a>

#### openai\_schema

The OpenAI Schema for the function.

<a id="decorator.openai_streaming_function"></a>

#### openai\_streaming\_function

```python
def openai_streaming_function(func: FunctionType) -> Any
def openai_streaming_function(func: F) -> OpenAIStreamingFunction
```

Decorator that creates an OpenAI Schema for your function, while support using Generators for Streaming.
Expand Down Expand Up @@ -189,3 +205,100 @@ dictionary of arguments

A set of function names that were invoked

<a id="struct.handler"></a>

# struct.handler

<a id="struct.handler.BaseHandler"></a>

## BaseHandler Objects

```python
class BaseHandler(Protocol[TModel])
```

The base handler for the structured response from OpenAI.

<a id="struct.handler.BaseHandler.model"></a>

#### model

```python
def model() -> Type[TModel]
```

The Pydantic Data Model that we parse

**Returns**:

type of the Pydantic model

<a id="struct.handler.BaseHandler.handle_partially_parsed"></a>

#### handle\_partially\_parsed

```python
async def handle_partially_parsed(data: TModel) -> Optional[Terminate]
```

Handle partially parsed model

**Arguments**:

- `data`: The partially parsed object

**Returns**:

None or Terminate if we want to terminate the parsing

<a id="struct.handler.BaseHandler.terminated"></a>

#### terminated

```python
async def terminated()
```

Called when the parsing was terminated

<a id="struct.handler.process_struct_response"></a>

#### process\_struct\_response

```python
async def process_struct_response(
response: OAIResponse,
handler: BaseHandler,
output_serialization: OutputSerialization = "json"
) -> Tuple[Optional[Union[TModel, Terminate]], Dict[str, Any]]
```

Process the structured response from OpenAI.

This is useful when we want to parse a structured response from OpenAI in streaming mode. For example: our response
contains reasoning, and content - but we want to stream only the content to the user.

**Arguments**:

- `response`: The response from OpenAI
- `handler`: The handler for the response. It should be a subclass of `BaseHandler`
- `output_serialization`: The output serialization of the response. It should be either "json" or "yaml"

**Returns**:

A tuple of the last parsed response, and a dictionary containing the OpenAI response

<a id="struct.yaml_parser"></a>

# struct.yaml\_parser

<a id="struct.yaml_parser.YamlParser"></a>

## YamlParser Objects

```python
class YamlParser(Parser)
```

Parse partial YAML

1 change: 1 addition & 0 deletions openai_streaming/struct/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .handler import process_struct_response, Terminate, BaseHandler
134 changes: 134 additions & 0 deletions openai_streaming/struct/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from typing import Protocol, Literal, AsyncGenerator, Optional, Type, TypeVar, Union, Dict, Any, Tuple

from pydantic import BaseModel

from json_streamer import Parser, JsonParser
from .yaml_parser import YamlParser
from ..stream_processing import OAIResponse, process_response

TModel = TypeVar('TModel', bound=BaseModel)


class Terminate:
pass


class BaseHandler(Protocol[TModel]):
"""
The base handler for the structured response from OpenAI.
"""

def model(self) -> Type[TModel]:
"""
The Pydantic Data Model that we parse
:return: type of the Pydantic model
"""
pass

async def handle_partially_parsed(self, data: TModel) -> Optional[Terminate]:
"""
Handle partially parsed model
:param data: The partially parsed object
:return: None or Terminate if we want to terminate the parsing
"""
pass

async def terminated(self):
"""
Called when the parsing was terminated
"""


OutputSerialization = Literal["json", "yaml"]


class _ContentHandler:
parser: Parser = None
_last_resp: Optional[Union[TModel, Terminate]] = None

def __init__(self, handler: BaseHandler, output_serialization: OutputSerialization = "yaml"):
self.handler = handler
if output_serialization == "json":
self.parser = JsonParser()
elif output_serialization == "yaml":
self.parser = YamlParser()

async def handle_content(self, content: AsyncGenerator[str, None]):
"""
Handle the content of the response from OpenAI.
:param content: A generator that yields the content of the response from OpenAI
:return: None
"""

loader = self.parser() # create a Streaming loader
next(loader)

last_resp = None

async for token in content:
parsed = loader.send(token) # send the token to the JSON loader
while parsed: # loop until through the parsed parts as the loader yields them
last_resp = await self._handle_parsed(parsed[1]) # handle the parsed dict of the response
if isinstance(last_resp, Terminate):
break
try:
parsed = next(loader)
except StopIteration:
break
if isinstance(last_resp, Terminate):
break

if not last_resp:
return
if isinstance(last_resp, Terminate):
await self.handler.terminated()

self._last_resp = last_resp

async def _handle_parsed(self, part) -> Optional[Union[TModel, Terminate]]:
"""
Handle a parsed part of the response from OpenAI.
It parses the "parsed dictionary" as a type of `TModel` object and processes it with the handler.
:param part: A dictionary containing the parsed part of the response
:return: The parsed part of the response as an `TModel` object, `Terminate` to terminate the handling,
or `None` if the part is not valid
"""
try:
parsed = self.handler.model()(**part)
except (TypeError, ValueError):
return

ret = await self.handler.handle_partially_parsed(parsed)
return ret if ret else parsed

def get_last_response(self) -> Optional[Union[TModel, Terminate]]:
"""
Get the last response from OpenAI.
:return: The last response from OpenAI
"""
return self._last_resp


async def process_struct_response(
response: OAIResponse,
handler: BaseHandler,
output_serialization: OutputSerialization = "json"
) -> Tuple[Optional[Union[TModel, Terminate]], Dict[str, Any]]:
"""
Process the structured response from OpenAI.
This is useful when we want to parse a structured response from OpenAI in streaming mode. For example: our response
contains reasoning, and content - but we want to stream only the content to the user.
:param response: The response from OpenAI
:param handler: The handler for the response. It should be a subclass of `BaseHandler`
:param output_serialization: The output serialization of the response. It should be either "json" or "yaml"
:return: A tuple of the last parsed response, and a dictionary containing the OpenAI response
"""

handler = _ContentHandler(handler, output_serialization)
_, result = await process_response(response, handler.handle_content, self=handler)
if not handler.get_last_response():
raise ValueError("Probably invalid response from OpenAI")

return handler.get_last_response(), result
27 changes: 27 additions & 0 deletions openai_streaming/struct/yaml_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import List, Dict, Tuple, Generator, Optional
from json_streamer import Parser, ParseState


class YamlParser(Parser):
"""
Parse partial YAML
"""

@staticmethod
def opening_symbols() -> List[chr]:
return ['{', '[', '"']

def raw_decode(self, s: str) -> Tuple[Dict, int]:
try:
from yaml import safe_load
except ImportError:
raise ImportError("You must install PyYAML to use the YamlParser: pip install PyYAML")
return safe_load(s), -1

def parse_part(self, part: str) -> Generator[Tuple[ParseState, dict], None, None]:
for y in super().parse_part(part):
yield ParseState.UNKNOWN, y[1]


def loads(s: Optional[Generator[chr, None, None]] = None) -> Generator[Tuple[ParseState, dict], Optional[str], None]:
return YamlParser()(s)
1 change: 1 addition & 0 deletions pydoc-markdown.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# build with: pipx run pydoc-markdown
loaders:
- type: python
search_path:
Expand Down
Loading

0 comments on commit b9542ff

Please sign in to comment.