Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try batch inserts #1533

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
46 changes: 44 additions & 2 deletions app/celery/scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
from datetime import timedelta

from flask import current_app
from sqlalchemy import between
from sqlalchemy.exc import SQLAlchemyError

from app import notify_celery, zendesk_client
from app import notify_celery, redis_store, zendesk_client
from app.celery.tasks import (
get_recipient_csv_and_template_and_sender_id,
process_incomplete_jobs,
Expand All @@ -24,6 +25,7 @@
find_missing_row_for_job,
)
from app.dao.notifications_dao import (
dao_batch_insert_notifications,
dao_close_out_delivery_receipts,
dao_update_delivery_receipts,
notifications_not_yet_sent,
Expand All @@ -34,7 +36,7 @@
)
from app.dao.users_dao import delete_codes_older_created_more_than_a_day_ago
from app.enums import JobStatus, NotificationType
from app.models import Job
from app.models import Job, Notification
from app.notifications.process_notifications import send_notification_to_queue
from app.utils import utc_now
from notifications_utils import aware_utcnow
Expand Down Expand Up @@ -286,3 +288,43 @@ def process_delivery_receipts(self):
)
def cleanup_delivery_receipts(self):
dao_close_out_delivery_receipts()


@notify_celery.task(bind=True, name="batch-insert-notifications")
def batch_insert_notifications(self):
current_app.logger.info("ENTER SCHEDULED TASK")
batch = []
# with redis_store.pipeline():
# while redis_store.llen("message_queue") > 0:
# redis_store.lpop("message_queue")
# current_app.logger.info("EMPTY!")
# return
with redis_store.pipeline():
current_app.logger.info("PIPELINE")
# since this list is always growing, just grab what is available when
# this call is made and process that.
current_len = redis_store.llen("message_queue")
count = 0
while count < current_len:
count = count + 1
notification_bytes = redis_store.lpop("message_queue")
notification_dict = json.loads(notification_bytes.decode("utf-8"))
notification_dict["status"] = notification_dict.pop("notification_status")
notification_dict["created_at"] = utc_now()
notification = Notification(**notification_dict)
current_app.logger.info(
f"WHAT IS THIS NOTIFICATION {type(notification)} {notification}"
)
if notification is not None:
current_app.logger.info(
f"SCHEDULED adding notification {notification.id} to batch"
)
batch.append(notification)
try:
current_app.logger.info("GOING TO DO BATCH INSERT")
dao_batch_insert_notifications(batch)
except Exception as e:
current_app.logger.exception(f"Notification batch insert failed {e}")

for msg in batch:
redis_store.rpush("notification_queue", json.dumps(msg))
2 changes: 1 addition & 1 deletion app/celery/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def save_sms(self, service_id, notification_id, encrypted_notification, sender_i
)
)
provider_tasks.deliver_sms.apply_async(
[str(saved_notification.id)], queue=QueueNames.SEND_SMS
[str(saved_notification.id)], queue=QueueNames.SEND_SMS, countdown=30
)

