--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/.hgignore Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,5 @@
+syntax: glob
+*~
+.*.sw*
+*.class
+target
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/build.sbt Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,21 @@
+name := "spray-couchdb-changes"
+
+organization := "tz"
+
+version := "0.1-SNAPSHOT"
+
+scalaVersion := "2.11.7"
+
+resolvers += "spray repo" at "http://repo.spray.io"
+
+val sprayVer = "1.3.2"
+val akkaVer = "2.3.12"
+
+libraryDependencies ++= Seq(
+ "io.spray" %% "spray-client" % sprayVer
+, "io.spray" %% "spray-json" % sprayVer
+, "com.typesafe.akka" %% "akka-actor" % akkaVer
+, "com.typesafe.akka" %% "akka-slf4j" % akkaVer
+)
+
+Revolver.settings
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/project/build.properties Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,1 @@
+sbt.version=0.13.8
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/project/plugins.sbt Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,1 @@
+addSbtPlugin("io.spray" % "sbt-revolver" % "0.7.2")
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/resources/application.conf Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,9 @@
+spray {
+ can.client {
+ idle-timeout = 90 s
+ request-timeout = 80 s
+ connection-timeout = 90 s
+ response-chunk-aggregation-limit = 0
+ }
+ io.confirm-sends = on
+}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/scala/CouchStream.scala Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,52 @@
+package couchdb.changes
+
+import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
+import spray.http._
+import spray.json._
+import spray.client.pipelining._
+import akka.actor.{ActorRef, Actor, ActorLogging}
+import scala.util.Try
+import spray.can.Http
+import akka.io.IO
+
+case class ResultRow(seq: Int, id: String, doc: JsObject)
+
+trait ChangesMarshaller {
+
+ implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
+ import DefaultJsonProtocol._
+ implicit val changesFormat = jsonFormat3(ResultRow)
+
+ def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
+ Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
+ }).getOrElse(Left(MalformedContent("bad json")))
+
+ }
+}
+
+class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
+ with ChangesMarshaller {
+
+ val io = IO(Http)(context.system)
+
+ def receive: Receive = {
+ case uri: Uri =>
+ val rq = HttpRequest(HttpMethods.GET, uri = uri)
+ log.debug("Sending request {}", rq)
+ sendTo(io).withResponsesReceivedBy(self)(rq)
+ case x@ChunkedResponseStart(_) =>
+ log.debug("Received {}", x)
+ case MessageChunk(data, _) =>
+ log.debug("Received data chunk {}", data)
+ if (data.length > 2) // skip heartbeat messages
+ ChangesMarshaller(data).fold(
+ _ => log.warning("Failed to deserialize entity {}", data),
+ processor !
+ )
+ case x =>
+ log.debug("Received {}", x)
+ }
+}
+
+
+// vim: set ts=2 sw=2 et:
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/scala/Example.scala Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,27 @@
+package couchdb.changes
+
+import akka.actor._
+import spray.http.Uri
+
+object LoggingActor extends Actor with ActorLogging {
+ log.info("Initializing LoggingActor")
+ def receive: Receive = {
+ case x => log.info("Received {}", x)
+ }
+}
+
+object Example extends App {
+
+ val system = ActorSystem()
+ val stream = system.actorOf(Props(new ChangesStreamActor(
+ system.actorOf(Props(LoggingActor)))))
+
+ stream ! Uri("http://localhost:5984/example/_changes").withQuery(
+ "feed" -> "continuous"
+ , "heartbeat" -> "5000"
+ , "include_docs" -> "true"
+ )
+
+}
+
+// vim: set ts=2 sw=2 et: