24 days of Hackage, 2015: day 18: vector, vector-algorithms: unleash your inner C programmer!
Dec 18, 2015 · 11 minute read · CommentsHaskellHackagevectorvector-algorithmsarrayrepaarraysmutationC
Table of contents for the whole series
A table of contents is at the top of the article for day 1.
Day 18
Often in programming, the right data structure to use is a C-style array, whether treated as immutable or mutable.
Ideally, I didn’t need to write this article, but I’ve gotten the
impression that many newcomers to Haskell don’t know about
vector
, the library for
zero-indexed C-style arrays, and use lists where arrays are better suited to
the problem. Arrays are efficient for indexing and are memory and
cache efficient also, being allocated in a block of memory rather than
spread out over pointer chasing.
The terminology regarding arrays in the Haskell ecosystem is confusing
because Haskell in the 1990s originally came with a data structure
called an Array
, and there’s even a supporting
array
package, but in
practice I never use it because it’s more generic and weird than the
simple data structure later provided called “vectors” (for lack of a
better name).
There are good tutorials on vector
I recommend:
I thought it would be useful to give a fully worked out example of using both immutable and mutable vectors. In particular, we’ll use unboxed vectors that are specialized and therefore contain no indirection and are basically equivalent to C-style arrays. We’ll do some low-level C-style programming.
The problem to solve: finding the median of an array of 8-bit integers
Here is a problem found at Programming Praxis:
“The median of an array is the value in the middle if the array
was sorted; if the array has an odd number of items n
, the median
is the (n+1)/2
’th largest item in the array (which is also the
(n+1)/2
’th smallest item in the array), and if the array has an
even number of items n, the median is the arithmetic average of the
n/2
’th smallest item in the array and the n/2
’th largest item in
the array. For instance, the median of the array [2,4,5,7,3,6,1]
is
4
and the median of the array [5,2,1,6,3,4]
is 3.5
.
Your task is to write a program that takes an array of 8-bit integers (possibly but not necessarily in sort) and finds the median value in the array; you should find an algorithm that takes linear time and constant space.”
vector
installation
Stackage LTS 3 is still on vector-0.10
, but I wanted to use
version 0.11
because the API changed slightly, so I added a bound as appropriate in
the Cabal file.
(Update of 2016-01-06)
Stackage LTS 4 now uses this updated version of vector
.
The importance of going unboxed
Because we are given that we should write code specific to an array of 8-bit integers, we will be using unboxed rather than boxed (generic) vectors: boxed vectors contain elements that are pointers to values, while unboxed vectors have elements are the actual values themselves. This is pretty important for efficiency if you know up front the type of your elements.
Some tests
Let’s write some tests first, assuming we will later write a module
VectorExample
.
Some imports first. We will be using the constant-time vector indexing
operator !
.
module VectorExampleSpec where
import VectorExample
( Value, MedianValue, averageValue
, spaceInefficientMedian, constantSpaceMedian
)
import Data.Vector.Unboxed ((!))
import qualified Data.Vector.Unboxed as V
import qualified Data.Vector.Algorithms.Radix as Radix
We will be using HSpec and QuickCheck, and writing our own random vector generators:
import Test.Hspec (Spec, hspec, describe, it, shouldBe)
import Test.Hspec.QuickCheck (prop)
import Test.QuickCheck (Arbitrary(arbitrary, shrink), choose, sized, shrinkList)
We will be implementing both a simple reference implementation and the desired clever one, so we will test both:
spec :: Spec
spec = do
describe "compute median of vector of 8-bit unsigned integers" $ do
medianSpec "inefficientSpaceMedian" inefficientSpaceMedian
medianSpec "constantSpaceMedian" constantSpaceMedian
We test the examples given, as well as general properties. For
sorting, we conveniently use the radix sort from the
vector-algorithms
package:
medianSpec :: String
-> (V.Vector Value -> Maybe MedianValue)
-> Spec
medianSpec description findMedian =
describe description $ do
describe "some examples" $ do
it "handles odd number of elements" $ do
findMedian (V.fromList [2, 4, 5, 7, 3, 6, 1]) `shouldBe` Just 4.0
it "handles nonzero even number of elements" $ do
findMedian (V.fromList [5, 2, 1, 6, 3, 4]) `shouldBe` Just 3.5
describe "properties" $ do
it "handles no elements" $ do
findMedian V.empty `shouldBe` Nothing
prop "handles one element" $ \v ->
findMedian (V.singleton v) == Just (fromIntegral v)
prop "handles odd number of elements" $
\(VectorWithOdd values) ->
let len = V.length values
midIndex = pred (succ len `div` 2)
sorted = V.modify Radix.sort values
in findMedian values == Just (fromIntegral (sorted ! midIndex))
prop "handles positive even number of elements" $
\(VectorWithPositiveEven values) ->
let len = V.length values
midIndex = pred (succ len `div` 2)
sorted = V.modify Radix.sort values
in findMedian values ==
Just (averageValue (fromIntegral (sorted ! midIndex))
(sorted ! succ midIndex))
The property tests specify what a median is supposed to be, for all possible cases.
A note on mutation
The radix sort operates on a mutable vector, but we are using an
immutable vector, so we use the
modify
function
that has a fancy type
modify :: Unbox a => (forall s. MVector s a -> ST s ()) -> Vector a -> Vector a
which takes a callback taking a mutable vector, and transforms an
immutable vector into another immutable vector (possibly the same one
“secretly” mutated underneath). It uses higher-rank types
and the
ST
state transformer monad,
but all you have to know is that
Radix.sort
has the compatible callback type and therefore can be plugged into modify
:
sort :: forall e m v. (PrimMonad m, MVector v e, Radix e) => v (PrimState m) e -> m ()
Custom QuickCheck generators
Here are some quick and dirty generators of vectors of the kinds we want for our tests. If we were doing this for real, we’d probably want to write custom shrinkers rather than go through lists.
-- | Generate a vector with an odd number of elements.
newtype VectorWithOdd a =
VectorWithOdd { getVectorWithOdd :: V.Vector a }
deriving (Show)
instance (Arbitrary a, V.Unbox a) => Arbitrary (VectorWithOdd a) where
arbitrary = sized $ \n -> do
k <- choose (0, n `div` 2)
VectorWithOdd <$> V.replicateM (2*k+1) arbitrary
shrink = map (VectorWithOdd . V.fromList)
. shrinkList shrink
. V.toList
. getVectorWithOdd
-- | Generate a vector with a nonzero even number of elements.
newtype VectorWithPositiveEven a =
VectorWithPositiveEven { getVectorWithPositiveEven :: V.Vector a }
deriving (Show)
instance (Arbitrary a, V.Unbox a) => Arbitrary (VectorWithPositiveEven a) where
arbitrary = sized $ \n -> do
k <- choose (1, 1 `max` (n `div` 2))
VectorWithPositiveEven <$> V.replicateM (2*k) arbitrary
shrink = map (VectorWithPositiveEven . V.fromList)
. shrinkList shrink
. V.toList
. getVectorWithPositiveEven
An inefficient implementation that violates our space requirement
For reference (we could read it off from the QuickCheck tests almost):
{-# LANGUAGE BangPatterns #-}
module VectorExample where
import qualified Data.Word as Word
import qualified Data.Vector.Unboxed as V
import qualified Data.Vector.Unboxed.Mutable as M
import qualified Data.Vector.Algorithms.Radix as Radix
-- | Assume 8-bit unsigned integer.
type Value = Word.Word8
-- | What to return as median when there is one.
type MedianValue = Double
-- | This is an obvious implementation, basically the specification.
-- The use of radix sort makes this linear in time, but a copy of
-- the original array is required.
inefficientSpaceMedian :: V.Vector Value -> Maybe MedianValue
inefficientSpaceMedian values
| V.null values = Nothing
| odd len = Just (fromIntegral (atSorted midIndex))
| otherwise = Just (averageValue (atSorted midIndex)
(atSorted (succ midIndex)))
where
len = V.length values
midIndex = pred (succ len `div` 2)
-- Make a local mutable copy to sort.
atSorted = V.unsafeIndex (V.modify Radix.sort values)
-- | Average of two values.
averageValue :: Value -> Value -> MedianValue
averageValue a b = (fromIntegral a + fromIntegral b) / 2
A note on unsafe operations
I promised we were going to do C-style programming. We could use
bounds-checked indexing (which throws a runtime exception if accessing
outside the bounds of a vector), but let’s have some “fun”, and use
unsafeIndex
. Note that this is truly unsafe: let’s get the element
at index 100 of a 1-element Int
vector:
Data.Vector.Unboxed> unsafeIndex (singleton (5 :: Int)) 100
4931475852
Why use unsafe operations? For illustration and also because it happens that we can manually prove (outside the type system) that we will never go out of bounds. In real life, I’d hesitate to use the unsafe operations unless maximum performance was critical though! But we can opt in the C world of potential disaster if we want to.
I do not want to promote using unsafe operations!! Globally search
and replace all unsafeXyz
functions in today’s sample code with
xyz
instead for peace of mind.
The clever solution
The clever solution allocates a table of every possible count (because we know our values are 8-bit unsigned integers) and fills it, then iterates through the table to find out where the median must lie, and when it hits the right place, returns the middle value (if an odd number of values) or the middle two values (if an even number of values).
-- | Number of occurrences of an 8-bit unsigned value.
-- We assume no overflow beyond 'Int' range.
type CountOfValue = Int
-- | Create a table of counts for each possible value, since we know
-- the number of values is small and finite.
constantSpaceMedian :: V.Vector Value -> Maybe MedianValue
constantSpaceMedian values
| V.null values = Nothing
| odd len = Just (findMid 0 (numToCount - countOf 0))
| otherwise = Just (findMid2 0 (numToCount - countOf 0))
where
len = V.length values
-- How many sorted elements to count off, to reach first median value.
numToCount = succ len `div` 2
-- Make efficient table of counts for each possible value.
countOf = V.unsafeIndex (makeCountsMutably values)
findMid !i !numRemaining
| numRemaining <= 0 = fromIntegral i
| otherwise = findMid (succ i) (numRemaining - countOf (succ i))
findMid2 !i !numRemaining =
case numRemaining `compare` 0 of
LT -> fromIntegral i -- median is duplicated, don't need average
GT -> findMid2 (succ i) (numRemaining - countOf (succ i))
EQ -> midAverage (succ i) (countOf (succ i))
where
midAverage j 0 = midAverage (succ j) (countOf (succ j))
midAverage j _ = averageValue (fromIntegral i) (fromIntegral j)
Note the use of the GHC extension BangPatterns
, covered in a
2014 Day of GHC Extensions,
to ensure strict evaluation in the tail-recursive loops.
Mutable versus immutable ways to create the counts table
What is makeCountsMutably
?
Mutable
The version of the function that uses mutation does it using the
create
function:
-- | Use local mutation for efficiency in creating a table of counts,
-- looping through to update it, and freezing the result to return.
makeCountsMutably
:: V.Vector Value -- ^ values seen
-> V.Vector CountOfValue -- ^ value => count
makeCountsMutably values = V.create $ do
counts <- M.replicate numPossibleValues 0
V.forM_ values $
M.unsafeModify counts succ . fromIntegral
return counts
-- | 256, in our case.
numPossibleValues :: Int
numPossibleValues = fromIntegral (maxBound :: Value) + 1
create
has type
create :: Unbox a => (forall s. ST s (MVector s a)) -> Vector a
and cleverly “freezes”, in place, the mutable vector returned within
ST
monad context, into an immutable vector, avoiding any copying
because the mutation happening inside the callback is never visible
outside it.
Immutable
But wait, there’s also a way to create the table using only immmutable
Vector
s, no intermediate MVector
needed!
-- | Make table of counts without using 'MVector'.
makeCountsPurely
:: V.Vector Value
-> V.Vector CountOfValue
makeCountsPurely =
V.unsafeAccumulate (+) (V.replicate numPossibleValues 0)
. V.map (\v -> (fromIntegral v, 1))
This is straightforward and elegant but seems to create an
intermediate vector using map
, which is not what we want to do
given that the whole point is to be space efficient.
Fusion
But vector
comes with sophisticated
support for fusion.
A later Day of Hackage (it’s now here at day 19) will explain what is going on, but GHC actually
rewrites the calls to unsafeAccumulate
and map
in order to avoid
creating an intermediate vector!
Conclusion
Fixed-size arrays are a very important low-level building block for
many computations. I use vector
when doing (sequential) array
operations. Hopefully this little example shows how to write efficient
vector-based imperative code in Haskell while hiding as much of the
state as possible, through selected type-oriented APIs that operate
between the two worlds of mutable and immutable. It is even possible
to write code relying on fusion that does not explicitly build a
mutable vector.
(These are sequential arrays. For parallel arrays, see a
2013 Day of Hackage
which covered repa
, a library
for “high performance, regular, multi-dimensional, shape polymorphic
parallel arrays”.)
All the code
All my code for my article series are at this GitHub repo.