Initial import
authorTomas Zeman <tzeman@volny.cz>
Tue, 21 Jul 2015 09:24:52 +0200
changeset 0 a279a342bc78
child 1 1e78bb2c0c84
Initial import
.hgignore
build.sbt
project/build.properties
project/plugins.sbt
src/main/resources/application.conf
src/main/scala/CouchStream.scala
src/main/scala/Example.scala
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/.hgignore	Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,5 @@
+syntax: glob
+*~
+.*.sw*
+*.class
+target
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/build.sbt	Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,21 @@
+name := "spray-couchdb-changes"
+
+organization := "tz"
+
+version := "0.1-SNAPSHOT"
+
+scalaVersion := "2.11.7"
+
+resolvers += "spray repo" at "http://repo.spray.io"
+
+val sprayVer = "1.3.2"
+val akkaVer = "2.3.12"
+
+libraryDependencies ++= Seq(
+  "io.spray" %% "spray-client" % sprayVer
+, "io.spray" %% "spray-json" % sprayVer
+, "com.typesafe.akka" %% "akka-actor" % akkaVer
+, "com.typesafe.akka" %% "akka-slf4j" % akkaVer
+)
+
+Revolver.settings
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/project/build.properties	Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,1 @@
+sbt.version=0.13.8
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/project/plugins.sbt	Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,1 @@
+addSbtPlugin("io.spray" % "sbt-revolver" % "0.7.2")
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/resources/application.conf	Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,9 @@
+spray {
+  can.client {
+    idle-timeout = 90 s
+    request-timeout = 80 s
+    connection-timeout = 90 s
+    response-chunk-aggregation-limit = 0
+  }
+  io.confirm-sends = on
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/scala/CouchStream.scala	Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,52 @@
+package couchdb.changes
+
+import spray.httpx.unmarshalling.{MalformedContent, Unmarshaller, Deserialized}
+import spray.http._
+import spray.json._
+import spray.client.pipelining._
+import akka.actor.{ActorRef, Actor, ActorLogging}
+import scala.util.Try
+import spray.can.Http
+import akka.io.IO
+
+case class ResultRow(seq: Int, id: String, doc: JsObject)
+
+trait ChangesMarshaller {
+
+  implicit object ChangesMarshaller extends Unmarshaller[ResultRow] {
+    import DefaultJsonProtocol._
+    implicit val changesFormat = jsonFormat3(ResultRow)
+
+    def apply(entity: HttpEntity): Deserialized[ResultRow] = (Try {
+      Right(JsonParser(entity.asString).asJsObject.convertTo[ResultRow])
+    }).getOrElse(Left(MalformedContent("bad json")))
+
+  }
+}
+
+class ChangesStreamActor(processor: ActorRef) extends Actor with ActorLogging
+  with ChangesMarshaller {
+
+  val io = IO(Http)(context.system)
+
+  def receive: Receive = {
+    case uri: Uri =>
+      val rq = HttpRequest(HttpMethods.GET, uri = uri)
+      log.debug("Sending request {}", rq)
+      sendTo(io).withResponsesReceivedBy(self)(rq)
+    case x@ChunkedResponseStart(_) =>
+      log.debug("Received {}", x)
+    case MessageChunk(data, _) =>
+      log.debug("Received data chunk {}", data)
+      if (data.length > 2) // skip heartbeat messages
+        ChangesMarshaller(data).fold(
+          _ => log.warning("Failed to deserialize entity {}", data),
+          processor !
+        )
+    case x =>
+      log.debug("Received {}", x)
+  }
+}
+
+
+// vim: set ts=2 sw=2 et:
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/main/scala/Example.scala	Tue Jul 21 09:24:52 2015 +0200
@@ -0,0 +1,27 @@
+package couchdb.changes
+
+import akka.actor._
+import spray.http.Uri
+
+object LoggingActor extends Actor with ActorLogging {
+  log.info("Initializing LoggingActor")
+  def receive: Receive = {
+    case x => log.info("Received {}", x)
+  }
+}
+
+object Example extends App {
+
+  val system = ActorSystem()
+  val stream = system.actorOf(Props(new ChangesStreamActor(
+    system.actorOf(Props(LoggingActor)))))
+
+  stream ! Uri("http://localhost:5984/example/_changes").withQuery(
+    "feed" -> "continuous"
+  , "heartbeat" -> "5000"
+  , "include_docs" -> "true"
+  )
+
+}
+
+// vim: set ts=2 sw=2 et: