--- a/build.sbt Thu Nov 19 18:36:56 2015 +0100
+++ b/build.sbt Thu Nov 19 19:08:18 2015 +0100
@@ -6,6 +6,8 @@
scalaVersion := "2.11.7"
+scalacOptions += "-feature"
+
resolvers += "spray repo" at "http://repo.spray.io"
val sprayVer = "1.3.2"
--- a/src/main/resources/settings.conf Thu Nov 19 18:36:56 2015 +0100
+++ b/src/main/resources/settings.conf Thu Nov 19 19:08:18 2015 +0100
@@ -2,5 +2,6 @@
couchConf {
url = "http://localhost:5984/example"
heartbeat = 5s
+ reconnect = 45s
}
}
--- a/src/main/scala/CouchStream.scala Thu Nov 19 18:36:56 2015 +0100
+++ b/src/main/scala/CouchStream.scala Thu Nov 19 19:08:18 2015 +0100
@@ -1,13 +1,19 @@
package couchdb.changes
+import akka.actor.{ActorRef, Actor, ActorLogging, Status}
+import akka.io.IO
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.language.postfixOps
+import scala.language.reflectiveCalls
+import scala.util.Try
+
+import spray.can.Http
+import spray.can.Http.ConnectionAttemptFailedException
+import spray.client.pipelining._
import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
import spray.http._
import spray.json._
-import spray.client.pipelining._
-import akka.actor.{ActorRef, Actor, ActorLogging}
-import scala.util.Try
-import spray.can.Http
-import akka.io.IO
case class ResultRow(seq: Int, id: String, doc: JsObject)
@@ -25,12 +31,19 @@
}
class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
- with ChangesMarshaller {
+ with ChangesMarshaller with settings {
val io = IO(Http)(context.system)
+ def config = context.system.settings.config
+
def receive: Receive = {
- case uri: Uri =>
+ case Start =>
+ val uri = Uri(s"${couchConf.url}/_changes").withQuery(
+ "feed" -> "continuous"
+ , "heartbeat" -> couchConf.heartbeat.toMillis.toString
+ , "include_docs" -> "true"
+ )
val rq = HttpRequest(HttpMethods.GET, uri = uri)
log.debug("Sending request {}", rq)
sendTo(io).withResponsesReceivedBy(self)(rq)
@@ -43,6 +56,15 @@
_ => log.warning("Failed to deserialize entity {}", data),
processor !
)
+ case Status.Failure(t) =>
+ log.error("Received failure {}", t)
+ t match {
+ case x:ConnectionAttemptFailedException =>
+ log.info("Scheduling reconnect to couchdb in {}", couchConf.reconnect)
+ context.system.scheduler.scheduleOnce(couchConf.reconnect, self,
+ Start)
+ case _ =>
+ }
case x =>
log.debug("Received {}", x)
}
--- a/src/main/scala/Example.scala Thu Nov 19 18:36:56 2015 +0100
+++ b/src/main/scala/Example.scala Thu Nov 19 19:08:18 2015 +0100
@@ -2,7 +2,6 @@
import akka.actor._
import spray.http.Uri
-import com.typesafe.config.ConfigFactory
object LoggingActor extends Actor with ActorLogging {
log.info("Initializing LoggingActor")
@@ -11,28 +10,25 @@
}
}
-object Example extends App with settings {
+case object Start
+
+object Example extends App {
val system = ActorSystem()
val stream = system.actorOf(Props(new ChangesStreamActor(
system.actorOf(Props(LoggingActor)))))
- def config = ConfigFactory.load()
-
- stream ! Uri(s"${couchConf.url}/_changes").withQuery(
- "feed" -> "continuous"
- , "heartbeat" -> couchConf.heartbeat.toMillis.toString
- , "include_docs" -> "true"
- )
-
+ stream ! Start
}
import com.wacai.config.annotation._
import scala.concurrent.duration._
+
@conf trait settings extends Configurable {
val couchConf = new {
val url = "http://localhost:5984/example"
val heartbeat = 5 seconds
+ val reconnect = 45 seconds
}
}