Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification worker #46

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public class WorkerFeatureFlipper {
public final static String BLACKLIST_SENSE = "blacklist_sense";
public static final String CLEAR_ALL_CACHE = "worker_clear_all_cache";
public static final String PG_CACHE = "worker_pg_cache";
public static final String PUSH_NOTIFICATIONS_ENABLED = "push_notifications_enabled";
public static final String SEND_TO_SEGMENT = "send_to_segment";
public static final String SEND_TO_SEGMENT_WAVE = "send_to_segment_wave";
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.hello.suripu.core.models.MobilePushRegistration;
import com.hello.suripu.core.notifications.NotificationSubscriptionsReadDAO;
import org.slf4j.Logger;
Expand All @@ -25,8 +26,8 @@ class MobilePushNotificationProcessor {
private final NotificationSubscriptionsReadDAO dao;
private final PushNotificationEventDynamoDB pushNotificationEventDynamoDB;

public MobilePushNotificationProcessor(final AmazonSNS sns, final NotificationSubscriptionsReadDAO dao,
final PushNotificationEventDynamoDB pushNotificationEventDynamoDB) {
MobilePushNotificationProcessor(final AmazonSNS sns, final NotificationSubscriptionsReadDAO dao,
final PushNotificationEventDynamoDB pushNotificationEventDynamoDB) {
this.sns = sns;
this.dao = dao;
this.pushNotificationEventDynamoDB = pushNotificationEventDynamoDB;
Expand All @@ -37,7 +38,6 @@ public PushNotificationEventDynamoDB getPushNotificationEventDynamoDB() {
}

public void push(final PushNotificationEvent event) {

// We often want at-most-once delivery of push notifications, so we insert the record to DDB first.
// That way if something later in this method fails, we won't accidentally send the same notification twice.
final boolean successfullyInserted = pushNotificationEventDynamoDB.insert(event);
Expand Down Expand Up @@ -71,21 +71,20 @@ public void push(final PushNotificationEvent event) {
}
}

// https://developer.apple.com/library/ios/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/Chapters/TheNotificationPayload.html#//apple_ref/doc/uid/TP40008194-CH107-SW1
private Optional<String> makeAPNSMessage(final HelloPushMessage message) {
final Map<String, String> messageMap = new HashMap<>();
final Map<String, String> content = new HashMap<>();
final Map<String, Object> appleMessageMap = new HashMap<>();
final Map<String, Object> appMessageMap = new HashMap<>();

content.put("body", message.body);
content.put("target", message.target);
content.put("details", message.details);


appMessageMap.put("alert", content);
appMessageMap.put("sound", "default");

appleMessageMap.put("aps", appMessageMap);
final Map<String, Object> appleMessageMap = ImmutableMap.<String, Object>of(
"view", message.target,
"aps", ImmutableMap.of(
"sound", "default",
"alert", ImmutableMap.of(
"title", message.body,
"body", message.details
)
)
);

final ObjectMapper mapper = new ObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.hello.suripu.workers.notifications;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.dropwizard.Configuration;

import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;

/**
* Created by jakepiccolo on 5/17/16.
*/
public class NotificationConfig extends Configuration {

@Valid
@JsonProperty("days_between_notifications")
private Integer daysBetweenNotifications = 7;
public Integer getDaysBetweenNotifications() { return daysBetweenNotifications; }

@Valid
@JsonProperty("min_hour_of_day")
@Min(0)
@Max(24)
private Integer minHourOfDay = 11;
public Integer getMinHourOfDay() { return minHourOfDay; }

@Valid
@JsonProperty("max_hour_of_day")
@Min(0)
@Max(24)
private Integer maxHourOfDay = 20;
public Integer getMaxHourOfDay() { return maxHourOfDay; }
}
Original file line number Diff line number Diff line change
@@ -1,66 +1,80 @@
package com.hello.suripu.workers.notifications;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import com.hello.suripu.api.input.DataInputProtos;
import com.hello.suripu.core.db.MergedUserInfoDynamoDB;
import com.hello.suripu.core.models.Calibration;
import com.hello.suripu.core.models.CurrentRoomState;
import com.hello.suripu.core.models.Sensor;
import com.hello.suripu.core.db.responses.Response;
import com.hello.suripu.core.models.UserInfo;
import com.hello.suripu.core.preferences.AccountPreferencesDynamoDB;
import com.hello.suripu.core.preferences.PreferenceName;
import com.hello.suripu.core.util.DateTimeUtil;
import com.hello.suripu.workers.WorkerFeatureFlipper;
import com.hello.suripu.workers.framework.HelloBaseRecordProcessor;
import com.hello.suripu.workers.protobuf.notifications.PushNotification;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Collections;
import java.util.List;
import java.util.Set;

public class PushNotificationsProcessor extends HelloBaseRecordProcessor {

private final static Logger LOGGER = LoggerFactory.getLogger(PushNotificationsProcessor.class);

private final MobilePushNotificationProcessor mobilePushNotificationProcessor;
private final MergedUserInfoDynamoDB mergedUserInfoDynamoDB;

private final AccountPreferencesDynamoDB accountPreferencesDynamoDB;
private final ImmutableSet<Integer> activeHours;
private final Set<String> sent = Sets.newHashSet();
private final PushNotificationsWorkerConfiguration pushNotificationsWorkerConfiguration;

public PushNotificationsProcessor(
final MobilePushNotificationProcessor mobilePushNotificationProcessor,
final MergedUserInfoDynamoDB mergedUserInfoDynamoDB,
final AccountPreferencesDynamoDB accountPreferencesDynamoDB,
final Set<Integer> activeHours) {
final PushNotificationsWorkerConfiguration pushNotificationsWorkerConfiguration)
{
this.mobilePushNotificationProcessor = mobilePushNotificationProcessor;
this.mergedUserInfoDynamoDB = mergedUserInfoDynamoDB;
this.accountPreferencesDynamoDB = accountPreferencesDynamoDB;
this.activeHours = ImmutableSet.copyOf(activeHours);
this.pushNotificationsWorkerConfiguration = pushNotificationsWorkerConfiguration;
}


private enum NotificationType {
PILL_BATTERY
}


@Override
public void initialize(String s) {

}


private PushNotification.UserPushNotification parseProtobuf(final Record record) throws InvalidProtocolBufferException {
final PushNotification.UserPushNotification pushNotification = PushNotification.UserPushNotification.parseFrom(record.getData().array());
if (!pushNotification.hasAccountId()) {
throw new IllegalArgumentException("Protobuf must contain accountId");
}
if (!pushNotification.hasSenseId()) {
throw new IllegalArgumentException("Protobuf must contain senseId");
}
return pushNotification;
}


@Override
public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {

for(final Record record : records) {
DataInputProtos.BatchPeriodicDataWorker batchPeriodicDataWorker;
try {
batchPeriodicDataWorker = DataInputProtos.BatchPeriodicDataWorker.parseFrom(record.getData().array());
sendMessage(batchPeriodicDataWorker.getData());
final PushNotification.UserPushNotification pushNotification = parseProtobuf(record);
sendNotification(pushNotification);
} catch (InvalidProtocolBufferException e) {
LOGGER.error("Failed parsing protobuf: {}", e.getMessage());
LOGGER.error("Moving to next record");
Expand All @@ -70,89 +84,95 @@ public void processRecords(final List<Record> records, final IRecordProcessorChe
}
}

// We do not checkpoint since we are using LATEST strategy, only going through new messages
try {
iRecordProcessorCheckpointer.checkpoint();
} catch (InvalidStateException e) {
LOGGER.error("error=InvalidStateException exception={}", e);
} catch (ShutdownException e) {
LOGGER.error("error=ShutdownException exception={}", e);
}
}

private void sendNotification(final PushNotification.UserPushNotification userPushNotification) {
if (!userPushNotification.hasAccountId() || !userPushNotification.hasSenseId()) {
LOGGER.warn("warning=require_sense_id_and_account_id account_id={} sense_id={}",
userPushNotification.getAccountId(), userPushNotification.getSenseId());
return;
}

/**
* Send push notifications if conditions warrant it and within the hours
* @param batched_periodic_data
*/
private void sendMessage(final DataInputProtos.batched_periodic_data batched_periodic_data) {
final String senseId = batched_periodic_data.getDeviceId();
final List<UserInfo> userInfos = mergedUserInfoDynamoDB.getInfo(senseId);
for(final UserInfo userInfo : userInfos) {
final Long accountId = userPushNotification.getAccountId();
final String senseId = userPushNotification.getSenseId();
final Optional<UserInfo> userInfoOptional = mergedUserInfoDynamoDB.getInfo(senseId, accountId);

if(!userHasPushNotificationsEnabled(userInfo.accountId)) {
continue;
}

final Optional<DateTimeZone> dateTimeZoneOptional = userInfo.timeZone;
if(!dateTimeZoneOptional.isPresent()) {
LOGGER.warn("No timezone for account: {} paired to Sense: {}", userInfo.accountId, senseId);
continue;
}
if(!accountPreferencesDynamoDB.isEnabled(accountId, PreferenceName.PUSH_ALERT_CONDITIONS)) {
LOGGER.debug("push_notification_status=disabled account_id={}", accountId);
return;
}

final DateTime nowInLocalTimeZone = DateTime.now().withZone(dateTimeZoneOptional.get());
if(!activeHours.contains(nowInLocalTimeZone.getHourOfDay())) {
continue;
}
if (!userInfoOptional.isPresent() || !userInfoOptional.get().timeZone.isPresent()) {
LOGGER.error("error=could_not_get_timezone account_id={} sense_id={}", accountId, senseId);
return;
}

final String key = String.format("%s-%s", String.valueOf(userInfo.accountId), nowInLocalTimeZone.toString(DateTimeFormat.forPattern(DateTimeUtil.DYNAMO_DB_DATE_FORMAT)));
if(sent.contains(key)) {
LOGGER.info("Account {}, already received push notification: {}", userInfo.accountId, key);
continue;
}
final boolean pushNotificationsEnabled = flipper.userFeatureActive(
WorkerFeatureFlipper.PUSH_NOTIFICATIONS_ENABLED, accountId, Collections.<String>emptyList());
if (!pushNotificationsEnabled) {
LOGGER.debug("push_notifications_enabled=false account_id={}", accountId);
return;
}

if(!accountPreferencesDynamoDB.isEnabled(userInfo.accountId, PreferenceName.PUSH_ALERT_CONDITIONS)) {
continue;
LOGGER.info("push_notifications_enabled=true account_id={}", accountId);

final DateTimeZone userTimeZone = userInfoOptional.get().timeZone.get();
final DateTime nowUserTime = DateTime.now(userTimeZone);
final DateTime nowUTC = new DateTime(nowUserTime, DateTimeZone.UTC);

PushNotificationEvent pushNotificationEvent = null;

if (userPushNotification.hasPillBatteryLow()) {
final NotificationConfig pillBatteryConfig = pushNotificationsWorkerConfiguration.getPillBatteryConfig();
if (shouldSendNotification(accountId, nowUserTime, pillBatteryConfig, NotificationType.PILL_BATTERY)) {
pushNotificationEvent = PushNotificationEvent.newBuilder()
.withAccountId(accountId)
.withType(NotificationType.PILL_BATTERY.name())
.withTimestamp(nowUTC)
// TODO figure out what body/target/details to use
// TODO externalize this text to a DB
.withHelloPushMessage(new HelloPushMessage("body", "target", "details"))
.build();
}
}
// TODO rest of the push notifications

// TODO: write to cache to avoid sending multiple notifications
for(DataInputProtos.periodic_data data: batched_periodic_data.getDataList()) {
final Long timestampMillis = data.getUnixTime() * 1000L;
final DateTime roundedDateTime = new DateTime(timestampMillis, DateTimeZone.UTC).withSecondOfMinute(0);
final DateTime now = DateTime.now(DateTimeZone.UTC);
final CurrentRoomState currentRoomState = CurrentRoomState.fromRawData(data.getTemperature(), data.getHumidity(), data.getDustMax(), data.getLight(), data.getAudioPeakBackgroundEnergyDb(), data.getAudioPeakDisturbanceEnergyDb(),
roundedDateTime.getMillis(),
data.getFirmwareVersion(),
now,
10,
Optional.of(Calibration.createDefault(data.getDeviceId()))); // TODO: adjust threshold
final Optional<HelloPushMessage> messageOptional = getMostImportantSensorState(currentRoomState);
if(messageOptional.isPresent()) {
LOGGER.info("Sending push notifications to user: {}. Message: {}", userInfo.accountId, messageOptional.get());
final PushNotificationEvent event = PushNotificationEvent.newBuilder()
.withAccountId(userInfo.accountId)
.withHelloPushMessage(messageOptional.get())
.withSenseId(senseId)
.withType("room_conditions")
.build();
mobilePushNotificationProcessor.push(event);
sent.add(key);
}
break;
}
if (pushNotificationEvent != null) {
mobilePushNotificationProcessor.push(pushNotificationEvent);
}
}

/**
* Prioritizes conditions alerts based on Sensors
* @param currentRoomState
* @return
*/
private Optional<HelloPushMessage> getMostImportantSensorState(final CurrentRoomState currentRoomState) {
final HashSet<CurrentRoomState.State.Condition> notificationStates = Sets.newHashSet(CurrentRoomState.State.Condition.ALERT, CurrentRoomState.State.Condition.WARNING);

if(notificationStates.contains(currentRoomState.temperature.condition)) {
return Optional.of(HelloPushMessage.fromSensors(currentRoomState.temperature.message, Sensor.TEMPERATURE));
protected boolean shouldSendNotification(final Long accountId,
final DateTime nowUserLocalTime,
final NotificationConfig notificationConfig,
final NotificationType notificationType)
{
final Integer minHour = notificationConfig.getMinHourOfDay();
final Integer maxHour = notificationConfig.getMaxHourOfDay();
final Integer daysBetweenNotifications = notificationConfig.getDaysBetweenNotifications();
final DateTime nowUTC = new DateTime(nowUserLocalTime, DateTimeZone.UTC);

// Ensure we send within the correct window
if (nowUserLocalTime.hourOfDay().get() < minHour || nowUserLocalTime.hourOfDay().get() > maxHour) {
LOGGER.debug("battery_notification_status=not_sent reason=wrong_time_of_day local_hour={} account_id={}",
nowUserLocalTime.hourOfDay().get(), accountId);
return false;
}

if(notificationStates.contains(currentRoomState.humidity.condition)) {
return Optional.of(HelloPushMessage.fromSensors(currentRoomState.humidity.message, Sensor.HUMIDITY));
}
final Response<List<PushNotificationEvent>> sentNotificationsResponse = mobilePushNotificationProcessor.
getPushNotificationEventDynamoDB()
.query(accountId, nowUTC.minusDays(daysBetweenNotifications), nowUTC.plusMinutes(10), notificationType.name());

return Optional.absent();
// If we couldn't successfully query, don't send a notification, just to be safe.
// Also only send a notification if we haven't sent one within a specified time.
return (sentNotificationsResponse.status == Response.Status.SUCCESS) && sentNotificationsResponse.data.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,6 @@ public IRecordProcessor createProcessor() {
final AmazonDynamoDB accountPreferencesDynamoDBClient = dynamoDBClientFactory.getForTable(DynamoDBTableName.PREFERENCES);
final AccountPreferencesDynamoDB accountPreferencesDynamoDB = AccountPreferencesDynamoDB.create(accountPreferencesDynamoDBClient, tableNames.get(DynamoDBTableName.PREFERENCES));

return new PushNotificationsProcessor(pushNotificationProcessor, mergedUserInfoDynamoDB, accountPreferencesDynamoDB, configuration.getActiveHours());
return new PushNotificationsProcessor(pushNotificationProcessor, mergedUserInfoDynamoDB, accountPreferencesDynamoDB, configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ protected void run(Environment environment, Namespace namespace, final PushNotif
final ImmutableMap<QueueName, String> queueNames = configuration.getQueues();

LOGGER.debug("{}", queueNames);
final String queueName = queueNames.get(QueueName.SENSE_SENSORS_DATA_FANOUT_ONE);
final String queueName = queueNames.get(QueueName.PUSH_NOTIFICATIONS);
LOGGER.info("\n\n\n!!! This worker is using the following queue: {} !!!\n\n\n", queueName);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public NewDynamoDBConfiguration dynamoDBConfiguration(){
@JsonProperty("active_hours")
private Set<Integer> activeHours = Sets.newHashSet();
public Set<Integer> getActiveHours() {return activeHours;}

@Valid
@JsonProperty("pill_battery")
private NotificationConfig pillBatteryConfig;
public NotificationConfig getPillBatteryConfig() { return pillBatteryConfig; }
}