diff --git a/src/main/java/com/hello/suripu/workers/WorkerFeatureFlipper.java b/src/main/java/com/hello/suripu/workers/WorkerFeatureFlipper.java index 96d27256..1c90e0de 100644 --- a/src/main/java/com/hello/suripu/workers/WorkerFeatureFlipper.java +++ b/src/main/java/com/hello/suripu/workers/WorkerFeatureFlipper.java @@ -5,6 +5,7 @@ public class WorkerFeatureFlipper { public final static String BLACKLIST_SENSE = "blacklist_sense"; public static final String CLEAR_ALL_CACHE = "worker_clear_all_cache"; public static final String PG_CACHE = "worker_pg_cache"; + public static final String PUSH_NOTIFICATIONS_ENABLED = "push_notifications_enabled"; public static final String SEND_TO_SEGMENT = "send_to_segment"; public static final String SEND_TO_SEGMENT_WAVE = "send_to_segment_wave"; } diff --git a/src/main/java/com/hello/suripu/workers/notifications/MobilePushNotificationProcessor.java b/src/main/java/com/hello/suripu/workers/notifications/MobilePushNotificationProcessor.java index 7fb3a96d..fb66e066 100644 --- a/src/main/java/com/hello/suripu/workers/notifications/MobilePushNotificationProcessor.java +++ b/src/main/java/com/hello/suripu/workers/notifications/MobilePushNotificationProcessor.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.hello.suripu.core.models.MobilePushRegistration; import com.hello.suripu.core.notifications.NotificationSubscriptionsReadDAO; import org.slf4j.Logger; @@ -25,8 +26,8 @@ class MobilePushNotificationProcessor { private final NotificationSubscriptionsReadDAO dao; private final PushNotificationEventDynamoDB pushNotificationEventDynamoDB; - public MobilePushNotificationProcessor(final AmazonSNS sns, final NotificationSubscriptionsReadDAO dao, - final PushNotificationEventDynamoDB pushNotificationEventDynamoDB) { + MobilePushNotificationProcessor(final AmazonSNS sns, final NotificationSubscriptionsReadDAO dao, + final PushNotificationEventDynamoDB pushNotificationEventDynamoDB) { this.sns = sns; this.dao = dao; this.pushNotificationEventDynamoDB = pushNotificationEventDynamoDB; @@ -37,7 +38,6 @@ public PushNotificationEventDynamoDB getPushNotificationEventDynamoDB() { } public void push(final PushNotificationEvent event) { - // We often want at-most-once delivery of push notifications, so we insert the record to DDB first. // That way if something later in this method fails, we won't accidentally send the same notification twice. final boolean successfullyInserted = pushNotificationEventDynamoDB.insert(event); @@ -71,21 +71,20 @@ public void push(final PushNotificationEvent event) { } } + // https://developer.apple.com/library/ios/documentation/NetworkingInternet/Conceptual/RemoteNotificationsPG/Chapters/TheNotificationPayload.html#//apple_ref/doc/uid/TP40008194-CH107-SW1 private Optional makeAPNSMessage(final HelloPushMessage message) { final Map messageMap = new HashMap<>(); - final Map content = new HashMap<>(); - final Map appleMessageMap = new HashMap<>(); - final Map appMessageMap = new HashMap<>(); - - content.put("body", message.body); - content.put("target", message.target); - content.put("details", message.details); - - - appMessageMap.put("alert", content); - appMessageMap.put("sound", "default"); - appleMessageMap.put("aps", appMessageMap); + final Map appleMessageMap = ImmutableMap.of( + "view", message.target, + "aps", ImmutableMap.of( + "sound", "default", + "alert", ImmutableMap.of( + "title", message.body, + "body", message.details + ) + ) + ); final ObjectMapper mapper = new ObjectMapper(); diff --git a/src/main/java/com/hello/suripu/workers/notifications/NotificationConfig.java b/src/main/java/com/hello/suripu/workers/notifications/NotificationConfig.java new file mode 100644 index 00000000..9637f66d --- /dev/null +++ b/src/main/java/com/hello/suripu/workers/notifications/NotificationConfig.java @@ -0,0 +1,33 @@ +package com.hello.suripu.workers.notifications; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.dropwizard.Configuration; + +import javax.validation.Valid; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +/** + * Created by jakepiccolo on 5/17/16. + */ +public class NotificationConfig extends Configuration { + + @Valid + @JsonProperty("days_between_notifications") + private Integer daysBetweenNotifications = 7; + public Integer getDaysBetweenNotifications() { return daysBetweenNotifications; } + + @Valid + @JsonProperty("min_hour_of_day") + @Min(0) + @Max(24) + private Integer minHourOfDay = 11; + public Integer getMinHourOfDay() { return minHourOfDay; } + + @Valid + @JsonProperty("max_hour_of_day") + @Min(0) + @Max(24) + private Integer maxHourOfDay = 20; + public Integer getMaxHourOfDay() { return maxHourOfDay; } +} diff --git a/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsProcessor.java b/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsProcessor.java index 6974032e..073a282c 100644 --- a/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsProcessor.java +++ b/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsProcessor.java @@ -1,31 +1,27 @@ package com.hello.suripu.workers.notifications; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason; import com.amazonaws.services.kinesis.model.Record; import com.google.common.base.Optional; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; import com.google.protobuf.InvalidProtocolBufferException; -import com.hello.suripu.api.input.DataInputProtos; import com.hello.suripu.core.db.MergedUserInfoDynamoDB; -import com.hello.suripu.core.models.Calibration; -import com.hello.suripu.core.models.CurrentRoomState; -import com.hello.suripu.core.models.Sensor; +import com.hello.suripu.core.db.responses.Response; import com.hello.suripu.core.models.UserInfo; import com.hello.suripu.core.preferences.AccountPreferencesDynamoDB; import com.hello.suripu.core.preferences.PreferenceName; -import com.hello.suripu.core.util.DateTimeUtil; +import com.hello.suripu.workers.WorkerFeatureFlipper; import com.hello.suripu.workers.framework.HelloBaseRecordProcessor; +import com.hello.suripu.workers.protobuf.notifications.PushNotification; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import org.joda.time.format.DateTimeFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashSet; +import java.util.Collections; import java.util.List; -import java.util.Set; public class PushNotificationsProcessor extends HelloBaseRecordProcessor { @@ -33,34 +29,52 @@ public class PushNotificationsProcessor extends HelloBaseRecordProcessor { private final MobilePushNotificationProcessor mobilePushNotificationProcessor; private final MergedUserInfoDynamoDB mergedUserInfoDynamoDB; - private final AccountPreferencesDynamoDB accountPreferencesDynamoDB; - private final ImmutableSet activeHours; - private final Set sent = Sets.newHashSet(); + private final PushNotificationsWorkerConfiguration pushNotificationsWorkerConfiguration; public PushNotificationsProcessor( final MobilePushNotificationProcessor mobilePushNotificationProcessor, final MergedUserInfoDynamoDB mergedUserInfoDynamoDB, final AccountPreferencesDynamoDB accountPreferencesDynamoDB, - final Set activeHours) { + final PushNotificationsWorkerConfiguration pushNotificationsWorkerConfiguration) + { this.mobilePushNotificationProcessor = mobilePushNotificationProcessor; this.mergedUserInfoDynamoDB = mergedUserInfoDynamoDB; this.accountPreferencesDynamoDB = accountPreferencesDynamoDB; - this.activeHours = ImmutableSet.copyOf(activeHours); + this.pushNotificationsWorkerConfiguration = pushNotificationsWorkerConfiguration; + } + + + private enum NotificationType { + PILL_BATTERY } + @Override public void initialize(String s) { } + + private PushNotification.UserPushNotification parseProtobuf(final Record record) throws InvalidProtocolBufferException { + final PushNotification.UserPushNotification pushNotification = PushNotification.UserPushNotification.parseFrom(record.getData().array()); + if (!pushNotification.hasAccountId()) { + throw new IllegalArgumentException("Protobuf must contain accountId"); + } + if (!pushNotification.hasSenseId()) { + throw new IllegalArgumentException("Protobuf must contain senseId"); + } + return pushNotification; + } + + @Override public void processRecords(final List records, final IRecordProcessorCheckpointer iRecordProcessorCheckpointer) { + for(final Record record : records) { - DataInputProtos.BatchPeriodicDataWorker batchPeriodicDataWorker; try { - batchPeriodicDataWorker = DataInputProtos.BatchPeriodicDataWorker.parseFrom(record.getData().array()); - sendMessage(batchPeriodicDataWorker.getData()); + final PushNotification.UserPushNotification pushNotification = parseProtobuf(record); + sendNotification(pushNotification); } catch (InvalidProtocolBufferException e) { LOGGER.error("Failed parsing protobuf: {}", e.getMessage()); LOGGER.error("Moving to next record"); @@ -70,89 +84,95 @@ public void processRecords(final List records, final IRecordProcessorChe } } - // We do not checkpoint since we are using LATEST strategy, only going through new messages + try { + iRecordProcessorCheckpointer.checkpoint(); + } catch (InvalidStateException e) { + LOGGER.error("error=InvalidStateException exception={}", e); + } catch (ShutdownException e) { + LOGGER.error("error=ShutdownException exception={}", e); + } } + private void sendNotification(final PushNotification.UserPushNotification userPushNotification) { + if (!userPushNotification.hasAccountId() || !userPushNotification.hasSenseId()) { + LOGGER.warn("warning=require_sense_id_and_account_id account_id={} sense_id={}", + userPushNotification.getAccountId(), userPushNotification.getSenseId()); + return; + } - /** - * Send push notifications if conditions warrant it and within the hours - * @param batched_periodic_data - */ - private void sendMessage(final DataInputProtos.batched_periodic_data batched_periodic_data) { - final String senseId = batched_periodic_data.getDeviceId(); - final List userInfos = mergedUserInfoDynamoDB.getInfo(senseId); - for(final UserInfo userInfo : userInfos) { + final Long accountId = userPushNotification.getAccountId(); + final String senseId = userPushNotification.getSenseId(); + final Optional userInfoOptional = mergedUserInfoDynamoDB.getInfo(senseId, accountId); - if(!userHasPushNotificationsEnabled(userInfo.accountId)) { - continue; - } - - final Optional dateTimeZoneOptional = userInfo.timeZone; - if(!dateTimeZoneOptional.isPresent()) { - LOGGER.warn("No timezone for account: {} paired to Sense: {}", userInfo.accountId, senseId); - continue; - } + if(!accountPreferencesDynamoDB.isEnabled(accountId, PreferenceName.PUSH_ALERT_CONDITIONS)) { + LOGGER.debug("push_notification_status=disabled account_id={}", accountId); + return; + } - final DateTime nowInLocalTimeZone = DateTime.now().withZone(dateTimeZoneOptional.get()); - if(!activeHours.contains(nowInLocalTimeZone.getHourOfDay())) { - continue; - } + if (!userInfoOptional.isPresent() || !userInfoOptional.get().timeZone.isPresent()) { + LOGGER.error("error=could_not_get_timezone account_id={} sense_id={}", accountId, senseId); + return; + } - final String key = String.format("%s-%s", String.valueOf(userInfo.accountId), nowInLocalTimeZone.toString(DateTimeFormat.forPattern(DateTimeUtil.DYNAMO_DB_DATE_FORMAT))); - if(sent.contains(key)) { - LOGGER.info("Account {}, already received push notification: {}", userInfo.accountId, key); - continue; - } + final boolean pushNotificationsEnabled = flipper.userFeatureActive( + WorkerFeatureFlipper.PUSH_NOTIFICATIONS_ENABLED, accountId, Collections.emptyList()); + if (!pushNotificationsEnabled) { + LOGGER.debug("push_notifications_enabled=false account_id={}", accountId); + return; + } - if(!accountPreferencesDynamoDB.isEnabled(userInfo.accountId, PreferenceName.PUSH_ALERT_CONDITIONS)) { - continue; + LOGGER.info("push_notifications_enabled=true account_id={}", accountId); + + final DateTimeZone userTimeZone = userInfoOptional.get().timeZone.get(); + final DateTime nowUserTime = DateTime.now(userTimeZone); + final DateTime nowUTC = new DateTime(nowUserTime, DateTimeZone.UTC); + + PushNotificationEvent pushNotificationEvent = null; + + if (userPushNotification.hasPillBatteryLow()) { + final NotificationConfig pillBatteryConfig = pushNotificationsWorkerConfiguration.getPillBatteryConfig(); + if (shouldSendNotification(accountId, nowUserTime, pillBatteryConfig, NotificationType.PILL_BATTERY)) { + pushNotificationEvent = PushNotificationEvent.newBuilder() + .withAccountId(accountId) + .withType(NotificationType.PILL_BATTERY.name()) + .withTimestamp(nowUTC) + // TODO figure out what body/target/details to use + // TODO externalize this text to a DB + .withHelloPushMessage(new HelloPushMessage("body", "target", "details")) + .build(); } + } + // TODO rest of the push notifications - // TODO: write to cache to avoid sending multiple notifications - for(DataInputProtos.periodic_data data: batched_periodic_data.getDataList()) { - final Long timestampMillis = data.getUnixTime() * 1000L; - final DateTime roundedDateTime = new DateTime(timestampMillis, DateTimeZone.UTC).withSecondOfMinute(0); - final DateTime now = DateTime.now(DateTimeZone.UTC); - final CurrentRoomState currentRoomState = CurrentRoomState.fromRawData(data.getTemperature(), data.getHumidity(), data.getDustMax(), data.getLight(), data.getAudioPeakBackgroundEnergyDb(), data.getAudioPeakDisturbanceEnergyDb(), - roundedDateTime.getMillis(), - data.getFirmwareVersion(), - now, - 10, - Optional.of(Calibration.createDefault(data.getDeviceId()))); // TODO: adjust threshold - final Optional messageOptional = getMostImportantSensorState(currentRoomState); - if(messageOptional.isPresent()) { - LOGGER.info("Sending push notifications to user: {}. Message: {}", userInfo.accountId, messageOptional.get()); - final PushNotificationEvent event = PushNotificationEvent.newBuilder() - .withAccountId(userInfo.accountId) - .withHelloPushMessage(messageOptional.get()) - .withSenseId(senseId) - .withType("room_conditions") - .build(); - mobilePushNotificationProcessor.push(event); - sent.add(key); - } - break; - } + if (pushNotificationEvent != null) { + mobilePushNotificationProcessor.push(pushNotificationEvent); } } - /** - * Prioritizes conditions alerts based on Sensors - * @param currentRoomState - * @return - */ - private Optional getMostImportantSensorState(final CurrentRoomState currentRoomState) { - final HashSet notificationStates = Sets.newHashSet(CurrentRoomState.State.Condition.ALERT, CurrentRoomState.State.Condition.WARNING); - - if(notificationStates.contains(currentRoomState.temperature.condition)) { - return Optional.of(HelloPushMessage.fromSensors(currentRoomState.temperature.message, Sensor.TEMPERATURE)); + protected boolean shouldSendNotification(final Long accountId, + final DateTime nowUserLocalTime, + final NotificationConfig notificationConfig, + final NotificationType notificationType) + { + final Integer minHour = notificationConfig.getMinHourOfDay(); + final Integer maxHour = notificationConfig.getMaxHourOfDay(); + final Integer daysBetweenNotifications = notificationConfig.getDaysBetweenNotifications(); + final DateTime nowUTC = new DateTime(nowUserLocalTime, DateTimeZone.UTC); + + // Ensure we send within the correct window + if (nowUserLocalTime.hourOfDay().get() < minHour || nowUserLocalTime.hourOfDay().get() > maxHour) { + LOGGER.debug("battery_notification_status=not_sent reason=wrong_time_of_day local_hour={} account_id={}", + nowUserLocalTime.hourOfDay().get(), accountId); + return false; } - if(notificationStates.contains(currentRoomState.humidity.condition)) { - return Optional.of(HelloPushMessage.fromSensors(currentRoomState.humidity.message, Sensor.HUMIDITY)); - } + final Response> sentNotificationsResponse = mobilePushNotificationProcessor. + getPushNotificationEventDynamoDB() + .query(accountId, nowUTC.minusDays(daysBetweenNotifications), nowUTC.plusMinutes(10), notificationType.name()); - return Optional.absent(); + // If we couldn't successfully query, don't send a notification, just to be safe. + // Also only send a notification if we haven't sent one within a specified time. + return (sentNotificationsResponse.status == Response.Status.SUCCESS) && sentNotificationsResponse.data.isEmpty(); } @Override diff --git a/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsProcessorFactory.java b/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsProcessorFactory.java index bb6ed3b2..6f3b8820 100644 --- a/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsProcessorFactory.java +++ b/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsProcessorFactory.java @@ -80,6 +80,6 @@ public IRecordProcessor createProcessor() { final AmazonDynamoDB accountPreferencesDynamoDBClient = dynamoDBClientFactory.getForTable(DynamoDBTableName.PREFERENCES); final AccountPreferencesDynamoDB accountPreferencesDynamoDB = AccountPreferencesDynamoDB.create(accountPreferencesDynamoDBClient, tableNames.get(DynamoDBTableName.PREFERENCES)); - return new PushNotificationsProcessor(pushNotificationProcessor, mergedUserInfoDynamoDB, accountPreferencesDynamoDB, configuration.getActiveHours()); + return new PushNotificationsProcessor(pushNotificationProcessor, mergedUserInfoDynamoDB, accountPreferencesDynamoDB, configuration); } } diff --git a/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsWorkerCommand.java b/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsWorkerCommand.java index 2a267b21..9f72a863 100644 --- a/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsWorkerCommand.java +++ b/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsWorkerCommand.java @@ -33,7 +33,7 @@ protected void run(Environment environment, Namespace namespace, final PushNotif final ImmutableMap queueNames = configuration.getQueues(); LOGGER.debug("{}", queueNames); - final String queueName = queueNames.get(QueueName.SENSE_SENSORS_DATA_FANOUT_ONE); + final String queueName = queueNames.get(QueueName.PUSH_NOTIFICATIONS); LOGGER.info("\n\n\n!!! This worker is using the following queue: {} !!!\n\n\n", queueName); diff --git a/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsWorkerConfiguration.java b/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsWorkerConfiguration.java index f874cc65..a97fb83b 100644 --- a/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsWorkerConfiguration.java +++ b/src/main/java/com/hello/suripu/workers/notifications/PushNotificationsWorkerConfiguration.java @@ -53,4 +53,9 @@ public NewDynamoDBConfiguration dynamoDBConfiguration(){ @JsonProperty("active_hours") private Set activeHours = Sets.newHashSet(); public Set getActiveHours() {return activeHours;} + + @Valid + @JsonProperty("pill_battery") + private NotificationConfig pillBatteryConfig; + public NotificationConfig getPillBatteryConfig() { return pillBatteryConfig; } }