require "./ring.cr"
|
|
require "json"
|
|
require "mutex"
|
|
require "openssl"
|
|
|
|
module Crystal::Scatter
|
|
|
|
class Daemon
|
|
getter weight : UInt64
|
|
getter location : Array(Int32)
|
|
getter url : String
|
|
def initialize(@weight : UInt64, @location : Array(Int32), @url : String)
|
|
end
|
|
end
|
|
|
|
class SliceInfo
|
|
JSON.mapping(
|
|
slices: Array(Array(Slice)),
|
|
last: Int32,
|
|
shards: UInt32
|
|
)
|
|
|
|
def initialize(@slices, @last, @shards)
|
|
|
|
end
|
|
end
|
|
|
|
class MetaRing
|
|
getter rings : Array(Ring)
|
|
getter ring_graph : RingGraph
|
|
|
|
def initialize(@shards : UInt32, @split_ratio : BigRational = BigRational.new(1,2))
|
|
@rings = Array(Ring).new
|
|
@ring_graph = RingGraph.new 0
|
|
@lock = Mutex.new
|
|
end
|
|
|
|
private def add_impl(element : Daemon)
|
|
@ring_graph.add(element.weight, element.location, element.url)
|
|
end
|
|
|
|
private def update_impl
|
|
@ring_graph.generate_ring @split_ratio
|
|
ring = Ring.new
|
|
@ring_graph.snapshot do |daemon|
|
|
ring.push Slice.new(daemon.range_effector.not_nil!.[0],daemon.range_effector.not_nil!.[1],daemon.reach_at,daemon.self_id)
|
|
end
|
|
@rings.push ring
|
|
end
|
|
|
|
private def hash_impl(data : String)
|
|
ctx = OpenSSL::Digest.new("md4")
|
|
ctx << data
|
|
digest = ctx.digest
|
|
tag = IO::ByteFormat::BigEndian.decode(UInt64, digest)
|
|
tag ^= IO::ByteFormat::BigEndian.decode(UInt64, digest+8)
|
|
return tag
|
|
end
|
|
|
|
private def hash_and_split(data)
|
|
h = hash_impl data
|
|
space=UInt64::MAX/@shards
|
|
targets = Array(UInt64).new
|
|
(1..@shards).each do
|
|
targets << h
|
|
h+=space
|
|
end
|
|
return targets
|
|
end
|
|
|
|
private def get_slice_from_hash(ring : Ring, h : UInt64)
|
|
value = ring.find do |slice|
|
|
h>=slice.s_begin && slice.s_end>=h
|
|
end
|
|
return value.not_nil!
|
|
end
|
|
|
|
private def unfold_ring(ring : Ring, targets : Array(UInt64))
|
|
slices = Array(Slice).new
|
|
targets.each do |h|
|
|
slices << get_slice_from_hash(ring, h)
|
|
end
|
|
return slices
|
|
end
|
|
|
|
def add(element : Daemon)
|
|
@lock.synchronize do
|
|
add_impl element
|
|
update_impl
|
|
end
|
|
end
|
|
|
|
def add(elements : Array(Daemon))
|
|
@lock.synchronize do
|
|
elements.each do |element|
|
|
add_impl element
|
|
end
|
|
update_impl
|
|
end
|
|
end
|
|
|
|
def hash(data : String)
|
|
hash_impl data
|
|
end
|
|
|
|
def get_slices_for(data : String)
|
|
ret = Array(Slice).new(@shards)
|
|
ring? = nil : Ring | Nil
|
|
@lock.synchronize do
|
|
ring? = @rings.last
|
|
end
|
|
ring = ring?.not_nil!
|
|
targets = hash_and_split(data)
|
|
slices = unfold_ring(ring,targets)
|
|
return slices
|
|
end
|
|
|
|
def get_slices_for(data : String, n_last : UInt32)
|
|
n_rings = Array(Ring).new
|
|
sllast = 0
|
|
@lock.synchronize do
|
|
sllast = @rings.size - 1
|
|
t = @rings.size-n_last
|
|
n_rings = @rings.skip( t >= 0 ? t : 1 )
|
|
end
|
|
targets = hash_and_split(data)
|
|
ret = Array(Array(Slice)).new
|
|
n_rings.each do |ring|
|
|
ret << unfold_ring(ring,targets)
|
|
end
|
|
return SliceInfo.new ret, sllast, @shards
|
|
end
|
|
end
|
|
end
|