Reconnect feature
authorTomas Zeman <tzeman@volny.cz>
Thu, 19 Nov 2015 19:08:18 +0100
changeset 4 b25c02bd6b11
parent 3 e8daf3ff497d
child 5 141e76d946e3
Reconnect feature
build.sbt
src/main/resources/settings.conf
src/main/scala/CouchStream.scala
src/main/scala/Example.scala
--- a/build.sbt	Thu Nov 19 18:36:56 2015 +0100
+++ b/build.sbt	Thu Nov 19 19:08:18 2015 +0100
@@ -6,6 +6,8 @@
 
 scalaVersion := "2.11.7"
 
+scalacOptions += "-feature"
+
 resolvers += "spray repo" at "http://repo.spray.io"
 
 val sprayVer = "1.3.2"
--- a/src/main/resources/settings.conf	Thu Nov 19 18:36:56 2015 +0100
+++ b/src/main/resources/settings.conf	Thu Nov 19 19:08:18 2015 +0100
@@ -2,5 +2,6 @@
   couchConf {
     url = "http://localhost:5984/example"
     heartbeat = 5s
+    reconnect = 45s
   }
 }
--- a/src/main/scala/CouchStream.scala	Thu Nov 19 18:36:56 2015 +0100
+++ b/src/main/scala/CouchStream.scala	Thu Nov 19 19:08:18 2015 +0100
@@ -1,13 +1,19 @@
 package couchdb.changes
 
+import akka.actor.{ActorRef, Actor, ActorLogging, Status}
+import akka.io.IO
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.language.postfixOps
+import scala.language.reflectiveCalls
+import scala.util.Try
+
+import spray.can.Http
+import spray.can.Http.ConnectionAttemptFailedException
+import spray.client.pipelining._
 import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
 import spray.http._
 import spray.json._
-import spray.client.pipelining._
-import akka.actor.{ActorRef, Actor, ActorLogging}
-import scala.util.Try
-import spray.can.Http
-import akka.io.IO
 
 case class ResultRow(seq: Int, id: String, doc: JsObject)
 
@@ -25,12 +31,19 @@
 }
 
 class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
-  with ChangesMarshaller {
+  with ChangesMarshaller with settings {
 
   val io = IO(Http)(context.system)
 
+  def config = context.system.settings.config
+
   def receive: Receive = {
-    case uri: Uri =>
+    case Start =>
+      val uri = Uri(s"${couchConf.url}/_changes").withQuery(
+          "feed" -> "continuous"
+        , "heartbeat" -> couchConf.heartbeat.toMillis.toString
+        , "include_docs" -> "true"
+      )
       val rq = HttpRequest(HttpMethods.GET, uri = uri)
       log.debug("Sending request {}", rq)
       sendTo(io).withResponsesReceivedBy(self)(rq)
@@ -43,6 +56,15 @@
           _ => log.warning("Failed to deserialize entity {}", data),
           processor !
         )
+    case Status.Failure(t) =>
+      log.error("Received failure {}", t)
+      t match {
+        case x:ConnectionAttemptFailedException =>
+          log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
+          context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
+            Start)
+        case _ =>
+      }
     case x =>
       log.debug("Received {}", x)
   }
--- a/src/main/scala/Example.scala	Thu Nov 19 18:36:56 2015 +0100
+++ b/src/main/scala/Example.scala	Thu Nov 19 19:08:18 2015 +0100
@@ -2,7 +2,6 @@
 
 import akka.actor._
 import spray.http.Uri
-import com.typesafe.config.ConfigFactory
 
 object LoggingActor extends Actor with ActorLogging {
   log.info("Initializing LoggingActor")
@@ -11,28 +10,25 @@
   }
 }
 
-object Example extends App with settings {
+case object Start
+
+object Example extends App {
 
   val system = ActorSystem()
   val stream = system.actorOf(Props(new ChangesStreamActor(
     system.actorOf(Props(LoggingActor)))))
 
-  def config = ConfigFactory.load()
-
-  stream ! Uri(s"${couchConf.url}/_changes").withQuery(
-    "feed" -> "continuous"
-  , "heartbeat" -> couchConf.heartbeat.toMillis.toString
-  , "include_docs" -> "true"
-  )
-
+  stream ! Start
 }
 
 import com.wacai.config.annotation._
 import scala.concurrent.duration._
+
 @conf trait settings extends Configurable {
   val couchConf = new {
     val url = "http://localhost:5984/example"
     val heartbeat = 5 seconds
+    val reconnect = 45 seconds
   }
 }