current_app.logger.debug(
Expand Down
5 changes: 5 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ class Config(object):
"schedule": timedelta(minutes=82),
"options": {"queue": QueueNames.PERIODIC},
},
"batch-insert-notifications": {
"task": "batch-insert-notifications",
"schedule": 10.0,
"options": {"queue": QueueNames.PERIODIC},
},
"expire-or-delete-invitations": {
"task": "expire-or-delete-invitations",
"schedule": timedelta(minutes=66),
Expand Down
12 changes: 12 additions & 0 deletions app/dao/notifications_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from datetime import timedelta
from time import time

import sqlalchemy
from flask import current_app
from sqlalchemy import (
TIMESTAMP,
Expand Down Expand Up @@ -799,3 +800,14 @@ def dao_close_out_delivery_receipts():
current_app.logger.info(
f"Marked {result.rowcount} notifications as technical failures"
)


def dao_batch_insert_notifications(batch):
current_app.logger.info("DOING BATCH INSERT IN DAO")
try:
db.session.bulk_save_objects(batch)
db.session.commit()
current_app.logger.info(f"SUCCESSFULLY INSERTED: {len(batch)}")
return len(batch)
except sqlalchemy.exc.SQLAlchemyError as e:
current_app.logger.exception(f"Error during batch insert {e}")
28 changes: 27 additions & 1 deletion app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from sqlalchemy import CheckConstraint, Index, UniqueConstraint
from sqlalchemy.dialects.postgresql import JSON, JSONB, UUID
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.declarative import DeclarativeMeta, declared_attr
from sqlalchemy.orm import validates
from sqlalchemy.orm.collections import attribute_mapped_collection

Expand Down Expand Up @@ -1694,6 +1694,32 @@ def get_created_by_email_address(self):
else:
return None

def serialize_for_redis(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
fields = {}
for column in obj.__table__.columns:
if column.name == "notification_status":
new_name = "status"
value = getattr(obj, new_name)
elif column.name == "created_at":
value = (obj.created_at.strftime("%Y-%m-%d %H:%M:%S"),)
elif column.name in ["sent_at", "completed_at"]:
value = None
elif column.name.endswith("_id"):
value = getattr(obj, column.name)
value = str(value)
else:
value = getattr(obj, column.name)
if column.name in ["message_id", "api_key_id"]:
pass # do nothing because we don't have the message id yet
else:
fields[column.name] = value
current_app.logger.warning(f"FIELDS {fields}")
print(f"FIELDS {fields}", flush=True)

return fields
raise ValueError("Provided object is not a SQLAlchemy instance")

def serialize_for_csv(self):
serialized = {
"row_number": (
Expand Down
26 changes: 14 additions & 12 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import json
import os
import uuid

from flask import current_app
Expand All @@ -11,7 +13,7 @@
dao_notification_exists,
get_notification_by_id,
)
from app.enums import KeyType, NotificationStatus, NotificationType
from app.enums import NotificationStatus, NotificationType
from app.errors import BadRequestError
from app.models import Notification
from app.utils import hilite, utc_now
Expand Down Expand Up @@ -139,18 +141,18 @@ def persist_notification(

# if simulated create a Notification model to return but do not persist the Notification to the dB
if not simulated:
current_app.logger.info("Firing dao_create_notification")
dao_create_notification(notification)
if key_type != KeyType.TEST and current_app.config["REDIS_ENABLED"]:
current_app.logger.info(
"Redis enabled, querying cache key for service id: {}".format(
service.id
if notification.notification_type == NotificationType.SMS:
# it's just too hard with redis and timing to test this here
if os.getenv("NOTIFY_ENVIRONMENT") == "test":
dao_create_notification(notification)
else:
redis_store.rpush(
"message_queue",
json.dumps(notification.serialize_for_redis(notification)),
)
)
else:
dao_create_notification(notification)

current_app.logger.info(
f"{notification_type} {notification_id} created at {notification_created_at}"
)
return notification


Expand All @@ -172,7 +174,7 @@ def send_notification_to_queue_detached(
deliver_task = provider_tasks.deliver_email

try:
deliver_task.apply_async([str(notification_id)], queue=queue)
deliver_task.apply_async([str(notification_id)], queue=queue, countdown=30)
except Exception:
dao_delete_notifications_by_id(notification_id)
raise
Expand Down
16 changes: 16 additions & 0 deletions notifications_utils/clients/redis/redis_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class RedisClient:
active = False
scripts = {}

@classmethod
def pipeline(cls):
return cls.redis_store.pipeline()

def init_app(self, app):
self.active = app.config.get("REDIS_ENABLED")
if self.active:
Expand Down Expand Up @@ -156,6 +160,18 @@ def get(self, key, raise_exception=False):

return None

def rpush(self, key, value):
if self.active:
self.redis_store.rpush(key, value)

def lpop(self, key):
if self.active:
return self.redis_store.lpop(key)

def llen(self, key):
if self.active:
return self.redis_store.llen(key)

def delete(self, *keys, raise_exception=False):
keys = [prepare_value(k) for k in keys]
if self.active:
Expand Down
4 changes: 2 additions & 2 deletions tests/app/celery/test_scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ def test_replay_created_notifications(notify_db_session, sample_service, mocker)

replay_created_notifications()
email_delivery_queue.assert_called_once_with(
[str(old_email.id)], queue="send-email-tasks"
[str(old_email.id)], queue="send-email-tasks", countdown=30
)
sms_delivery_queue.assert_called_once_with(
[str(old_sms.id)], queue="send-sms-tasks"
[str(old_sms.id)], queue="send-sms-tasks", countdown=30
)


Expand Down
8 changes: 4 additions & 4 deletions tests/app/celery/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ def test_should_send_template_to_correct_sms_task_and_persist(
assert persisted_notification.personalisation == {}
assert persisted_notification.notification_type == NotificationType.SMS
mocked_deliver_sms.assert_called_once_with(
[str(persisted_notification.id)], queue="send-sms-tasks"
[str(persisted_notification.id)], queue="send-sms-tasks", countdown=30
)


Expand Down Expand Up @@ -470,7 +470,7 @@ def test_should_save_sms_if_restricted_service_and_valid_number(
assert not persisted_notification.personalisation
assert persisted_notification.notification_type == NotificationType.SMS
provider_tasks.deliver_sms.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-sms-tasks"
[str(persisted_notification.id)], queue="send-sms-tasks", countdown=30
)


Expand Down Expand Up @@ -598,7 +598,7 @@ def test_should_save_sms_template_to_and_persist_with_job_id(sample_job, mocker)
assert persisted_notification.notification_type == NotificationType.SMS

provider_tasks.deliver_sms.apply_async.assert_called_once_with(
[str(persisted_notification.id)], queue="send-sms-tasks"
[str(persisted_notification.id)], queue="send-sms-tasks", countdown=30
)


Expand Down Expand Up @@ -938,7 +938,7 @@ def test_save_sms_uses_sms_sender_reply_to_text(mocker, notify_db_session):
notification = _notification_json(template, to="2028675301")
mocker.patch("app.celery.provider_tasks.deliver_sms.apply_async")

notification_id = uuid.uuid4()
notification_id = str(uuid.uuid4())
save_sms(
service.id,
notification_id,
Expand Down
7 changes: 4 additions & 3 deletions tests/app/notifications/test_process_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ def test_send_notification_to_queue(

send_notification_to_queue(notification=notification, queue=requested_queue)

mocked.assert_called_once_with([str(notification.id)], queue=expected_queue)
mocked.assert_called_once_with(
[str(notification.id)], queue=expected_queue, countdown=30
)


def test_send_notification_to_queue_throws_exception_deletes_notification(
Expand All @@ -276,8 +278,7 @@ def test_send_notification_to_queue_throws_exception_deletes_notification(
with pytest.raises(Boto3Error):
send_notification_to_queue(sample_notification, False)
mocked.assert_called_once_with(
[(str(sample_notification.id))],
queue="send-sms-tasks",
[(str(sample_notification.id))], queue="send-sms-tasks", countdown=30
)

assert _get_notification_query_count() == 0
Expand Down
2 changes: 1 addition & 1 deletion tests/app/organization/test_invite_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_create_invited_org_user(
# assert len(notification.personalisation["url"]) > len(expected_start_of_invite_url)

mocked.assert_called_once_with(
[(str(notification.id))], queue="notify-internal-tasks"
[(str(notification.id))], queue="notify-internal-tasks", countdown=30
)


Expand Down
Loading
Loading