src/main/scala/CouchStream.scala
changeset 4 b25c02bd6b11
parent 0 a279a342bc78
child 5 141e76d946e3
--- a/src/main/scala/CouchStream.scala	Thu Nov 19 18:36:56 2015 +0100
+++ b/src/main/scala/CouchStream.scala	Thu Nov 19 19:08:18 2015 +0100
@@ -1,13 +1,19 @@
 package couchdb.changes
 
+import akka.actor.{ActorRef, Actor, ActorLogging, Status}
+import akka.io.IO
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.language.postfixOps
+import scala.language.reflectiveCalls
+import scala.util.Try
+
+import spray.can.Http
+import spray.can.Http.ConnectionAttemptFailedException
+import spray.client.pipelining._
 import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
 import spray.http._
 import spray.json._
-import spray.client.pipelining._
-import akka.actor.{ActorRef, Actor, ActorLogging}
-import scala.util.Try
-import spray.can.Http
-import akka.io.IO
 
 case class ResultRow(seq: Int, id: String, doc: JsObject)
 
@@ -25,12 +31,19 @@
 }
 
 class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
-  with ChangesMarshaller {
+  with ChangesMarshaller with settings {
 
   val io = IO(Http)(context.system)
 
+  def config = context.system.settings.config
+
   def receive: Receive = {
-    case uri: Uri =>
+    case Start =>
+      val uri = Uri(s"${couchConf.url}/_changes").withQuery(
+          "feed" -> "continuous"
+        , "heartbeat" -> couchConf.heartbeat.toMillis.toString
+        , "include_docs" -> "true"
+      )
       val rq = HttpRequest(HttpMethods.GET, uri = uri)
       log.debug("Sending request {}", rq)
       sendTo(io).withResponsesReceivedBy(self)(rq)
@@ -43,6 +56,15 @@
           _ => log.warning("Failed to deserialize entity {}", data),
           processor !
         )
+    case Status.Failure(t) =>
+      log.error("Received failure {}", t)
+      t match {
+        case x:ConnectionAttemptFailedException =>
+          log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
+          context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
+            Start)
+        case _ =>
+      }
     case x =>
       log.debug("Received {}", x)
   }