# HG changeset patch # User Tomas Zeman # Date 1447956498 -3600 # Node ID b25c02bd6b11725d375bde671ade83f93802c225 # Parent e8daf3ff497dab355f811553cc1afd960b089b15 Reconnect feature diff -r e8daf3ff497d -r b25c02bd6b11 build.sbt --- a/build.sbt Thu Nov 19 18:36:56 2015 +0100 +++ b/build.sbt Thu Nov 19 19:08:18 2015 +0100 @@ -6,6 +6,8 @@ scalaVersion := "2.11.7" +scalacOptions += "-feature" + resolvers += "spray repo" at "http://repo.spray.io" val sprayVer = "1.3.2" diff -r e8daf3ff497d -r b25c02bd6b11 src/main/resources/settings.conf --- a/src/main/resources/settings.conf Thu Nov 19 18:36:56 2015 +0100 +++ b/src/main/resources/settings.conf Thu Nov 19 19:08:18 2015 +0100 @@ -2,5 +2,6 @@ couchConf { url = "http://localhost:5984/example" heartbeat = 5s + reconnect = 45s } } diff -r e8daf3ff497d -r b25c02bd6b11 src/main/scala/CouchStream.scala --- a/src/main/scala/CouchStream.scala Thu Nov 19 18:36:56 2015 +0100 +++ b/src/main/scala/CouchStream.scala Thu Nov 19 19:08:18 2015 +0100 @@ -1,13 +1,19 @@ package couchdb.changes +import akka.actor.{ActorRef, Actor, ActorLogging, Status} +import akka.io.IO + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.language.postfixOps +import scala.language.reflectiveCalls +import scala.util.Try + +import spray.can.Http +import spray.can.Http.ConnectionAttemptFailedException +import spray.client.pipelining._ import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized} import spray.http._ import spray.json._ -import spray.client.pipelining._ -import akka.actor.{ActorRef, Actor, ActorLogging} -import scala.util.Try -import spray.can.Http -import akka.io.IO case class ResultRow(seq: Int, id: String, doc: JsObject) @@ -25,12 +31,19 @@ } class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging - with ChangesMarshaller { + with ChangesMarshaller with settings { val io = IO(Http)(context.system) + def config = context.system.settings.config + def receive: Receive = { - case uri: Uri => + case Start => + val uri = Uri(s"${couchConf.url}/_changes").withQuery( + "feed" -> "continuous" + , "heartbeat" -> couchConf.heartbeat.toMillis.toString + , "include_docs" -> "true" + ) val rq = HttpRequest(HttpMethods.GET, uri = uri) log.debug("Sending request {}", rq) sendTo(io).withResponsesReceivedBy(self)(rq) @@ -43,6 +56,15 @@ _ => log.warning("Failed to deserialize entity {}", data), processor ! ) + case Status.Failure(t) => + log.error("Received failure {}", t) + t match { + case x:ConnectionAttemptFailedException => + log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect) + context.system.scheduler.scheduleOnce(couchConf.reconnect, self, + Start) + case _ => + } case x => log.debug("Received {}", x) } diff -r e8daf3ff497d -r b25c02bd6b11 src/main/scala/Example.scala --- a/src/main/scala/Example.scala Thu Nov 19 18:36:56 2015 +0100 +++ b/src/main/scala/Example.scala Thu Nov 19 19:08:18 2015 +0100 @@ -2,7 +2,6 @@ import akka.actor._ import spray.http.Uri -import com.typesafe.config.ConfigFactory object LoggingActor extends Actor with ActorLogging { log.info("Initializing LoggingActor") @@ -11,28 +10,25 @@ } } -object Example extends App with settings { +case object Start + +object Example extends App { val system = ActorSystem() val stream = system.actorOf(Props(new ChangesStreamActor( system.actorOf(Props(LoggingActor))))) - def config = ConfigFactory.load() - - stream ! Uri(s"${couchConf.url}/_changes").withQuery( - "feed" -> "continuous" - , "heartbeat" -> couchConf.heartbeat.toMillis.toString - , "include_docs" -> "true" - ) - + stream ! Start } import com.wacai.config.annotation._ import scala.concurrent.duration._ + @conf trait settings extends Configurable { val couchConf = new { val url = "http://localhost:5984/example" val heartbeat = 5 seconds + val reconnect = 45 seconds } }