--- a/src/main/scala/CouchStream.scala Thu Nov 19 18:36:56 2015 +0100
+++ b/src/main/scala/CouchStream.scala Thu Nov 19 19:08:18 2015 +0100
@@ -1,13 +1,19 @@
package couchdb.changes
+import akka.actor.{ActorRef, Actor, ActorLogging, Status}
+import akka.io.IO
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.language.postfixOps
+import scala.language.reflectiveCalls
+import scala.util.Try
+
+import spray.can.Http
+import spray.can.Http.ConnectionAttemptFailedException
+import spray.client.pipelining._
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
import spray.http._
import spray.json._
-import spray.client.pipelining._
-import akka.actor.{ActorRef, Actor, ActorLogging}
-import scala.util.Try
-import spray.can.Http
-import akka.io.IO
case class ResultRow(seq: Int, id: String, doc: JsObject)
@@ -25,12 +31,19 @@
}
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
- with ChangesMarshaller {
+ with ChangesMarshaller with settings {
val io = IO(Http)(context.system)
+ def config = context.system.settings.config
+
def receive: Receive = {
- case uri: Uri =>
+ case Start =>
+ val uri = Uri(s"${couchConf.url}/_changes").withQuery(
+ "feed" -> "continuous"
+ , "heartbeat" -> couchConf.heartbeat.toMillis.toString
+ , "include_docs" -> "true"
+ )
val rq = HttpRequest(HttpMethods.GET, uri = uri)
log.debug("Sending request {}", rq)
sendTo(io).withResponsesReceivedBy(self)(rq)
@@ -43,6 +56,15 @@
_ => log.warning("Failed to deserialize entity {}", data),
processor !
)
+ case Status.Failure(t) =>
+ log.error("Received failure {}", t)
+ t match {
+ case x:ConnectionAttemptFailedException =>
+ log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
+ context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
+ Start)
+ case _ =>
+ }
case x =>
log.debug("Received {}", x)
}